# ETL: Limpieza de Datos de Viviendas en Barcelona (PySpark)

## Importamos las librer√≠as necesarias


In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re


# Crear SparkSession
spark = SparkSession.builder \
    .appName("ETL_Housing_Barcelona") \
    .getOrCreate()

print("‚úÖ SparkSession creada correctamente")
print(f"Versi√≥n de Spark: {spark.version}")


‚úÖ SparkSession creada correctamente
Versi√≥n de Spark: 4.0.1


## EXTRACT: Cargar los datos


## EXPLORACI√ìN INICIAL DEL DATASET

### An√°lisis descriptivo y detecci√≥n de problemas


In [29]:
# Cargar el CSV
df_raw = spark.read.csv(
    "../data/housing-barcelona.csv",
    header=True,
    inferSchema=False  # Leer todo como string primero
)

print("‚úÖ El dataframe se ha creado correctamente")
print(f"Filas: {df_raw.count()}")
print(f"Columnas: {len(df_raw.columns)}")
print(f"\nColumnas: {df_raw.columns}")
print(f"\nPrimeras filas:")
df_raw.show(5, truncate=False)


‚úÖ El dataframe se ha creado correctamente
Filas: 10000
Columnas: 20

Columnas: ['listing_id', 'operation', 'district', 'neighborhood', 'address', 'surface_m2', 'rooms', 'bathrooms', 'price_eur', 'price_per_m2', 'floor', 'elevator', 'balcony', 'furnished', 'condition', 'energy_certificate', 'has_parking', 'latitude', 'longitude', 'agency']

Primeras filas:
+----------+---------+----------+---------------+-----------------+----------+-----+---------+---------+------------+------+--------+-------+---------+----------+------------------+-----------+--------+---------+---------------+
|listing_id|operation|district  |neighborhood   |address          |surface_m2|rooms|bathrooms|price_eur|price_per_m2|floor |elevator|balcony|furnished|condition |energy_certificate|has_parking|latitude|longitude|agency         |
+----------+---------+----------+---------------+-----------------+----------+-----+---------+---------+------------+------+--------+-------+---------+----------+------------------+-

In [30]:
# Informaci√≥n general del dataset
print("=== INFORMACI√ìN GENERAL DEL DATASET ===\n")
total_rows = df_raw.count()
total_cols = len(df_raw.columns)
print(f"Dimensiones: {total_rows} filas √ó {total_cols} columnas")
print(f"\nEsquema original:")
df_raw.printSchema()


=== INFORMACI√ìN GENERAL DEL DATASET ===

Dimensiones: 10000 filas √ó 20 columnas

Esquema original:
root
 |-- listing_id: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- district: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- surface_m2: string (nullable = true)
 |-- rooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- price_eur: string (nullable = true)
 |-- price_per_m2: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- elevator: string (nullable = true)
 |-- balcony: string (nullable = true)
 |-- furnished: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- energy_certificate: string (nullable = true)
 |-- has_parking: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- agency: string (nullable = true)



In [31]:
# An√°lisis de valores faltantes
print("=== AN√ÅLISIS DE VALORES FALTANTES ===\n")
from pyspark.sql.functions import col, when, isnan, isnull, count
import builtins

# Calcular total_rows si no est√° definido (por si se ejecuta esta celda antes que la anterior)
if 'total_rows' not in globals():
    total_rows = df_raw.count()


null_counts = {}
for col_name in df_raw.columns:
    null_count = df_raw.filter(col(col_name).isNull() | (col(col_name) == '') | (col(col_name) == '?') | 
                                (col(col_name) == 'N/A') | (col(col_name) == 'NULL') | 
                                (col(col_name) == 'null') | (col(col_name) == 'unknown')).count()
    if null_count > 0:
        null_counts[col_name] = null_count

# Ordenar por cantidad de valores faltantes
sorted_null_counts = sorted(null_counts.items(), key=lambda x: x[1], reverse=True)
for col_name, count_val in sorted_null_counts:
    percentage = (count_val / total_rows) * 100
    print(f"  {col_name}: {count_val} valores faltantes ({percentage:.2f}%)")

total_missing = builtins.sum(null_counts.values())
print(f"\nTotal de valores faltantes detectados: {total_missing}")


=== AN√ÅLISIS DE VALORES FALTANTES ===

  latitude: 5055 valores faltantes (50.55%)
  price_per_m2: 5041 valores faltantes (50.41%)
  longitude: 4919 valores faltantes (49.19%)
  rooms: 4004 valores faltantes (40.04%)
  bathrooms: 3992 valores faltantes (39.92%)
  surface_m2: 3983 valores faltantes (39.83%)
  furnished: 3907 valores faltantes (39.07%)
  price_eur: 3347 valores faltantes (33.47%)
  listing_id: 3321 valores faltantes (33.21%)
  balcony: 2579 valores faltantes (25.79%)
  condition: 2533 valores faltantes (25.33%)
  elevator: 2451 valores faltantes (24.51%)
  has_parking: 2447 valores faltantes (24.47%)
  energy_certificate: 2234 valores faltantes (22.34%)
  address: 1966 valores faltantes (19.66%)
  neighborhood: 1425 valores faltantes (14.25%)
  agency: 1422 valores faltantes (14.22%)
  operation: 1399 valores faltantes (13.99%)
  floor: 1277 valores faltantes (12.77%)

