In [64]:
import pandas as pd
import os

# Procesamiento de los datos de las encuestas familiares

In [None]:
# Definir la ruta base donde están los datos
base_path = "../data"

# Estructura de los archivos
years = list(range(2016, 2024))  # De 2016 a 2023
file_types = ["EPFgastos", "EPFhogar", "EPFmhogar"]

# Diccionario para almacenar los DataFrames
dataframes = {ftype: [] for ftype in file_types}

# Leer los archivos y almacenarlos
for year in years:
    year_path = os.path.join(base_path, str(year))
    for ftype in file_types:
        file_name = f"{ftype}_{year}.csv"
        file_path = os.path.join(year_path, file_name)
        
        if os.path.exists(file_path):
            df = pd.read_csv(file_path, low_memory=False, encoding="utf-8", sep="\t")  # Cargar el CSV
            df["Año"] = year  # Agregar una columna de año
            dataframes[ftype].append(df)

# Unir los archivos en DataFrames únicos por tipo
df_gastos = pd.concat(dataframes["EPFgastos"], ignore_index=True) if dataframes["EPFgastos"] else None
df_hogar = pd.concat(dataframes["EPFhogar"], ignore_index=True) if dataframes["EPFhogar"] else None
df_mhogar = pd.concat(dataframes["EPFmhogar"], ignore_index=True) if dataframes["EPFmhogar"] else None


# Procesamiento del Fichero de Miembros

In [None]:
pd.set_option('display.max_columns', None)
df_mhogar.head()

Unnamed: 0,ANOENC,NUMERO,NORDEN,CATEGMH,SUSPRIN,RELASP,EDAD,SEXO,PAISNACIM,NACIONA,PAISNACION,SITURES,ECIVILLEGAL,NORDENCO,UNION,CONVIVENCIA,NORDENPA,PAISPADRE,NORDENMA,PAISMADRE,ESTUDIOS,ESTUDRED,SITUACT,SITURED,OCU,JORNADA,PERCEP,IMPEXACP,INTERINP,NINODEP,HIJODEP,ADULTO,FACTOR,Año
0,2016,1,1,1,1,1,66,1,1,1,,1,2,2,1,1,99,1,99,1,2,1,1,1,1,1.0,1,-9.0,2,6,6,1,742.998897,2016
1,2016,1,2,1,6,2,64,6,1,1,,1,2,1,1,1,99,1,99,1,2,1,1,1,1,1.0,1,-9.0,2,6,6,1,742.998897,2016
2,2016,2,1,1,1,1,35,1,1,1,,1,2,2,1,1,99,1,99,1,7,4,3,1,2,,1,-9.0,1,6,6,1,1849.021419,2016
3,2016,2,2,1,6,2,35,6,3,1,,1,2,1,1,1,99,1,99,1,4,3,1,1,1,1.0,1,-9.0,2,6,6,1,1849.021419,2016
4,2016,3,1,1,1,1,64,1,1,1,,1,2,2,1,1,99,1,99,1,3,2,1,1,1,1.0,1,-9.0,5,6,6,1,1580.183563,2016


Contamos el numero de mujeres por casa

In [None]:
df_mhogar.isna().sum()  # Verifica los valores nulos en el DataFrame

ANOENC              0
NUMERO              0
NORDEN              0
CATEGMH             0
SUSPRIN             0
RELASP              0
EDAD                0
SEXO                0
PAISNACIM           0
NACIONA             0
PAISNACION      46545
SITURES             0
ECIVILLEGAL         0
NORDENCO            0
UNION           25096
CONVIVENCIA         0
NORDENPA            0
PAISPADRE           0
NORDENMA            0
PAISMADRE           0
ESTUDIOS         7976
ESTUDRED         7976
SITUACT          7976
SITURED          7976
OCU              7976
JORNADA         29419
PERCEP              0
IMPEXACP       148267
INTERINP        16595
NINODEP             0
HIJODEP             0
ADULTO              0
FACTOR              0
Año                 0
dtype: int64

# Procesamiento del Fichero de Hogar

In [69]:
df_hogar.columns

