In [2]:
import pyodbc
import pandas as pd
import numpy as np

#  Conexión a **Azure SQL**
AZURE_SERVER = 'uaxmathfis.database.windows.net'
AZURE_DATABASE = 'usecases'
AZURE_DRIVER = '{ODBC Driver 17 for SQL Server}'

azure_conn_str = f"DRIVER={AZURE_DRIVER};SERVER={AZURE_SERVER};DATABASE={AZURE_DATABASE};Authentication=ActiveDirectoryInteractive"

#  Conexión a **SQL Server LOCAL**
LOCAL_SERVER = 'localhost'
LOCAL_DATABASE = 'dwh_case1'  
LOCAL_DRIVER = '{ODBC Driver 17 for SQL Server}'

local_conn_str = f"DRIVER={LOCAL_DRIVER};SERVER={LOCAL_SERVER};DATABASE={LOCAL_DATABASE};Trusted_Connection=yes;TrustServerCertificate=yes"

#  Consulta SQL en Azure SQL
SQL_QUERY = """
WITH Mediana_KM AS (
    SELECT 
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY KM_Ultima_Revision) 
        OVER () AS Mediana_KM
    FROM 
        DATAEX.[004_rev]
    WHERE 
        KM_Ultima_Revision IS NOT NULL AND KM_Ultima_Revision > 0
)
SELECT DISTINCT
    s.CODE,                          -- Código de la venta
    s.Sales_Date,                    -- Fecha de la venta
    s.Customer_ID,                   -- ID del cliente
    s.Id_Producto,                   -- ID del producto
    s.PVP,                           -- Precio de venta al público
    s.IMPUESTOS,                     -- Impuestos
    s.COSTE_VENTA_NO_IMPUESTOS,      -- Coste de venta sin impuestos
    fp.FORMA_PAGO AS Forma_Pago,     -- Descripción de la forma de pago
    mv.MOTIVO_VENTA AS Motivo_Venta, -- Descripción del motivo de la venta
    t.TIENDA_ID,                     -- ID de la tienda
    t.TIENDA_DESC AS Tienda,         -- Descripción de la tienda
    e.Car_Age,                       -- Edad del coche (Car Age)
    c.QUEJA,                         -- Queja (si existe)
    p.Modelo,                        -- Modelo del producto
    R.DIAS_DESDE_ULTIMA_REVISION,    -- Días desde la última revisión
    COALESCE(
        CASE 
            WHEN R.KM_Ultima_Revision = 0 THEN (SELECT DISTINCT Mediana_KM FROM Mediana_KM)
            ELSE R.KM_Ultima_Revision
        END, 
        (SELECT DISTINCT Mediana_KM FROM Mediana_KM)
    ) AS KM_Ultima_Revision_Final,   -- KM de la última revisión (con reemplazo de 0 o NULL)
    co.Costetransporte,              -- Coste de transporte
    co.GastosMarketing,              -- Gastos de marketing
    co.Margendistribuidor,           -- Margen del distribuidor
    co.Comisión_Marca,               -- Comisión de la marca (si está disponible)
    logist.Fue_Lead,                 -- Indica si fue un lead
    logist.Lead_Compra,              -- Indica si el lead resultó en compra
    logist.Fue_Lead + logist.Lead_Compra AS Lead_Compra_Total, -- Total de leads que resultaron en compra

    -- Cálculo del Margen Bruto
    ROUND(s.PVP * (co.Margen) * 0.01 * (1 - s.IMPUESTOS / 100), 2) AS Margen_eur_bruto,
    
    -- Cálculo del Margen Neto
    ROUND(
        s.PVP * (co.Margen) * 0.01 * (1 - s.IMPUESTOS / 100) 
        - s.COSTE_VENTA_NO_IMPUESTOS 
        - (co.Margendistribuidor * 0.01 + co.GastosMarketing * 0.01 - co.Comisión_Marca * 0.01) * s.PVP * (1 - s.IMPUESTOS / 100) 
        - co.Costetransporte, 
        2
    ) AS Margen_eur

FROM 
    DATAEX.[001_sales] s
LEFT JOIN 
    [DATAEX].[004_rev] R ON s.CODE = R.CODE  -- Join para revisiones
LEFT JOIN 
    DATAEX.[010_forma_pago] fp ON s.FORMA_PAGO_ID = fp.FORMA_PAGO_ID  -- Join para forma de pago
LEFT JOIN 
    DATAEX.[009_motivo_venta] mv ON s.MOTIVO_VENTA_ID = mv.MOTIVO_VENTA_ID  -- Join para motivo de venta
LEFT JOIN 
    DATAEX.[011_tienda] t ON s.TIENDA_ID = t.TIENDA_ID  -- Join para tienda
LEFT JOIN 
    DATAEX.[018_edad] e ON s.CODE = e.CODE  -- Join para Car_Age
LEFT JOIN 
    DATAEX.[008_cac] c ON s.CODE = c.CODE  -- Join para quejas
LEFT JOIN 
    DATAEX.[006_producto] p ON s.Id_Producto = p.Id_Producto  -- Join para producto
LEFT JOIN 
    DATAEX.[007_costes] co ON p.Modelo = co.Modelo  -- Join para costes
LEFT JOIN 
    [usecases].DATAEX.[017_logist] logist ON s.CODE = logist.CODE;  -- Join para Fue_Lead y Lead_Compra





"""

