# Actividad 10: Las diferentes API de Spark
### Ivonis Florindo

## Primera Parte: Spark sobre datos de automóviles.

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

import pandas as pd

In [None]:
# Se crea una sesión de Spark
spark = SparkSession.builder \
    .appName("Actividad10_cars") \
    .getOrCreate()

# Se lee el CSV de automóviles
df = spark.read.csv("/kaggle/input//cars-csv/cars.csv", header=True, inferSchema=True, sep=";")

# Ver el esquema
df.printSchema()

# Mostrar las 5 primeras filas
df.show(5)

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: decimal(4,0) (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 

In [28]:
# Mostrar columnas "Car" y "Cylinders cuyo origen sea "Europe""
df.select("Car", "Cylinders") \
  .filter(df.Origin == "Europe") \
  .show()


+--------------------+---------+
|                 Car|Cylinders|
+--------------------+---------+
|Citroen DS-21 Pallas|        4|
|Volkswagen 1131 D...|        4|
|         Peugeot 504|        4|
|         Audi 100 LS|        4|
|            Saab 99e|        4|
|            BMW 2002|        4|
|Volkswagen Super ...|        4|
|           Opel 1900|        4|
|         Peugeot 304|        4|
|           Fiat 124B|        4|
|Volkswagen Model 111|        4|
|   Volkswagen Type 3|        4|
|     Volvo 145e (sw)|        4|
| Volkswagen 411 (sw)|        4|
|    Peugeot 504 (sw)|        4|
|     Renault 12 (sw)|        4|
|Volkswagen Super ...|        4|
|Fiat 124 Sport Coupe|        4|
|            Fiat 128|        4|
|          Opel Manta|        4|
+--------------------+---------+
only showing top 20 rows



In [None]:
# Se obtiene la media de Horsepower, "Weight" y "Acceleration" por "Origin"
df.groupBy("Origin").agg(
    F.avg("Horsepower").alias("Media_Horsepower"),
    F.avg("Weight").alias("Media_Weight"),
    F.avg("Acceleration").alias("Media_Acceleration")
).show()


+------+------------------+------------+------------------+
|Origin|  Media_Horsepower|Media_Weight|Media_Acceleration|
+------+------------------+------------+------------------+
|Europe| 78.78082191780823|   2431.4932| 16.82191780821918|
|    US|118.01181102362204|   3372.7008|14.942519685039361|
| Japan| 79.83544303797468|   2221.2278|16.172151898734175|
+------+------------------+------------+------------------+



In [None]:
# Se crea nueva columna con el ratio entre potencia y peso
# lo que en argot automovilístico se conoce como Power-to-weight ratio"
df = df.withColumn("PowerToWeightRatio", F.col("Horsepower") / F.col("Weight"))

# Se crea una ventana particionada por cilindros y ordenada por PowerToWeightRatio descendente
windowSpec = Window.partitionBy("Cylinders").orderBy(F.col("PowerToWeightRatio").desc())

# Se añade un número de fila para cada grupo
df_top = df.withColumn("row_num", row_number().over(windowSpec))

# Se filtra solo la primera fila de cada grupo (el top 1 por cilindros)
df_top.filter(F.col("row_num") == 1) \
    .select("Cylinders", "Car", "Horsepower", "Weight", "PowerToWeightRatio") \
    .orderBy("Cylinders") \
    .show(truncate=False)

# Se calcula la media del Power-to-weight ratio por número de cilindros
df.groupBy("Cylinders") \
  .agg(F.avg("PowerToWeightRatio").alias("AvgPowerToWeight")) \
  .show()

+---------+-------------------------------+----------+------+--------------------+
|Cylinders|Car                            |Horsepower|Weight|PowerToWeightRatio  |
+---------+-------------------------------+----------+------+--------------------+
|3        |Mazda RX3                      |90.0      |2124  |0.0423728813559322  |
|4        |BMW 2002                       |113.0     |2234  |0.050581915846016116|
|5        |Audi 5000                      |103.0     |2830  |0.036395759717314485|
|6        |Buick Regal Sport Coupe (turbo)|165.0     |3445  |0.047895500725689405|
|8        |Buick Estate Wagon (sw)        |225.0     |3086  |0.0729099157485418  |
+---------+-------------------------------+----------+------+--------------------+

+---------+--------------------+
|Cylinders|    AvgPowerToWeight|
+---------+--------------------+
|        6| 0.03163238614784404|
|        3|0.041441818290915214|
|        5|0.026973551761855744|
|        4|0.033217183371144564|
|        8| 0.0387285

## Segunda Parte: Spark sobre datos de automóviles.

In [34]:
# Imports
from pyspark.sql.functions import expr
from pyspark.sql import Row

In [None]:
# Se carga el CSV de calidad del aire de enero 2020 (separador ; europeo)

df_aire = spark.read.csv(
    "/kaggle/input/air-qualy/ene_mo20.csv",
    sep=";",
    header=True,
    inferSchema=True
)

# Se verificanican algunas columnas clave (fecha + primeras horas)
df_aire.select("ANO", "MES", "DIA", "H01", "H02", "H03").show(5)
print(df_aire.columns)

+----+---+---+---+---+---+
| ANO|MES|DIA|H01|H02|H03|
+----+---+---+---+---+---+
|2020|  1|  1|7.0|8.0|9.0|
|2020|  1|  2|8.0|8.0|7.0|
|2020|  1|  3|9.0|8.0|7.0|
|2020|  1|  4|7.0|5.0|6.0|
|2020|  1|  5|8.0|6.0|5.0|
+----+---+---+---+---+---+
only showing top 5 rows

['PROVINCIA', 'MUNICIPIO', 'ESTACION', 'MAGNITUD', 'PUNTO_MUESTREO', 'ANO', 'MES', 'DIA', 'H01', 'V01', 'H02', 'V02', 'H03', 'V03', 'H04', 'V04', 'H05', 'V05', 'H06', 'V06', 'H07', 'V07', 'H08', 'V08', 'H09', 'V09', 'H10', 'V10', 'H11', 'V11', 'H12', 'V12', 'H13', 'V13', 'H14', 'V14', 'H15', 'V15', 'H16', 'V16', 'H17', 'V17', 'H18', 'V18', 'H19', 'V19', 'H20', 'V20', 'H21', 'V21', 'H22', 'V22', 'H23', 'V23', 'H24', 'V24']


In [None]:
# Se reestructura el DataFrame de "ancho" a "largo" con stack()
# Se convierten las columnas H01–H24 en dos columnas nuevas: HORA y VALOR
columnas_stack = ",".join([f"'{i}', H{str(i).zfill(2)}" for i in range(1, 25)])

df_largo = df_aire.select(
    "PROVINCIA", "MUNICIPIO", "ESTACION", "MAGNITUD", "ANO", "MES", "DIA",
    expr(f"stack(24, {columnas_stack}) as (HORA, VALOR)")
)

# Se muestran los primeros resultados del nuevo formato largo
df_largo.select("ESTACION", "MAGNITUD", "DIA", "HORA", "VALOR").show(10, truncate=False)


+--------+--------+---+----+-----+
|ESTACION|MAGNITUD|DIA|HORA|VALOR|
+--------+--------+---+----+-----+
|4       |1       |1  |1   |7.0  |
|4       |1       |1  |2   |8.0  |
|4       |1       |1  |3   |9.0  |
|4       |1       |1  |4   |8.0  |
|4       |1       |1  |5   |6.0  |
|4       |1       |1  |6   |6.0  |
|4       |1       |1  |7   |5.0  |
|4       |1       |1  |8   |5.0  |
|4       |1       |1  |9   |4.0  |
|4       |1       |1  |10  |5.0  |
+--------+--------+---+----+-----+
only showing top 10 rows



In [None]:
# Se calcula el número de estaciones distintas presentes en el dataset
print (df_largo.select("ESTACION").distinct().count())

# Se muestran las magnitudes distintas registradas en el fichero de enero
# (cada valor representa un contaminante diferente)
df_largo.select("MAGNITUD").distinct().orderBy("MAGNITUD").show()

24
+--------+
|MAGNITUD|
+--------+
|       1|
|       6|
|       7|
|       8|
|       9|
|      10|
|      12|
|      14|
|      20|
|      30|
|      35|
|      42|
|      43|
|      44|
+--------+



In [None]:
# Diccionario solo con las magnitudes presentes en el dataset
magnitudes_dict = {
    1: "SO₂ (Dióxido de azufre)",
    6: "CO (Monóxido de carbono)",
    7: "NO (Monóxido de nitrógeno)",
    8: "NO₂ (Dióxido de nitrógeno)",
    9: "PM10 (Partículas ≤10µm)",
    10: "PM2.5 (Partículas ≤2.5µm)",
    12: "NOx (Óxidos de nitrógeno)",
    14: "O₃ (Ozono)",
    20: "Tolueno",
    35: "Benceno",
    43: "Meta-xileno",
    44: "Para-xileno"
}

# Se crea un DataFrame con los valores encontrados
magnitudes_presentes = df_largo.select("MAGNITUD").distinct().rdd.flatMap(lambda x: x).collect()

# Se genera la lista de Rows con nombre
info_magnitudes = [Row(MAGNITUD=mag, NOMBRE=magnitudes_dict.get(mag, "Desconocido")) for mag in magnitudes_presentes]

# Se crea DataFrame y se muestra
df_mags = spark.createDataFrame(info_magnitudes)
df_mags.orderBy("MAGNITUD").show(truncate=False)

+--------+--------------------------+
|MAGNITUD|NOMBRE                    |
+--------+--------------------------+
|1       |SO₂ (Dióxido de azufre)   |
|6       |CO (Monóxido de carbono)  |
|7       |NO (Monóxido de nitrógeno)|
|8       |NO₂ (Dióxido de nitrógeno)|
|9       |PM10 (Partículas ≤10µm)   |
|10      |PM2.5 (Partículas ≤2.5µm) |
|12      |NOx (Óxidos de nitrógeno) |
|14      |O₃ (Ozono)                |
|20      |Tolueno                   |
|30      |Desconocido               |
|35      |Benceno                   |
|42      |Desconocido               |
|43      |Meta-xileno               |
|44      |Para-xileno               |
+--------+--------------------------+



In [None]:
# Se filtra el número de registros correspondientes al día 18 de enero de 2020
df_largo.filter(
    (F.col("ANO") == 2020) &
    (F.col("MES") == 1) &
    (F.col("DIA") == 18)
).count()


3672

In [None]:
# Se calcula la media de dióxido de azufre (MAGNITUD 1) a las 12h de cada día

df_largo.filter(
    (F.col("MAGNITUD") == 1) & (F.col("HORA") == "12")
).groupBy("DIA") \
 .agg(F.avg("VALOR").alias("Media_SO2_12h")) \
 .orderBy("DIA") \
 .show(truncate=False)


+---+-------------+
|DIA|Media_SO2_12h|
+---+-------------+
|1  |9.5          |
|2  |11.2         |
|3  |8.6          |
|4  |5.4          |
|5  |6.0          |
|6  |7.0          |
|7  |11.1         |
|8  |12.4         |
|9  |12.8         |
|10 |6.5          |
|11 |5.6          |
|12 |8.0          |
|13 |10.3         |
|14 |8.2          |
|15 |8.1          |
|16 |8.2          |
|17 |5.9          |
|18 |5.5          |
|19 |4.5          |
|20 |4.6          |
+---+-------------+
only showing top 20 rows