Total de valores faltantes detectados: 57302


In [32]:
# An√°lisis de valores √∫nicos y duplicados
print("=== AN√ÅLISIS DE VALORES √öNICOS Y DUPLICADOS ===\n")
duplicate_count = df_raw.count() - df_raw.dropDuplicates().count()
print(f"Filas duplicadas: {duplicate_count}")

print(f"\nValores √∫nicos por columna:")
for col_name in df_raw.columns:
    unique_count = df_raw.select(col_name).distinct().count()
    print(f"  {col_name}: {unique_count} valores √∫nicos")


=== AN√ÅLISIS DE VALORES √öNICOS Y DUPLICADOS ===

Filas duplicadas: 0

Valores √∫nicos por columna:
  listing_id: 6680 valores √∫nicos
  operation: 7 valores √∫nicos
  district: 14 valores √∫nicos
  neighborhood: 14 valores √∫nicos
  address: 1148 valores √∫nicos
  surface_m2: 405 valores √∫nicos
  rooms: 10 valores √∫nicos
  bathrooms: 7 valores √∫nicos
  price_eur: 4644 valores √∫nicos
  price_per_m2: 4427 valores √∫nicos
  floor: 8 valores √∫nicos
  elevator: 8 valores √∫nicos
  balcony: 8 valores √∫nicos
  furnished: 5 valores √∫nicos
  condition: 8 valores √∫nicos
  energy_certificate: 9 valores √∫nicos
  has_parking: 8 valores √∫nicos
  latitude: 2108 valores √∫nicos
  longitude: 2362 valores √∫nicos
  agency: 7 valores √∫nicos


In [33]:
# Detecci√≥n de valores problem√°ticos
print("=== DETECCI√ìN DE VALORES PROBLEM√ÅTICOS ===\n")
valores_problematicos = ['?', 'N/A', 'n/a', 'NULL', 'null', 'unknown', 'Unknown', '']

for col_name in df_raw.columns:
    problematic_count = df_raw.filter(col(col_name).isin(valores_problematicos)).count()
    if problematic_count > 0:
        print(f"{col_name}: {problematic_count} valores problem√°ticos detectados")
        
# Verificar espacios en blanco al inicio/final (usando trim)
print("\nVerificando espacios en blanco:")
for col_name in df_raw.columns:
    # Contar filas donde el valor trimado es diferente al original
    has_spaces = df_raw.filter(trim(col(col_name)) != col(col_name)).count()
    if has_spaces > 0:
        print(f"  {col_name}: {has_spaces} valores con espacios al inicio/final")


=== DETECCI√ìN DE VALORES PROBLEM√ÅTICOS ===

operation: 1399 valores problem√°ticos detectados
district: 686 valores problem√°ticos detectados
neighborhood: 1425 valores problem√°ticos detectados
address: 1966 valores problem√°ticos detectados
surface_m2: 3983 valores problem√°ticos detectados
rooms: 4004 valores problem√°ticos detectados
bathrooms: 3992 valores problem√°ticos detectados
price_eur: 3347 valores problem√°ticos detectados
price_per_m2: 5041 valores problem√°ticos detectados
floor: 1277 valores problem√°ticos detectados
elevator: 2451 valores problem√°ticos detectados
balcony: 2579 valores problem√°ticos detectados
furnished: 3907 valores problem√°ticos detectados
condition: 2533 valores problem√°ticos detectados
energy_certificate: 2234 valores problem√°ticos detectados
has_parking: 2447 valores problem√°ticos detectados
latitude: 5055 valores problem√°ticos detectados
longitude: 4919 valores problem√°ticos detectados
agency: 1422 valores problem√°ticos detectados

Veri

## TRANSFORM: Limpieza de Datos

### Paso 1: Crear copia para trabajar


In [34]:
# Crear copia del dataframe (en PySpark, los DataFrames son inmutables, as√≠ que trabajamos directamente)
df_clean = df_raw
print(f"‚úÖ Dataframe listo para trabajar. Filas: {df_clean.count()}")


‚úÖ Dataframe listo para trabajar. Filas: 10000


### Paso 2: Eliminar espacios (strip) en columnas de texto


In [35]:
# Aplicar trim() a todas las columnas (elimina espacios al inicio y final)
for col_name in df_clean.columns:
    df_clean = df_clean.withColumn(col_name, trim(col(col_name)))

print("‚úÖ Espacios eliminados de todas las columnas")


‚úÖ Espacios eliminados de todas las columnas


### Paso 3: Reemplazar valores vac√≠os por NULL


In [36]:
# Reemplazar valores que representan "vac√≠o" por NULL (None en PySpark)
valores_vacios = ['', ' ', 'nan', 'None', 'N/A', 'n/a', 'NULL', 'null', '?', 'unknown']

for col_name in df_clean.columns:
    df_clean = df_clean.withColumn(
        col_name,
        when(col(col_name).isin(valores_vacios) | col(col_name).isNull(), None)
        .otherwise(col(col_name))
    )

