# **Proceso ETL Automático: Extracción desde Azure y Carga en SQL Server Local.**
Este notebook realiza un proceso **ETL (Extracción, Transformación y Carga)** entre dos bases de datos SQL Server: una en Azure y otra local. Se conecta a ambas fuentes, extrae datos usando archivos SQL, los transforma y los carga en tablas equivalentes dentro de una base de datos local, creando las tablas desde cero con claves primarias y foráneas definidas.

## **Importación de librerías y configuración general.**
Se cargan las librerías necesarias (incluidas en el archivo `requirements.txt`) para trabajar con bases de datos, manipulación de datos y archivos. También se desactivan ciertos mensajes de advertencia que podrían ensuciar la salida.

In [8]:
import pyodbc
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

## **Conexiones a bases de datos.**
Se definen y configuran las cadenas de conexión para acceder tanto a la **base de datos remota en Azure** como a una **base de datos local**. Las cadenas de conexión incluyen el driver ODBC, el nombre del servidor, la base de datos destino y los métodos de autenticación correspondientes.
- **Azure:** Utilizan autenticación interactiva con Azure Active Directory.
- **LocalHost:** Autenticación integrada de Windows (Trusted Connection).

In [1]:
# Azure connection string.
AZURE_SERVER = 'uaxmathfis.database.windows.net'
AZURE_DATABASE = 'usecases'
AZURE_DRIVER = '{ODBC Driver 17 for SQL Server}'

azure_conn_str = f"DRIVER={AZURE_DRIVER};SERVER={AZURE_SERVER};DATABASE={AZURE_DATABASE};Authentication=ActiveDirectoryInteractive"

In [2]:
# Local connection string.
LOCAL_SERVER = 'localhost'
LOCAL_DATABASE = 'dwh_case1'
LOCAL_DRIVER = '{ODBC Driver 17 for SQL Server}'

local_conn_str = f"DRIVER={LOCAL_DRIVER};SERVER={LOCAL_SERVER};DATABASE={LOCAL_DATABASE};Trusted_Connection=yes;TrustServerCertificate=yes"

## **Definición de consultas SQL.**
Se construye un diccionario que asocia los nombres lógicos de las tablas destino con los archivos `.sql` donde se encuentran las sentencias SQL de extracción. Estas consultas se ejecutarán sobre la base de datos de Azure para obtener los datos fuente.

In [3]:
query_folder = "../database/dwh"
queries = {
    "dim_geo": "load_geo.sql",
    "dim_product": "load_product.sql",
    "dim_time": "load_time.sql",
    "dim_client": "load_client.sql",
    "fact_sales": "load_fact.sql"
}

### **Definición de claves primarias y foráneas.**
Se declaran las claves primarias de cada tabla para garantizar integridad a nivel de entidad.
Además, se especifican las claves foráneas necesarias para mantener la integridad referencial entre la tabla de hechos y las dimensiones del modelo estrella (star schema). Esta metadata será utilizada al momento de generar dinámicamente las sentencias `CREATE TABLE`.

In [4]:
# Primary keys for each table.
primary_keys = {
    "fact_sales": ["CODE"],
    "dim_client": ["Customer_ID"],
    "dim_geo": ["TIENDA_ID"],
    "dim_product": ["Id_Producto"],
    "dim_time": ["Fecha"]
}

In [5]:
# Foreign keys for each table.
foreign_keys = {
    "fact_sales": {
        "Customer_ID": "dim_client(Customer_ID)",
        "TIENDA_ID": "dim_geo(TIENDA_ID)",
        "Id_Producto": "dim_product(Id_Producto)",
        "Sales_Date": "dim_time(Fecha)"
    }
}

## **Generación dinámica de sentencias.**
Se implementa una función que construye sentencias `CREATE TABLE` en T-SQL a partir de la estructura del DataFrame resultante de la extracción. La función determina los tipos de datos SQL basándose en los tipos de datos de pandas/numpy, y añade restricciones de clave primaria y foráneas cuando corresponda.

In [6]:
def create_table_sql(table_name, df):
    # Definición de los tipos de datos SQL para cada columna del DataFrame: Por defecto tipo TEXTO.
    col_defs = []
    for col in df.columns:
        if np.issubdtype(df[col].dtype, np.datetime64):
            col_defs.append(f'[{col}] DATE')
        elif df[col].dtype == np.float32:
            col_defs.append(f'[{col}] FLOAT')
        elif df[col].dtype == np.int32:
            col_defs.append(f'[{col}] INT')
        else:
            col_defs.append(f'[{col}] NVARCHAR(255)')

    # Agregación clave primaria si corresponde.
    pk = ", PRIMARY KEY (" + ", ".join(primary_keys[table_name]) + ")" if table_name in primary_keys else ""
    # Agregación claves foráneas si corresponde.
    fk = ""
    if table_name in foreign_keys:
        for col, ref in foreign_keys[table_name].items():
            fk += f", FOREIGN KEY ({col}) REFERENCES {ref}"

    return f"CREATE TABLE {table_name} ({', '.join(col_defs)}{pk}{fk});"

La siguiente función se encarga de **eliminar las tablas existentes** en la base de datos local antes de volver a crearlas durante el proceso ETL. Se eliminan en un **orden específico** que respeta las dependencias de claves foráneas, evitando errores de integridad referencial.

In [7]:
def drop_tables_in_order(cursor, conn):
    drop_order = ["fact_sales", "dim_time", "dim_product", "dim_geo", "dim_client"]
    for table in drop_order:
        # Verifica si la tabla existe en el esquema actual.
        check_exists_query = f"""
        IF OBJECT_ID('{table}', 'U') IS NOT NULL
            DROP TABLE {table};
        """
        try:
            cursor.execute(check_exists_query)
            conn.commit()
        except Exception as e:
            print(f"Error al intentar eliminar la tabla {table}: {e}")