Index(['ANOENC', 'NUMERO', 'CCAA', 'NUTS1', 'CAPROV', 'TAMAMU', 'DENSIDAD',
       'CLAVE', 'CLATEO', 'FACTOR',
       ...
       'FUENPRINRED', 'IMPEXAC', 'INTERIN', 'NUMPERI', 'COMIMH', 'COMISD',
       'COMIHU', 'COMIINV', 'COMITOT', 'Año'],
      dtype='object', length=189)

Seleccionamos las columnas que almacenaremos en nuestro datalake

In [None]:
columnas_df_hogar = [
    # 1️⃣ Información General
    "ANOENC", "NUMERO", "CCAA", "CAPROV", "TAMAMU", "DENSIDAD",

    # 2️⃣ Características del Hogar (excluyendo 'TAMAÑO')
    "NUMACTI", "NUMOCU", "NUMESTU",

    # 3️⃣ Datos del Sustentador Principal
    "EDADSP", "NACIONASP", "OCUSP", "ESTUDREDSP",

    # 4️⃣ Características de la Vivienda
    "REGTEN", "ZONARES", "SUPERF", "AGUACALI", "CALEF",

    # 5️⃣ Otras Viviendas a Disposición del Hogar
    "DISPOSIOV",

    # 7️⃣ Ingresos Regulares del Hogar
    "CAPROP", "CAJENA", "IMPEXAC",

    # Gasto por hogar
    "GASTOT", "FACTOR", "NUMPERI"
]

df_hogar_filtrado = df_hogar[columnas_df_hogar]
df_hogar_filtrado = df_hogar_filtrado.copy()



In [71]:
pd.set_option('display.max_columns', None)

df_hogar_filtrado.head()

Unnamed: 0,ANOENC,NUMERO,CCAA,CAPROV,TAMAMU,DENSIDAD,NUMACTI,NUMOCU,NUMESTU,EDADSP,NACIONASP,OCUSP,ESTUDREDSP,REGTEN,ZONARES,SUPERF,AGUACALI,CALEF,DISPOSIOV,CAPROP,CAJENA,IMPEXAC,GASTOT,FACTOR,NUMPERI
0,2016,1,7,6,5,3,2,2,0,66,1,1,1,2,5,96,1,1,6,1,6,1225,14239915.96,742.998897,2
1,2016,2,12,6,2,1,2,1,0,35,1,2,4,1,3,-9,1,1,6,1,1,676,82364853.85,1849.021419,2
2,2016,3,13,1,1,1,4,4,0,64,1,1,2,5,3,155,1,1,6,6,1,5868,95097473.22,1580.183563,4
3,2016,4,8,6,5,3,4,2,0,60,1,2,2,2,7,150,1,1,6,6,1,3769,25251223.53,846.366967,4
4,2016,5,13,6,1,1,1,1,0,37,2,1,3,3,3,98,1,1,6,6,1,1271,39117817.76,1359.668063,1


Convertimos en variables binarias manejables, las variables binarias

In [72]:
df_hogar_filtrado.loc[:, 'CAPROV'] = df_hogar_filtrado['CAPROV'].replace({1: 1, 6: 0})
df_hogar_filtrado.loc[:, 'AGUACALI'] = df_hogar_filtrado['AGUACALI'].replace({1: 1, 6: 0})
df_hogar_filtrado.loc[:, 'CALEF'] = df_hogar_filtrado['CALEF'].replace({1: 1, 6: 0})
df_hogar_filtrado.loc[:, 'DISPOSIOV'] = df_hogar_filtrado['DISPOSIOV'].replace({1: 1, 6: 0})
df_hogar_filtrado.loc[:, 'CAJENA'] = df_hogar_filtrado['CAJENA'].replace({1: 1, 6: 0})
df_hogar_filtrado.loc[:, 'CAPROP'] = df_hogar_filtrado['CAPROP'].replace({1: 1, 6: 0})

Manejamos la columna NACIONASP, la convertiremos en dummies:

- NACION_ESP = 1 si la persona tiene nacionalidad española (NACIONASP == 1 o 3).
- NACION_EXT = 1 si la persona tiene nacionalidad extranjera (NACIONASP == 2 o 3).
- Si tiene doble nacionalidad (NACIONASP == 3), ambas serán 1.

