In [1]:
# Para cargar los datos
import pandas as pd
import polars as pl
# Para acceder al file-system
import os
# Para acceder a la base de datos
import sqlite3

In [2]:
# Cargamos metadata
metadata = pl.read_parquet('../../data/google/production/metadata.parquet')

In [3]:
# Obtenemos una lista de diccionarios por fila
old_new_id = metadata.select(['gmap_id', 'company_index']).to_dicts()

In [4]:
# Creamos el diccionario que mapea old ids con las new id
map_old_new = {element['gmap_id']:element['company_index'] for element in old_new_id}

In [7]:
# Obtenemos los nombre de los estados
base_path = r'../../data/google/reviews'
estados = [file.split('.')[0] for file in os.listdir(base_path)][1:]

In [5]:
# Este diccionario contendra el estado y una lista de empresas en el mismo
estado_company = {}

# Este diccionario contendra el user_id y las empresas correspondientes a las que les realizo una review
user_company = {}

# Offset para crear la columna user_index
offset = 1
for estado in estados:
    # Cargamos la review en pandas
    df = pd.read_csv(f'../../data/google/reviews/{estado}.csv', low_memory=False)
    # Convertimos la columna user_id a string para asi poder usarla en polars
    df['user_id'] = df['user_id'].astype(str)
    # Pasamos de pandas a polars
    df = pl.from_dataframe(df)
    # Crea una nueva columna company_index con los nuevos id de las empresas
    df = df.with_columns(pl.col('gmap_id').map_dict(map_old_new).alias('company_index'))
    # Convierte a lowercase los nombres de los usuarios
    df = df.with_columns(pl.col('name').str.to_lowercase())
    # Elimina la columna company_id
    df = df.drop('company_id')
    # Elimina los valores nulos en company_index. Esto es, aquellas reviews con un incorrecto gmap_id.
    df = df.drop_nulls(subset='company_index')
    
    # Obtenemos los user_id unicos como lista
    old_user_id = df['user_id'].unique().to_list()
    # Creamos un diccionario que mapee el old_id con el new_id
    new_user_id = {old_id: int(index + offset) for index, old_id in enumerate(old_user_id)}
    # Creamos la nueva columna user_index
    df = df.with_columns(pl.col('user_id').map_dict(new_user_id).alias('user_index'))
    
    # Almacenamos los ids del estado en estado_company
    estado_company[estado] = df['company_index'].unique().to_list()

    # Ahora toca user_company dict
    for row in df.groupby('user_index').agg('company_index').to_dicts():
        # Obtenemos el id del user
        user = row['user_index']
        # Obtenemos la lista de empresas
        companies = row['company_index']
        if user not in companies:
            user_company[user] = companies
        else:
            user_company[user].extend(companies)

    # Printea cuando esta en multiplo de 10 (se utiliza para ver el progreso)
    if estados.index(estado) % 10 == 0:
        print(estados.index(estado))

    offset = len(old_user_id) + 1
        
    # Almacenamos la data en formato parquet
    df.write_parquet(f'../../data/google/production/reviews/{estado}.parquet')

0
10
20
30
40
50


In [9]:
# Creamos una tabla en la base de datos para almacenar user_company dict

# Realizamos la conexion
connection = sqlite3.connect('../../data/google/production/google.db')
cursor = connection.cursor()
# Creamos la tabla
cursor.execute('CREATE TABLE user_company(user_index BIGINT, company_index BIGINT);')

<sqlite3.Cursor at 0x20e7d7de0c0>

In [10]:
# Funcion que convierte las listas a string
def convert_to_string_list(lst):
    serialized_list = '[' + ', '.join(map(repr, lst)) + ']'
    return serialized_list

# Almacenamos toda la data en ids en la base de datos
for key, value in user_company.items():
    serialized_list = convert_to_string_list(value)
    cursor.execute('INSERT INTO user_company (user_index, company_index) VALUES (?, ?)', (key, serialized_list))

# Guardamos los cambios
connection.commit()

In [11]:
# Cerramos la conexion
connection.close()

In [7]:
# Crea un diccionario company - estado al que pertenece
company_estado = {id:estado for estado, ids in estado_company.items() for id in ids}

In [14]:
# Renombre la columna state y crea una nueva
metadata = metadata.rename(dict(state='state_iso')).with_columns(pl.col('company_index').map_dict(company_estado).alias('state'))

In [15]:
# Almacenamos metadata
metadata.write_parquet('../../data/google/production/metadata.parquet')