# 0. Extracción, transformación y carga (ETL) - Notebook

In [9]:
#Imports
import os
import re
import fitz  # PyMuPDF
import pandas as pd
from pathlib import Path
import pathlib
import sqlite3
import time
import math

## 1. ETL nº1 - Estadísticas de elecciones generales (archivos PDF)

### 1.1. - Generación de DataFrame unificado de resultados electorales

In [None]:
# Obtener la ruta raíz del proyecto dinámicamente (asumiendo que estás ejecutando el notebook dentro del repo)
ROOT_DIR = Path().resolve()
while not (ROOT_DIR / 'data').exists() and ROOT_DIR != ROOT_DIR.parent:
    ROOT_DIR = ROOT_DIR.parent

# Ruta a la carpeta de PDFs
pdf_dir = ROOT_DIR / 'data' / 'raw'

# Buscar automáticamente todos los PDFs en la carpeta
pdf_paths = sorted(pdf_dir.glob("*.pdf"))

def extraer_datos(pdf_path):
    doc = fitz.open(pdf_path)
    texto = "\n".join(page.get_text() for page in doc)

    # Extraer mes y año desde "Congreso : Mes Año"
    match_fecha = re.search(r"Congreso\s*:\s*(\w+)\s+(\d{4})", texto)
    mes = match_fecha.group(1).capitalize() if match_fecha else None
    año = match_fecha.group(2) if match_fecha else None
    año_int = int(año) if año else 0

    # Lista de datos generales a buscar (sin candidaturas)
    patrones_datos = [
        "Población",
        "Votantes a las 14:00",
        "Votantes a las 18:00",
        "Votantes a las 20:00",
        "Total censo electoral",
        "Votantes",
        "Abstenciones",
    ]
    patron = rf"({'|'.join(patrones_datos)})\s*\n?([\d\.]+)"
    datos = re.findall(patron, texto)

    # === EXTRAER PORCENTAJES DE LA PRIMERA PÁGINA ===
    primera_pagina_texto = doc[0].get_text()

    match_ultimo_num = list(re.finditer(r"\b\d{1,2}\b\s*$", primera_pagina_texto, re.MULTILINE))
    if match_ultimo_num:
        inicio_bloque = match_ultimo_num[-1].end()
        bloque_final = primera_pagina_texto[inicio_bloque:]
    else:
        bloque_final = ""

    porcentajes = re.findall(r"(\d{1,3}[.,]?\d{1,2})%", bloque_final)
    porcentajes = [float(p.replace(",", ".")) for p in porcentajes]

    # === ASIGNACIÓN ===
    datos_finales = []
    asignar_pct = False
    pct_index = 0

    for clave, valor in datos:
        porcentaje = None

        if "Votantes a las 14:00" in clave:
            asignar_pct = True

        if asignar_pct:
            if clave.strip() == "Total censo electoral":
                if año_int in [1979, 1982]:
                    porcentaje = None
                else:
                    porcentaje = None
                    pct_index += 1
            else:
                if pct_index < len(porcentajes):
                    porcentaje = porcentajes[pct_index]
                    pct_index += 1

        datos_finales.append({
            "Mes": mes,
            "Año": año,
            "Dato": clave.strip(),
            "Valor": int(valor.replace(".", "")),
            "Porcentaje": porcentaje
        })

    return pd.DataFrame(datos_finales)


# Procesar todos los PDFs
dfs = [extraer_datos(path) for path in pdf_paths]
df_pdf_final = pd.concat(dfs, ignore_index=True)

# Mostrar resultado
print(df_pdf_final)


       Mes   Año                   Dato     Valor  Porcentaje
0    Marzo  1979              Población  37757534         NaN
1    Marzo  1979   Votantes a las 14:00   9665733       36.02
2    Marzo  1979   Votantes a las 18:00  15250686       56.83
3    Marzo  1979  Total censo electoral  26836490         NaN
4    Marzo  1979               Votantes  18259192       68.04
..     ...   ...                    ...       ...         ...
98   Julio  2023   Votantes a las 18:00  18671607       53.13
99   Julio  2023   Votantes a las 20:00  24744016       70.41
100  Julio  2023  Total censo electoral  37469458         NaN
101  Julio  2023               Votantes  24952447       66.59
102  Julio  2023           Abstenciones  12517011       33.41

