# Accidentes de tráfico en Reino Unido entre 2010 y 2014 

### Disponible en Kaggle en:
https://www.kaggle.com/stefanoleone992/adm-project-road-accidents-in-uk

### Variables y significado

* Accident_Index: Accident index
* Latitude: Accident latitude
* Longitude: Accident longitude
* Region: Accident region
* Urban_or_Rural_Area: Accident area (rural or urban)
* X1st_Road_Class: Accident road class
* Driver_IMD_Decile: Road IMD Decile
* Speed_limit: Road speed limit
* Road_Type: Road type
* Road_Surface_Conditions: Road surface condition
* Weather: Weather
* High_Wind: High wind
* Lights: Road lights
* Datetime: Accident datetime
* Year: Accident year
* Season: Accident season
* Month_of_Year: Accident month
* Day_of_Month: Accident day of month
* Day_of_Week: Accident day of week
* Hour_of_Day: Accident hour of day
* Number_of_Vehicles: Accident number of vehicles
* Age_of_Driver: Driver age
* Age_of_Vehicle: Vehicle age
* Junction_Detail: Accident junction detail
* Junction_Location: Accident junction location
* X1st_Point_of_Impact: Vehicle first point of impact
* Driver_Journey_Purpose: Driver journey purpose
* Engine_CC: Vehicle engine power (in CC)
* Propulsion_Code: Vehicle propulsion code
* Vehicle_Make: Vehicle brand
* Vehicle_Category: Vehicle brand category
* Vehicle_Manoeuvre: Vehicle manoeuvre when accident happened
* Accident_Severity: Accident severity

**Nombre completo del alumno:**  

# INSTRUCCIONES 

En cada celda debes responder a la pregunta formulada, asegurándote de que el resultado queda guardado en la(s) variable(s) que por defecto vienen inicializadas a `None`. No se necesita usar variables intermedias, pero puedes hacerlo siempre que el resultado final del cálculo quede guardado exactamente en la variable que venía inicializada a None (debes reemplazar None por la secuencia de transformaciones necesarias, pero nunca cambiar el nombre de esa variable). 

**No olvides borrar la línea *raise NotImplementedError()* de cada celda cuando hayas completado la solución de esa celda y quieras probarla**.

Después de cada celda evaluable verás una celda con código. Ejecútala (no modifiques su código) y te dirá si tu solución es correcta o no. Además de esas pruebas, se realizarán algunas más (ocultas) a la hora de puntuar el ejercicio, pero evaluar dicha celda es un indicador bastante fiable acerca de si realmente has implementado la solución correcta o no. Asegúrate de que, al menos, todas las celdas indican que el código es correcto antes de enviar el notebook terminado.

**Nunca se debe redondear ninguna cantidad si no lo pide explícitamente el enunciado**

### Cada solución debe escribirse obligatoriamente en la celda habilitada para ello. Cualquier celda adicional que se haya creado durante el desarrollo deberá ser eliminada.

Si necesitas crear celdas auxiliares durante el desarrollo, puedes hacerlo pero debes asegurarte de borrarlas antes de entregar el notebook.

### Sobre el dataset anterior (accidents_uk.csv) se pide:

**Ejercicio 1 (1.5 puntos)** 
* Leerlo tratando de que Spark infiera el tipo de dato de cada columna.
* Crear una columna llamada `Age_Category` renombrando los valores de la columna `Age_of_Driver` donde los valores 1 y 2 de la columna original sean etiquetados en la columna nueva como "Adolescente", los valores 3 y 4 como "Joven", los valores 5 y 6 como "Adulto", y los valores 7 y 8 como "Anciano".
* Crear una columna llamada `hora` aplicando la función `F.hour` a la columna `"Datetime"` ya existente.
* El resultado debe guardarse **cacheado** en la variable `accidentesDF`.

In [28]:




from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, hour
from pyspark.sql.types import *

# Creamos la sesión de Spark
spark = SparkSession.builder.appName("Accidentes").getOrCreate()

# Leer el archivo CSV
file_path = "/Users/kscortesp/Dropbox/Documentos/Estudios/Master IA/Hadoop:Spark/accidents_uk.csv"  # Asegúrate de que la ruta es correcta
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Crear la columna Age_Category
df = df.withColumn("Age_Category",
                                       when(col("Age_of_Driver").isin([1, 2]), "Adolescente") \
                                       .when(col("Age_of_Driver").isin([3, 4]), "Joven") \
                                       .when(col("Age_of_Driver").isin([5, 6]), "Adulto") \
                                       .when(col("Age_of_Driver").isin([7, 8]), "Anciano") \
                                       .otherwise("Unknown"))