print("‚úÖ Valores vac√≠os convertidos a NULL")
print(f"\nValores NULL por columna:")
null_counts = {}
for col_name in df_clean.columns:
    null_count = df_clean.filter(col(col_name).isNull()).count()
    if null_count > 0:
        null_counts[col_name] = null_count

for col_name, count in sorted(null_counts.items(), key=lambda x: x[1], reverse=True):
    print(f"  {col_name}: {count}")


‚úÖ Valores vac√≠os convertidos a NULL

Valores NULL por columna:
  latitude: 5055
  price_per_m2: 5041
  longitude: 4919
  rooms: 4004
  bathrooms: 3992
  surface_m2: 3983
  furnished: 3907
  address: 3904
  price_eur: 3347
  listing_id: 3321
  balcony: 2579
  condition: 2533
  elevator: 2451
  has_parking: 2447
  energy_certificate: 2234
  neighborhood: 1425
  agency: 1422
  operation: 1399
  floor: 1277


### Paso 4: Convertir tipos de datos adecuados

Primero definimos UDFs (User Defined Functions) para las funciones personalizadas


In [37]:
# Definir UDFs para extraer n√∫meros y convertir texto

# UDF para extraer n√∫meros de strings
def extract_number_udf(value):
    """Extrae el primer n√∫mero de un string"""
    if value is None:
        return None
    value_str = str(value)
    numbers = re.findall(r'\d+\.?\d*', value_str)
    if numbers:
        return float(numbers[0])
    return None

# UDF para convertir texto a n√∫mero
def text_to_number_udf(value):
    """Convierte texto a n√∫mero"""
    if value is None:
        return None
    value_str = str(value).lower().strip()
    
    text_map = {
        'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5,
        'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10
    }
    
    if value_str in text_map:
        return text_map[value_str]
    
    if '+' in value_str:
        nums = re.findall(r'\d+', value_str)
        if nums:
            return int(nums[0])
    
    numbers = re.findall(r'\d+\.?\d*', value_str)
    if numbers:
        return float(numbers[0])
    return None

# UDF para extraer precio
def extract_price_udf(value):
    """Extrae precio num√©rico"""
    if value is None:
        return None
    value_str = str(value).replace('‚Ç¨', '').replace('.', '').replace(',', '.').strip()
    numbers = re.findall(r'\d+\.?\d*', value_str)
    if numbers:
        return float(numbers[0])
    return None

# UDF para extraer precio por m¬≤
def extract_price_m2_udf(value):
    """Extrae precio por m¬≤"""
    if value is None:
        return None
    value_str = str(value).replace('‚Ç¨/m2', '').replace('‚Ç¨/m¬≤', '').replace('.', '').replace(',', '.').strip()
    numbers = re.findall(r'\d+\.?\d*', value_str)
    if numbers:
        return float(numbers[0])
    return None

# Registrar UDFs
extract_number = udf(extract_number_udf, DoubleType())
text_to_number = udf(text_to_number_udf, DoubleType())
extract_price = udf(extract_price_udf, DoubleType())
extract_price_m2 = udf(extract_price_m2_udf, DoubleType())

print("‚úÖ UDFs definidas y registradas")


‚úÖ UDFs definidas y registradas


In [38]:
# Limpiar columnas num√©ricas usando las UDFs

# Limpiar surface_m2
if 'surface_m2' in df_clean.columns:
    df_clean = df_clean.withColumn('surface_m2', extract_number('surface_m2'))

# Limpiar rooms
if 'rooms' in df_clean.columns:
    df_clean = df_clean.withColumn('rooms', text_to_number('rooms'))

# Limpiar bathrooms
if 'bathrooms' in df_clean.columns:
    df_clean = df_clean.withColumn('bathrooms', text_to_number('bathrooms'))

# Limpiar price_eur
if 'price_eur' in df_clean.columns:
    df_clean = df_clean.withColumn('price_eur', extract_price('price_eur'))

# Limpiar price_per_m2
if 'price_per_m2' in df_clean.columns:
    df_clean = df_clean.withColumn('price_per_m2', extract_price_m2('price_per_m2'))

# Convertir coordenadas a num√©rico
if 'latitude' in df_clean.columns:
    df_clean = df_clean.withColumn('latitude', col('latitude').cast('double'))
if 'longitude' in df_clean.columns:
    df_clean = df_clean.withColumn('longitude', col('longitude').cast('double'))

print("‚úÖ Columnas num√©ricas limpiadas y convertidas")
print(f"\nTipos de datos num√©ricos:")
df_clean.select('surface_m2', 'rooms', 'bathrooms', 'price_eur', 'price_per_m2', 'latitude', 'longitude').printSchema()


‚úÖ Columnas num√©ricas limpiadas y convertidas

Tipos de datos num√©ricos:
root
 |-- surface_m2: double (nullable = true)
 |-- rooms: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- price_eur: double (nullable = true)
 |-- price_per_m2: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [39]:
# Convertir columnas que deben ser enteros
if 'rooms' in df_clean.columns:
    df_clean = df_clean.withColumn('rooms', col('rooms').cast('int'))
if 'bathrooms' in df_clean.columns:
    df_clean = df_clean.withColumn('bathrooms', col('bathrooms').cast('int'))

print("‚úÖ Columnas convertidas a enteros")


‚úÖ Columnas convertidas a enteros


In [40]:
# Asegurar que las columnas de texto sean string
text_cols = ['listing_id', 'operation', 'district', 'neighborhood', 'address', 
             'floor', 'condition', 'energy_certificate', 'agency']

for col_name in text_cols:
    if col_name in df_clean.columns:
        df_clean = df_clean.withColumn(
            col_name,
            when(col(col_name).isNull() | (col(col_name) == 'nan'), None)
            .otherwise(col(col_name).cast('string'))
        )

print("‚úÖ Columnas de texto convertidas a string")


‚úÖ Columnas de texto convertidas a string


In [41]:
# Convertir columnas booleanas
boolean_cols = ['elevator', 'balcony', 'furnished', 'has_parking']

for col_name in boolean_cols:
    if col_name in df_clean.columns:
        df_clean = df_clean.withColumn(
            col_name,
            when(lower(trim(col(col_name))).isin(['y', 'yes', 's√≠', 'si', 's', '1', 'true']), True)
            .when(lower(trim(col(col_name))).isin(['n', 'no', '0', 'false']), False)
            .otherwise(None)
        )

print("‚úÖ Columnas booleanas convertidas")


‚úÖ Columnas booleanas convertidas


### Paso 5: Rellenar valores faltantes


In [42]:
# Rellenar valores faltantes
# Para columnas de texto: rellenar con "{nombre_columna} empty"
# Para columnas num√©ricas: rellenar con la media
# Para columnas booleanas: rellenar con False

import builtins  # Para usar round() de Python en lugar del de PySpark

print("=== RELLENANDO VALORES FALTANTES ===\n")

# Obtener esquema para identificar tipos
schema = df_clean.schema

# Crear diccionario para fillna
fill_dict = {}

# Rellenar columnas de texto (StringType)
for field in schema.fields:
    if isinstance(field.dataType, StringType):
        fill_dict[field.name] = f"{field.name} empty"
        print(f"‚úÖ {field.name}: valores rellenados con '{fill_dict[field.name]}'")

# Rellenar columnas num√©ricas con la media
numeric_cols = ['surface_m2', 'rooms', 'bathrooms', 'price_eur', 'price_per_m2', 'latitude', 'longitude']

for col_name in numeric_cols:
    if col_name in df_clean.columns:
        # Calcular media
        mean_value = df_clean.agg(avg(col(col_name)).alias('mean')).collect()[0]['mean']
        
        if mean_value is not None:
            # Si es int, redondear usando round() de Python (builtins)
            if col_name in ['rooms', 'bathrooms']:
                mean_value = int(builtins.round(mean_value))
                fill_dict[col_name] = mean_value
                print(f"‚úÖ {col_name}: valores rellenados con media = {mean_value} (entero)")
            else:
                fill_dict[col_name] = mean_value
                print(f"‚úÖ {col_name}: valores rellenados con media = {mean_value:.2f}")

# Rellenar columnas booleanas con False
for field in schema.fields:
    if isinstance(field.dataType, BooleanType):
        fill_dict[field.name] = False
        print(f"‚úÖ {field.name}: valores rellenados con False")

# Aplicar fillna
df_clean = df_clean.fillna(fill_dict)

# Verificar valores NULL restantes
null_count = 0
for col_name in df_clean.columns:
    col_null_count = df_clean.filter(col(col_name).isNull()).count()
    if col_null_count > 0:
        null_count += col_null_count

print(f"\n‚úÖ Todos los valores faltantes han sido rellenados")
print(f"Valores NULL restantes: {null_count}")


=== RELLENANDO VALORES FALTANTES ===

‚úÖ listing_id: valores rellenados con 'listing_id empty'
‚úÖ operation: valores rellenados con 'operation empty'
‚úÖ district: valores rellenados con 'district empty'
‚úÖ neighborhood: valores rellenados con 'neighborhood empty'
‚úÖ address: valores rellenados con 'address empty'
‚úÖ floor: valores rellenados con 'floor empty'
‚úÖ condition: valores rellenados con 'condition empty'
‚úÖ energy_certificate: valores rellenados con 'energy_certificate empty'
‚úÖ agency: valores rellenados con 'agency empty'
‚úÖ surface_m2: valores rellenados con media = 106.58
‚úÖ rooms: valores rellenados con media = 3 (entero)
‚úÖ bathrooms: valores rellenados con media = 1 (entero)
‚úÖ price_eur: valores rellenados con media = 263348.96
‚úÖ price_per_m2: valores rellenados con media = 281963.15
‚úÖ latitude: valores rellenados con media = 41.19
‚úÖ longitude: valores rellenados con media = 2.08
‚úÖ elevator: valores rellenados con False
‚úÖ balcony: valores rellena

### Paso 6: Capitalizar texto (Title Case)