[103 rows x 5 columns]


In [56]:
pd.set_option('display.max_rows', None)  # Mostrar todas las filas
print(df_pdf_final)  # Mostrar las primeras 20 filas

           Mes   Año                   Dato     Valor  Porcentaje
0        Marzo  1979              Población  37757534         NaN
1        Marzo  1979   Votantes a las 14:00   9665733       36.02
2        Marzo  1979   Votantes a las 18:00  15250686       56.83
3        Marzo  1979  Total censo electoral  26836490         NaN
4        Marzo  1979               Votantes  18259192       68.04
5        Marzo  1979           Abstenciones   8577298       31.96
6      Octubre  1982              Población  37520072         NaN
7      Octubre  1982   Votantes a las 14:00  10671403       39.75
8      Octubre  1982   Votantes a las 18:00  16861705       62.81
9      Octubre  1982  Total censo electoral  26846940         NaN
10     Octubre  1982               Votantes  21469274       79.97
11     Octubre  1982           Abstenciones   5377666       20.03
12       Junio  1986              Población  38467850         NaN
13       Junio  1986   Votantes a las 14:00  11802482       40.90
14       J

### 1.2 - Trasvase de DataFrame de resultados de elecciones generales a SQLite

In [None]:
# Ruta de salida automática en 'data/processed'
output_sqldf = ROOT_DIR / 'data' / 'processed'

# Ruta al archivo SQLite (mismo directorio que RUTA_SALIDA)
RUTA_SQLITE = output_sqldf / "resultados_elecciones.sqlite"

print(f"\n💾 Exportando DataFrame a SQLite en:\n{RUTA_SQLITE.resolve()}")

# Crear conexión y exportar
conn = sqlite3.connect(RUTA_SQLITE)
cursor = conn.cursor()

# Guardar en tabla (si existe, reemplazar)
df_pdf_final.to_sql("resultados_generales", conn, if_exists="replace", index=False)

# Añadir columnas si no existen
try:
    cursor.execute('ALTER TABLE resultados_generales ADD COLUMN "¿GobiernoFormado?" TEXT')
except Exception as e:
    print(f"ℹ️ Columna '¿GobiernoFormado?' ya existe o no se pudo añadir: {e}")

try:
    cursor.execute('ALTER TABLE resultados_generales ADD COLUMN PartidoFormadorGobierno TEXT')
except Exception as e:
    print(f"ℹ️ Columna 'PartidoFormadorGobierno' ya existe o no se pudo añadir: {e}")

# Actualizaciones
cursor.execute('UPDATE resultados_generales SET "¿GobiernoFormado?" = "Si"')
cursor.execute('UPDATE resultados_generales SET "¿GobiernoFormado?" = "No" WHERE Año = 2016 OR Mes = "Abril"')

cursor.execute("UPDATE resultados_generales SET PartidoFormadorGobierno = 'UCD' WHERE Año = 1979")
cursor.execute("UPDATE resultados_generales SET PartidoFormadorGobierno = 'PSOE' WHERE Año IN (1982, 1986, 1989, 1993, 2004, 2008, 2023)")
cursor.execute("UPDATE resultados_generales SET PartidoFormadorGobierno = 'PSOE' WHERE Año = 2019 AND Mes = 'Noviembre'")
cursor.execute("UPDATE resultados_generales SET PartidoFormadorGobierno = 'PP' WHERE Año IN (1996, 2000, 2011, 2015)")

conn.commit()
conn.close()

print("\n✅ Base de datos generada y columnas actualizadas correctamente.")


💾 Exportando DataFrame a SQLite en:
D:\AAACarpetasEmigradasDeC\Downloads\Data Science\elecciones-espana\data\processed\resultados_elecciones.sqlite