# Creamos la columna hora
accidentesDF = df.withColumn("hora", hour(col("Datetime")))

# Cacheamos el DataFrame
accidentesDF.cache()


24/06/09 19:25:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


DataFrame[Accident_Index: string, Latitude: double, Longitude: double, Region: string, Urban_or_Rural_Area: string, X1st_Road_Class: string, Driver_IMD_Decile: int, Speed_limit: int, Road_Type: string, Road_Surface_Conditions: string, Weather: string, High_Wind: string, Lights: string, Datetime: timestamp, Year: int, Season: int, Month_of_Year: int, Day_of_Month: int, Day_of_Week: int, Hour_of_Day: double, Number_of_Vehicles: int, Age_of_Driver: int, Age_of_Vehicle: int, Junction_Detail: string, Junction_Location: string, X1st_Point_of_Impact: string, Driver_Journey_Purpose: string, Engine_CC: int, Propulsion_Code: string, Vehicle_Make: string, Vehicle_Category: string, Vehicle_Manoeuvre: string, Accident_Severity: string, Age_Category: string, hora: int]

In [29]:
from pyspark.sql.types import DoubleType
assert(accidentesDF.schema[1].dataType == DoubleType())
assert(accidentesDF.count() == 251832)

assert(dict(accidentesDF.dtypes)["Age_Category"] == "string")
collectedDF = accidentesDF.groupBy("Age_Category").count().orderBy("count").collect()
assert((collectedDF[0]["count"] == 22533) & (collectedDF[0]["Age_Category"] == "Anciano"))
assert((collectedDF[1]["count"] == 57174) & (collectedDF[1]["Age_Category"] == "Adolescente"))
assert((collectedDF[2]["count"] == 67138) & (collectedDF[2]["Age_Category"] == "Adulto"))
assert((collectedDF[3]["count"] == 104987) & (collectedDF[3]["Age_Category"] == "Joven"))


                                                                                

**Ejercicio 2 (3 puntos)** 

Partiendo de `accidentesDF`, queremos pegar (**sin hacer JOIN sino usando agregaciones sobre ventanas**) a cada accidente la siguiente información:
* Número de accidentes que ha habido *en ese mismo año con esa misma categoría de vehículo*, en una nueva columna `total_vehiculo_anio`.
* Número de accidentes que ha habido *en ese mismo año con esa misma categoría de vehículo y con esa misma situación (Junction Location)*, en una nueva columna `total_vehiculo_causa_anio`.
* Porcentaje (en tanto por uno) que supone el segundo dato sobre el primero, en la columna `porc_vehiculo_causa_anio`. Esta columna la podrás calcular tras haber calculado las dos anteriores, sin necesidad de utilizar ninguna ventana.
* Edad promedio de los accidentados *en ese mismo año con esa misma categoría de vehículo*, en una nueva columna `media_edad_vehiculo_anio`.
* Edad promedio de los accidentados *en ese mismo año con esa misma categoría de vehículo y con esa misma situación (Junction_Location)*, en una nueva columna `media_edad_vehiculo_causa_anio`.
* Desviación típica (función `F.stddev`) del dato anterior *en ese mismo año con ese mismo tipo de vehículo y con esa misma situación (Junction_Location)*, en una nuea columna `stddev_edad_vehiculo_causa_anio`.
* Guardar el DF resultante en una nueva variable llamada `accidentes_info_agregadaDF`.

PISTA: crear en las variables `ventana_vehiculo_anio` y `ventana_vehiculo_causa_anio` dos ventanas diferentes que definan los dos grupos distintos que intervienen en los cálculos anteriores (una de ellos con dos criterios y la otra con tres).

**Revisa el diccionario de variables y su significado para saber qué columnas debes utilizar**.

In [30]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Definimos las ventanas para los diferentes grupos
ventana_vehiculo_anio = Window.partitionBy("Year", "Vehicle_Category")
ventana_vehiculo_causa_anio = Window.partitionBy("Year", "Vehicle_Category", "Junction_Location")

# Calculamos los totales de accidentes por año y categoría de vehículo
accidentes_info_agregadaDF = accidentesDF.withColumn("total_vehiculo_anio",
                                                     F.count("Accident_Index").over(ventana_vehiculo_anio))

