## Carga de datasets a PostgreSQL
Objetivo: disponibilidad de datos via web

In [1]:
from glob import glob
import json
import pandas as pd
from sqlalchemy import create_engine

In [2]:
def cargar_df_a_postgres(df: pd.DataFrame, nombre_tabla: str, uri_db: str, if_exists='replace') -> None:
    """Carga un archivo parquet a una nueva tabla en PostgreSQL,
    si la tabla no exite. Si la tabla existe, el parquet se anexa.
    """
    # Crear un motor de base de datos
    engine = create_engine(uri_db)
    # Anexar el dataframe a la tabla SQL
    df.to_sql(nombre_tabla, engine, if_exists=if_exists, index=False)

    # Cerrar la conexión
    engine.dispose()

In [3]:
def transforms_de_compatibilidad(df: pd.DataFrame, nom_archivo: str) -> pd.DataFrame:
    """transforms_de_compatibilidad
    """
    if nom_archivo == 'y_business':
        df['attributes'] = df['attributes'].apply(json.dumps)
        df['categories'] = df['categories'].apply(lambda x: str(x).split(',')).apply(json.dumps)
        df['hours'] = df['hours'].apply(json.dumps)

    elif nom_archivo == 'y_checkin':
        df.set_index('business_id', inplace=True)
        df['date'] = df['date'].str.split(',')
        df = df.explode('date')
        df['date'] = pd.to_datetime(df['date'], format='mixed')
        df.reset_index(inplace=True)

    elif nom_archivo == 'y_review':
        pass

    elif nom_archivo == 'y_user':
        df['elite'] = df['elite'].apply(lambda x: str(x).split(',')).apply(lambda x: json.dumps(x) if x[0] else None)
        df['friends'] = df['friends'].apply(lambda x: str(x).split(',')).apply(lambda x: json.dumps(x) if x[0] != "None" else None)
        df.reset_index(drop=True, inplace=True)

    elif nom_archivo == 'y_tip':
        pass

    elif nom_archivo == 'g_sitios':
        df['category'] = df['category'].apply(lambda x: json.dumps(x.tolist()) if x is not None else None)
        df['hours'] =  df['hours'].apply((lambda x: json.dumps(dict(x.tolist())) if x is not None else None))
        df['relative_results'] = df['relative_results'].apply(lambda x: json.dumps(x.tolist()) if x is not None else None)
        df.drop('MISC', axis=1, inplace=True)
        df.drop('url', axis=1, inplace=True)

    elif nom_archivo == 'g_review':
        df['time'] = pd.to_datetime(df['time'], unit='ms')
        df['rating'] = df['rating'].astype('int16')
        df['text'] = df['text'].apply(lambda x: x.replace('\x00', ' ') if isinstance(x, str) else x)
        df.drop('pics', axis=1, inplace=True)
        df.drop('resp', axis=1, inplace=True)
        
    return df

### Credenciales base de datos

In [8]:
# Credenciales PostgreSQL
db_user = 'henry'
db_passwd = 'Onetwo12!'
uri_base_datos = f'postgresql://{db_user}:{db_passwd}@localhost:5432/yelp_goog'

### Parquet únicos

In [None]:
# Crear lista de parquets en /data/raw/
parquets_unicos = glob(f'../data/raw/*.parquet')
parquets_unicos

In [None]:
for p in parquets_unicos:
    nom_tabla = p.split('/')[-1].split('.')[0].replace('-','_')
    print(f'Procesando {nom_tabla}')
    df = pd.read_parquet(p)
    cargar_df_a_postgres(transforms_de_compatibilidad(df, nom_tabla), nom_tabla, uri_base_datos)
    print(f'{nom_tabla=} cargado a PostgreSQL')

### Parquet fragmentados

In [5]:
# Crear lista de parquets en /data/raw/*/
parquets_frag = glob(f'../data/raw/*/*.parquet')
parquets_frag.sort()
parquets_frag

['../data/raw/g-review/g-review_California.parquet',
 '../data/raw/g-review/g-review_District_of_Columbia.parquet',
 '../data/raw/g-review/g-review_Florida.parquet',
 '../data/raw/g-review/g-review_Georgia.parquet',
 '../data/raw/g-review/g-review_Illinois.parquet',
 '../data/raw/g-review/g-review_New_Jersey.parquet',
 '../data/raw/g-review/g-review_New_York.parquet',
 '../data/raw/g-review/g-review_Pennsylvania.parquet',
 '../data/raw/g-review/g-review_Texas.parquet',
 '../data/raw/g-review/g-review_Virginia.parquet',
 '../data/raw/y-review/y-review_01.parquet',
 '../data/raw/y-review/y-review_02.parquet',
 '../data/raw/y-review/y-review_03.parquet',
 '../data/raw/y-user/y-user_01.parquet',
 '../data/raw/y-user/y-user_02.parquet',
 '../data/raw/y-user/y-user_03.parquet',
 '../data/raw/y-user/y-user_04.parquet']

In [7]:
ultimo_nom_tabla = ''
for p in parquets_frag:
    nom_tabla = p.split('/')[-2].replace('-','_')
    tipo_transaccion = 'append' if ultimo_nom_tabla == nom_tabla else 'replace'
    print(nom_tabla, tipo_transaccion)
    
    df = pd.read_parquet(p)
    cargar_df_a_postgres(transforms_de_compatibilidad(df, nom_tabla), nom_tabla, uri_base_datos, if_exists=tipo_transaccion)
    print(f'{p}\ncargado a {nom_tabla=} en PostgreSQL', end='\n')

    ultimo_nom_tabla = nom_tabla

g_review replace
../data/raw/g-review/g-review_California.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_District_of_Columbia.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_Florida.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_Georgia.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_Illinois.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_New_Jersey.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_New_York.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_Pennsylvania.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review append
../data/raw/g-review/g-review_Texas.parquet
cargado a nom_tabla='g_review' en PostgreSQL
g_review app