# Limpieza del dataset

### 1. Importación de librerías
Importamos las librerías de PySpark, pandas y Azure Blob Storage necesarias para la lectura de datos de nuestro contenedor y la limpieza de nuestro dataset.


In [1]:
from pyspark.sql import SparkSession
from azure.storage.blob import BlobServiceClient
from io import BytesIO
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, when, count, min, max, approx_count_distinct, substring
from pyspark.sql import Row

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 6, Finished, Available, Finished)

### 2. Conexión a Azure Blob Storage
Se configuran las credenciales y el path de acceso al archivo .parquet ubicado en el contenedor raw-data.

In [None]:
# Parámetros de conexión
storage_account_name = "accidentesml9046401304"
# Mala práctica: deberíamos pasar la clave como variable de entorno
# No tuvimos tiempo de cambiarlo
storage_account_key = "qPblehKg+PRBtekunhfYQOHNjG0slqHHu4/A4GqcJ60iWPr/cFAI0t9n8hrdexfQuZr/vYR5k+4m+AStpXAPtg=="
container_name = "raw-data"
input = "ingesta/ingesta_output.parquet"

# Inicializar Spark
spark = SparkSession.builder \
    .appName("Limpieza") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Configurar acceso a Blob Storage en Spark 
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_key
)

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 7, Finished, Available, Finished)

### 3. Carga del dataset desde Parquet
Se lee el archivo como un DataFrame de Spark, y se cachea para evitar múltiples lecturas del mismo origen.

In [3]:
input_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{input}"

# Leer parquet desde el contenedor
df_accidentes = spark.read.format("parquet").load(input_path)
df_accidentes.cache()

print("DataFrame:")
display(df_accidentes.limit(5))

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 8, Finished, Available, Finished)

DataFrame:


SynapseWidget(Synapse.DataFrame, 6ac25af0-8e30-4db6-9efe-578123ddb6de)

### 4. Eliminación de duplicados.
Se remueven registros duplicados para asegurar la consistencia e integridad de los datos.

In [4]:
# Eliminación de duplicados
df_accidentes = df_accidentes.dropDuplicates()

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 9, Finished, Available, Finished)

### 5. Verificación de campos temporales
Exploramos los campos MES, DIA_SEMANA, HORA y ANYO para entender su rango y variedad de valores, y detectar errores de formato o valores atípicos antes de aplicar transformaciones.

In [5]:
# Revisión de campos temporales
temporal_fields = ['MES', 'DIA_SEMANA', 'HORA', 'ANYO']
for field in temporal_fields:
    if field in df_accidentes.columns:
        stats = df_accidentes.select(
            min(col(field)).alias("min"),
            max(col(field)).alias("max"),
            approx_count_distinct(col(field)).alias("unique_vals")
        ).collect()[0]
        print(f"{field}: Rango {stats['min']}-{stats['max']}, {stats['unique_vals']} valores únicos")

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 10, Finished, Available, Finished)

MES: Rango 1-12, 12 valores únicos
DIA_SEMANA: Rango 1-7, 7 valores únicos
HORA: Rango 0-23, 24 valores únicos
ANYO: Rango 2022-2022, 1 valores únicos


### 6. Correlaciones de valores de ISLA por código postal
Se crea un df con el mapeo de códigos postales y sus respectivas islas (ISLA). Se utiliza un esquema explícito para asegurar que los códigos postales se traten como strings (preservando ceros a la izquierda). Esto permite mejorar la precisión geográfica de los registros.