# Calculamos los totales de accidentes por año, categoría de vehículo y situación
accidentes_info_agregadaDF = accidentes_info_agregadaDF.withColumn("total_vehiculo_causa_anio",
                                                                   F.count("Accident_Index").over(ventana_vehiculo_causa_anio))

# Calculamos el porcentaje de accidentes con misma situación sobre total de accidentes con misma categoría y año
accidentes_info_agregadaDF = accidentes_info_agregadaDF.withColumn("porc_vehiculo_causa_anio",
                                                                   F.col("total_vehiculo_causa_anio") / F.col("total_vehiculo_anio"))

# Calculamos la edad promedio de los accidentados por año y categoría de vehículo
accidentes_info_agregadaDF = accidentes_info_agregadaDF.withColumn("media_edad_vehiculo_anio",
                                                                   F.avg("Age_of_Driver").over(ventana_vehiculo_anio))

# Calculamos la edad promedio de los accidentados por año, categoría de vehículo y situación
accidentes_info_agregadaDF = accidentes_info_agregadaDF.withColumn("media_edad_vehiculo_causa_anio",
                                                                   F.avg("Age_of_Driver").over(ventana_vehiculo_causa_anio))

# Calculamos la desviación estándar de la edad de los accidentados por año, categoría de vehículo y situación
accidentes_info_agregadaDF = accidentes_info_agregadaDF.withColumn("stddev_edad_vehiculo_causa_anio",
                                                                   F.stddev("Age_of_Driver").over(ventana_vehiculo_causa_anio))

# Guardamos el DataFrame resultante
accidentes_info_agregadaDF.cache()


DataFrame[Accident_Index: string, Latitude: double, Longitude: double, Region: string, Urban_or_Rural_Area: string, X1st_Road_Class: string, Driver_IMD_Decile: int, Speed_limit: int, Road_Type: string, Road_Surface_Conditions: string, Weather: string, High_Wind: string, Lights: string, Datetime: timestamp, Year: int, Season: int, Month_of_Year: int, Day_of_Month: int, Day_of_Week: int, Hour_of_Day: double, Number_of_Vehicles: int, Age_of_Driver: int, Age_of_Vehicle: int, Junction_Detail: string, Junction_Location: string, X1st_Point_of_Impact: string, Driver_Journey_Purpose: string, Engine_CC: int, Propulsion_Code: string, Vehicle_Make: string, Vehicle_Category: string, Vehicle_Manoeuvre: string, Accident_Severity: string, Age_Category: string, hora: int, total_vehiculo_anio: bigint, total_vehiculo_causa_anio: bigint, porc_vehiculo_causa_anio: double, media_edad_vehiculo_anio: double, media_edad_vehiculo_causa_anio: double, stddev_edad_vehiculo_causa_anio: double]

In [31]:
r = accidentes_info_agregadaDF.select(F.mean("total_vehiculo_anio").alias("total_vehiculo_anio"),
                                  F.mean("total_vehiculo_causa_anio").alias("total_vehiculo_causa_anio"),
                                  F.mean("porc_vehiculo_causa_anio").alias("porc_vehiculo_causa_anio"),
                                  F.mean("media_edad_vehiculo_causa_anio").alias("media_edad_vehiculo_causa_anio"),
                                 ).first()
assert(round(r.total_vehiculo_anio, 2) == 33843.98)
assert(round(r.total_vehiculo_causa_anio, 2) == 8185.52)
assert(round(r.porc_vehiculo_causa_anio, 2) == 0.25)
assert(round(r.media_edad_vehiculo_causa_anio, 2) == 3.90)

                                                                                

**Ejercicio 3 (1 punto)** Queremos saber si el tipo de vehículo está relacionado con la hora del día a la que se tienen más accidentes, y si es diferente entre cada tipo de vehículo. Para ello, partiendo de nuevo de `accidentesDF` 
* Crear un nuevo DF con tantas filas como horas del día existen, y tantas columnas como categorías de vehículo más una (que será justamente la hora). En cada casilla, debe contener el número de accidentes ocurridos a esa hora del día con ese tipo de vehículo.
* Ordenar el DF en base a la hora de menor a mayor.
* Almacenar el DF resultante en la variable `accidentes_hora_vehiculo`.