In [43]:
# Capitalizar texto en columnas de string (Title Case)
# UDF para capitalizar texto
def capitalize_text_udf(value):
    """Capitaliza la primera letra de cada palabra"""
    if value is None:
        return None
    # Convertir a string y capitalizar cada palabra
    return str(value).title()

capitalize_text = udf(capitalize_text_udf, StringType())

# Aplicar capitalizaci√≥n a columnas de texto (excepto las que tienen valores "X empty")
text_cols_to_capitalize = ['operation', 'district', 'neighborhood', 'condition', 'energy_certificate']

for col_name in text_cols_to_capitalize:
    if col_name in df_clean.columns:
        # Solo capitalizar si no es un valor "empty"
        df_clean = df_clean.withColumn(
            col_name,
            when(col(col_name).contains(" empty"), col(col_name))
            .otherwise(initcap(col(col_name)))  # initcap capitaliza la primera letra de cada palabra
        )

print("‚úÖ Texto capitalizado (Title Case)")


‚úÖ Texto capitalizado (Title Case)


### Verificaci√≥n: Comparaci√≥n antes/despu√©s


In [44]:
# Mostrar ejemplos de limpieza
print("=== EJEMPLOS DE LIMPIEZA ===\n")
print("ANTES (RAW):")
df_raw.select('surface_m2', 'rooms', 'bathrooms', 'price_eur', 'price_per_m2', 'elevator', 'district').show(10, truncate=False)
print("\nDESPU√âS (CLEAN):")
df_clean.select('surface_m2', 'rooms', 'bathrooms', 'price_eur', 'price_per_m2', 'elevator', 'district').show(10, truncate=False)


=== EJEMPLOS DE LIMPIEZA ===

ANTES (RAW):
+----------+-----+---------+---------+------------+--------+-------------------+
|surface_m2|rooms|bathrooms|price_eur|price_per_m2|elevator|district           |
+----------+-----+---------+---------+------------+--------+-------------------+
|89 m¬≤     |?    |2        |?        |4240 ‚Ç¨/m2   |Y       |Unknown            |
|171       |N/A  |1        |?        |7920.91     |?       |Eixampl            |
|?         |2+   |?        |317642 ‚Ç¨ |?           |Y       |Sant Mart√≠         |
|N/A       |three|two      |N/A      |5484 ‚Ç¨/m2   |N       |SANTS              |
|?         |2+   |?        |N/A      |?           |S√≠      |SANTS              |
|127 m¬≤    |three|2        |491626 ‚Ç¨ |N/A         |Y       |Ciutat Vella       |
|?         |2+   |two      |N/A      |?           |N       |Sants-Montju√Øc     |
|?         |three|?        |1282371  |4093 ‚Ç¨/m2   |Y       |Sarri√†-Sant Gervasi|
|127 m¬≤    |2+   |3        |?        |6630.1     

### Resumen de la transformaci√≥n


In [45]:
print("=== RESUMEN DE LA TRANSFORMACI√ìN ===\n")
print(f"Filas: {df_clean.count()}")
print(f"Columnas: {len(df_clean.columns)}")
print(f"\nEsquema del dataset limpio:")
df_clean.printSchema()
print(f"\nPrimeras filas del dataset limpio:")
df_clean.show(5, truncate=False)


=== RESUMEN DE LA TRANSFORMACI√ìN ===

Filas: 10000
Columnas: 20

Esquema del dataset limpio:
root
 |-- listing_id: string (nullable = false)
 |-- operation: string (nullable = false)
 |-- district: string (nullable = false)
 |-- neighborhood: string (nullable = false)
 |-- address: string (nullable = false)
 |-- surface_m2: double (nullable = false)
 |-- rooms: integer (nullable = false)
 |-- bathrooms: integer (nullable = false)
 |-- price_eur: double (nullable = false)
 |-- price_per_m2: double (nullable = false)
 |-- floor: string (nullable = false)
 |-- elevator: boolean (nullable = false)
 |-- balcony: boolean (nullable = false)
 |-- furnished: boolean (nullable = false)
 |-- condition: string (nullable = false)
 |-- energy_certificate: string (nullable = false)
 |-- has_parking: boolean (nullable = false)
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- agency: string (nullable = false)


Primeras filas del dataset limpio:
+----------------

## LOAD: Guardar datos limpios


In [46]:
# Guardar el dataframe limpio como un solo archivo CSV
import os
import glob
import pandas as pd
import shutil

output_path = "../data/housing-barcelona-clean-pyspark.csv"
temp_dir = "../data/temp_pyspark_output"

# Eliminar el directorio de salida si existe (por si hay una ejecuci√≥n anterior)
if os.path.exists(output_path):
    if os.path.isdir(output_path):
        shutil.rmtree(output_path)
    else:
        os.remove(output_path)

# Eliminar directorio temporal si existe
if os.path.exists(temp_dir):
    shutil.rmtree(temp_dir)

# Guardar en directorio temporal con una sola partici√≥n
df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_dir)

# Encontrar el archivo CSV generado (PySpark crea archivos como part-00000-*.csv)
csv_files = glob.glob(os.path.join(temp_dir, "part-*.csv"))

