In [33]:
import pyspark
from pyspark import SparkContext

##############
#### EJ 1 ####
##############

sc = SparkContext.getOrCreate() 


data = sc.textFile("data/calidad_aire_datos_meteo_mes.csv") 
header = data.first() 


rows = data.filter(lambda line: line != header) \
  .map(lambda line: line.split(";")) 


rows_madrid = rows.filter(lambda f: f[0] == "28")


rows_temp = rows_madrid.filter(lambda f: f[3] == "83") 


resultado_ej1 = rows_temp.filter(
    lambda f: any(f[k] == "V" for k in range(9, min(57, len(f)), 2))
).count() 
print("")
print("############ Ejercicio 1 ############")
print("")

print("Nº de registros de temperatura con >=1 hora válida:", resultado_ej1)

##############
#### EJ 2 ####
##############


# Partimos del RDD inicial y aplicamos los filtros base de provincia y magnitud
rows_temp = rows.filter(lambda f: f[0] == "28" and f[3] == "83") # [cite: 271, 280]

# Aplicación de Θ(r) y U (flatMap) seguido de M(τ_0) (reduceByKey)
resultado_ej2 = (
    rows_temp
    # flatMap utiliza composición funcional pura sin bucles 'for'
    .flatMap(
        lambda c: list(
            map(
                lambda i: (
                    f"{c[5]}-{c[6].zfill(2)}-{c[7].zfill(2)}", # Clave τ(r)
                    float(c[8 + 2 * i].replace(',', '.'))      # Valor F(v_i) [cite: 278]
                ),
                # Filtramos el rango de 24 horas evaluando la condición de expansión
                filter(lambda i: c[9 + 2 * i] == "V", range(24)) # [cite: 274]
            )
        )
    )
    # reduceByKey evalúa cada par de valores que comparten la misma fecha y conserva el mayor
    .reduceByKey(lambda a, b: a if a > b else b)
    
    # [cite_start]sortByKey ordena el RDD final cronológicamente (opcional pero recomendado para la salida) [cite: 147, 149]
    .sortByKey()
)

# [cite_start]Traemos los resultados calculados al driver en forma de lista [cite: 155, 157]
temperaturas_maximas = resultado_ej2.collect()

# Formateo e impresión funcional sin usar estructuras iterativas explícitas
print("")
print("")
print("############ Ejercicio 2 ############")
print("")
print('\n'.join(
    map(lambda x: f"Fecha: {x[0]} | Temp Máxima: {x[1]} °C", temperaturas_maximas) # [cite: 281]
))


##############
#### EJ 3 ####
##############


# M(r): Filtrado por provincia y magnitud pluviométrica (89)
rows_precip = rows.filter(lambda f: f[0] == "28" and f[3] == "89") 

# W: Transformación integral (map) para proyectar τ(r) y (σ(r), Σ(r))
# Sustituimos la comprensión de listas por composición funcional pura (map y filter)
precipitaciones_diarias = rows_precip.map(
    lambda c: (
        f"{c[5]}-{c[6].zfill(2)}-{c[7].zfill(2)}", # Clave τ(r): Fecha [cite: 283]
        (
           c[1], # Municipio [cite: 283]
           c[2], # Estación [cite: 283]
            # Σ(r): Reducción sumatoria sobre el mapeo de los índices filtrados
            sum(
                map(
                    lambda i: float(c[8 + 2 * i].replace(',', '.')), 
                    filter(lambda i: c[9 + 2 * i] == "V", range(24))
                )
            ) 
        )
    )
)

# S(τ): Reducción por clave temporal para hallar el supremo local (reduceByKey)
maximos_diarios_estacion = (
    precipitaciones_diarias
    .reduceByKey(lambda a, b: a if a[2] > b[2] else b) 
    .sortByKey() [cite: 147, 149]
)

# Ω: Reducción escalar global (reduce) para extraer el supremo absoluto 
# Compara pares completos (fecha, (municipio, estacion, precipitación)) y conserva el de mayor magnitud
maximo_absoluto = maximos_diarios_estacion.reduce(
    lambda a, b: a if a[1][2] > b[1][2] else b 
)

# [cite_start]Extracción de la topología local (collect) al driver [cite: 155, 157]
resultados_locales = maximos_diarios_estacion.collect() 

print("")
print("")
print("############ Ejercicio 3 ############")
print("")

print("--- MÁXIMOS DIARIOS POR ESTACIÓN ---")
# Impresión funcional acoplada prescindiendo del bucle for
print('\n'.join(
    map(
        lambda r: f"Fecha: {r[0]} | Municipio: {r[1][0]} | Estación: {r[1][1]} | Precipitación Total: {r[1][2]}", 
        resultados_locales
    )
[cite_start])) 

print("\n--- MÁXIMO ABSOLUTO DEL PERIODO ---")
print(f"Fecha: {maximo_absoluto[0]} | Municipio: {maximo_absoluto[1][0]} | Estación: {maximo_absoluto[1][1]} | Precipitación: {maximo_absoluto[1][2]}") 


##############
#### EJ 4 ####
##############


rows_temp = rows.filter(lambda f: f[0] == "28" and f[3] == "83")


estaciones_medias = (
    rows_temp.map(
        lambda c: (
            c[1], 
            c[2], 
            f"{c[5]}-{c[6].zfill(2)}-{c[7].zfill(2)}", 

            [float(c[8 + 2 * i].replace(',', '.')) for i in range(24) if c[9 + 2 * i] == "V"] 
        )
    )
    .filter(lambda x: len(x[3]) > 0) 
    .map(lambda x: (x[0], x[1], x[2], sum(x[3]) / len(x[3]))) 
)


estacion_A = (
    estaciones_medias
    .filter(lambda x: x[0] == "6" and x[1] == "4")
    .map(lambda x: (x[2], x[3]))
)


estacion_B = (
    estaciones_medias
    .filter(lambda x: x[0] == "5" and x[1] == "2") 
    .map(lambda x: (x[2], x[3])) 
)


comparacion_porcentual = (
    estacion_A.join(estacion_B) 
    .map(lambda j: (j[0], (j[1][1] / j[1][0]) * 100)) 
    .sortByKey() 
)


resultados_ej4 = comparacion_porcentual.collect() 
print("")
print("")
print("############ Ejercicio 4 ############")
print("")
print("--- COMPARACIÓN PORCENTUAL DE TEMPERATURAS MEDIAS ---")
for fecha, porcentaje in resultados_ej4:
    print(f"Fecha: {fecha} | Porcentaje (Est. 5-2 vs Ref 6-4): {porcentaje:.2f}%") 

ConnectionRefusedError: [Errno 111] Connection refused