In [38]:
# Creamos un DataFrame pivoteando los datos para obtener el número de accidentes por hora y tipo de vehículo
accidentes_hora_vehiculo = accidentesDF.groupBy("hora").pivot("Vehicle_Category").count()

# Ordenamos el DataFrame por la hora del día de menor a mayor
accidentes_hora_vehiculo = accidentes_hora_vehiculo.orderBy("hora")

# Guardamos el DataFrame resultante
accidentes_hora_vehiculo.cache()
accidentes_hora_vehiculo.printSchema()
accidentes_hora_vehiculo.show()

root
 |-- hora: integer (nullable = true)
 |-- Bus/minibus: long (nullable = true)
 |-- Car: long (nullable = true)
 |-- Motorcycle: long (nullable = true)
 |-- Other: long (nullable = true)
 |-- Taxi: long (nullable = true)
 |-- Van: long (nullable = true)

+----+-----------+-----+----------+-----+----+----+
|hora|Bus/minibus|  Car|Motorcycle|Other|Taxi| Van|
+----+-----------+-----+----------+-----+----+----+
|   0|         10| 2438|       232|    5| 322|  97|
|   1|          8| 1743|       105|    6| 263|  78|
|   2|          3| 1270|        76|    5| 248|  61|
|   3|          5| 1122|        64|    4| 222|  64|
|   4|          1|  884|        73|    6| 112|  62|
|   5|          6| 1364|       216|    3|  82| 139|
|   6|          1| 3319|       595|   15| 101| 418|
|   7|         19| 8435|      1484|   21| 182| 972|
|   8|         44|15192|      1896|   37| 309|1161|
|   9|         27|10067|      1119|   39| 253| 906|
|  10|         24| 9086|      1072|   41| 220| 814|
|  11|       

24/06/09 19:27:27 WARN CacheManager: Asked to cache already cached data.


In [36]:
acc = accidentes_hora_vehiculo.collect()
assert(acc[0].hora == 0 and acc[0].Taxi == 324)
assert(acc[10].hora == 10 and acc[10].Other == 41)
assert(acc[15].hora == 15 and acc[15].Motorcycle == 1860)
assert(acc[19].hora == 19 and acc[19].Car == 10886)

AssertionError: 

**Ejercicio 4 (1 punto)** Partiendo de la variable `accidentes_hora_vehiculo` creada en el ejercicio anterior, crear un nuevo DF de **una sola fila** y tantas columnas como categorías de vehículos (es decir, 6). Debe contener, para cada columna, una *pareja del número de accidentes máximo que ocurre a lo largo del día, y la hora a la que se produjeron*. Para ello, en lugar de ir aplicando la función `F.max` a cada columna del DF anterior (dentro de una llamada a `select`), aplícala en cada momento lo que devuelve la función `F.struct(nombreColumna, "hora"`), es decir, `F.max(F.struct(nombreColumna, "hora"))`. De esta forma, estarás creando (al vuelo) un objeto columna de parejas, cuyo primer elemento de cada pareja es el número total de accidentes indicado en esa columna, y cuyo segundo elemento es la hora del día a la que se ha producido. La función `F.max` aplicada a una columna de tipo parejas tendrá en cuenta, por defecto, solamente el primer elemento de cada pareja para ordenar, así que escogerá la pareja que tiene un mayor número de accidentes ya que ese valor es el primer elemento de cada pareja, pero lo mostrará como pareja, con lo que veremos la hora del día a la que va aparejado ese número de accidentes.

Cada columna de pares mostrada por F.max debe renombrarse exactamente con el nombre de la categoría de vehículo a la que corresponde esa pareja. 

El DF resultante debe quedar guardado en la variable `hora_max_accidentes_vehiculo_df`

PISTA: la solución es simplemente una operación `select` que incluye dentro la creación de 6 columnas al vuelo haciendo 6 llamadas a la función `F.max(F.struct(..., "hora"))`, y haciendo `alias` sobre el objeto columna devuelto por cada una de estas llamadas, para que cada una de esas 6 columnas creadas al vuelo se llame igual que su categoría de vehículo.

In [16]:
from pyspark.sql import functions as F

# Crear el nuevo DataFrame con una fila y tantas columnas como categorías de vehículos
hora_max_accidentes_vehiculo_df = accidentes_hora_vehiculo.select(
    [F.max(F.struct(F.col(col), F.col("hora"))).alias(col) for col in accidentes_hora_vehiculo.columns if col != "hora"]
)