if csv_files:
    # Leer el archivo CSV generado
    df_pandas = pd.read_csv(csv_files[0])
    
    # Guardar como un solo archivo CSV
    df_pandas.to_csv(output_path, index=False)
    
    # Eliminar el directorio temporal y su contenido
    shutil.rmtree(temp_dir)
    
    print(f"‚úÖ Datos limpios guardados en: {output_path}")
    print(f"\nArchivo guardado exitosamente con {len(df_pandas)} filas y {len(df_pandas.columns)} columnas")
else:
    print("‚ùå Error: No se encontr√≥ el archivo CSV generado")


‚úÖ Datos limpios guardados en: ../data/housing-barcelona-clean-pyspark.csv

Archivo guardado exitosamente con 10000 filas y 20 columnas


In [None]:
# CARGAR EN SQLITE: Crear Datawarehouse en SQLite
import sqlite3
from sqlalchemy import create_engine

print("=== CARGA EN SQLITE: CREANDO DATAWAREHOUSE ===\n")

# Convertir DataFrame de PySpark a Pandas para guardar en SQLite
print("üìä Convirtiendo DataFrame de PySpark a Pandas...")
df_pandas_clean = df_clean.toPandas()
print(f"‚úÖ DataFrame convertido: {len(df_pandas_clean)} filas")

# Ruta de la base de datos SQLite
db_path = "../warehouse/warehouse_pyspark.db"

# Crear conexi√≥n usando SQLAlchemy (requerido para to_sql)
engine = create_engine(f'sqlite:///{db_path}', echo=False)

# Preparar datos para tablas dimensionales
print("\nüìä Preparando datos para tablas dimensionales...")

# Tabla dim_district
df_dim_district = pd.DataFrame({
    'district_name': df_pandas_clean['district'].unique()
}).dropna()
df_dim_district = df_dim_district[df_dim_district['district_name'] != 'district empty']
print(f"‚úÖ dim_district: {len(df_dim_district)} distritos √∫nicos")

# Tabla dim_neighborhood
df_dim_neighborhood = df_pandas_clean[['neighborhood', 'district']].drop_duplicates()
df_dim_neighborhood = df_dim_neighborhood[
    (df_dim_neighborhood['neighborhood'] != 'neighborhood empty') &
    (df_dim_neighborhood['district'] != 'district empty')
].rename(columns={'neighborhood': 'neighborhood_name', 'district': 'district_name'})
print(f"‚úÖ dim_neighborhood: {len(df_dim_neighborhood)} barrios √∫nicos")

# Tabla dim_operation
df_dim_operation = pd.DataFrame({
    'operation_type': df_pandas_clean['operation'].unique()
}).dropna()
df_dim_operation = df_dim_operation[df_dim_operation['operation_type'] != 'operation empty']
print(f"‚úÖ dim_operation: {len(df_dim_operation)} tipos de operaci√≥n √∫nicos")

# Tabla dim_agency
df_dim_agency = pd.DataFrame({
    'agency_name': df_pandas_clean['agency'].unique()
}).dropna()
df_dim_agency = df_dim_agency[df_dim_agency['agency_name'] != 'agency empty']
print(f"‚úÖ dim_agency: {len(df_dim_agency)} agencias √∫nicas")

# Tabla dim_condition
df_dim_condition = pd.DataFrame({
    'condition_type': df_pandas_clean['condition'].unique()
}).dropna()
df_dim_condition = df_dim_condition[df_dim_condition['condition_type'] != 'condition empty']
print(f"‚úÖ dim_condition: {len(df_dim_condition)} condiciones √∫nicas")

# Tabla dim_energy_certificate
df_dim_energy_certificate = pd.DataFrame({
    'certificate_type': df_pandas_clean['energy_certificate'].unique()
}).dropna()
df_dim_energy_certificate = df_dim_energy_certificate[
    df_dim_energy_certificate['certificate_type'] != 'energy_certificate empty'
]
print(f"‚úÖ dim_energy_certificate: {len(df_dim_energy_certificate)} certificados √∫nicos")

# Guardar tablas dimensionales en SQLite
print("\nüíæ Guardando tablas dimensionales en SQLite...")
df_dim_district.to_sql('dim_district', engine, if_exists='replace', index=False)
df_dim_neighborhood.to_sql('dim_neighborhood', engine, if_exists='replace', index=False)
df_dim_operation.to_sql('dim_operation', engine, if_exists='replace', index=False)
df_dim_agency.to_sql('dim_agency', engine, if_exists='replace', index=False)
df_dim_condition.to_sql('dim_condition', engine, if_exists='replace', index=False)
df_dim_energy_certificate.to_sql('dim_energy_certificate', engine, if_exists='replace', index=False)
print("‚úÖ Tablas dimensionales guardadas")

# Preparar tabla de hechos (fact_housing)
print("\nüìä Preparando tabla de hechos...")
df_fact_housing = df_pandas_clean.copy()
print(f"‚úÖ fact_housing: {len(df_fact_housing)} filas preparadas")

