In [1]:
# We are using black as a code formatter
%load_ext nb_black

<IPython.core.display.Javascript object>

In [4]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine

# load parquet file as dataframe
df1 = pd.read_parquet(
    "https://github.com/jorgeav527/life-expectancy/blob/main/pre_process_data/df_unpd_%26_twb.parquet?raw=true",
    engine="pyarrow",
)
df1.reset_index(inplace=True)

# load parquet file as dataframe
df2 = pd.read_parquet(
    "https://github.com/jorgeav527/life-expectancy/blob/main/data_lake/paises_del_mundo.parquet?raw=true",
    engine="pyarrow",
)
df2.reset_index(inplace=True)

<IPython.core.display.Javascript object>

In [5]:
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8796 entries, 0 to 8795
Data columns (total 53 columns):
 #   Column                                             Non-Null Count  Dtype   
---  ------                                             --------------  -----   
 0   countryiso3code                                    8796 non-null   object  
 1   date                                               8796 non-null   object  
 2   porcentaje_de_bosque                               8337 non-null   float64 
 3   emisiones_co2                                      7564 non-null   float64 
 4   capacidad_estadística                              3238 non-null   float64 
 5   pib_ppa_prec_inter                                 7686 non-null   float64 
 6   pib_pc_usd_actuales                                8130 non-null   float64 
 7   pib_pc_prec_inter                                  7683 non-null   float64 
 8   INB_percapita                                      7558 non-null   float64 
 9

<IPython.core.display.Javascript object>

In [6]:
df2.rename(
    columns={
        "id": "Iso3",
    },
    inplace=True,
)
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 175 entries, 0 to 174
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   index         175 non-null    int64 
 1   Iso3          175 non-null    object
 2   iso2Code      175 non-null    object
 3   name          175 non-null    object
 4   longitude     175 non-null    object
 5   latitude      175 non-null    object
 6   region.id     175 non-null    object
 7   region.value  175 non-null    object
dtypes: int64(1), object(7)
memory usage: 11.1+ KB


<IPython.core.display.Javascript object>

In [7]:
df_copy_df1 = (
    df1[["countryiso3code", "date", "INB_percapita", "nivel_ingreso"]]
    .copy()
    .replace("", np.nan)
)

df_copy_df1[["nivel_ingreso_codes"]] = (
    df_copy_df1[["nivel_ingreso"]]
    .apply(lambda col: pd.Categorical(col).codes)
    .replace(-1, np.nan)
)
df_copy_df1[["countryiso3code_codes"]] = (
    df_copy_df1[["countryiso3code"]]
    .apply(lambda col: pd.Categorical(col).codes)
    .replace(-1, np.nan)
)
df_copy_df1.rename(
    columns={
        "countryiso3code": "Iso3",
    },
    inplace=True,
)
df_copy_df1_merge_df2 = pd.merge(df_copy_df1, df2, how="inner", on="Iso3")
df_copy_df1_merge_df2

Unnamed: 0,Iso3,date,INB_percapita,nivel_ingreso,nivel_ingreso_codes,countryiso3code_codes,index,iso2Code,name,longitude,latitude,region.id,region.value
0,AFG,2020,500.0,low_income,0.0,2.0,1,AF,Afganistán,69.1761,34.5228,SAS,Asia meridional
1,AFG,2019,520.0,low_income,0.0,2.0,1,AF,Afganistán,69.1761,34.5228,SAS,Asia meridional
2,AFG,2018,510.0,low_income,0.0,2.0,1,AF,Afganistán,69.1761,34.5228,SAS,Asia meridional
3,AFG,2017,530.0,low_income,0.0,2.0,1,AF,Afganistán,69.1761,34.5228,SAS,Asia meridional
4,AFG,2016,550.0,low_income,0.0,2.0,1,AF,Afganistán,69.1761,34.5228,SAS,Asia meridional
...,...,...,...,...,...,...,...,...,...,...,...,...,...
5420,ZMB,1994,350.0,low_income,0.0,259.0,174,ZM,Zambia,28.2937,-15.3982,SSF,África al sur del Sahara
5421,ZMB,1993,380.0,low_income,0.0,259.0,174,ZM,Zambia,28.2937,-15.3982,SSF,África al sur del Sahara
5422,ZMB,1992,360.0,low_income,0.0,259.0,174,ZM,Zambia,28.2937,-15.3982,SSF,África al sur del Sahara
5423,ZMB,1991,390.0,low_income,0.0,259.0,174,ZM,Zambia,28.2937,-15.3982,SSF,África al sur del Sahara


<IPython.core.display.Javascript object>

### Dataframe ingresos

In [8]:
df_ingresos = df_copy_df1_merge_df2[
    ["countryiso3code_codes", "date", "INB_percapita", "nivel_ingreso_codes"]
].copy()
df_ingresos.rename(
    columns={
        "countryiso3code_codes": "pais_id",
        "date": "año",
        "nivel_ingreso_codes": "nivel_id",
    },
    inplace=True,
)
df_ingresos

