# Actividad 4 | Métricas de calidad de resultados

In [None]:
import kagglehub
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, hour, date_format, count, round, concat_ws, rand
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, percentile_approx


## 1. Construcción de la muestra M

Primero descargaremos el dataset de forma local y creamos la sesión de PySpark.

In [None]:
# Download the latest version
path = kagglehub.dataset_download("sobhanmoosavi/us-accidents")

print("Path to dataset files:", path)

dataset_path = path + "/US_Accidents_March23.csv"

In [None]:
# Create a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

spark

Ahora podemos cargar nuestro dataset como un dataframe de PySpark.

In [None]:
df = spark.read.csv(dataset_path, header=True, inferSchema=True)

df.show(5)

A continuación aplicamos nuestro muestreo del conjunto de datos, tal como se describió en la actividad anterior.

El primer paso consiste en crear las columnas Weather_Condition, Hora_Periodo y Tipo_Día.

In [None]:
df = df.withColumn("Weather_Type",
    when(col("Weather_Condition").isNull(), "Desconocido")
    .when(col("Weather_Condition").rlike("(?i)null|N/A"), "Desconocido")
    .when(col("Weather_Condition").rlike("(?i)Rain|Drizzle|Thunder|Storm|Snow|Sleet|Hail|Ice|Fog|Haze|Mist|Dust|Sand|Smoke|Wintry|Squall|Tornado|Ash|Funnel"), "Adverso")
    .otherwise("Favorable")
)

df = df.withColumn(
    "Hora_Periodo",
    when(hour("Start_Time") < 6, "Madrugada")
    .when(hour("Start_Time") < 18, "Alta actividad")
    .otherwise("Tarde-Noche")
)

df = df.withColumn("Dia_Semana", date_format("Start_Time", "E"))
df = df.withColumn(
    "Tipo_Día",
    when(col("Dia_Semana").isin("Sat", "Sun"), "Fin de semana").otherwise("Laboral")
)

Posteriormente, filtramos cualquier registro en el cual las columnas clave (Severity, Hora_Periodo, Tipo_Día y Weather_Type) son nulas.

In [None]:
df_filtrado = df.filter(
    (col("Severity").isNotNull()) &
    (col("Hora_Periodo").isNotNull()) &
    (col("Tipo_Día").isNotNull()) &
    (col("Weather_Type").isNotNull())
)

Ahora obtenemos los estratos a partir de las diferentes combinaciones de estas variables, al igual que la probabilidad para cada estrato.

In [None]:
total_registros = df_filtrado.count()

estratos = df_filtrado.groupBy("Severity", "Hora_Periodo", "Tipo_Día", "Weather_Type") \
    .agg(count("*").alias("frecuencia")) \
    .withColumn("probabilidad", round(col("frecuencia") / total_registros, 6)) \
    .orderBy(col("probabilidad").desc())

Para cada estrato, calculamos el número de elementos a incluir a partir del tamaño de la muestra deseado y la probabilidad para cada estrato. En este caso, buscamos una sub-muestra de 10,000 elementos.

In [None]:
# Tamaño total de muestra deseado
n_muestra = 10000

estratos = estratos.withColumn(
    "n_estrato",
    round(col("probabilidad") * n_muestra).cast("integer")
)

Unimos los dataframes con la información de los estratos con nuestro dataset.

In [None]:
# En df_filtrado (base depurada sin nulos en variables clave)
df_filtrado = df_filtrado.withColumn(
    "estrato_id",
    concat_ws("_", "Severity", "Hora_Periodo", "Tipo_Día", "Weather_Type")
)

# Igual en la tabla de estratos con probabilidades y n_estrato
estratos = estratos.withColumn(
    "estrato_id",
    concat_ws("_", "Severity", "Hora_Periodo", "Tipo_Día", "Weather_Type")
)

df_muestreo = df_filtrado.join(
    estratos.select("estrato_id", "n_estrato"),
    on="estrato_id",
    how="inner"
)

Ordenamos de forma aleatoria los elementos dentro de cada estrato.

In [None]:
# Asignar un número aleatorio y calcular el orden por estrato
df_muestreo = df_muestreo.withColumn("rand", rand(seed=42))

window = Window.partitionBy("estrato_id").orderBy("rand")

df_muestreo = df_muestreo.withColumn("row_num", row_number().over(window))

