In [1]:
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, MetaData, Table, Text
import os
import h3

class ETL:
    def __init__(self, db_name: str, output_table_name: str):
        self.db_name = db_name
        self.output_table_name = output_table_name

        db_folder = 'databases'

        # Si la carpeta no existe, la crea
        if not os.path.exists(db_folder):
            os.makedirs(db_folder)
        
        db_path = os.path.join(db_folder, 'data.db')
        
        self.engine = create_engine(f'sqlite:///{db_path}')
        self.metadata = MetaData()
        self.df = None

    def run(self):
        self.extract()
        self.transform()
        self.load()

    def extract(self):
        '''Extraer de base de datos'''
        print('Inicio de extracción de datos.')
        query = '''
        SELECT 
            latitude AS lat,
            longitude AS long,
            total_cars,
            cars_list,
            timestamp
        FROM shared_car_locations;
        '''

        with self.engine.connect() as conn:
            self.df = pd.read_sql(query, conn)
            print('Datos extraidos correctamente.')
    
    def transform(self):
        print('Inicio de transformación de datos.')
        df = self.df.copy()
        df = df[df['total_cars'] >= 2]
        
        # Convertir a listas asegurando que todos los valores sean listas
        df['cars_list'] = df['cars_list'].apply(lambda x:
                                                eval(x) if isinstance(x, str)
                                                else (x if isinstance(x, list) else []))

        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df['month'] = df['timestamp'].dt.strftime('%Y-%m')

        df['h3_index'] = [h3.geo_to_h3(lat, lon, 9) for lat, lon in zip(df['lat'],
                                                                        df['long'])]
        
        df_month = df.groupby(['h3_index', 'month']).apply(
                        lambda group: pd.Series({
                            'n_bookings': round(group['total_cars'].sum(), 1),
                            'cars_list': list(set(sum(group['cars_list'], []))),
                        }), include_groups=False
                    ).reset_index()

        df_month['num_cars'] = df_month['cars_list'].apply(len)

        n_months = df_month['month'].nunique()

        df_group = df_month.groupby(['h3_index']).apply(
                        lambda group: pd.Series({
                            'Avg_Cars': round(group['num_cars'].sum() / n_months, 1),
                            'Cars': list(set(sum(group['cars_list'], []))),
                            'Avg_Bookings': round(group['n_bookings'].mean(), 1),
                        }), include_groups=False
                    ).reset_index()
        
        df_group['Cars'] = df_group['Cars'].apply(lambda x: ','.join(map(str, x)))
        df_group['Num_Cars'] = df_group['Cars'].apply(len)

        df_group['Center_Coordinates'] = df_group['h3_index'].apply(lambda x:
                                                                    h3.h3_to_geo(x))

        # Separate into columns of latitude and longitude
        df_group['latitude'] = df_group['Center_Coordinates'].apply(lambda x: x[0])
        df_group['longitude'] = df_group['Center_Coordinates'].apply(lambda x: x[1])
        df_group = df_group.drop(columns=['Center_Coordinates'])

        self.df = df_group

    def load(self):
        print('Inicio de carga de datos en la base de datos.')
        self.create_table()

        '''Inserta los datos en la tabla sin sobrescribir su estructura'''
        # Convertir datetime a string ISO 8601 antes de insertar
        for col in self.df.columns:
            if self.df[col].dtype == 'datetime64[ns]':
                self.df[col] = self.df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
        
        with self.engine.connect() as conn:
            self.df.to_sql(self.output_table_name, conn, if_exists='append', index=False)
        print('Datos insertados en la base de datos.')

    def create_table(self):
        '''Crea la tabla con un índice autoincremental si no existe'''
        columns = [
            Column('id', Integer, primary_key=True, autoincrement=True)  # ID autoincremental
        ]

        # Inferir tipos de datos de Pandas a SQLAlchemy
        for col, dtype in self.df.dtypes.items():
            if 'int' in str(dtype):
                col_type = Integer
            elif 'float' in str(dtype):
                col_type = Float
            elif 'datetime' in str(dtype):
                col_type = DateTime
            elif self.df[col].str.len().gt(255).any():
                col_type = Text
            else:
                col_type = String  # Cualquier otro tipo lo tratamos como texto
            
            columns.append(Column(col, col_type))

        # Definir la tabla
        Table(self.output_table_name, self.metadata, *columns)

        # Crear la tabla si no existe
        self.metadata.create_all(self.engine)
        print(f"Tabla '{self.output_table_name}' creada o ya existente.")

In [2]:
etl = ETL(db_name='data.db', output_table_name='shared_car_locations_summary')
etl.run()

Inicio de extracción de datos.
Datos extraidos correctamente.
Inicio de transformación de datos.
Inicio de carga de datos en la base de datos.
Tabla 'shared_car_locations_summary' creada o ya existente.
Datos insertados en la base de datos.
