In [61]:
import pyodbc
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
# Azure connection string.
AZURE_SERVER = 'uaxmathfis.database.windows.net'
AZURE_DATABASE = 'usecases'
AZURE_DRIVER = '{ODBC Driver 17 for SQL Server}'

azure_conn_str = f"DRIVER={AZURE_DRIVER};SERVER={AZURE_SERVER};DATABASE={AZURE_DATABASE};Authentication=ActiveDirectoryInteractive"

In [63]:
# Local connection string.
LOCAL_SERVER = 'localhost'
LOCAL_DATABASE = 'dwh_case1'
LOCAL_DRIVER = '{ODBC Driver 17 for SQL Server}'

local_conn_str = f"DRIVER={LOCAL_DRIVER};SERVER={LOCAL_SERVER};DATABASE={LOCAL_DATABASE};Trusted_Connection=yes;TrustServerCertificate=yes"

In [64]:
query_folder = "../Database/Dimensional"
queries = {
    "Dim_geo": "Dim_Geo.sql",
    "Dim_product": "Dim_Product.sql",
    "Dim_time": "Dim_Time.sql",
    "Dim_client": "Dim_Cli.sql",
    "Facts_Table": "Dim_Fact.sql"
}

In [65]:
# Primary keys for each table.
primary_keys = {
    "Facts_Table": ["CODE"],
    "Dim_client": ["Customer_ID"],
    "Dim_geo": ["TIENDA_ID"],
    "Dim_product": ["Id_Producto"],
    "Dim_time": ["Date"]
}

In [66]:
# Foreign keys for each table.
foreign_keys = {
    "Facts_Table": {
        "Customer_ID": "Dim_client(Customer_ID)",
        "TIENDA_ID": "Dim_geo(TIENDA_ID)",
        "Id_Producto": "Dim_product(Id_Producto)",
        "Sales_Date": "Dim_time(Date)"
    }
}

In [67]:
import numpy as np

def generate_create_table_sql(table_name, df):
    # Determinar los tipos de datos SQL según el tipo de dato de cada columna del DataFrame.
    column_definitions = []
    for column in df.columns:
        if np.issubdtype(df[column].dtype, np.datetime64):
            sql_type = "DATE"
        elif df[column].dtype == np.float32:
            sql_type = "FLOAT"
        elif df[column].dtype == np.int32:
            sql_type = "INT"
        else:
            sql_type = "NVARCHAR(255)"
        column_definitions.append(f"[{column}] {sql_type}")

    # Definir clave primaria si existe
    primary_key = f", PRIMARY KEY ({', '.join(primary_keys[table_name])})" if table_name in primary_keys else ""

    # Definir claves foráneas si existen
    foreign_key_constraints = ""
    if table_name in foreign_keys:
        foreign_key_constraints = "".join(f", FOREIGN KEY ({col}) REFERENCES {ref}" for col, ref in foreign_keys[table_name].items())

    # Construir la sentencia SQL final
    return f"CREATE TABLE {table_name} ({', '.join(column_definitions)}{primary_key}{foreign_key_constraints});"


In [68]:
def drop_existing_tables(cursor, conn):
    tables_to_drop = ["Facts_Table", "Dim_time", "Dim_product", "Dim_geo", "Dim_client"]
    
    for tbl in tables_to_drop:
        drop_query = f"""
        IF EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID('{tbl}') AND type = 'U')
            DROP TABLE {tbl};
        """
        try:
            cursor.execute(drop_query)
            conn.commit()
        except Exception as err:
            print(f"Error al eliminar la tabla {tbl}: {err}")