Finalmente, creamos nuestro data frame con la muestra a utilizar, incluyendo sólamente el número de elementos correspondiente a cada estrato.

In [None]:
df_muestra_final = df_muestreo.filter(col("row_num") <= col("n_estrato"))

Una vez construida la muestra, podemos persistir el dataframe para que PySpark optimice las transformaciones posteriores. Al persistir el DataFrame, nos aseguramos de "congelar" su estado actual, de modo que cualquier operación posterior tenga un punto de partida definido.

In [None]:
df_muestra_final = df_muestra_final.persist()

In [None]:
df_muestra_final.summary()

## 2. Construcción Train - Test

Previo a la construcción de los conjuntos train y test, vamos a realizar una limpieza de datos.
Esto consiste en:
- Eliminar columnas con metadatos que no proporcionarán valor a ningún modelo
- Eliminar columnas que son redundantes con otras columnas en el conjunto de datos
- Eliminar columnas donde más del 5% de los registros son valores faltantes
- De las columnas restantes, eliminar los registros donde existan valores faltantes

In [None]:
# Comenzamos con la limpieza de las columnas agregadas para el sub muestreo
cols_to_drop = ['ID', 'estrato_id', 'n_estrato', 'rand', 'row_num', 'rand', 'Source']


In [None]:
# Ahora las columnas irrelevantes o redundantes.
cols_to_drop += ['Start_Lng', 'End_Lng', 'Start_Lat', 'End_Lat', 'Street', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight', 'Wind_Chill(F)', 'Description', 'Wind_Direction', 'Sunrise_Sunset']


In [None]:
# Ignoramos las columnas que ya identificamos como columnas a remover
cols = [c for c in df_muestra_final.columns if c not in cols_to_drop]
total = df_muestra_final.count()

for c in cols:
    n_missing = df_muestra_final.filter(col(c).isNull()).count()
    if n_missing > (total*0.05):
        print(f'Dropping column {c}')
        cols_to_drop.append(c)


In [None]:
cols = [c for c in df_muestra_final.columns if c not in cols_to_drop]
df_muestra_final = df_muestra_final.dropna(subset=cols)

Una vez realizada la limpieza de datos, podemos proceder a crear nuestros conjuntos de train y test. Para este ejercicio, se utilizará una proporción de 80/20, la cual es comúnmnente utilizada en problemas de aprendizaje de máquina.

In [None]:
train_data,test_data = df_muestra_final.randomSplit([0.8,0.2], seed = 42)
print(f"""Existen {train_data.count()} instancias en el conjunto train, y {test_data.count()} en el conjunto test""")

## 3. Selección de métricas para medir calidad de resultados

El modelo de aprendizaje automático a crear durante este ejercicio será un modelo de clasificación binaria. La intención de este modelo es predecir si un accidente será de baja o alta severidad, basado en las condiciones bajo las cuales sucedió.

El caso de uso hipotético para este modelo es por parte de los equipos de respuesta a accidentes; la intención será que, una vez recibido un reporte de accidente, puedan utilizar este modelo para predecir su severidad y priorizar los recursos de respuesta de forma apropiada e informada.

Debido a que se trata de un modelo de clasificación binaria, se utilizarán las siguientes métricas para evaluar la calidad del modelo:
- Accuracy
- Precision
- Recall
- F1-Score

Además, considerando el contexto del problema, la métrica para la cual se busca optimizar será recall. Esto es debido a que se busca reducir la cantidad de falsos negativos; es decir, reducir la cantidad de accidentes de alta severidad que son catalogados como baja severidad. El razonamiento detrás de esta decisión es que el costo de un falso negativo (no atender inmediatamente un accidente grave) es mayor que el costo de un falso positivo (atender inmediatamente un accidente leve).

## 4. Entrenamiento de Modelos de Aprendizaje

Antes de comenzar a entrenar nuestro modelo, se realizará preprocesamiento básico del dataset con la finalidad de prepararlo para el entrenamiento.

La primera transformación consiste en crear una columna "Minutes" calculada a partir del tiempo de inicio y final de los accidentes. En el caso de uso hipotético en el que los equipos de respuesta utilizarán este modelo, la columna se calculará a partir de la hora a la que se reportó el accidente y la hora actual.

In [None]:
train_data = train_data.withColumn('Minutes', (col('End_Time').cast('long') - col('Start_Time').cast('long')) / 60)
test_data = train_data.withColumn('Minutes', (col('End_Time').cast('long') - col('Start_Time').cast('long')) / 60)

Ahora procedemos a eliminar outliers mediante la técnica IQR. Esto lo aplicaremos a las columnas Minutes y Distance.

In [None]:
# Calculate Q1 and Q3
quantiles = train_data.select(
    percentile_approx('Minutes', [0.25, 0.75], 10000).alias('quantiles')
).collect()[0]['quantiles']

Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

# Filter out outliers
train_data = train_data.filter(
    (col('Minutes') >= Q1 - 1.5 * IQR) &
    (col('Minutes') <= Q3 + 1.5 * IQR)
)
test_data = test_data.filter(
    (col('Minutes') >= Q1 - 1.5 * IQR) &
    (col('Minutes') <= Q3 + 1.5 * IQR)
)

In [None]:
# Calculate Q1 and Q3
quantiles = train_data.select(
    percentile_approx('Distance(mi)', [0.25, 0.75], 10000).alias('quantiles')
).collect()[0]['quantiles']

Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

# Filter out outliers
train_data = train_data.filter(
    (col('Distance(mi)') >= Q1 - 1.5 * IQR) &
    (col('Distance(mi)') <= Q3 + 1.5 * IQR)
)
test_data = test_data.filter(
    (col('Distance(mi)') >= Q1 - 1.5 * IQR) &
    (col('Distance(mi)') <= Q3 + 1.5 * IQR)
)

Ahora procedemos a crear nuestra columna objetivo a partir de la columna Severity. Para este ejercicio, severidades 1 y 2 se considerarán como accidentes leves, mientras que las severidades 3 y 4 se considerarán accidentes graves.

In [None]:
train_data = train_data.withColumn('IsSevere', (col('Severity') > 2).cast("bool"))
test_data = test_data.withColumn('IsSevere', (col('Severity') > 2).cast("bool"))

Ahora bien, procedemos a crear nuestro vector de características. Para este ejercicio, consideraremos las variables numéricas Distance, Visibility y Minutes; así como las variables categóricas Weather_Type, Hora_Periodo y Tipo_Día.

Para las variables numéricas, aplicamos escalamiento estándar.

In [None]:
cols_to_scale = ['Distance(mi)', 'Visibility(mi)', 'Minutes']

vectorizer = VectorAssembler(inputCols=cols_to_scale, outputCol="numerical_features")
train_data = vectorizer.transform(train_data)
test_data = vectorizer.transform(test_data)

scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_features")
fitted_scaler = scaler.fit(train_data)
train_data = fitted_scaler.transform(train_data)
test_data = fitted_scaler.transform(test_data)


Por su parte, las variables categóricas deben ser indexadas primero, para posteriormente codificar mediante One Hot encoding.

In [None]:
categorical_columns = ['Weather_Type', 'Dia_Semana', 'Tipo_Día', 'Hora_Periodo']

# Primero convertimos todas las columnas a índices
for c in categorical_columns:
    indexer = StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid='keep')
    fitted_indexer = indexer.fit(train_data)
    train_data = fitted_indexer.transform(train_data)
    test_data = fitted_indexer.transform(test_data)

In [None]:
categorical_index_cols = [f"{c}_index" for c in categorical_columns]

encoder = OneHotEncoder(inputCols=categorical_index_cols, outputCols=[f"{c}_vector" for c in categorical_columns], handleInvalid='keep')
fitted_encoder = encoder.fit(train_data)
train_data = fitted_encoder.transform(train_data)
test_data = fitted_encoder.transform(test_data)

Ahora creamos un solo vector que contenga las características para entrenar nuestro modelo.

In [None]:
cols_to_vectorize = ['Weather_Type_vector', 'Tipo_Día_vector', 'Hora_Periodo_vector', 'Severity', 'scaled_features']
vectorizer = VectorAssembler(inputCols=cols_to_vectorize, outputCol="input_features")
train_data = vectorizer.transform(train_data)
test_data = vectorizer.transform(test_data)

Finalmente, podemos crear un modelo de clasificación.

In [None]:
log_regression = LogisticRegression(featuresCol='input_features', labelCol='IsSevere').fit(train_data)

In [None]:
predictions = log_regression.evaluate(test_data)

evaluator = BinaryClassificationEvaluator(labelCol='IsSevere')
result = evaluator.evaluate(predictions.predictions)

## 5. Análisis de resultados