In [6]:
# Mapeo de código postal a ISLA
data_cpislas = [
    ("Formentera", "07024", 4),
    ("Ibiza", "07026", 3),
    ("Ibiza", "07046", 3),
    ("Ibiza", "07050", 3),
    ("Ibiza", "07048", 3),
    ("Ibiza", "07054", 3),
    ("Mallorca", "07001", 1),
    ("Mallorca", "07003", 1),
    ("Mallorca", "07004", 1),
    ("Mallorca", "07005", 1),
    ("Mallorca", "07901", 1),
    ("Mallorca", "07006", 1),
    ("Mallorca", "07007", 1),
    ("Mallorca", "07008", 1),
    ("Mallorca", "07009", 1),
    ("Mallorca", "07010", 1),
    ("Mallorca", "07011", 1),
    ("Mallorca", "07012", 1),
    ("Mallorca", "07013", 1),
    ("Mallorca", "07014", 1),
    ("Mallorca", "07016", 1),
    ("Mallorca", "07017", 1),
    ("Mallorca", "07018", 1),
    ("Mallorca", "07019", 1),
    ("Mallorca", "07020", 1),
    ("Mallorca", "07021", 1),
    ("Mallorca", "07022", 1),
    ("Mallorca", "07025", 1),
    ("Mallorca", "07027", 1),
    ("Mallorca", "07028", 1),
    ("Mallorca", "07029", 1),
    ("Mallorca", "07030", 1),
    ("Mallorca", "07031", 1),
    ("Mallorca", "07033", 1),
    ("Mallorca", "07034", 1),
    ("Mallorca", "07035", 1),
    ("Mallorca", "07036", 1),
    ("Mallorca", "07038", 1),
    ("Mallorca", "07039", 1),
    ("Mallorca", "07040", 1),
    ("Mallorca", "07041", 1),
    ("Mallorca", "07044", 1),
    ("Mallorca", "07042", 1),
    ("Mallorca", "07043", 1),
    ("Mallorca", "07045", 1),
    ("Mallorca", "07059", 1),
    ("Mallorca", "07049", 1),
    ("Mallorca", "07051", 1),
    ("Mallorca", "07053", 1),
    ("Mallorca", "07055", 1),
    ("Mallorca", "07056", 1),
    ("Mallorca", "07057", 1),
    ("Mallorca", "07058", 1),
    ("Mallorca", "07047", 1),
    ("Mallorca", "07060", 1),
    ("Mallorca", "07061", 1),
    ("Mallorca", "07062", 1),
    ("Mallorca", "07063", 1),
    ("Mallorca", "07065", 1),
    ("Menorca", "07002", 2),
    ("Menorca", "07064", 2),
    ("Menorca", "07015", 2),
    ("Menorca", "07023", 2),
    ("Menorca", "07032", 2),
    ("Menorca", "07037", 2),
    ("Menorca", "07902", 2),
    ("Menorca", "07052", 2),
    ("Fuerteventura", "35003", 8),
    ("Fuerteventura", "35007", 8),
    ("Fuerteventura", "35014", 8),
    ("Fuerteventura", "35015", 8),
    ("Fuerteventura", "35017", 8),
    ("Fuerteventura", "35030", 8),
    ("Gran Canaria", "35001", 7),
    ("Gran Canaria", "35002", 7),
    ("Gran Canaria", "35020", 7),
    ("Gran Canaria", "35005", 7),
    ("Gran Canaria", "35006", 7),
    ("Gran Canaria", "35008", 7),
    ("Gran Canaria", "35009", 7),
    ("Gran Canaria", "35011", 7),
    ("Gran Canaria", "35012", 7),
    ("Gran Canaria", "35013", 7),
    ("Gran Canaria", "35016", 7),
    ("Gran Canaria", "35019", 7),
    ("Gran Canaria", "35021", 7),
    ("Gran Canaria", "35022", 7),
    ("Gran Canaria", "35023", 7),
    ("Gran Canaria", "35025", 7),
    ("Gran Canaria", "35026", 7),
    ("Gran Canaria", "35027", 7),
    ("Gran Canaria", "35032", 7),
    ("Gran Canaria", "35031", 7),
    ("Gran Canaria", "35033", 7),
    ("Lanzarote", "35004", 9),
    ("Lanzarote", "35010", 9),
    ("Lanzarote", "35018", 9),
    ("Lanzarote", "35024", 9),
    ("Lanzarote", "35028", 9),
    ("Lanzarote", "35029", 9),
    ("Lanzarote", "35034", 9),
    ("Gomera", "38002", 12),
    ("Gomera", "38003", 12),
    ("Gomera", "38021", 12),
    ("Gomera", "38036", 12),
    ("Gomera", "38049", 12),
    ("Gomera", "38050", 12),
    ("Hierro", "38013", 13),
    ("Hierro", "38901", 13),
    ("Hierro", "38048", 13),
    ("La Palma", "38007", 11),
    ("La Palma", "38008", 11),
    ("La Palma", "38009", 11),
    ("La Palma", "38014", 11),
    ("La Palma", "38016", 11),
    ("La Palma", "38024", 11),
    ("La Palma", "38027", 11),
    ("La Palma", "38029", 11),
    ("La Palma", "38030", 11),
    ("La Palma", "38033", 11),
    ("La Palma", "38037", 11),
    ("La Palma", "38045", 11),
    ("La Palma", "38047", 11),
    ("La Palma", "38053", 11),
    ("Tenerife", "38001", 10),
    ("Tenerife", "38004", 10),
    ("Tenerife", "38005", 10),
    ("Tenerife", "38006", 10),
    ("Tenerife", "38010", 10),
    ("Tenerife", "38011", 10),
    ("Tenerife", "38012", 10),
    ("Tenerife", "38015", 10),
    ("Tenerife", "38017", 10),
    ("Tenerife", "38018", 10),
    ("Tenerife", "38019", 10),
    ("Tenerife", "38020", 10),
    ("Tenerife", "38022", 10),
    ("Tenerife", "38025", 10),
    ("Tenerife", "38026", 10),
    ("Tenerife", "38028", 10),
    ("Tenerife", "38031", 10),
    ("Tenerife", "38032", 10),
    ("Tenerife", "38023", 10),
    ("Tenerife", "38034", 10),
    ("Tenerife", "38035", 10),
    ("Tenerife", "38038", 10),
    ("Tenerife", "38039", 10),
    ("Tenerife", "38040", 10),
    ("Tenerife", "38041", 10),
    ("Tenerife", "38042", 10),
    ("Tenerife", "38043", 10),
    ("Tenerife", "38044", 10),
    ("Tenerife", "38046", 10),
    ("Tenerife", "38051", 10),
    ("Tenerife", "38052", 10),
]