# Mostrar el esquema y el contenido del DataFrame resultante (opcional)
hora_max_accidentes_vehiculo_df.printSchema()
hora_max_accidentes_vehiculo_df.show(truncate=False)


root
 |-- Bus/minibus: struct (nullable = true)
 |    |-- Bus/minibus: long (nullable = true)
 |    |-- hora: integer (nullable = true)
 |-- Car: struct (nullable = true)
 |    |-- Car: long (nullable = true)
 |    |-- hora: integer (nullable = true)
 |-- Motorcycle: struct (nullable = true)
 |    |-- Motorcycle: long (nullable = true)
 |    |-- hora: integer (nullable = true)
 |-- Other: struct (nullable = true)
 |    |-- Other: long (nullable = true)
 |    |-- hora: integer (nullable = true)
 |-- Taxi: struct (nullable = true)
 |    |-- Taxi: long (nullable = true)
 |    |-- hora: integer (nullable = true)
 |-- Van: struct (nullable = true)
 |    |-- Van: long (nullable = true)
 |    |-- hora: integer (nullable = true)





+-----------+-----------+----------+--------+---------+----------+
|Bus/minibus|Car        |Motorcycle|Other   |Taxi     |Van       |
+-----------+-----------+----------+--------+---------+----------+
|{56, 15}   |{19961, 17}|{2751, 17}|{64, 13}|{389, 16}|{1233, 16}|
+-----------+-----------+----------+--------+---------+----------+



                                                                                

In [17]:
assert(len(hora_max_accidentes_vehiculo_df.columns) == 6)
assert(sum([1 for c in ["Bus/minibus", "Car", "Motorcycle", "Other", "Taxi", "Van"]
          if c in hora_max_accidentes_vehiculo_df.columns]) == 6)
r2 = hora_max_accidentes_vehiculo_df.first()
assert(r2["Bus/minibus"][0] == 56 and r2["Bus/minibus"][1] == 15)
assert(r2["Car"][0] == 19961 and r2["Car"][1] == 17)
assert(r2["Motorcycle"][0] == 2751 and r2["Motorcycle"][1] == 17)
assert(r2["Other"][0] == 64 and r2["Other"][1] == 13)
assert(r2["Taxi"][0] == 389 and r2["Taxi"][1] == 16)
assert(r2["Van"][0] == 1233 and r2["Van"][1] == 16)

**Ejercicio 5 (2 puntos)** Vamos a preprocesar algunas variables para prepararlas para un posible algoritmo predictivo. Partiendo de `accidentesDF` se pide:
* Crear en la variable `journey_purpose_indexer` un StringIndexer para la variable "Driver_Journey_Purpose" y que cree una nueva columna de salida `purpose_indexed`. Debe ser capaz de lidiar con etiquetas nunca vistas a la hora de hacer la codificación de un nuevo dataset (que no se eliminen dichas filas ni tampoco salte un error).
* Crear en la variable `cars_involved_binarizer` un Binarizer de la variable `Number_of_Vehicles` que tenga `threshold=2.5` puesto que en la mayoría de los accidentes están involucrados 1 o 2 coches. Queremos pasarla a una variable binaria donde el 0.0 represente justamente que ha habido 1 o 2 coches involucrados, y el 1.0 represente que ha habido 3 o más coches involucrados. El binarizador debe crear como salida una nueva columna llamada `number_vehicles_binarized`
* Crear en la variable `vector_assembler` un VectorAssembler que colapse en una nueva columna de tipo vector las columnas `purpose_indexed`, `number_vehicles_binarized` y `Speed_limit`. La nueva columna debe llamarse `features`. 
* Crear en la variable `pipeline` un pipeline que contenga **exclusivamente** las tres etapas anteriores. **NO DEBE CONTENER NINGÚN ALGORITMO PREDICTIVO**.
* "Entrenar" ese pipeline con el DF `accidentesDF` y guardar el resultado en la variable `pipeline_model`. **No debe hacerse ningún tipo de división de los datos en entrenamiento y test**. Aunque el método sea "entrenar", en realidad sólo estamos ajustando etapas de pre-procesamiento.

In [18]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Binarizer, VectorAssembler

# Crear el StringIndexer para la variable "Driver_Journey_Purpose"
journey_purpose_indexer = StringIndexer(
    inputCol="Driver_Journey_Purpose",
    outputCol="purpose_indexed",
    handleInvalid="keep"  # Lidiar con etiquetas nunca vistas
)