✅ Base de datos generada y columnas actualizadas correctamente.


## 2. ETL nº2 - Resultados de elecciones por municipio (archivos XLSX)

### 2.1. - Generación de XLSX unificado de resultados electorales por municipio

In [None]:
# ================= CONFIGURACIÓN =================
# Buscar la carpeta 'data/raw' desde el directorio actual hacia arriba
ROOT_DIR = Path().resolve()
while not (ROOT_DIR / 'data').exists() and ROOT_DIR != ROOT_DIR.parent:
    ROOT_DIR = ROOT_DIR.parent

# Ruta a la carpeta de entrada
xlsx_dir = ROOT_DIR / 'data' / 'raw'

# Buscar automáticamente todos los archivos .xlsx que empiecen por 'Generales_por_Municipio_'
xlsx_paths = sorted(xlsx_dir.glob("Generales_por_Municipio_*.xlsx"))

# Ruta de salida automática en 'data/processed'
output_dir = ROOT_DIR / 'data' / 'processed'
output_dir.mkdir(parents=True, exist_ok=True)  # Crear la carpeta si no existe
output_path = output_dir / 'Elecciones_Consolidadas.xlsx'

# ================= FUNCIONES =================
def leer_archivos(archivos):
    dfs = []
    start_time = time.time()

    print(f"Procesando {len(archivos)} archivos...")

    for i, archivo in enumerate(archivos, 1):
        try:
            archivo_nombre = os.path.basename(archivo)
            print(f"\r [{i}/{len(archivos)}] {archivo_nombre[:30]}... | {time.time() - start_time:.1f}s", end='')

            df = pd.read_excel(archivo, engine='openpyxl', header=None)
            dfs.append(df)
        except Exception as e:
            print(f"\n⚠️ Error en {archivo}: {str(e)}")
            continue

    print("\n\n Concatenando datos...")
    df_total = pd.concat(dfs, ignore_index=True)

    # === Insertar columnas vacías para Año y Mes ===
    df_total.insert(0, "Año", None)
    df_total.insert(1, "Mes", None)

    # === Detectar líneas con "Congreso | Mes Año" y propagar ===
    año_actual = None
    mes_actual = None
    for i in range(len(df_total)):
        valor = str(df_total.iloc[i, 2])
        if isinstance(valor, str) and "congreso |" in valor.lower():
            partes = valor.split("|")
            if len(partes) >= 2:
                fecha = partes[1].strip()
                partes_fecha = fecha.split()
                if len(partes_fecha) == 2:
                    mes_actual = partes_fecha[0].capitalize()
                    año_actual = partes_fecha[1]

        if año_actual and mes_actual:
            df_total.at[i, "Año"] = año_actual
            df_total.at[i, "Mes"] = mes_actual

    df_total["Año"] = df_total["Año"].ffill()
    df_total["Mes"] = df_total["Mes"].ffill()

    return df_total

def guardar_con_valores(df, ruta_salida):
    import xlsxwriter

    print(f"\n Guardando archivo con participación calculada en:\n{ruta_salida}")

    if os.path.exists(ruta_salida):
        os.remove(ruta_salida)

    def a_float_strict(val):
        if isinstance(val, str):
            val = val.strip().replace(".", "").replace(",", ".")
        return float(val)

    participacion = []
    for idx, row in df.iterrows():
        try:
            val8 = a_float_strict(row[7])
            val9 = a_float_strict(row[8])
            resultado = val9 / val8 
            participacion.append(round(resultado, 4))
        except:
            participacion.append("")

    df.insert(11, "Participación", participacion)

    with pd.ExcelWriter(ruta_salida, engine='xlsxwriter') as writer:
        workbook = writer.book
        worksheet = workbook.add_worksheet("Resultados")
        writer.sheets["Resultados"] = worksheet

        format_pct = workbook.add_format({'num_format': '0.00%;[Red]-0.00%'})

        # === Buscar la fila real con encabezados ===
        fila_encabezado = df[df.iloc[:, 2].astype(str).str.contains("Nombre de Comunidad", case=False, na=False)].index.min()

        worksheet.write(0, 0, "Año")
        worksheet.write(0, 1, "Mes")

        if pd.notna(fila_encabezado):
            for col in range(2, 11):  # C-K
                valor = df.iloc[fila_encabezado, col]
                worksheet.write(0, col, valor)
        else:
            print("⚠️ No se detectó la fila de encabezados correctamente.")

        worksheet.write(0, 11, "Participación")

        # === Escribir datos desde fila 2 ===
        for row in range(len(df)):
            for col in range(len(df.columns)):
                val = df.iloc[row, col]
                try:
                    if col == 11 and pd.notna(val) and math.isfinite(float(val)):
                        worksheet.write_number(row + 1, col, float(val), format_pct)
                    else:
                        worksheet.write(row + 1, col, val)
                except:
                    worksheet.write(row + 1, col, "")

    print(f"\n✅ Archivo guardado: {os.path.abspath(ruta_salida)}")