## **Proceso ETL.**
Se establece el flujo completo de carga de datos:
- **Extract:** Lectura y ejecución de las consultas SQL sobre Azure para obtener los datos.
- **Transform:** Eliminación columnas duplicadas. Indentificación columnas de tipo fecha (evitando confundirlas con campos numéricos como año). Ajuste tipos de datos (int32, float32) para optimizar espacio. Relleno de valores nulos con ceros para evitar errores al insertar.
- **Load:** Eliminación de las tablas destino en la base local si ya existen. Creación nuevamente con las restricciones adecuadas. Realización con una inserción masiva de datos utilizando `fast_executemany` para optimizar la carga.

In [9]:
try:
    # Conexión a las bases de datos.
    conn_azure = pyodbc.connect(azure_conn_str)
    conn_local = pyodbc.connect(local_conn_str)
    print("Conexiones correctamente establecidas.\n")

    with conn_local.cursor() as cursor:
        drop_tables_in_order(cursor, conn_local)
    # Procesamiento de cada tabla definida en el diccionario de Queries.
    for table_name, file in queries.items():
        print(f"Procesando: {table_name}")
        query_path = os.path.join(query_folder, file)
        with open(query_path, "r", encoding="utf-8") as f:
            sql_query = f.read()

        # Ejecución de la consulta sobre la base de datos de Azure.
        df = pd.read_sql(sql_query, conn_azure)

        # Eliminación de las columnas duplicadas.
        if df.columns.duplicated().any():
            print(f"Columnas duplicadas en {table_name}: {df.columns[df.columns.duplicated()].tolist()}")
            df = df.loc[:, ~df.columns.duplicated()]

        # Detección de las columnas tipo DATE para convertirlas adecuadamente.
        for col in df.columns:
            if df[col].dtype == object or df[col].dtype == "string":
                sample_values = df[col].astype(str).sample(min(len(df), 30), random_state=42)
                # Saltar si parece una columna numérica (para no confundir INT con DATE).
                if sample_values.str.isdigit().mean() > 0.8:
                    continue
                try:
                    parsed = pd.to_datetime(sample_values, errors='coerce')
                    if parsed.notna().sum() > 0.9 * len(sample_values):
                        df[col] = pd.to_datetime(df[col], errors='coerce')
                except:
                    pass
        # Si el DataFrame está vacío, se salta.
        if df.empty:
            print(f"La tabla {table_name} no devolvió resultados.\n")
            continue
        print(f"   - Filas obtenidas: {df.shape[0]}")
        print(f"   - Columnas: {df.columns.tolist()}")

        # Limpieza de valores nulos y tipos de datos.
        for col in df.columns:
            df[col] = df[col].replace(r'^\s*$', np.nan, regex=True) # Reemplazar espacios en blanco por NaN.
            if pd.api.types.is_numeric_dtype(df[col]):
                sentinel = 0
                df[col] = df[col].fillna(sentinel)
            elif pd.api.types.is_datetime64_any_dtype(df[col]):
                pass
            else:
                df[col] = df[col].fillna("N/A")
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = df[col].astype(np.float32)
        for col in df.select_dtypes(include=['int64']).columns:
            df[col] = df[col].astype(np.int32)

        # Creación de la tabla en la base de datos local.
        with conn_local.cursor() as cursor:
            create_sql = create_table_sql(table_name, df)
            cursor.execute(create_sql)
            conn_local.commit()
            print(f"   - Tabla {table_name} creada correctamente.")

            placeholders = ', '.join(['?' for _ in df.columns])
            insert_sql = f"INSERT INTO {table_name} VALUES ({placeholders})"
            cursor.fast_executemany = True
            cursor.executemany(insert_sql, df.values.tolist())
            conn_local.commit()
            print(f"   - {df.shape[0]} filas insertadas.\n")

except Exception as e:
    print(f"Error: {e}")

finally:
    if 'conn_azure' in locals():
        conn_azure.close()
    if 'conn_local' in locals():
        conn_local.close()

print("ETL completado.")

Conexiones correctamente establecidas.

Procesando: dim_geo
   - Filas obtenidas: 12
   - Columnas: ['TIENDA_ID', 'TIENDA_DESC', 'PROV_DESC', 'ZONA']
   - Tabla dim_geo creada correctamente.
   - 12 filas insertadas.

Procesando: dim_product
   - Filas obtenidas: 404
   - Columnas: ['Id_Producto', 'Code_', 'Kw', 'TIPO_CARROCERIA', 'TRANSMISION_ID', 'Equipamiento', 'FUEL', 'Margen', 'Costetransporte', 'Margendistribuidor', 'GastosMarketing', 'Mantenimiento_medio', 'Comisión_Marca']
   - Tabla dim_product creada correctamente.
   - 404 filas insertadas.

Procesando: dim_time
   - Filas obtenidas: 3652
   - Columnas: ['Fecha', 'InicioMes', 'FinMes', 'Dia', 'Diadelasemana', 'Diadelesemana_desc', 'Mes', 'Mes_desc', 'Año', 'Añomes', 'Week', 'Trimestre', 'SemanaDelMes', 'DiaDelAño', 'DiaDelTrimestre', 'Findesemana', 'Festivo', 'Laboral']
   - Tabla dim_time creada correctamente.
   - 3652 filas insertadas.

Procesando: dim_client
   - Filas obtenidas: 44053
   - Columnas: ['Customer_ID', 'Eda