# 🔹 Nombre de la tabla en SQL Server Local
NEW_TABLE_NAME = "DATAEX.FACT_SALES"

try:
    #  Conectar a Azure SQL
    print(f"Conectando a Azure SQL...")
    conn_azure = pyodbc.connect(azure_conn_str)
    
    # 🔹 Ejecutar la consulta en Azure SQL
    print(f"Ejecutando consulta en Azure SQL...")
    df = pd.read_sql(SQL_QUERY, conn_azure)

    if df.empty:
        print(f" La consulta no devolvió resultados. No se creará la tabla en SQL Server Local.")
    else:
        print(f"   - Datos extraídos: {df.shape[0]} filas")



        #  Convertir NaN en columnas numéricas a 0
        df = df.fillna(0)

        #  Convertir valores numéricos problemáticos
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = df[col].astype(np.float32)  # Reducir precisión
        
        for col in df.select_dtypes(include=['int64']).columns:
            df[col] = df[col].astype(np.int32)  # Evitar valores fuera de rango
        
        #  Conectar a SQL Server Local
        print(f"Conectando a SQL Server Local...")
        conn_local = pyodbc.connect(local_conn_str)
        
        with conn_local.cursor() as cursor:
            # 🔹 Eliminar la tabla si ya existe
            drop_table_sql = f"DROP TABLE IF EXISTS {NEW_TABLE_NAME}"
            cursor.execute(drop_table_sql)
            conn_local.commit()
            print(f"   - Tabla eliminada si existía.")

            # 🔹 Crear la tabla en SQL Server Local con tipos de datos ajustados
            create_table_sql = f"""
            CREATE TABLE {NEW_TABLE_NAME} (
                {', '.join([
                    f'[{col}] FLOAT' if df[col].dtype == np.float32 
                    else f'[{col}] INT' if df[col].dtype == np.int32 
                    else f'[{col}] NVARCHAR(255)' for col in df.columns
                ])}
            );
            """
            cursor.execute(create_table_sql)
            conn_local.commit()
            print(f" Tabla {NEW_TABLE_NAME} creada correctamente en SQL Server Local.")

            # Insertar los datos en SQL Server Local
            placeholders = ', '.join(['?' for _ in df.columns])
            insert_sql = f"INSERT INTO {NEW_TABLE_NAME} VALUES ({placeholders})"

            cursor.fast_executemany = True
            cursor.executemany(insert_sql, df.values.tolist())
            conn_local.commit()

            print(f" {df.shape[0]} filas insertadas en {NEW_TABLE_NAME}.")

except Exception as e:
    print(f" Error: {e}")

finally:
    if 'conn_azure' in locals():
        conn_azure.close()
    if 'conn_local' in locals():
        conn_local.close()

print("\n ¡Proceso completado!")


Conectando a Azure SQL...
Ejecutando consulta en Azure SQL...


  df = pd.read_sql(SQL_QUERY, conn_azure)


   - Datos extraídos: 58049 filas
Conectando a SQL Server Local...
   - Tabla eliminada si existía.
 Tabla DATAEX.FACT_SALES creada correctamente en SQL Server Local.
 58049 filas insertadas en DATAEX.FACT_SALES.

 ¡Proceso completado!


In [5]:
import pyodbc
import pandas as pd
import numpy as np
from typing import Dict, List