# Guardar tabla de hechos en SQLite
print("\nüíæ Guardando tabla de hechos en SQLite...")
df_fact_housing.to_sql('fact_housing', engine, if_exists='replace', index=False)
print("‚úÖ Tabla de hechos guardada")

print(f"\n‚úÖ Datawarehouse creado exitosamente en: {db_path}")
print(f"   ‚Ä¢ 1 tabla de hechos: fact_housing")
print(f"   ‚Ä¢ 6 tablas de dimensiones: dim_district, dim_neighborhood, dim_operation, dim_agency, dim_condition, dim_energy_certificate")


## CREACI√ìN DEL DATAWAREHOUSE

### Generaci√≥n de DDLs para las tablas del Datawarehouse


In [47]:
# Generar DDL para el Datawarehouse basado en el esquema de PySpark
def pyspark_dtype_to_sql(dtype):
    """Convierte tipos de datos de PySpark a tipos SQL"""
    dtype_str = str(dtype).lower()
    if 'int' in dtype_str:
        return "INTEGER"
    elif 'double' in dtype_str or 'float' in dtype_str:
        return "REAL"
    elif 'boolean' in dtype_str or 'bool' in dtype_str:
        return "BOOLEAN"
    elif 'timestamp' in dtype_str or 'date' in dtype_str:
        return "TIMESTAMP"
    else:
        return "TEXT"

# Crear DDL para la tabla principal
ddl_statements = []
ddl_statements.append("-- ============================================")
ddl_statements.append("-- DDL para Datawarehouse - Housing Barcelona")
ddl_statements.append("-- Generado desde ETL con PySpark")
ddl_statements.append("-- ============================================\n")

ddl_statements.append("-- ============================================\n")
ddl_statements.append("-- TABLA DE HECHOS\n")
ddl_statements.append("-- ============================================\n")
ddl_statements.append("-- ============================================\n")
ddl_statements.append("-- TABLAS DIMENSIONALES\n")
ddl_statements.append("-- ============================================\n")
ddl_statements.append("-- Tabla dimensional: dim_district")
ddl_statements.append("CREATE TABLE IF NOT EXISTS dim_district (")
ddl_statements.append("    district_id INTEGER PRIMARY KEY AUTOINCREMENT,")
ddl_statements.append("    district_name TEXT UNIQUE NOT NULL")
ddl_statements.append(");\n")

ddl_statements.append("-- Tabla dimensional: dim_neighborhood")
ddl_statements.append("CREATE TABLE IF NOT EXISTS dim_neighborhood (")
ddl_statements.append("    neighborhood_id INTEGER PRIMARY KEY AUTOINCREMENT,")
ddl_statements.append("    neighborhood_name TEXT UNIQUE NOT NULL,")
ddl_statements.append("    district_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (district_id) REFERENCES dim_district(district_id)")
ddl_statements.append(");\n")

ddl_statements.append("-- Tabla dimensional: dim_operation")
ddl_statements.append("CREATE TABLE IF NOT EXISTS dim_operation (")
ddl_statements.append("    operation_id INTEGER PRIMARY KEY AUTOINCREMENT,")
ddl_statements.append("    operation_type TEXT UNIQUE NOT NULL")
ddl_statements.append(");\n")

ddl_statements.append("-- Tabla dimensional: dim_agency")
ddl_statements.append("CREATE TABLE IF NOT EXISTS dim_agency (")
ddl_statements.append("    agency_id INTEGER PRIMARY KEY AUTOINCREMENT,")
ddl_statements.append("    agency_name TEXT UNIQUE NOT NULL")
ddl_statements.append(");\n")

ddl_statements.append("-- Tabla dimensional: dim_condition")
ddl_statements.append("CREATE TABLE IF NOT EXISTS dim_condition (")
ddl_statements.append("    condition_id INTEGER PRIMARY KEY AUTOINCREMENT,")
ddl_statements.append("    condition_type TEXT UNIQUE NOT NULL")
ddl_statements.append(");\n")

ddl_statements.append("-- Tabla dimensional: dim_energy_certificate")
ddl_statements.append("CREATE TABLE IF NOT EXISTS dim_energy_certificate (")
ddl_statements.append("    certificate_id INTEGER PRIMARY KEY AUTOINCREMENT,")
ddl_statements.append("    certificate_type TEXT UNIQUE NOT NULL")
ddl_statements.append(");\n")

# Crear √≠ndices para mejorar el rendimiento
ddl_statements.append("-- √çndices para mejorar el rendimiento de consultas")
ddl_statements.append("CREATE INDEX IF NOT EXISTS idx_fact_price ON fact_housing(price_eur);")
ddl_statements.append("CREATE INDEX IF NOT EXISTS idx_fact_surface ON fact_housing(surface_m2);")

# Unir todas las declaraciones
ddl_sql = "\n".join(ddl_statements)

# Guardar DDL en archivo
import os
import builtins
import stat

# Usar ruta relativa desde el notebook (que est√° en notebooks/)
ddl_file_path = "../warehouse/modelo_datawarehouse_pyspark.sql"

# Obtener la ruta absoluta y normalizarla
ddl_file_abs = os.path.abspath(ddl_file_path)
warehouse_dir = os.path.dirname(ddl_file_abs)

