## Data engineering

Build an entire pipe-line combining 2 different sources and output to Power BI service. For this, you will choose 2 tables. Table A from INE and table B from UE. You will later combine in the common field, preferably, `date`.

In [1]:
%pip install eurostat
import eurostat
import requests
import pandas as pd
import datetime
import json


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


__1. Extract data from table A directly from INE using python__

You can use the notebook "State_Stats"

In [2]:
#Datos de la evolución de los indices de produccion industrial en España
url = 'https://servicios.ine.es/wstempus/js/ES/DATOS_TABLA/50902'

r = requests.get(url)
data = r.json()
#prettify
df=pd.DataFrame(data)
data=pd.DataFrame(df.iloc[0,-1])
data['date']=data[['Anyo','FK_Periodo']].apply(lambda x: f'{x.Anyo}-{x.FK_Periodo}',axis=1)
data=data.drop(['Fecha','FK_TipoDato','FK_Periodo','Anyo','Secreto'],axis=1)
data.rename(columns={'Valor':'ESP'},inplace=True)
data.head()

Unnamed: 0,ESP,date
0,113.308,2023-12
1,113.28,2023-11
2,113.676,2023-10
3,113.348,2023-9
4,113.149,2023-8


__2. Extract data from table B directly from UE using python__

In [3]:
code='prc_ipc_g20'
data_euro = eurostat.get_data_df(code)
date_values=data_euro.columns[3:]
print('\nEstado inicial de los datos: \n')
print('-'*20)
display(data_euro.head())
data_euro=data_euro.transpose().iloc[-1:1:-1]
data_euro.columns=data_euro.iloc[-1,:]
data_euro=data_euro.iloc[:-1,0:20].copy()
data_euro.reset_index(inplace=True)
data_euro.rename(columns={'index':'date'},inplace=True)
print('\nEstado FINAL de los datos: \n')
print('-'*20)
data_euro.head()


Estado inicial de los datos: 

--------------------


Unnamed: 0,freq,unit,geo\TIME_PERIOD,1994-01,1994-02,1994-03,1994-04,1994-05,1994-06,1994-07,...,2023-03,2023-04,2023-05,2023-06,2023-07,2023-08,2023-09,2023-10,2023-11,2023-12
0,M,I15,AR,38.4,38.4,38.5,38.6,38.7,38.9,39.2,...,,,,,,,,,,
1,M,I15,BR,3.3,4.6,6.6,9.4,13.5,19.9,21.3,...,153.4,154.3,154.6,154.5,154.7,155.1,155.5,,,
2,M,I15,CA,68.0,67.5,67.5,67.5,67.3,67.5,67.7,...,122.7,123.6,124.0,124.2,124.9,125.4,125.2,,,
3,M,I15,CN,50.1,51.2,51.8,53.0,53.6,54.1,54.4,...,115.1,114.9,114.8,114.6,114.8,115.1,115.3,,,
4,M,I15,DE,74.2,74.7,74.8,74.8,74.9,75.2,75.5,...,125.1,125.8,125.6,126.1,126.7,127.2,127.4,127.2,126.3,126.6



Estado FINAL de los datos: 

--------------------


geo\TIME_PERIOD,date,AR,BR,CA,CN,DE,EU27_2020,FR,G20,ID,...,IT,JP,KR,MX,RU,SA,TR,UK,US,ZA
0,2023-12,,,,,126.6,127.4,121.5,,,...,121.7,,,,,,,,,
1,2023-11,,,,,126.3,127.2,121.3,,,...,121.5,,,,,,,,,
2,2023-10,,,,,127.2,127.7,121.6,,128.4,...,122.2,,119.5,149.0,,,,,,
3,2023-09,,155.5,125.2,115.3,127.4,127.6,121.4,140.2,128.2,...,122.1,,119.1,148.4,,113.8,648.5,132.0,129.9,151.5
4,2023-08,,155.1,125.4,115.1,127.2,127.2,122.1,139.5,127.9,...,120.1,,118.4,147.8,,113.8,619.1,131.3,129.5,150.4


__3. Join both tables using the common field__

You can use `merge` or `concat`

In [4]:
data_merged=pd.merge(data, data_euro, on='date', how='inner')
data_merged.head()