Unnamed: 0,pais_id,año,INB_percapita,nivel_id
0,2.0,2020,500.0,0.0
1,2.0,2019,520.0,0.0
2,2.0,2018,510.0,0.0
3,2.0,2017,530.0,0.0
4,2.0,2016,550.0,0.0
...,...,...,...,...
5420,259.0,1994,350.0,0.0
5421,259.0,1993,380.0,0.0
5422,259.0,1992,360.0,0.0
5423,259.0,1991,390.0,0.0


<IPython.core.display.Javascript object>

### DataFrame países

In [10]:
df_paises = df_copy_df1[["countryiso3code_codes", "Iso3"]].copy()
df_paises.drop_duplicates(inplace=True, ignore_index=True)
df_paises.rename(
    columns={
        "countryiso3code_codes": "id",
    },
    inplace=True,
)
df_paises_merge_df2 = pd.merge(df_paises, df2, how="left", on="Iso3")
df_paises_merge_df2.dropna(inplace=True)
df_paises_merge_df2
df_paises_agregados = df_paises_merge_df2[
    ["id", "iso2Code", "Iso3", "name", "longitude", "latitude", "region.value"]
].copy()
df_paises_agregados.rename(
    columns={
        "name": "nombre",
        "Longitude": "longitud",
        "Latitude": "latitud",
        "region.value": "región",
    },
    inplace=True,
)
# df_paises
df_paises_agregados

Unnamed: 0,id,iso2Code,Iso3,nombre,longitude,latitude,región
45,2.0,AF,AFG,Afganistán,69.1761,34.5228,Asia meridional
46,5.0,AL,ALB,Albania,19.8172,41.3317,Europa y Asia central
47,60.0,DZ,DZA,Argelia,3.05097,36.7397,Oriente Medio y Norte de África
48,11.0,AS,ASM,Samoa Americana,-170.691,-14.2846,Asia oriental y el Pacífico
49,6.0,AD,AND,Andorra,1.5218,42.5075,Europa y Asia central
...,...,...,...,...,...,...,...
251,246.0,US,USA,Estados Unidos,-95.712891,37.0902,América del Norte
253,247.0,UZ,UZB,Uzbekistán,69.269,41.3052,Europa y Asia central
255,249.0,VE,VEN,Venezuela,-69.8371,9.08165,América Latina y el Caribe
259,257.0,YE,YEM,"Yemen, Rep. del",44.2075,15.352,Oriente Medio y Norte de África


<IPython.core.display.Javascript object>

### Dataframe niveles

In [11]:
df_niveles = df_copy_df1[["nivel_ingreso_codes", "nivel_ingreso"]].copy()
df_niveles.dropna(inplace=True)
df_niveles.drop_duplicates(inplace=True, ignore_index=True)
df_niveles.rename(
    columns={
        "nivel_ingreso_codes": "id",
        "nivel_ingreso": "cuartil",
    },
    inplace=True,
)
df_niveles["cuartil"] = df_niveles["cuartil"].replace(
    ["lower_middle_income", "low_income", "upper_middle_income", "high_income"],
    [
        "ingresos_medios_bajos",
        "ingresos_bajos",
        "ingresos_medios_altos",
        "ingresos_altos",
    ],
)
df_niveles

Unnamed: 0,id,cuartil
0,1.0,ingresos_medios_bajos
1,0.0,ingresos_bajos
2,2.0,ingresos_medios_altos
3,3.0,ingresos_altos


<IPython.core.display.Javascript object>

### indices

In [12]:
df_copy_df2 = (
    df1[df1.columns.difference(["nivel_ingreso", "INB_percapita"])]
    .copy()
    .replace("", np.nan)
)
df_copy_df2[["countryiso3code_codes"]] = (
    df_copy_df2[["countryiso3code"]]
    .apply(lambda col: pd.Categorical(col).codes)
    .replace(-1, np.nan)
)
df_copy_df2.rename(
    columns={
        "countryiso3code": "Iso3",
    },
    inplace=True,
)
df_copy_df2_merge_df2 = pd.merge(
    df_copy_df2, df_paises_agregados, how="inner", on="Iso3"
)
df_indices = (
    df_copy_df2_merge_df2[
        df_copy_df2_merge_df2.columns.difference(
            [
                "Iso3",
                "nombre",
                "longitude",
                "latitude",
                "región",
                "countryiso3code_codes",
            ]
        )
    ]
    .copy()
    .reset_index()
)
df_indices.rename(
    columns={
        "index": "id",
        "id": "pais_id",
    },
    inplace=True,
)
df_indices.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5425 entries, 0 to 5424
Data columns (total 53 columns):
 #   Column                                             Non-Null Count  Dtype  