# Esquema explícito para asegurar que CP es texto y no pierde ceros
schema = StructType([
    StructField("NOMBRE_ISLA", StringType(), True),
    StructField("CP", StringType(), True),        # Muy importante: CP como String
    StructField("ISLA", IntegerType(), True),
])

# Crear DataFrame
df_cpislas = spark.createDataFrame(data_cpislas, schema)

# Mostrar para verificar
df_cpislas.show()

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 11, Finished, Available, Finished)

+-----------+-----+----+
|NOMBRE_ISLA|   CP|ISLA|
+-----------+-----+----+
| Formentera|07024|   4|
|      Ibiza|07026|   3|
|      Ibiza|07046|   3|
|      Ibiza|07050|   3|
|      Ibiza|07048|   3|
|      Ibiza|07054|   3|
|   Mallorca|07001|   1|
|   Mallorca|07003|   1|
|   Mallorca|07004|   1|
|   Mallorca|07005|   1|
|   Mallorca|07901|   1|
|   Mallorca|07006|   1|
|   Mallorca|07007|   1|
|   Mallorca|07008|   1|
|   Mallorca|07009|   1|
|   Mallorca|07010|   1|
|   Mallorca|07011|   1|
|   Mallorca|07012|   1|
|   Mallorca|07013|   1|
|   Mallorca|07014|   1|
+-----------+-----+----+
only showing top 20 rows



### 7. Corrección condicional de valores nulos en ISLA
Se reemplazan valores nulos de la columna ISLA con 0 si pertenecen a provincias insulares (7, 35, 38). Luego, se hace un join con el mapeo de códigos postales para intentar rellenar ese 0 con la ISLA correcta, si existe.

In [7]:
# Reemplazar ISLA nula por 0 solo si COD_PROVINCIA está entre 7, 35 o 38
df_accidentes = df_accidentes.withColumn(
    "ISLA",
    when(
        (col("ISLA").isNull()) & (col("COD_PROVINCIA").isin(7, 35, 38)),
        0
    ).otherwise(col("ISLA"))
)

# Join entre accidentes e islas por código postal
df_joined = df_accidentes.join(
    df_cpislas.select(col("CP").alias("COD_MUNICIPIO_ISLA"), col("ISLA").alias("ISLA_CORRECTA")),
    df_accidentes["COD_MUNICIPIO"] == col("COD_MUNICIPIO_ISLA"),
    how="left"
)

# Sustituir ISLA = 0 por el valor correcto si existe en el mapping
df_accidentes = df_joined.withColumn(
    "ISLA",
    when(col("ISLA") == 0, col("ISLA_CORRECTA")).otherwise(col("ISLA"))
).drop("COD_MUNICIPIO_ISLA", "ISLA_CORRECTA")

display(df_accidentes.select("COD_PROVINCIA", "CARRETERA", "ISLA").limit(20))

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d3fa251b-b353-4070-a0e4-f2b7e4a08307)

### 8. Corrección de ISLA usando el prefijo de la carretera
Algunos registros aún quedan con ISLA = 0. Se intenta inferir su valor en base a un patrón de abreviatura de carretera (ABV_CARRETERA) y el código de provincia. Esto se logra mediante un join condicional, donde se compara el prefijo de la carretera con un segundo mapeo manual que relaciona abreviaturas con islas.