# ================= EJECUCIÓN =================
if __name__ == "__main__":
    try:
        df_final = leer_archivos(xlsx_paths)
        print(f"\n Datos consolidados: {len(df_final):,} filas")
        print("Columnas disponibles:", df_final.columns.tolist())

        guardar_con_valores(df_final, output_path)

    except Exception as e:
        print(f"\n❌ Error en el proceso principal: {str(e)}")
        print("🛠 Posibles soluciones:")
        print("- Verifica que todos los archivos de entrada existan")
        print("- Asegúrate de tener permisos de escritura en la ruta de salida")
        print("- Revisa que los archivos tengan la misma estructura de columnas")


Procesando 15 archivos...
 [15/15] Generales_por_Municipio_2023.x... | 68.7s

 Concatenando datos...

 Datos consolidados: 121,468 filas
Columnas disponibles: ['Año', 'Mes', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110]

 Guardando archivo con participación calculada en:
D:\AAACarpetasEmigradasDeC\Downloads\Data Science\elecciones-espana\data\processed\Elecciones_Consolidadas.xlsx

✅ Archivo guardado: D:\AAACarpetasEmigradasDeC\Downloads\Data Science\elecciones-espana\data\processed\Elecciones_Consolidadas.xlsx


### 2.2. - Trasvase de XLSX a SQlite, incluyendo modificación de nombres de columnas

In [None]:
# Ruta de salida automática en 'data/processed'
output_sql = ROOT_DIR / 'data' / 'processed'

# Ruta al archivo SQLite (mismo directorio que RUTA_SALIDA)
RUTA_SQLITE = output_sql / "Elecciones_Consolidadas.sqlite"

# Diccionario de columnas a renombrar si existen como números
column_renames = {
    "0": "Nombre_de_Comunidad",
    "1": "Codigo_de_Provincia",
    "2": "Nombre_de_Provincia",
    "3": "Codigo_de_Municipio",
    "4": "Nombre_de_Municipio",
    "5": "Poblacion",
    "6": "Numero_de_Mesas",
    "7": "Total_Censo_Electoral",
    "8": "Total_Votantes"
}

try:
    # Crear conexión
    conn = sqlite3.connect(RUTA_SQLITE)
    cursor = conn.cursor()

    # Exportar DataFrame a tabla SQL (sin índice)
    df_final.to_sql("resultados_municipales", conn, if_exists="replace", index=False)

    print(f"\n✅ Base de datos SQLite creada en:\n{os.path.abspath(RUTA_SQLITE)}")

    # === Verificar columnas numéricas ===
    cursor.execute("PRAGMA table_info(resultados_municipales);")
    columnas_sql = [row[1] for row in cursor.fetchall()]
    columnas_numericas = [col for col in columnas_sql if col in column_renames]

    if columnas_numericas:
        print(f"\nℹ️ Se detectaron columnas numéricas: {columnas_numericas}")
        for vieja, nueva in column_renames.items():
            if vieja in columnas_sql:
                try:
                    cursor.execute(f'ALTER TABLE resultados_municipales RENAME COLUMN "{vieja}" TO "{nueva}"')
                    print(f"✅ Renombrada: '{vieja}' → '{nueva}'")
                except Exception as e:
                    print(f"⚠️ No se pudo renombrar '{vieja}': {e}")
    else:
        print("✅ Las columnas ya están correctamente nombradas.")

    conn.commit()
    conn.close()