# Crear el Binarizer para la variable "Number_of_Vehicles"
cars_involved_binarizer = Binarizer(
    inputCol="Number_of_Vehicles",
    outputCol="number_vehicles_binarized",
    threshold=2.5  # Umbral para convertir en variable binaria
)

# Crear el VectorAssembler para las columnas de características
vector_assembler = VectorAssembler(
    inputCols=["purpose_indexed", "number_vehicles_binarized", "Speed_limit"],
    outputCol="features"
)

# Crear el pipeline con las tres etapas anteriores
pipeline = Pipeline(stages=[
    journey_purpose_indexer,
    cars_involved_binarizer,
    vector_assembler
])

# Ajustar el pipeline con el DataFrame accidentesDF
pipeline_model = pipeline.fit(accidentesDF)


                                                                                

In [19]:
from pyspark.ml.feature import StringIndexer, Binarizer, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel

assert(isinstance(journey_purpose_indexer, StringIndexer))
assert(journey_purpose_indexer.getInputCol() == "Driver_Journey_Purpose" and 
       journey_purpose_indexer.getOutputCol() == "purpose_indexed" and
       journey_purpose_indexer.getHandleInvalid() == "keep")

assert(isinstance(cars_involved_binarizer, Binarizer))
assert(cars_involved_binarizer.getInputCol() == "Number_of_Vehicles" and 
       cars_involved_binarizer.getOutputCol() == "number_vehicles_binarized" and
       cars_involved_binarizer.getThreshold() == 2.5)

assert(isinstance(pipeline, Pipeline))
assert(len(pipeline.getStages()) == 3)             # el pipeline debe tener solamente tres etapas
assert(journey_purpose_indexer in pipeline.getStages() 
       and cars_involved_binarizer in pipeline.getStages() 
       and vector_assembler in pipeline.getStages())

assert(isinstance(pipeline_model, PipelineModel))

**Ejercicio 6 (1.5 puntos)** Queremos ver cuál es la forma de transporte involucrada en más accidentes en cada región de Reino Unido. Para ello, partiendo de `accidentesDF` se pide:

* Crear un DF con tantas filas como Regiones distintas existen y tantas columnas como categorías de vehículo más una (la de la región, que estará a la izquierda). En cada casilla debe calcularse una **tripleta** (columna de tipo estructura, que se crea con `F.struct(col1, col2, col3)`) formada por:
  * Número de accidentes en esa región y tipo vehículo,
  * Edad media del conductor redondeada a 2 cifras decimales, y
  * Número medio de coches involucrados redondeado a 2 cifras decimales). 
* Ordenar el DF alfabéticamente de menor a mayor en base a la columna `"Region"`
* Guardar el DF resultante en la variable `numero_edad_coches_df`.
* Para visualizarlo mejor, y puesto que el tamaño del DF que hemos obtenido como resultado está acotado por el número de regiones distintas existentes y por el número de categorías de vehículos existentes, pasar dicho DF a un dataframe de Pandas en la variable `numero_edad_coches_pd` y mostrarlo por pantalla.

PISTA: para construir la columna de tipo estructura dentro de la función `agg(...)` se puede utilizar `F.struct(F.funcionagregacion(...), F.funcionagregacion(...), F.funcionagregacion(...))`. 

PISTA: en vez de pasarle a la función `F.struct` directamente la columna resultante de la agregación, pásale en caso necesario `F.round(F.nombrefuncion(...), 2)` para que ya esté redondeada.

En el resultado puedes observar fenómenos como por ejemplo: 
* Los conductores de autobús son los que en promedio tienen siempre más edad, mientras que los de moto son los más jóvenes, como era de esperar. 
* Los accidentes de moto son los que menos coches involucran en promedio, en torno a 1.80, lo que quiere decir que hay muchos accidentes que los tiene el propio conductor sin que intervenga otro vehículo (condiciones atmosféricas, etc). Los de Taxi parecen estar bastante por debajo que los accidentes de coche, lo que indica que, mientras que un accidente de coche con frecuencia implica la interacción con otro vehículo, en los taxis hay aún bastantes accidentes donde no necesariamente hay otro vehículo implicado y por eso el promedio todavía no se acerca tanto a 2.
* Es llamativo que en Gales haya unos promedios tan bajos en el número de vehículos involucrados en todas las categorías, en especial en los accidentes de coche y moto, lo que indica que muchos accidentes se producen sin causa de otro vehículo. Puede tener relación directa con que la edad media de los conductores es bastante superior a la de otros medios, y por eso son propensos a tener un accidente por mala conducción o relacionado con las facultades físicas o cognitivas del conductor.