# Configuración de conexiones
CONFIG = {
    'azure': {
        'server': 'uaxmathfis.database.windows.net',
        'database': 'usecases',
        'driver': '{ODBC Driver 17 for SQL Server}',
        'auth': 'Authentication=ActiveDirectoryInteractive'
    },
    'local': {
        'server': 'localhost',
        'database': 'dwh_case1',
        'driver': '{ODBC Driver 17 for SQL Server}',
        'auth': 'Trusted_Connection=yes;TrustServerCertificate=yes'
    }
}

# Especificación de claves
TABLE_CONFIG = {
    'table_name': 'DATAEX.FACT_SALES',
    'primary_key': 'CODE',
    'foreign_keys': [
        {'column': 'Customer_ID', 'reference_table': 'DATAEX.DIM_CLIENTE', 'reference_column': 'Customer_ID'},
        {'column': 'Id_Producto', 'reference_table': 'DATAEX.DIM_PRODUCTO', 'reference_column': 'Id_Producto'},
        {'column': 'Sales_Date', 'reference_table': 'DATAEX.DIM_TIEMPO', 'reference_column': 'Date'},
        {'column': 'TIENDA_ID', 'reference_table': 'DATAEX.DIM_LUGAR', 'reference_column': 'TIENDA_ID'}
    ]
}

In [6]:
def detect_column_types(df: pd.DataFrame) -> Dict[str, str]:
    """Detecta los tipos de columna de pandas y los mapea a tipos SQL Server"""
    type_mapping = {
        'object': 'NVARCHAR(255)',
        'int64': 'INT',
        'float64': 'FLOAT',
        'datetime64[ns]': 'DATETIME',
        'bool': 'BIT'
    }
    
    column_types = {}
    for col in df.columns:
        pandas_type = str(df[col].dtype)
        sql_type = type_mapping.get(pandas_type, 'NVARCHAR(255)')
        
        # Ajustes especiales para columnas conocidas
        if col in ['CODE', 'Id_Producto']:
            sql_type = 'NVARCHAR(255)'
        elif col in ['PVP', 'IMPUESTOS', 'COSTE_VENTA_NO_IMPUESTOS']:
            sql_type = 'DECIMAL(18,2)'
            
        column_types[col] = sql_type
    
    return column_types

In [None]:
def create_table_with_schema(conn, table_name: str, column_types: Dict[str, str]):
    """
    Crea una tabla en SQL Server con el esquema especificado, manejando adecuadamente
    la eliminación previa de la tabla si existe y validando los parámetros.
    
    Args:
        conn: Conexión a la base de datos
        table_name: Nombre completo de la tabla (incluyendo esquema si es necesario)
        column_types: Diccionario con los tipos de datos para cada columna
        
    Raises:
        ValueError: Si los parámetros no son válidos
        pyodbc.Error: Si hay errores en la operación SQL
    """
    # Validación de parámetros
    if not table_name or '.' not in table_name:
        raise ValueError("El nombre de la tabla debe incluir el esquema (ej: 'DATAEX.FACT_SALES')")
    
    if not column_types:
        raise ValueError("El diccionario de tipos de columnas no puede estar vacío")
    
    # Construcción del SQL para creación de la tabla
    columns_sql = []
    pk_column = TABLE_CONFIG.get('primary_key', '')
    fk_columns = [fk['column'] for fk in TABLE_CONFIG.get('foreign_keys', [])]
    
    for col, col_type in column_types.items():
        # Determinar si la columna debe ser NOT NULL
        is_pk = col == pk_column
        is_fk = col in fk_columns
        nullable = 'NOT NULL' if is_pk or is_fk else 'NULL'
        
        # Asegurar que las columnas PK y FK tengan tipos válidos
        if is_pk and 'varchar' not in col_type.lower() and 'char' not in col_type.lower():
            print(f"Advertencia: La columna PK '{col}' tiene tipo {col_type}. Se recomienda usar tipo texto para claves.")
        
        columns_sql.append(f"[{col}] {col_type} {nullable}")
    
    # SQL para creación de tabla
    create_sql = f"""
    IF OBJECT_ID('{table_name}', 'U') IS NOT NULL 
    BEGIN
        PRINT 'Eliminando tabla existente {table_name}...'
        DROP TABLE {table_name}
    END
    
    CREATE TABLE {table_name} (
        {',\n        '.join(columns_sql)}
    );
    
    PRINT 'Tabla {table_name} creada exitosamente con {len(columns_sql)} columnas';
    """
    
    try:
        with conn.cursor() as cursor:
            # Ejecutar en una transacción
            cursor.execute("BEGIN TRANSACTION")
            
            # Eliminar y crear la tabla
            cursor.execute(create_sql)
            
            # Validar que la tabla se creó correctamente
            cursor.execute(f"""
                SELECT COUNT(*) 
                FROM INFORMATION_SCHEMA.TABLES 
                WHERE TABLE_SCHEMA + '.' + TABLE_NAME = '{table_name}'
            """)
            if cursor.fetchone()[0] == 0:
                raise pyodbc.Error(f"No se pudo crear la tabla {table_name}")
            
            conn.commit()
            print(f"✓ Tabla {table_name} creada exitosamente")
            
    except pyodbc.Error as e:
        conn.rollback()
        error_msg = f"Error al crear tabla {table_name}: {str(e)}"
        print(f"✗ {error_msg}")
        raise pyodbc.Error(error_msg) from e