except Exception as e:
    print(f"\n❌ Error al exportar a SQLite: {e}")
    print("Verifica si el archivo está abierto o bloqueado por otro proceso.")



✅ Base de datos SQLite creada en:
D:\AAACarpetasEmigradasDeC\Downloads\Data Science\elecciones-espana\data\processed\Elecciones_Consolidadas.sqlite

ℹ️ Se detectaron columnas numéricas: ['0', '1', '2', '3', '4', '5', '6', '7', '8']
✅ Renombrada: '0' → 'Nombre_de_Comunidad'
✅ Renombrada: '1' → 'Codigo_de_Provincia'
✅ Renombrada: '2' → 'Nombre_de_Provincia'
✅ Renombrada: '3' → 'Codigo_de_Municipio'
✅ Renombrada: '4' → 'Nombre_de_Municipio'
✅ Renombrada: '5' → 'Poblacion'
✅ Renombrada: '6' → 'Numero_de_Mesas'
✅ Renombrada: '7' → 'Total_Censo_Electoral'
✅ Renombrada: '8' → 'Total_Votantes'


## 3. ETL nº3 - Renta por municipios de España (archivo CSV)

### 3.1. - Trasvase de datos CSV transformados a DataFrame y SQLite

In [None]:
# Detectar la raíz del proyecto (carpeta que contiene /data)
ROOT_DIR = Path().resolve()
while not (ROOT_DIR / 'data').exists() and ROOT_DIR != ROOT_DIR.parent:
    ROOT_DIR = ROOT_DIR.parent

# Rutas relativas portables
ruta_csv = ROOT_DIR / 'data' / 'raw' / 'Datos_Renta_Municipios.csv'
ruta_sqlite = ROOT_DIR / 'data' / 'processed' / 'Datos_Renta_Municipios.sqlite'
nombre_tabla = "Datos_Renta_Municipios"

# Verificar que el CSV existe
if not ruta_csv.exists():
    raise FileNotFoundError(f"❌ El archivo CSV no existe: {ruta_csv}")

# Leer el CSV
df = pd.read_csv(ruta_csv, sep=';', encoding='utf-8-sig')
print(f"📥 CSV cargado con {len(df)} filas.")

# Asegurarse de que la carpeta 'processed' existe
ruta_sqlite.parent.mkdir(parents=True, exist_ok=True)

# Conectar a SQLite y crear tabla
conn = sqlite3.connect(ruta_sqlite)
df.to_sql(nombre_tabla, conn, if_exists='replace', index=False)
print(f"Tabla '{nombre_tabla}' creada en '{ruta_sqlite}'.")

# Eliminar filas donde 'Distritos' o 'Tota' no sean NULL
cursor = conn.cursor()
cursor.execute(f"DELETE FROM {nombre_tabla} WHERE Distritos IS NOT NULL;")
conn.commit()
print("🧹 Filas con 'Distritos' no nulos eliminadas.")

# Leer solo las columnas útiles
query = f"""
    SELECT Municipios, [Indicadores de renta media], Periodo, Total
    FROM {nombre_tabla}
"""
df_limpio = pd.read_sql(query, conn)
conn.close()

# Mostrar resultados
print(f"✅ Datos finales cargados. {len(df_limpio)} filas, columnas: {df_limpio.columns.tolist()}")
df_limpio.head()


📥 CSV cargado con 2670912 filas.
Tabla 'Datos_Renta_Municipios' creada en 'D:\AAACarpetasEmigradasDeC\Downloads\Data Science\elecciones-espana\data\processed\Datos_Renta_Municipios.sqlite'.
🧹 Filas con 'Distritos' no nulos eliminadas.
✅ Datos finales cargados. 390624 filas, columnas: ['Municipios', 'Indicadores de renta media', 'Periodo', 'Total']