In [69]:
try:
    # Establecer conexión con ambas bases de datos.
    azure_conn = pyodbc.connect(azure_conn_str)
    local_conn = pyodbc.connect(local_conn_str)
    print("Conexiones establecidas exitosamente.\n")

    # Eliminar tablas si existen en la base de datos local.
    with local_conn.cursor() as cur:
        drop_existing_tables(cur, local_conn)

    # Procesar cada tabla definida en el diccionario de consultas.
    for tbl_name, filename in queries.items():
        print(f"Procesando: {tbl_name}")
        query_file_path = os.path.join(query_folder, filename)
        
        with open(query_file_path, "r", encoding="utf-8") as file:
            sql_query = file.read()

        # Ejecutar la consulta en la base de datos de Azure.
        df = pd.read_sql(sql_query, azure_conn)

        # Manejo de columnas duplicadas.
        if df.columns.duplicated().any():
            duplicated_cols = df.columns[df.columns.duplicated()].tolist()
            print(f"Columnas duplicadas en {tbl_name}: {duplicated_cols}")
            df = df.loc[:, ~df.columns.duplicated()]

        # Convertir columnas de tipo DATE si es necesario.
        for column in df.columns:
            if df[column].dtype in [object, "string"]:
                sampled_values = df[column].astype(str).sample(min(len(df), 30), random_state=42)
                
                # Omitir columnas que parecen numéricas para evitar errores de conversión.
                if sampled_values.str.isdigit().mean() > 0.8:
                    continue
                try:
                    parsed_dates = pd.to_datetime(sampled_values, errors='coerce')
                    if parsed_dates.notna().sum() > 0.9 * len(sampled_values):
                        df[column] = pd.to_datetime(df[column], errors='coerce')
                except:
                    pass

        # Si el DataFrame está vacío, omitir el procesamiento.
        if df.empty:
            print(f"La tabla {tbl_name} no contiene datos.\n")
            continue
        
        print(f"   - Filas obtenidas: {df.shape[0]}")
        print(f"   - Columnas: {df.columns.tolist()}")

        # Limpiar valores nulos y ajustar tipos de datos.
        for column in df.columns:
            df[column] = df[column].replace(r'^\s*$', np.nan, regex=True)  # Reemplazar espacios vacíos con NaN.
            if pd.api.types.is_numeric_dtype(df[column]):
                df[column] = df[column].fillna(-1)  # Usar -1 como valor sentinel.
            elif pd.api.types.is_datetime64_any_dtype(df[column]):
                df[column] = df[column].fillna(df[column].mode(dropna=True)[0])
            else:
                df[column] = df[column].fillna("N/A")

        for column in df.select_dtypes(include=['float64']).columns:
            df[column] = df[column].astype(np.float32)
        for column in df.select_dtypes(include=['int64']).columns:
            df[column] = df[column].astype(np.int32)

        # Crear tabla en la base de datos local.
        with local_conn.cursor() as cur:
            create_table_query = generate_create_table_sql(tbl_name, df)
            cur.execute(create_table_query)
            local_conn.commit()
            print(f"   - Tabla {tbl_name} creada correctamente.")

            # Insertar datos en la tabla creada.
            placeholders = ', '.join(['?' for _ in df.columns])
            insert_query = f"INSERT INTO {tbl_name} VALUES ({placeholders})"
            cur.fast_executemany = True
            cur.executemany(insert_query, df.values.tolist())
            local_conn.commit()
            print(f"   - {df.shape[0]} filas insertadas.\n")

except Exception as err:
    print(f"Se ha producido un error: {err}")

finally:
    if 'azure_conn' in locals():
        azure_conn.close()
    if 'local_conn' in locals():
        local_conn.close()

print("Proceso ETL finalizado.")

Conexiones establecidas exitosamente.

Procesando: Dim_geo
   - Filas obtenidas: 12
   - Columnas: ['TIENDA_ID', 'PROVINCIA_ID', 'ZONA_ID', 'TIENDA_DESC', 'PROV_DESC', 'ZONA']
   - Tabla Dim_geo creada correctamente.
   - 12 filas insertadas.

Procesando: Dim_product
Columnas duplicadas en Dim_product: ['Modelo']
   - Filas obtenidas: 404
   - Columnas: ['Id_Producto', 'Code_', 'CATEGORIA_ID', 'Modelo', 'FUEL', 'Grade_ID', 'Equipamiento', 'Costetransporte', 'GastosMarketing', 'Mantenimiento_medio', 'Comisión_Marca']
   - Tabla Dim_product creada correctamente.
   - 404 filas insertadas.

Procesando: Dim_time
   - Filas obtenidas: 3652
   - Columnas: ['Date', 'Anno', 'Annomes', 'Dia', 'Diadelasemana', 'Diadelesemana_desc', 'Festivo', 'Findesemana', 'FinMes', 'InicioMes', 'Laboral', 'Mes', 'Mes_desc', 'Week']
   - Tabla Dim_time creada correctamente.
   - 3652 filas insertadas.

Procesando: Dim_client
   - Filas obtenidas: 44053
   - Columnas: ['Customer_ID', 'Edad', 'Fecha_nacimiento', 