Unnamed: 0,ESP,date,AR,BR,CA,CN,DE,EU27_2020,FR,G20,...,IT,JP,KR,MX,RU,SA,TR,UK,US,ZA
0,113.308,2023-12,,,,,126.6,127.4,121.5,,...,121.7,,,,,,,,,
1,113.28,2023-11,,,,,126.3,127.2,121.3,,...,121.5,,,,,,,,,
2,113.676,2023-10,,,,,127.2,127.7,121.6,,...,122.2,,119.5,149.0,,,,,,
3,109.899,2022-12,,150.2,121.0,115.1,122.0,123.2,116.7,133.6,...,121.1,,115.2,144.3,,112.5,432.5,127.2,125.2,145.2
4,109.734,2022-11,,149.3,121.7,115.2,123.5,123.4,116.8,133.5,...,120.8,,115.0,143.7,,112.3,427.4,126.7,125.6,144.6


__4. Create a calculated field named "pred_place_holder"__

You can create a dummy column of `ones`. This step is to ilustrate "the magic goes here"

In [10]:
data_merged['pred_place_holder']=1
data_merged.tail()

Unnamed: 0,ESP,date,AR,BR,CA,CN,DE,EU27_2020,FR,G20,...,JP,KR,MX,RU,SA,TR,UK,US,ZA,pred_place_holder
61,74.167,2003-11,57.3,51.5,81.5,72.4,82.2,80.1,83.6,68.7,...,96.9,74.2,63.0,33.4,65.7,39.7,75.9,77.8,55.1,1
62,73.926,2003-10,57.1,51.3,81.2,71.7,82.4,80.0,83.5,68.6,...,97.4,74.4,62.4,33.1,65.7,39.1,76.0,78.1,55.7,1
63,72.409,2002-12,55.4,47.3,79.9,70.5,82.0,78.6,81.7,66.9,...,97.4,72.1,60.8,30.2,65.4,34.1,75.2,76.3,56.0,1
64,72.169,2002-11,55.3,46.4,80.2,70.3,81.1,78.2,81.6,66.8,...,97.4,71.8,60.6,29.7,65.4,33.5,74.9,76.5,55.9,1
65,72.058,2002-10,55.0,45.0,80.0,70.4,81.4,78.3,81.6,66.6,...,97.4,71.7,60.1,29.2,65.3,32.5,74.9,76.5,55.2,1


__5. Push the resulting df to Power BI service__

You can use the notebook "Idealista"

In [6]:
#Convertimos el dataframe al formato de diccionario requerido por el data streaming
dict_list = []
valores=['ESP','FR','EU27_2020','DE']
# Iteramos sobre cada fila del DataFrame
for index, row in data_merged.iterrows():
    item_dict={}
    item_dict['Fecha']=str(datetime.datetime(day=1,month=int(row['date'].split('-')[1]),year=int(row['date'].split('-')[0])))
    for val in valores:
        # Creamos un diccionario con los valores de cada fila
        item_dict[val]=row[val]
        # Agregamos este diccionario a la lista
    dict_list.append(item_dict)

dict_list