In [73]:
df_hogar_filtrado['NACIONASP'].value_counts()

1    153414
2      8350
3      4358
Name: NACIONASP, dtype: int64

In [74]:
df_hogar_filtrado['NACION_ESP'] = df_hogar_filtrado['NACIONASP'].isin([1, 3]).astype(int)
df_hogar_filtrado['NACION_EXT'] = df_hogar_filtrado['NACIONASP'].isin([2, 3]).astype(int)

df_hogar_filtrado.drop(columns=['NACIONASP'], inplace=True)

Estudios del sustentador principal, lo modificaremos a si tiene educación superior o no, y la situación laboral a si está trabajando o no.

In [75]:
df_hogar_filtrado['EDUC_SUPERIOR'] = (df_hogar_filtrado['ESTUDREDSP'] == 4).astype(int)
df_hogar_filtrado.drop(columns=['ESTUDREDSP'], inplace=True)

In [76]:
# Reemplazo de valores en 'OCUSP' los valores 2 se reemplazan por 0
df_hogar_filtrado['OCUSP'] = df_hogar_filtrado['OCUSP'].replace({2: 0})

In [77]:
# La variable TAMAMU esta categorica de 1 a 5 y se le da la vuelta a la escala
df_hogar_filtrado['TAMAMU'] = df_hogar_filtrado['TAMAMU'].replace({1: 5, 2: 4, 3: 3, 4: 2, 5: 1})

# Lo mismo para la variable DENSIDAD
df_hogar_filtrado['DENSIDAD'] = df_hogar_filtrado['DENSIDAD'].replace({1: 3, 2: 2, 3: 1})

# Lo mismo para la variable ZONARES del 1 al 7
df_hogar_filtrado['ZONARES'] = df_hogar_filtrado['ZONARES'].replace({1: 7, 2: 6, 3: 5, 4: 4, 5: 3, 6: 2, 7: 1})

# Lo mismo para la variable REGTEN del 1 al 6
df_hogar_filtrado['REGTEN'] = df_hogar_filtrado['REGTEN'].replace({1: 6, 2: 5, 3: 4, 4: 3, 5: 2, 6: 1})

In [78]:
df_hogar_filtrado.head()

Unnamed: 0,ANOENC,NUMERO,CCAA,CAPROV,TAMAMU,DENSIDAD,NUMACTI,NUMOCU,NUMESTU,EDADSP,OCUSP,REGTEN,ZONARES,SUPERF,AGUACALI,CALEF,DISPOSIOV,CAPROP,CAJENA,IMPEXAC,GASTOT,FACTOR,NUMPERI,NACION_ESP,NACION_EXT,EDUC_SUPERIOR
0,2016,1,7,0,1,1,2,2,0,66,1,5,3,96,1,1,0,1,0,1225,14239915.96,742.998897,2,1,0,0
1,2016,2,12,0,4,3,2,1,0,35,0,6,5,-9,1,1,0,1,1,676,82364853.85,1849.021419,2,1,0,1
2,2016,3,13,1,5,3,4,4,0,64,1,2,5,155,1,1,0,0,1,5868,95097473.22,1580.183563,4,1,0,0
3,2016,4,8,0,1,1,4,2,0,60,0,5,1,150,1,1,0,0,1,3769,25251223.53,846.366967,4,1,0,0
4,2016,5,13,0,5,3,1,1,0,37,1,4,5,98,1,1,0,0,1,1271,39117817.76,1359.668063,1,0,1,0


Unimos el numero de mujeres y hombres por casa

In [79]:
# Filtrar solo filas válidas
df_validos = df_mhogar[df_mhogar["ADULTO"].isin([1, 6]) & df_mhogar["SEXO"].isin([1, 6])]

# Clasificar personas según sexo y NADULTO
def clasificar(row):
    if row["ADULTO"] == 1 and row["SEXO"] == 1:
        return "NADUL_MAS"
    elif row["ADULTO"] == 1 and row["SEXO"] == 6:
        return "NADUL_FEM"
    elif row["ADULTO"] == 6 and row["SEXO"] == 1:
        return "NNINO_MAS"
    elif row["ADULTO"] == 6 and row["SEXO"] == 6:
        return "NNINO_FEM"