Unnamed: 0,Municipios,Indicadores de renta media,Periodo,Total
0,01001 Alegría-Dulantzi,Renta neta media por persona,2022,15.116
1,01001 Alegría-Dulantzi,Renta neta media por persona,2021,14.647
2,01001 Alegría-Dulantzi,Renta neta media por persona,2020,13.969
3,01001 Alegría-Dulantzi,Renta neta media por persona,2019,14.299
4,01001 Alegría-Dulantzi,Renta neta media por persona,2018,13.361


# 4. Creación de nueva tabla merged renta y municipio

In [None]:
# Agregamos la columna Municipio_Limpio a las tablas Datos_Renta_Municipios y Elecciones_Consolidadas

# ==================== CÓDIGO PARA AGREGAR MUNICIPIO_LIMPIO ====================

# Detectar la raíz del proyecto (carpeta que contiene /data)
ROOT_DIR = Path().resolve()
while not (ROOT_DIR / 'data').exists() and ROOT_DIR != ROOT_DIR.parent:
    ROOT_DIR = ROOT_DIR.parent

# Función de limpieza compatible con ambos contextos
def crear_columna_municipio_limpio_sqlite():
    # Conectar a las bases de datos
    conn_renta = sqlite3.connect(ROOT_DIR / 'data' / 'processed' / 'Datos_Renta_Municipios.sqlite')
    conn_elecciones = sqlite3.connect(ROOT_DIR / 'data' / 'processed' / 'Elecciones_Consolidadas.sqlite')

    # Crear la columna en Datos_Renta_Municipios
    conn_renta.execute("""
        CREATE TEMPORARY VIEW IF NOT EXISTS Renta_Limpio AS
        SELECT *,
               UPPER(TRIM(SUBSTR(Municipios, INSTR(Municipios, ' ') + 1))) AS Municipio_Limpio
        FROM Datos_Renta_Municipios;
    """)

    conn_renta.execute("""
        DROP TABLE IF EXISTS Datos_Renta_Municipios_Cleaned;
    """)
    conn_renta.execute("""
        CREATE TABLE Datos_Renta_Municipios_Cleaned AS
        SELECT * FROM Renta_Limpio;
    """)

    conn_renta.execute("DROP VIEW IF EXISTS Renta_Limpio;")
    conn_renta.commit()

    # Crear la columna en resultados_municipales
    conn_elecciones.execute("""
        CREATE TEMPORARY VIEW IF NOT EXISTS Elecciones_Limpio AS
        SELECT *,
               UPPER(TRIM(Nombre_de_Municipio)) AS Municipio_Limpio
        FROM resultados_municipales;
    """)

    conn_elecciones.execute("""
        DROP TABLE IF EXISTS resultados_municipales_Cleaned;
    """)
    conn_elecciones.execute("""
        CREATE TABLE resultados_municipales_Cleaned AS
        SELECT * FROM Elecciones_Limpio;
    """)

    conn_elecciones.execute("DROP VIEW IF EXISTS Elecciones_Limpio;")
    conn_elecciones.commit()

    # Cerrar conexiones
    conn_renta.close()
    conn_elecciones.close()

crear_columna_municipio_limpio_sqlite()


In [13]:
# Creamos nueva base de datos SQLite con la tabla combinada
# Detectar la raíz del proyecto (carpeta que contiene /data)
ROOT_DIR = Path().resolve()
while not (ROOT_DIR / 'data').exists() and ROOT_DIR != ROOT_DIR.parent:
    ROOT_DIR = ROOT_DIR.parent

# Rutas a las bases de datos originales
conn_renta = sqlite3.connect(ROOT_DIR / 'data' / 'processed' / 'Datos_Renta_Municipios.sqlite')
conn_elecciones = sqlite3.connect(ROOT_DIR / 'data' / 'processed' / 'Elecciones_Consolidadas.sqlite')
output_path = ROOT_DIR / 'data' / 'processed' / 'Renta_y_Participacion.sqlite'