# Verificar y crear el directorio warehouse de forma robusta
success = False
try:
    # Verificar si el directorio existe
    if not os.path.exists(warehouse_dir):
        print(f"üìÅ Creando directorio: {warehouse_dir}")
        # Crear el directorio y todos los padres necesarios
        os.makedirs(warehouse_dir, exist_ok=True, mode=0o777)
        print(f"‚úÖ Directorio creado exitosamente")
    else:
        print(f"üìÅ Directorio ya existe: {warehouse_dir}")
    
    # Verificar permisos de escritura
    if not os.access(warehouse_dir, os.W_OK):
        print(f"‚ö†Ô∏è No hay permisos de escritura en: {warehouse_dir}")
        # Intentar cambiar permisos
        try:
            os.chmod(warehouse_dir, 0o777)
            print(f"‚úÖ Permisos actualizados")
        except Exception as perm_error:
            print(f"‚ö†Ô∏è No se pudieron cambiar permisos: {perm_error}")
    
    # Intentar escribir el archivo
    print(f"üíæ Escribiendo archivo en: {ddl_file_abs}")
    with builtins.open(ddl_file_abs, 'w', encoding='utf-8') as f:
        f.write(ddl_sql)
    success = True
    print("‚úÖ DDL del Datawarehouse generado exitosamente")
    print(f"üìÑ Archivo guardado en: {ddl_file_abs}\n")
    
except (FileNotFoundError, PermissionError, OSError) as e:
    print(f"‚ùå Error al escribir en {ddl_file_abs}")
    print(f"   Error: {e}")
    print(f"   Directorio existe: {os.path.exists(warehouse_dir) if warehouse_dir else 'N/A'}")
    print(f"   Permisos de escritura: {os.access(warehouse_dir, os.W_OK) if os.path.exists(warehouse_dir) else 'N/A'}")
    print(f"\nüí° El directorio warehouse/ puede no estar montado correctamente en Docker.")
    print(f"   Verifica la configuraci√≥n del volumen en docker-compose.yml")

if success:
    print("=== DDL GENERADO ===")
    print(ddl_sql)
ddl_statements.append("-- Tabla de hechos: fact_housing")
ddl_statements.append("CREATE TABLE IF NOT EXISTS fact_housing (")
ddl_statements.append("    listing_id TEXT PRIMARY KEY,")
ddl_statements.append("    operation_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (operation_id) REFERENCES dim_operation(operation_id),")
ddl_statements.append("    district_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (district_id) REFERENCES dim_district(district_id),")
ddl_statements.append("    neighborhood_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (neighborhood_id) REFERENCES dim_neighborhood(neighborhood_id),")
ddl_statements.append("    address TEXT,")
ddl_statements.append("    surface_m2 REAL,")
ddl_statements.append("    rooms INTEGER,")
ddl_statements.append("    bathrooms INTEGER,")
ddl_statements.append("    price_eur REAL,")
ddl_statements.append("    price_per_m2 REAL,")
ddl_statements.append("    floor TEXT,")
ddl_statements.append("    elevator BOOLEAN,")
ddl_statements.append("    balcony BOOLEAN,")
ddl_statements.append("    furnished BOOLEAN,")
ddl_statements.append("    condition_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (condition_id) REFERENCES dim_condition(condition_id),")
ddl_statements.append("    energy_certificate_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (energy_certificate_id) REFERENCES dim_energy_certificate(certificate_id),")
ddl_statements.append("    has_parking BOOLEAN,")
ddl_statements.append("    latitude REAL,")
ddl_statements.append("    longitude REAL,")
ddl_statements.append("    agency_id INTEGER,")
ddl_statements.append("    FOREIGN KEY (agency_id) REFERENCES dim_agency(agency_id)")
ddl_statements.append(");\n")



üìÅ Directorio ya existe: /app/warehouse
üíæ Escribiendo archivo en: /app/warehouse/modelo_datawarehouse_pyspark.sql
‚úÖ DDL del Datawarehouse generado exitosamente
üìÑ Archivo guardado en: /app/warehouse/modelo_datawarehouse_pyspark.sql

=== DDL GENERADO ===
-- DDL para Datawarehouse - Housing Barcelona
-- Generado desde ETL con PySpark

-- Tabla de hechos: fact_housing
CREATE TABLE IF NOT EXISTS fact_housing (
    listing_id TEXT PRIMARY KEY,
    operation_id INTEGER,
    FOREIGN KEY (operation_id) REFERENCES dim_operation(operation_id),
    district_id INTEGER,
    FOREIGN KEY (district_id) REFERENCES dim_district(district_id),
    neighborhood_id INTEGER,
    FOREIGN KEY (neighborhood_id) REFERENCES dim_neighborhood(neighborhood_id),
    address TEXT,
    surface_m2 REAL,
    rooms INTEGER,
    bathrooms INTEGER,
    price_eur REAL,
    price_per_m2 REAL,
    floor TEXT,
    elevator BOOLEAN,
    balcony BOOLEAN,
    furnished BOOLEAN,
    condition_id INTEGER,
    FOREIGN KEY (