df_validos["categoria"] = df_validos.apply(clasificar, axis=1)

# Contar por hogar y año
conteo_personas = df_validos.groupby(["NUMERO", "Año", "categoria"]).size().unstack(fill_value=0).reset_index()

# Renombrar la columna Año por ANOENC
conteo_personas.rename(columns={"Año": "ANOENC"}, inplace=True)

# Asegurar que todas las columnas estén presentes aunque no existan en los datos
for col in ["NADUL_MAS", "NADUL_FEM", "NNINO_MAS", "NNINO_FEM"]:
    if col not in conteo_personas.columns:
        conteo_personas[col] = 0

# Unir al df_hogar_filtrado
df_hogar_filtrado = df_hogar_filtrado.merge(conteo_personas, on=["NUMERO", "ANOENC"], how="left")

# Rellenar nulos por 0
for col in ["NADUL_MAS", "NADUL_FEM", "NNINO_MAS", "NNINO_FEM"]:
    df_hogar_filtrado[col] = df_hogar_filtrado[col].fillna(0).astype(int)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_validos["categoria"] = df_validos.apply(clasificar, axis=1)


# Procesamiento del Fichero de Gastos

In [80]:
df_gastos.head()

Unnamed: 0,ANOENC,NUMERO,CODIGO,GASTO,PORCENDES,PORCENIMP,CANTIDAD,GASTOMON,GASTNOM1,GASTNOM2,GASTNOM3,GASTNOM4,GASTNOM5,FACTOR,Año
0,2016,1,1111,37.22,100.0,100.0,,37.22,,,,,,742.998897,2016
1,2016,1,1112,14.79,100.0,100.0,,14.79,,,,,,742.998897,2016
2,2016,1,1113,113964.81,0.5,83.0,45869.56,113964.81,,,,,,742.998897,2016
3,2016,1,1114,54614.37,0.69,0.69,5811.31,54614.37,,,,,,742.998897,2016
4,2016,1,1115,73.85,100.0,100.0,,73.85,,,,,,742.998897,2016


In [81]:
# Lista de columnas sdeleccioandas del DataFrame según la estructura proporcionada
columnas_df_gastos = [
    "ANOENC", "NUMERO", "CODIGO", "GASTO", "FACTOR"
]

df_gastos_filtrado = df_gastos[columnas_df_gastos]
df_gastos_filtrado = df_gastos_filtrado.copy()

Añadimos el factor temporal

In [None]:
# Cargar el CSV con códigos y factores temporales (separado por tabulaciones)
df_factors = pd.read_csv("../data/codes_factors.csv", sep="\t", dtype={"CODIGO": str})

# Convertir ambos códigos a string para evitar errores de tipo
df_gastos_filtrado["CODIGO"] = df_gastos_filtrado["CODIGO"].astype(str)
df_factors["CODIGO"] = df_factors["CODIGO"].astype(str)

# Hacer el merge basado en la columna CODIGO
df_gastos_filtrado = df_gastos_filtrado.merge(df_factors, on="CODIGO", how="left")

# Mostrar las primeras filas para verificar
df_gastos_filtrado.head()

Unnamed: 0,ANOENC,NUMERO,CODIGO,GASTO,FACTOR,FACTOR_TEMPORAL
0,2016,1,1111,37.22,742.998897,Bisemanal
1,2016,1,1112,14.79,742.998897,Bisemanal
2,2016,1,1113,113964.81,742.998897,Bisemanal
3,2016,1,1114,54614.37,742.998897,Bisemanal
4,2016,1,1115,73.85,742.998897,Bisemanal


In [83]:
df_factors.head()

Unnamed: 0,CODIGO,FACTOR_TEMPORAL
0,1111,Bisemanal
1,1112,Bisemanal
2,1113,Bisemanal
3,1114,Bisemanal
4,1115,Bisemanal


Procesamos el codigo

In [84]:
# Contar cuántas veces aparece el número 1 en la columna "NUMERO"
conteo_numero_1 = df_gastos['NUMERO'].eq(1).sum()

# Mostrar el resultado
print(f"El número 1 aparece {conteo_numero_1} veces en la columna 'NUMERO'.")