[{'Fecha': '2023-12-01 00:00:00',
  'ESP': 113.308,
  'FR': 121.5,
  'EU27_2020': 127.4,
  'DE': 126.6},
 {'Fecha': '2023-11-01 00:00:00',
  'ESP': 113.28,
  'FR': 121.3,
  'EU27_2020': 127.2,
  'DE': 126.3},
 {'Fecha': '2023-10-01 00:00:00',
  'ESP': 113.676,
  'FR': 121.6,
  'EU27_2020': 127.7,
  'DE': 127.2},
 {'Fecha': '2022-12-01 00:00:00',
  'ESP': 109.899,
  'FR': 116.7,
  'EU27_2020': 123.2,
  'DE': 122.0},
 {'Fecha': '2022-11-01 00:00:00',
  'ESP': 109.734,
  'FR': 116.8,
  'EU27_2020': 123.4,
  'DE': 123.5},
 {'Fecha': '2022-10-01 00:00:00',
  'ESP': 109.866,
  'FR': 116.3,
  'EU27_2020': 123.3,
  'DE': 123.5},
 {'Fecha': '2021-12-01 00:00:00',
  'ESP': 103.965,
  'FR': 109.3,
  'EU27_2020': 111.6,
  'DE': 111.3},
 {'Fecha': '2021-11-01 00:00:00',
  'ESP': 102.738,
  'FR': 109.1,
  'EU27_2020': 111.1,
  'DE': 111.0},
 {'Fecha': '2021-10-01 00:00:00',
  'ESP': 102.425,
  'FR': 108.6,
  'EU27_2020': 110.5,
  'DE': 110.7},
 {'Fecha': '2020-12-01 00:00:00',
  'ESP': 97.574,
  'FR

In [7]:
url='https://api.powerbi.com/beta/78954451-ce2d-4c90-ae61-68906e409956/datasets/ad1618cf-0dcd-4a6c-806b-3fe5fa339f57/rows?experience=power-bi&key=L1FZkaTWxUp0Jj7lJdZoXkqsWt6Q2XXK0DFZ1qUSlfull0WWmeMG%2BKnohUkPrJw6u4XzPosn5fjAJxsO1jb%2F%2BQ%3D%3D'
headers = {'Content-Type': 'application/json'}

In [8]:
headers = {'Content-Type': 'application/json'}
response = requests.request(
    method="POST",
    url=url,
    headers=headers,
    data=json.dumps(dict_list)
)

__6. Paste the code you would use to automate the process using your local scheduler__

You can use the notebook "Local_deployment". You need 2 files: .py, and .bat.

In [9]:
#OPCION 1
## CONTENIDO DEL ARCHIVO.SH (EN MAC SE PUEDE DEFINIR LA EJECUCION CON AUTOMATOR)
/Users/mariolamas/anaconda3/bin/python3 /Users/mariolamas/Desktop/TecnicasDeRecogidaDeDatos/Entregable_2/entregable_2.py

#OPCION 2
##DIRECTAMENTE CON CRONTAB
### CONTENIDO DE crontrab -e
''' 
De esta forma me ejecuta todos los meses a las 9 el archivo y en caso de que haya un error me lo guarda en un archivo llamado entregable.log
'''
0 9 1 * * /Users/mariolamas/anaconda3/bin/python3 /Users/mariolamas/Desktop/TecnicasDeRecogidaDeDatos/Entregable_2/entregable_2.py >> entregable.log 2>&1

SyntaxError: invalid syntax (3650410424.py, line 3)

In [None]:
#CONTENIDO DEL ARCHIVO .PY (ADAPTACION DEL IPYNB PARA EL FLUJO AUTOMATIZADO)
#!/usr/bin/env python3
#IMPORTACION DE PAQUETES
import requests
import pandas as pd
import datetime
import numpy as np
import json
import eurostat

#CONSTANTES
URL_INE='https://servicios.ine.es/wstempus/js/ES/DATOS_TABLA/26061'
URL_POWER_BI='https://api.powerbi.com/beta/78954451-ce2d-4c90-ae61-68906e409956/datasets/1f36bc2c-4539-4f74-a5e7-ff1d7898ee51/rows?experience=power-bi&key=WyBo5NOrT%2FzlgJCPXglBxozLxm%2FMwLNKOMqY%2BLNxEuyIMGxtYysHpnIhTkEi%2FDW42TEVzd772kk68Bl6xQ2NmA%3D%3D'
URL_DATA='/Users/mariolamas/Desktop'


class Scrap():
    def __init__(self,url_ine,url_pw_bi,url_data):
        """
        INIT
        -----
        Se encarga de definir algunas constantes básicas que va a requerir la clase

        Args:
            url_ine (string): Url a la tabla del Ine
            url_pw_bi (string): Url del flujo de datos de POWERBI
            url_data (string): Url del equipo donde ubicar el archivo con los datos mergeados
        """
        self.url_ine=url_ine
        self.url_power_bi=url_pw_bi
        self.url_data=url_data
        self.code='prc_ipc_g20'
        self.headers={'Content-Type': 'application/json'}
        self.paises=['ESP','FR','EU27_2020','DE']
    
    def datos_ine_preprocess(self):
        """
        Data Ine Processed
        ------------------
        Esta funcion se encarga de recolectar los datos del INE y procesarlos

        Returns:
            pd.DataFrame: Datos del INE procesados.
        """
        r = requests.get(self.url_ine)
        data = r.json()
        #prettify
        df=pd.DataFrame(data)
        data=pd.DataFrame(df.iloc[0,-1])
        data['date']=data[['Anyo','FK_Periodo']].apply(lambda x: f'{x.Anyo}-{x.FK_Periodo}',axis=1)
        data=data.drop(['Fecha','FK_TipoDato','FK_Periodo','Anyo','Secreto'],axis=1)
        data.rename(columns={'Valor':'ESP'},inplace=True)
        return data

    def data_eur_processed(self):
        """
        Data Eurostat Processed
        -----------------------
        Esta funcion se encarga de recolectar los datos de eurostat y procesarlos

        Returns:
            pd.DataFrame: Datos de EUROSTAT procesados.
        """
        data_euro = eurostat.get_data_df(self.code)
        data_euro=data_euro.transpose().iloc[-1:1:-1]
        data_euro.columns=data_euro.iloc[-1,:]
        data_euro=data_euro.iloc[:-1,0:20].copy()
        data_euro.reset_index(inplace=True)
        data_euro.rename(columns={'index':'date'},inplace=True)
        return data_euro

    def data_merge(self,data_ine,data_eur):
        """
        Data Merge
        -----------
        Funcion encargada de mergear los datos de los Dataframes preprocesados del INE y EUROSTAT

        Args:
            data_ine (pd.DataFrame): Datos preprocesados del INE
            data_eur (pd.DataFrame): Datos preprocesados de EUROSTAT

        Returns:
            pd.DataFrame: Datos unidos del INE con EUROSTAT
        """
        data_merged=pd.merge(data_ine, data_eur, on='date', how='inner')
        data_merged['pred_place_holder']=1
        return data_merged
    
    def push_pw_bi(self,data_merged):
        """
        Push to POWERBI
        -----------------
        Este metodo se encarga de enviar los datos procesados a un streaming de datos de powerbi a traves de la url proporcionada

        Args:
            data_merged (pd.DataFrame): DataFrame con los datos limpios y procesados del INE y EUROSTAT
        """
        #Convertimos el dataframe al formato de diccionario requerido por el data streaming
        dict_list = []
        # Iteramos sobre cada fila del DataFrame
        for _, row in data_merged.iterrows():
            item_dict={}
            item_dict['Fecha']=str(datetime.datetime(day=1,month=int(row['date'].split('-')[1]),year=int(row['date'].split('-')[0])))
            for pais in self.paises:
                # Creamos un diccionario con los valores de cada fila
                item_dict[pais]=row[pais]
                # Agregamos este diccionario a la lista
            dict_list.append(item_dict)
        
        #HACEMOS EL POST REQUEST
        response = requests.request(
            method="POST",
            url=self.url_power_bi,
            headers=self.headers,
            data=json.dumps(dict_list))
    

    def guardar_info(self,data):
        """
        Guardar info
        -------------

        Esta funcion de encarga de guardar la informacion de los datos procesados en un archivo datos_scrap.json
        Args:
            data (pd.DataFrame): Datos mergeados y procesados del INE y EUROSTAT
        """
        with open(f'{self.url_data}/data_scrap.json','w') as archivo:
            json.dump(data.to_dict(),archivo)
        archivo.close()
    
    def run(self):
        """
        Run
        ----
        Este método se encarga de definir el flujo de ejecucion de las diferentes funcionalidades del scraper de datos del INE y EUROSTAT
        """
        data_ine=self.datos_ine_preprocess()
        data_eur=self.data_eur_processed()
        data_merged=self.data_merge(data_ine=data_ine,
                                    data_eur=data_eur)
        try:   
            self.push_pw_bi(self.url_power_bi,data_merged=data_merged)
        except:
            print('La URL de POWERBI introducida no es correcta')
        self.guardar_info(data_merged)


if __name__=='__main__':
    #Creamos una instancia de la clase y llamamos al método run
    scraper=Scrap(URL_INE,URL_POWER_BI,URL_DATA)
    scraper.run()