---  ------                                             --------------  -----  
 0   id                                                 5425 non-null   int64  
 1   acceso_agua_potable(%)                             3591 non-null   float64
 2   acceso_servicios_sanitarios(%)                     3554 non-null   float64
 3   camas_hospitales_c/1000personas                    2466 non-null   float64
 4   cambio_de_la_población                             5301 non-null   float64
 5   cambio_natural_población                           5301 non-null   float64
 6   capacidad_estadística                              2177 non-null   float64
 7   date                                               5425 non-null   object 
 8   densidad_población_por_kilómetro_cuadrado)         5301 non-null   float64
 9   duración

<IPython.core.display.Javascript object>

### Creación de la base de datos

In [13]:
# create postgresql engine
DATABASE_URL = "postgresql://dnsajfigxozxvj:6b41d1c0158a5c6844e040d68b7ec94b2534e1462259681f9807589d65d320d4@ec2-3-213-66-35.compute-1.amazonaws.com:5432/dcgv9p01litrvi"  # Remote Heroku
# DATABASE_URL = "postgresql://lupvjmdf:cyxuY_-w0bSTg36HynGOWCqY0Ylyg7w6@babar.db.elephantsql.com/lupvjmdf"  # Remote Elephant
# DATABASE_URL = "postgresql://postgres:jorgimetro527@localhost/fastapi" # Local

engine = create_engine(DATABASE_URL)

<IPython.core.display.Javascript object>

In [14]:
# table ingreso (hecho)
df_ingresos.to_sql(
    "ingreso", con=engine, index=True, if_exists="replace", index_label="id"
)

425

<IPython.core.display.Javascript object>

In [15]:
# tabla nivel (dimensión)
df_niveles.to_sql(
    "nivel", con=engine, index=False, if_exists="replace", index_label="id"
)

4

<IPython.core.display.Javascript object>

In [16]:
# tabla pais (dimension)
df_paises_agregados.to_sql(
    "pais", con=engine, index=False, if_exists="replace", index_label="id"
)

175

<IPython.core.display.Javascript object>

In [17]:
# table ingreso (hecho)
df_indices.to_sql(
    "indice", con=engine, index=False, if_exists="replace", index_label="id"
)

425

<IPython.core.display.Javascript object>

Create parquet files

In [18]:
df_ingresos.to_parquet(f"../data_ware_house/ingreso.parquet")
df_niveles.to_parquet(f"../data_ware_house/nivel.parquet")
df_paises_agregados.to_parquet(f"../data_ware_house/pais.parquet")
df_indices.to_parquet(f"../data_ware_house/indice.parquet")

OSError: Cannot save file into a non-existent directory: '../data_ware_house'

<IPython.core.display.Javascript object>

## muchas gracias por la introduccion ravi y buenos días con todos, acontinuación mostraré de forma rápida pero efectiva la arquitectura del proyecto, como también diferentes soluciones al problema de la pipeline sugerida.

## Como sabemos estamos extrayendo los datos mediante las API's del banco mundial, las naciones unidas, y la organización mundial de la salud, pero estamos preparados para merger otras fuentes de data.

## La primera dificultad que nos vino a la mente y experimentamos, fue el fallo de conexión al sevidor y algunas alteraciones en la data que no estaba propiamente documentada en la API. Es por eso que tomamos la decisión de usar git lfs como un tipo datalake para albergar archivos en formato parquet derivados tanto de la extracción como del preprocesamiento de datos. Asu vez y tambien solo para recalcar estamos usando github flow para el seguimiento del proyecto.

## Estos datos preprocesados podian ser consumibles directamente de manera remota por  powerBI, pero decidimos implementar un datawarehouse empleando dos aproximaciones, la primera fue la creación de una api integrada con una base de datos postgres dockerizada y enviada a produccion, de manera local demoraba mucho en los requests y por otro lado no hace mucho sentido extraer datos de una API para generar otra.

## La segunda solo la base de datos albergada en elephantsql, ventajas 20 MB, pero al tomar cada tabla como una conexión no se puede completar la descarga a powerBI. Herokupostgres, por otro lado, completa la descarga, pero al tener una limitación de solo 10000 filas compiladas también es una decisión que debe evaluarse.

## Estas bases de datos tienen como fuente AWS

## Por otro lado, tenemos a Airflow una plataforma de planificación de flujo de trabajos utilizada en ingenieria de datos; una solución que está en proceso de ejecución, ya que se encarga de la descarga de datos, el preprocesamiento, el monitoreo y la generación de un reporte. Fácil de integrar con hive, haddop, kubernetes, AWS o google clould, resolvería problemas como fallos en la extracción, ejecución de tareas dependientes, escalabilidad, la entrega continua automatiza y la integración continua de los datos.

## Este fue el resultado de la conexión exitosa de herokupostgres y el schema de la base de datos relacional cargada en powerBI.