# === 1. EXTRAER ELECCIONES ===
query_elecciones = """
SELECT 
    Municipio_Limpio AS Municipio,
    Año,
    Mes AS Mes_Elecciones,
    Participación
FROM resultados_municipales_Cleaned
WHERE Municipio_Limpio IS NOT NULL AND Participación IS NOT NULL
"""
df_elecciones = pd.read_sql_query(query_elecciones, conn_elecciones)

# Convertir participación a float
if df_elecciones['Participación'].dtype == 'object':
    df_elecciones['Participación'] = pd.to_numeric(
        df_elecciones['Participación'].str.replace(",", "."), errors='coerce'
    )

# === 2. EXTRAER RENTA ===
query_renta = """
SELECT 
    Municipio_Limpio,
    "Periodo" AS Año,
    "Indicadores de renta media" AS Indicador,
    Total
FROM Datos_Renta_Municipios_Cleaned
WHERE "Indicadores de renta media" IN (
    'Renta neta media por persona',
    'Mediana de la renta por unidad de consumo'
)
"""
df_renta = pd.read_sql_query(query_renta, conn_renta)

# Pivotear renta para tener una fila por municipio
df_renta_pivot = df_renta.pivot_table(
    index=['Municipio_Limpio', 'Año'],  # incluir Año
    columns='Indicador',
    values='Total',
    aggfunc='first'
).reset_index()

# Renombrar columnas
df_renta_pivot.columns.name = None
df_renta_pivot = df_renta_pivot.rename(columns={
    'Municipio_Limpio': 'Municipio',
    'Renta neta media por persona': 'Renta_Media',
    'Mediana de la renta por unidad de consumo': 'Renta_Mediana_Hogar'
})

# Limpiar y convertir textos como "15,00" → 15000.0
def limpiar_valor(val):
    if isinstance(val, str):
        val = val.replace(",", ".").replace(" ", "")
    return pd.to_numeric(val, errors='coerce') * 1000  # porque el valor es en miles

df_renta_pivot['Renta_Media'] = df_renta_pivot['Renta_Media'].apply(limpiar_valor)
df_renta_pivot['Renta_Mediana_Hogar'] = df_renta_pivot['Renta_Mediana_Hogar'].apply(limpiar_valor)

# === 3. UNIR ===
df_elecciones['Año'] = pd.to_numeric(df_elecciones['Año'], errors='coerce')
df_renta_pivot['Año'] = pd.to_numeric(df_renta_pivot['Año'], errors='coerce')
df_combinado = pd.merge(df_elecciones, df_renta_pivot, on=['Municipio', 'Año'], how='inner')

# === 4. CUARTILES CON ETIQUETAS (por año) ===
def asignar_cuartiles_por_año(grupo):
    grupo['Cuartil_Renta_Media'] = pd.qcut(grupo['Renta_Media'], 4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
    grupo['Cuartil_Renta_Mediana'] = pd.qcut(grupo['Renta_Mediana_Hogar'], 4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
    grupo['Cuartil_Participacion'] = pd.qcut(grupo['Participación'], 4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
    return grupo

df_combinado = df_combinado.groupby('Año', group_keys=False).apply(asignar_cuartiles_por_año)

# === 5. REORDENAR COLUMNAS ===
df_final = df_combinado[[
    'Municipio', 'Año', 'Mes_Elecciones', 'Participación',
    'Renta_Media', 'Renta_Mediana_Hogar',
    'Cuartil_Renta_Media', 'Cuartil_Renta_Mediana', 'Cuartil_Participacion'
]]

# === 6. GUARDAR RESULTADO ===
conn_output = sqlite3.connect(output_path)
df_final.to_sql("Renta_y_Participacion", conn_output, if_exists="replace", index=False)
conn_output.close()

# Cerrar conexiones originales
conn_elecciones.close()
conn_renta.close()


  df_combinado = df_combinado.groupby('Año', group_keys=False).apply(asignar_cuartiles_por_año)