El número 1 aparece 562 veces en la columna 'NUMERO'.


In [85]:
df_gastos_filtrado.head()

Unnamed: 0,ANOENC,NUMERO,CODIGO,GASTO,FACTOR,FACTOR_TEMPORAL
0,2016,1,1111,37.22,742.998897,Bisemanal
1,2016,1,1112,14.79,742.998897,Bisemanal
2,2016,1,1113,113964.81,742.998897,Bisemanal
3,2016,1,1114,54614.37,742.998897,Bisemanal
4,2016,1,1115,73.85,742.998897,Bisemanal


# Procesamiento de Indicadores Externos

In [None]:
tipo_interes_df = pd.read_csv("../data/tipos_interes_bce.csv", sep="\t", encoding="utf-8")
temperaturas_extremas_df = pd.read_csv("../data/temperaturas_comunidades.csv", sep="\t", encoding="utf-8")
tasa_paro_df = pd.read_csv("../data/tasa_paro_comunidad_2016_2023.csv", sep="\t", encoding="ISO-8859-1", decimal=",")
inflacion_df = pd.read_csv("../data/inflacion_espana_2016_2023.csv", sep="\t", encoding="utf-8")

In [87]:
temperaturas_extremas_df.head()

Unnamed: 0,CCAA,Comunidad_Autonoma,Año,Tmax_max,Tmin_min
0,1,Andalucía,2016,44.6,-6.9
1,1,Andalucía,2017,44.4,-7.4
2,1,Andalucía,2018,43.5,-5.1
3,1,Andalucía,2019,40.6,-8.6
4,1,Andalucía,2020,42.9,-8.8


In [88]:
tasa_paro_df.head()

Unnamed: 0,Comunidades y Ciudades Autónomas,Sexo,Tiempo de residencia en el municipio,Periodo,Total
0,01 Andalucía,Ambos sexos,Total,2023,18.3
1,01 Andalucía,Ambos sexos,Total,2022,19.4
2,01 Andalucía,Ambos sexos,Total,2021,22.5
3,01 Andalucía,Ambos sexos,Total,2020,21.2
4,01 Andalucía,Ambos sexos,Total,2019,21.1


In [89]:
codigos_comunidad = {
    "Andalucía": 1, "Aragón": 2, "Asturias, Principado de": 3, "Balears, Illes": 4,
    "Canarias": 5, "Cantabria": 6, "Castilla y León": 8, "Castilla-La Mancha": 7,
    "Cataluña": 9, "Comunitat Valenciana": 10, "Extremadura": 11, "Galicia": 12,
    "Madrid, Comunidad de": 13, "Murcia, Región de": 14, "Navarra, Comunidad Foral de": 15,
    "País Vasco": 16, "Rioja, La": 17, "Ceuta": 18, "Melilla": 19
}

# Agregar código de comunidad
inflacion_df["CCAA"] = inflacion_df["Comunidad Autónoma"].map(codigos_comunidad)



In [90]:
inflacion_df.head()


Unnamed: 0,Año,Comunidad Autónoma,Inflación (%),CCAA
0,2016,Nacional,1.6,
1,2016,Andalucía,1.6,1.0
2,2016,Aragón,1.6,2.0
3,2016,"Asturias, Principado de",1.6,3.0
4,2016,"Balears, Illes",1.6,4.0


In [91]:
tipo_interes_df.head()

Unnamed: 0,Año,Tipo_Interes
0,2016,0.0
1,2017,0.0
2,2018,0.0
3,2019,0.0
4,2020,0.0


In [92]:

# Normalizar nombres de columna para facilitar el merge
tasa_paro_df = tasa_paro_df.rename(columns={"Comunidades y Ciudades Autónomas": "Comunidad_Autonoma", "Periodo": "Año", "Total": "Tasa_Paro"})
inflacion_df = inflacion_df.rename(columns={"Inflación (%)": "Inflacion"})

# Extraer el código de comunidad en tasa_paro_df
tasa_paro_df["CCAA"] = tasa_paro_df["Comunidad_Autonoma"].str.extract(r"^0?(\d{1,2})").astype(int)