In [8]:
data_carreteraisla = [
    Row(COD_PROVINCIA=7, ABV_CARRETERA='Ma', ISLA=1),
    Row(COD_PROVINCIA=7, ABV_CARRETERA='Me', ISLA=2),
    Row(COD_PROVINCIA=7, ABV_CARRETERA='EI', ISLA=3),
    Row(COD_PROVINCIA=7, ABV_CARRETERA='PM', ISLA=4),
    Row(COD_PROVINCIA=35, ABV_CARRETERA='GC', ISLA=7),
    Row(COD_PROVINCIA=35, ABV_CARRETERA='FV', ISLA=8),
    Row(COD_PROVINCIA=35, ABV_CARRETERA='LZ', ISLA=9),
    Row(COD_PROVINCIA=38, ABV_CARRETERA='TF', ISLA=10),
    Row(COD_PROVINCIA=38, ABV_CARRETERA='LP', ISLA=11),
    Row(COD_PROVINCIA=38, ABV_CARRETERA='GM', ISLA=12),
    Row(COD_PROVINCIA=38, ABV_CARRETERA='HI', ISLA=13),
]

df_carreteraisla = spark.createDataFrame(data_carreteraisla) \
    .withColumnRenamed("ISLA", "ISLA_CARRETERA")\
    .alias("mapa")

# Renombramos columnas de df_carreteraisla para que no choquen
df_carreteraisla_ren = df_carreteraisla \
    .withColumnRenamed("ISLA", "ISLA_CARRETERA") \
    .withColumnRenamed("COD_PROVINCIA", "COD_PROV_CARRETERA") \
    .withColumnRenamed("ABV_CARRETERA", "ABV_CARRETERA_CARRETERA")

# Hacemos el join solo para actualizar ISLA cuando es 0
df_joined = df_accidentes.join(
    df_carreteraisla_ren,
    (df_accidentes["ISLA"] == 0) &
    (df_accidentes["COD_PROVINCIA"] == df_carreteraisla_ren["COD_PROV_CARRETERA"]) &
    (substring(df_accidentes["CARRETERA"], 1, 2) == df_carreteraisla_ren["ABV_CARRETERA_CARRETERA"]),
    how="left"
)

# Reemplazamos ISLA si encontramos una coincidencia
df_accidentes = df_joined.withColumn(
    "ISLA",
    when((col("ISLA") == 0) & col("ISLA_CARRETERA").isNotNull(), col("ISLA_CARRETERA"))
    .otherwise(col("ISLA"))
).drop("ISLA_CARRETERA", "COD_PROV_CARRETERA", "ABV_CARRETERA_CARRETERA")

# Forzar tipo entero
df_accidentes = df_accidentes.withColumn("ISLA", col("ISLA").cast(IntegerType()))

# Mostrar resultado
display(df_accidentes.select("COD_PROVINCIA", "CARRETERA", "ISLA").limit(20))


StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5c71dc21-d868-4061-88a7-de5e34a0a148)

### 9. Reemplazo de valores nulos
Presuponemos que la mayor parte de los nulos de nuestro dataset es por falta de información. La mayoría de nuestras columnas son categóricas, por lo que los nulos simplemente los imputamos por la categoría Sin Información específica de la columna.    

Las columnas que creamos nosotros a partir del PDF (tráfico) no las vamos a tratar. A futuro tendríamos que encontrar otra fuente de datos con más registros y obtener mejor información del tráfico.

Los nulos de KM y CRUCE_CARRETERA tampoco los tratamos porque son nulos por falta de información. 

In [9]:
# Reemplazar nulos

# ISLA si la provincia es insular se reemplaza por 0 = Isla sin identificar
df_accidentes = df_accidentes.withColumn("ISLA", when((col("ISLA").isNull()) & (col("COD_PROVINCIA").isin(7, 35, 38)), 0).otherwise(col("ISLA")))
# ISLA si la provincia NO es insular se reemplaza por 14 = Peninsular
df_accidentes = df_accidentes.withColumn("ISLA", when(col("ISLA").isNull(), "14").otherwise(col("ISLA")))

# NUDO_INFO se reemplaza por 999 = Sin especificar
df_accidentes = df_accidentes.withColumn("NUDO_INFO", when(col("NUDO_INFO").isNull(), "999").otherwise(col("NUDO_INFO")))

#CONDICION_NIEBLA se reemplaza por 999 = Sin especificar
df_accidentes = df_accidentes.withColumn("CONDICION_NIEBLA", when(col("CONDICION_NIEBLA").isNull(), "999").otherwise(col("CONDICION_NIEBLA")))

#CONDICION_VIENTO se reemplaza por 0 = No se aprecia viento fuerte
df_accidentes = df_accidentes.withColumn("CONDICION_VIENTO", when(col("CONDICION_VIENTO").isNull(), "0").otherwise(col("CONDICION_VIENTO")))

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 14, Finished, Available, Finished)