In [24]:
from pyspark.sql import functions as F

# Agrupar por Región y Vehicle_Category, y calcular las tripletas
agg_df = accidentesDF.groupBy("Region", "Vehicle_Category").agg(
    F.count("*").alias("num_accidentes"),
    F.round(F.avg("Age_of_Driver"), 2).alias("edad_media"),
    F.round(F.avg("Number_of_Vehicles"), 2).alias("num_vehiculos_medio")
)

# Pivotar el DataFrame para tener una columna por cada tipo de vehículo
pivot_df = agg_df.groupBy("Region").pivot("Vehicle_Category").agg(
    F.first(F.struct("num_accidentes", "edad_media", "num_vehiculos_medio")).alias("tripleta")
)

# Ordenar el DataFrame alfabéticamente por la columna "Region"
numero_edad_coches_df = pivot_df.orderBy("Region")

# Convertir el DataFrame a Pandas para visualizarlo mejor
numero_edad_coches_pd = numero_edad_coches_df.toPandas()

# Mostrar el DataFrame de Pandas
numero_edad_coches_pd


Unnamed: 0,Region,Bus/minibus,Car,Motorcycle,Other,Taxi,Van
0,East England,"(44, 4.34, 1.77)","(22813, 3.95, 2.0)","(2753, 3.17, 1.8)","(61, 4.02, 1.87)","(424, 4.43, 1.89)","(1470, 3.91, 2.12)"
1,East Midlands,"(39, 4.95, 1.85)","(17191, 3.91, 1.96)","(2084, 3.29, 1.79)","(48, 4.44, 1.92)","(362, 4.36, 1.83)","(1117, 3.98, 2.06)"
2,London,"(28, 4.71, 1.75)","(24015, 3.9, 1.92)","(5658, 3.22, 1.82)","(56, 3.89, 1.84)","(1671, 4.63, 1.79)","(2212, 3.88, 1.91)"
3,North East England,"(52, 4.46, 1.85)","(9272, 3.96, 1.96)","(742, 3.46, 1.79)","(32, 3.81, 2.06)","(279, 4.12, 1.67)","(639, 3.95, 2.0)"
4,North West England,"(66, 4.7, 1.88)","(25783, 4.01, 1.97)","(2724, 3.29, 1.82)","(66, 4.47, 1.98)","(1070, 4.33, 1.75)","(1387, 3.91, 2.01)"
5,Scotland,"(2, 4.0, 1.5)","(198, 4.24, 1.45)","(65, 4.77, 1.25)","(2, 7.0, 1.5)","(1, 5.0, 1.0)","(26, 4.0, 1.46)"
6,South East England,"(71, 4.96, 1.87)","(39410, 4.03, 2.02)","(5281, 3.28, 1.8)","(145, 3.81, 2.0)","(787, 4.42, 1.78)","(2548, 3.92, 2.1)"
7,South West England,"(37, 4.84, 1.81)","(21144, 4.11, 1.99)","(2802, 3.17, 1.88)","(67, 4.49, 2.0)","(317, 4.58, 1.74)","(1236, 3.97, 2.03)"
8,Wales,"(1, 5.0, 2.0)","(282, 4.23, 1.73)","(79, 4.35, 1.51)","(2, 2.5, 1.5)",,"(29, 3.52, 1.83)"
9,Wast Midlands,"(56, 4.7, 1.91)","(20933, 3.85, 1.97)","(2168, 3.16, 1.88)","(52, 3.96, 1.92)","(607, 4.14, 1.79)","(1436, 3.91, 2.03)"


In [25]:
assert(len(numero_edad_coches_df.columns) == 7)
assert(sum([1 for c in ["Region", "Bus/minibus", "Car", "Motorcycle", "Other", "Taxi", "Van"]
          if c in numero_edad_coches_df.columns]) == 7)
r = numero_edad_coches_df.collect()
assert(r[0].Region == "East England" and r[0].Other == (61, 4.02, 1.87))
assert(len(numero_edad_coches_df.columns) == 7 and len(r) == 11)
assert(r[0].Car == (22813, 3.95, 2.0))
assert(r[7].Motorcycle == (2802, 3.17, 1.88))
assert(r[10].Van == (1412, 3.92, 2.02))