# Unir los DataFrames por Código de Comunidad y Año
indicadores_externos = temperaturas_extremas_df.merge(
    tasa_paro_df[["CCAA", "Año", "Tasa_Paro"]],
    on=["CCAA", "Año"],
    how="left"
).merge(
    inflacion_df[["CCAA", "Año", "Inflacion"]],
    on=["CCAA", "Año"],
    how="left"
).merge(
    tipo_interes_df,
    on="Año",
    how="left"
)

In [93]:
indicadores_externos.rename(columns={'Año' : 'ANOENC'}, inplace=True)


In [94]:
indicadores_externos.head()

Unnamed: 0,CCAA,Comunidad_Autonoma,ANOENC,Tmax_max,Tmin_min,Tasa_Paro,Inflacion,Tipo_Interes
0,1,Andalucía,2016,44.6,-6.9,29.7,1.6,0.0
1,1,Andalucía,2017,44.4,-7.4,26.9,1.1,0.0
2,1,Andalucía,2018,43.5,-5.1,24.7,1.0,0.0
3,1,Andalucía,2019,40.6,-8.6,21.1,0.5,0.0
4,1,Andalucía,2020,42.9,-8.8,21.2,-0.3,0.0


# Creamos nuestro datalake

In [95]:
import os
import pandas as pd

def crear_datalake(df_gastos, df_hogar, df_indicadores, output_dir="DataLake"):
    """
    Crea un DataLake estructurado por año, con tres archivos por año:
    - external_indicators.tsv: Todos los indicadores por comunidad autónoma.
    - homes.tsv: Todas las características del hogar excluyendo el año.
    - family_expenses.tsv: Toda la información de gastos excluyendo el año.

    Parámetros:
    - df_gastos (DataFrame): DataFrame con los datos de gasto (contiene Año y Número de hogar).
    - df_hogar (DataFrame): DataFrame con las características del hogar (contiene Año y CCAA).
    - df_indicadores (DataFrame): DataFrame con los datos de comunidad (contiene Año y CCAA).
    - output_dir (str): Directorio base donde se creará el datalake.
    """

    # Asegurar que la carpeta base del DataLake existe
    os.makedirs(output_dir, exist_ok=True)

    # Convertir tipos de datos para evitar problemas
    df_hogar['CCAA'] = df_hogar['CCAA'].astype(str)
    df_indicadores['CCAA'] = df_indicadores['CCAA'].astype(str)
    df_gastos['NUMERO'] = df_gastos['NUMERO'].astype(str)
    df_gastos['ANOENC'] = df_gastos['ANOENC'].astype(int)
    df_hogar['ANOENC'] = df_hogar['ANOENC'].astype(int)
    df_indicadores['ANOENC'] = df_indicadores['ANOENC'].astype(int)
    
    # Obtener todos los años únicos
    anos = df_hogar['ANOENC'].unique()

    for anoenc in anos:
        # Crear la carpeta del año si no existe
        carpeta_ano = os.path.join(output_dir, str(anoenc))
        os.makedirs(carpeta_ano, exist_ok=True)
        
        # Filtrar los datos por año
        df_indicadores_ano = df_indicadores[df_indicadores['ANOENC'] == anoenc].drop(columns=['ANOENC'])
        df_hogar_ano = df_hogar[df_hogar['ANOENC'] == anoenc].drop(columns=['ANOENC'])
        df_gastos_ano = df_gastos[df_gastos['ANOENC'] == anoenc].drop(columns=['ANOENC'])
        
        # Guardar los archivos en formato TSV con manejo de errores
        try:
            df_indicadores_ano.to_csv(os.path.join(carpeta_ano, "external_indicators.tsv"), sep="\t", index=False)
            df_hogar_ano.to_csv(os.path.join(carpeta_ano, "homes.tsv"), sep="\t", index=False)
            df_gastos_ano.to_csv(os.path.join(carpeta_ano, "family_expenses.tsv"), sep="\t", index=False)
        except OSError as e:
            print(f"❌ Error al guardar archivos en {carpeta_ano}: {e}")

    print(f"✅ DataLake creado en {output_dir}")


In [96]:
crear_datalake(df_gastos_filtrado, df_hogar_filtrado, indicadores_externos)

✅ DataLake creado en DataLake