### 10. Conversión de tipos
Las columnas corregidas son convertidas a tipo IntegerType, asegurando un esquema consistente y compatible con nuestros análisis posteriores.

In [10]:
df_accidentes = df_accidentes.withColumn("ISLA", col("ISLA").cast(IntegerType()))
df_accidentes = df_accidentes.withColumn("NUDO_INFO", col("NUDO_INFO").cast(IntegerType()))
df_accidentes = df_accidentes.withColumn("CONDICION_NIEBLA", col("CONDICION_NIEBLA").cast(IntegerType()))
df_accidentes = df_accidentes.withColumn("CONDICION_VIENTO", col("CONDICION_VIENTO").cast(IntegerType()))

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 15, Finished, Available, Finished)

### 11. Outliers

Analizamos nuestros datos y nuestros posibles outliers, pero hemos decidido que no los consideramos outlier y no vamos a tratarlos principalmente por:
- La mayoría de nuestras columnas son categóricas, por lo que no son outliers.
- Las variables numéricas son en su mayoría codificaciones numéricas de categorías, y no representan medidas continuas susceptibles a valores extremos.
- Hemos analizado los boxplots de las variables numéricas y no hemos identificado valores que se comporten como atípicos o inconsistentes dentro del contexto de nuestros datos.

### 12. Verificación de los datos
Imprimimos el esquema final y los valores únicos de ISLA, además del cruce COD_PROVINCIA - ISLA, para validar que las transformaciones se aplicaron correctamente y que no quedan inconsistencias evidentes.

In [12]:
df_accidentes.printSchema()

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 17, Finished, Available, Finished)

root
 |-- ID_ACCIDENTE: long (nullable = true)
 |-- CARRETERA: string (nullable = true)
 |-- KM: integer (nullable = true)
 |-- ANYO: long (nullable = true)
 |-- MES: long (nullable = true)
 |-- DIA_SEMANA: long (nullable = true)
 |-- HORA: long (nullable = true)
 |-- COD_PROVINCIA: long (nullable = true)
 |-- COD_MUNICIPIO: string (nullable = true)
 |-- ISLA: integer (nullable = true)
 |-- ZONA: long (nullable = true)
 |-- ZONA_AGRUPADA: long (nullable = true)
 |-- SENTIDO_1F: long (nullable = true)
 |-- TITULARIDAD_VIA: long (nullable = true)
 |-- TIPO_VIA: long (nullable = true)
 |-- TIPO_ACCIDENTE: long (nullable = true)
 |-- TOTAL_MU24H: long (nullable = true)
 |-- TOTAL_HG24H: long (nullable = true)
 |-- TOTAL_HL24H: long (nullable = true)
 |-- TOTAL_VICTIMAS_24H: long (nullable = true)
 |-- TOTAL_MU30DF: long (nullable = true)
 |-- TOTAL_HG30DF: long (nullable = true)
 |-- TOTAL_HL30DF: long (nullable = true)
 |-- TOTAL_VICTIMAS_30DF: long (nullable = true)
 |-- TOTAL_VEHICULOS:

In [13]:
df_accidentes.select("ISLA").distinct().orderBy("ISLA").show()

StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 18, Finished, Available, Finished)

+----+
|ISLA|
+----+
|   0|
|   1|
|   2|
|   3|
|   4|
|   7|
|   8|
|   9|
|  10|
|  11|
|  12|
|  13|
|  14|
+----+



In [14]:
df_accidentes.select("COD_PROVINCIA", "ISLA") \
    .distinct() \
    .orderBy("COD_PROVINCIA", "ISLA") \
    .show(truncate=False)


StatementMeta(b2c68712-eec7-4103-8184-d5da7dcf1f19, 36, 19, Finished, Available, Finished)

+-------------+----+
|COD_PROVINCIA|ISLA|
+-------------+----+
|1            |14  |
|2            |14  |
|3            |14  |
|4            |14  |
|5            |14  |
|6            |14  |
|7            |0   |
|7            |1   |
|7            |2   |
|7            |3   |
|7            |4   |
|8            |14  |
|9            |14  |
|10           |14  |
|11           |14  |
|12           |14  |
|13           |14  |
|14           |14  |
|15           |14  |
|16           |14  |
+-------------+----+
only showing top 20 rows



### 13. Guardado final del parquet

In [None]:
container_name = "processed-data"
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/limpieza_output.parquet"

# Coalesce(1) para un solo archivo
df_accidentes.coalesce(1).write.mode("overwrite").parquet(output_path)

print(f"Data guardada en parquet en: {output_path}")

StatementMeta(, 36, -1, Cancelled, , Cancelled)