SyntaxError: f-string expression part cannot include a backslash (2164766844.py, line 8)

In [None]:
def add_constraints(conn, table_name: str):
    """Añade las PK y FK especificadas en la configuración"""
    try:
        with conn.cursor() as cursor:
            # Añadir PRIMARY KEY
            pk_sql = f"""
            ALTER TABLE {table_name}
            ADD CONSTRAINT [PK_{table_name.replace('.', '_')}] 
            PRIMARY KEY ({TABLE_CONFIG['primary_key']})
            """
            cursor.execute(pk_sql)
            
            # Añadir FOREIGN KEYS
            for fk in TABLE_CONFIG['foreign_keys']:
                fk_sql = f"""
                ALTER TABLE {table_name}
                ADD CONSTRAINT [FK_{table_name.replace('.', '')}_{fk['column']}] 
                FOREIGN KEY ({fk['column']}) 
                REFERENCES {fk['reference_table']}({fk['reference_column']})
                """
                cursor.execute(fk_sql)
            
            conn.commit()
        print("Constraints añadidas exitosamente")
    except Exception as e:
        print(f"Error al añadir constraints: {str(e)}")
        conn.rollback()
        raise

In [None]:
def migrate_table(sql_file_path: str):
    """Función principal que ejecuta todo el proceso de migración"""
    try:
        # Leer consulta SQL
        with open(sql_file_path, 'r', encoding='utf-8') as f:
            sql_query = f.read()
        
        # Conectar a Azure y obtener datos
        with pyodbc.connect(
            f"DRIVER={CONFIG['azure']['driver']};"
            f"SERVER={CONFIG['azure']['server']};"
            f"DATABASE={CONFIG['azure']['database']};"
            f"{CONFIG['azure']['auth']}"
        ) as azure_conn:
            df = pd.read_sql(sql_query, azure_conn)
            
            # Detectar tipos de columnas
            column_types = detect_column_types(df)
            
            # Conectar a SQL local y crear tabla
            with pyodbc.connect(
                f"DRIVER={CONFIG['local']['driver']};"
                f"SERVER={CONFIG['local']['server']};"
                f"DATABASE={CONFIG['local']['database']};"
                f"{CONFIG['local']['auth']}"
            ) as local_conn:
                # Crear tabla
                create_table_with_schema(local_conn, TABLE_CONFIG['table_name'], column_types)
                
                # Insertar datos
                with local_conn.cursor() as cursor:
                    # Preparar datos para inserción
                    df = df.replace({np.nan: None})
                    placeholders = ', '.join(['?'] * len(df.columns))
                    insert_sql = f"INSERT INTO {TABLE_CONFIG['table_name']} VALUES ({placeholders})"
                    
                    # Insertar en bloques para mejor performance
                    cursor.fast_executemany = True
                    cursor.executemany(insert_sql, df.values.tolist())
                    local_conn.commit()
                    print(f"{len(df)} registros insertados")
                
                # Añadir constraints
                add_constraints(local_conn, TABLE_CONFIG['table_name'])
    
    except Exception as e:
        print(f"Error en el proceso de migración: {str(e)}")
    finally:
        print("Proceso completado")

# Ejecutar migración
if __name__ == "__main__":
    migrate_table("DATAEX.FACT_SALES.sql")