In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
import sys



# Configuración detallada para Spark
conf = SparkConf().setAppName("ClusterConfigExample") 

# Iniciar el SparkContext
sc = SparkContext(conf=conf)

# Crear la SparkSession
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [None]:

df = spark.read.csv('datos_combinados_Alcala_Henares.csv', header=True, inferSchema=True)
df.show(2)
df.printSchema()
print(df.count())

+----------+----------+-----------------+---------+-------+----+----+----+--------+----+--------+---+--------+-----+---------+-------+-----+---------+-----+---------+
|     fecha|indicativo|           nombre|provincia|altitud|tmed|prec|tmin|horatmin|tmax|horatmax|dir|velmedia|racha|horaracha|hrMedia|hrMax|horaHrMax|hrMin|horaHrMin|
+----------+----------+-----------------+---------+-------+----+----+----+--------+----+--------+---+--------+-----+---------+-------+-----+---------+-----+---------+
|2019-01-01|     3170Y|ALCALA DE HENARES|   MADRID|    605| 5,3| 0,0|-4,1|   08:05|14,7|   15:31| 99|     0,8|  3,3|   Varias|     62|   91|    08:30|   38|    13:40|
|2019-01-02|     3170Y|ALCALA DE HENARES|   MADRID|    605| 4,6| 0,0|-5,5|   07:04|14,6|   15:31|  8|     1,1|  3,3|    09:10|     65|   93|    08:30|   39|    15:40|
+----------+----------+-----------------+---------+-------+----+----+----+--------+----+--------+---+--------+-----+---------+-------+-----+---------+-----+---------

### Exploratorio
- Eliminar las columnas que no se vayan a utilizar.
- Cambiar el tipo de dato.
- Tranformar las columnas pertinentes.

In [3]:
# Columnas del dataframe: todas son strings, hay que cambiar el tipo de dato a fecha y numericos
# 1. Elimino dichas columnas por ser innecesarias
# 2. Cambio el tipo de dato y remplazo ',' por '.'
# 3. La columna fecha la convierto a date
columnas = ["indicativo","provincia","nombre","horatmax","horatmin","horaracha","horaHrMax","horaHrMin","altitud","dir","prec","racha","hrMin","hrMax"]
df = df.drop(*columnas)
for columna in df.columns:
    if columna in ["tmed","tmin","tmax","velmedia"]:
        df = df.withColumn(columna, F.regexp_replace(F.col(columna), ",", ".").cast("double"))
    elif columna == "fecha":
        df = df.withColumn(columna, F.to_date(F.col(columna), "yyyy-MM-dd"))
        
        
        

- Columnas finales:
    - Fecha
    - Temperatura media
    - Temperatura mínima
    - Temperatura máxima
    - Velocidad media del viento
    - Humedad realtiva media

In [4]:
# Columnas del dataframe al hacer las operaciones anteriores
df.show(2)

+----------+----+----+----+--------+-------+
|     fecha|tmed|tmin|tmax|velmedia|hrMedia|
+----------+----+----+----+--------+-------+
|2019-01-01| 5.3|-4.1|14.7|     0.8|     62|
|2019-01-02| 4.6|-5.5|14.6|     1.1|     65|
+----------+----+----+----+--------+-------+
only showing top 2 rows



In [5]:
# Esquema con los tipos de datos cambiados
df.printSchema()

root
 |-- fecha: date (nullable = true)
 |-- tmed: double (nullable = true)
 |-- tmin: double (nullable = true)
 |-- tmax: double (nullable = true)
 |-- velmedia: double (nullable = true)
 |-- hrMedia: integer (nullable = true)



- Genero una lista de días entre dos rangos de fechas para comprobar que no falten fechas en el dataframe original

In [6]:
# Generar rango de fechas en una lista
# from pyspark.sql.types import DateType
# from datetime import datetime, timedelta
start_date = datetime(2019, 1, 1)
end_date = datetime(2025, 2, 11)
date_range_list = [(start_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((end_date - start_date).days + 1)]

# Crear un DataFrame en PySpark con las fechas del rango
date_range_df = spark.createDataFrame([(d,) for d in date_range_list], ["fecha"]).withColumn("fecha", F.to_date(F.col("fecha"), "yyyy-MM-dd"))

# Encontrar las fechas que están en el rango pero no en df_henares
missing_dates_df = date_range_df.subtract(df.select("fecha"))


missing_dates_df.sort("fecha").show()

+----------+
|     fecha|
+----------+
|2019-09-16|
|2019-09-17|
|2019-09-18|
|2019-10-06|
|2019-10-08|
|2019-12-31|
|2020-10-20|
|2020-10-21|
|2025-02-11|
+----------+



In [7]:
# Meter las fechas que faltan en el dataframe 
df_union = df.join(missing_dates_df, on="fecha", how="outer")
df_union.show()

+----------+----+----+----+--------+-------+
|     fecha|tmed|tmin|tmax|velmedia|hrMedia|
+----------+----+----+----+--------+-------+
|2019-01-01| 5.3|-4.1|14.7|     0.8|     62|
|2019-01-02| 4.6|-5.5|14.6|     1.1|     65|
|2019-01-03| 6.0|-2.8|14.7|     1.1|     63|
|2019-01-04| 5.0|-4.2|14.3|     0.6|     60|
|2019-01-05| 4.5|-5.8|14.8|     1.1|     58|
|2019-01-06| 4.7|-7.1|16.5|     1.4|     51|
|2019-01-07| 6.0|-5.5|17.4|     0.8|     43|
|2019-01-08| 5.5|-5.3|16.3|     0.8|     55|
|2019-01-09| 5.4|-3.6|14.4|     1.9|     61|
|2019-01-10| 2.2|-5.0| 9.5|     3.1|     60|
|2019-01-11| 1.4|-7.1| 9.9|     2.2|     54|
|2019-01-12| 3.2|-5.5|11.9|     1.9|     63|
|2019-01-13| 3.3|-5.8|12.4|     0.6|     65|
|2019-01-14| 6.6|-4.2|17.4|     0.3|     64|
|2019-01-15| 6.2|-3.2|15.5|     0.3|     64|
|2019-01-16| 3.7|-4.5|11.9|     1.9|     73|
|2019-01-17| 2.8|-2.7| 8.3|     0.0|     85|
|2019-01-18| 1.0|-5.1| 7.2|     0.3|     77|
|2019-01-19|-0.2|-6.0| 5.6|     1.4|     85|
|2019-01-2

In [8]:
# Comprobar que se han metido correctamente (solo va a mostrar 3 registros)
df_filtered = df_union.filter((F.col("fecha") >= "2019-09-16") & (F.col("fecha") <= "2019-09-18"))

df_filtered.show()

+----------+----+----+----+--------+-------+
|     fecha|tmed|tmin|tmax|velmedia|hrMedia|
+----------+----+----+----+--------+-------+
|2019-09-16|NULL|NULL|NULL|    NULL|   NULL|
|2019-09-17|NULL|NULL|NULL|    NULL|   NULL|
|2019-09-18|NULL|NULL|NULL|    NULL|   NULL|
+----------+----+----+----+--------+-------+



In [9]:
# Agrupar por todas las columnas y contar
duplicados = df_union.groupBy(df_union.columns).count().filter(col("count") > 1)

# Mostrar los duplicados
if duplicados.count() > 0:
    print("Hay valores duplicados")
    duplicados.show()
else:
    print("No hay valores duplicados")

No hay valores duplicados


### Detección de nulos

In [10]:
# from pyspark.sql.functions import col,isnan, when, count
# Nulos en cada columna
columnas = ["tmed","tmin","tmax","velmedia","hrMedia"]
df_union.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columnas]
   ).show()


+----+----+----+--------+-------+
|tmed|tmin|tmax|velmedia|hrMedia|
+----+----+----+--------+-------+
|  52|  52|  52|      43|     43|
+----+----+----+--------+-------+



In [11]:
# Muestra de los registros que tienen tmin nulo, coincide que aquí se concentran la mayoría de nulos
df_union.filter(F.col("tmin").isNull()).show(60)

+----------+----+----+----+--------+-------+
|     fecha|tmed|tmin|tmax|velmedia|hrMedia|
+----------+----+----+----+--------+-------+
|2019-09-16|NULL|NULL|NULL|    NULL|   NULL|
|2019-09-17|NULL|NULL|NULL|    NULL|   NULL|
|2019-09-18|NULL|NULL|NULL|    NULL|   NULL|
|2019-09-22|NULL|NULL|NULL|    NULL|   NULL|
|2019-09-23|NULL|NULL|NULL|    NULL|   NULL|
|2019-10-04|NULL|NULL|NULL|    NULL|   NULL|
|2019-10-05|NULL|NULL|NULL|    NULL|   NULL|
|2019-10-06|NULL|NULL|NULL|    NULL|   NULL|
|2019-10-07|NULL|NULL|NULL|    NULL|   NULL|
|2019-10-08|NULL|NULL|NULL|    NULL|   NULL|
|2019-12-03|NULL|NULL|NULL|     3.9|     75|
|2019-12-30|NULL|NULL|NULL|     0.8|     76|
|2019-12-31|NULL|NULL|NULL|    NULL|   NULL|
|2020-04-28|NULL|NULL|NULL|     2.5|     70|
|2020-04-29|NULL|NULL|NULL|     5.0|     65|
|2020-04-30|NULL|NULL|NULL|    NULL|   NULL|
|2020-05-01|NULL|NULL|NULL|    NULL|   NULL|
|2020-05-02|NULL|NULL|NULL|    NULL|   NULL|
|2020-05-03|NULL|NULL|NULL|    NULL|   NULL|
|2020-05-0

## IMPUTACIÓN DE NULOS
- Lo que se va a intentar es si detecta nulos coger el valor anterior que no sea nulo e imputarlo

In [12]:
# from pyspark.sql.window import Window
# import sys
# ---------------------------------------------------TMED---------------------------------------------------------------
# Definir una ventana ordenada por fecha
window_spec = Window.orderBy("fecha")

# Obtener el valor anterior no nulo
window_prev = window_spec.rowsBetween(-sys.maxsize, -1)
df_prueba = df_union.withColumn("prev_tmed", F.last(F.col("tmed"), ignorenulls=True).over(window_prev))

# Obtener el valor siguiente no nulo
window_next = window_spec.rowsBetween(1, sys.maxsize)
df_prueba = df_prueba.withColumn("next_tmed", F.first(F.col("tmed"), ignorenulls=True).over(window_next))

# Calcular la media entre el valor anterior y el siguiente
df_prueba = df_prueba.withColumn("imputed_tmed", (F.col("prev_tmed") + F.col("next_tmed")) / 2)

# Rellenar solo los valores nulos en 'tmed' con la media calculada
df_prueba = df_prueba.withColumn("tmed", F.when(F.col("tmed").isNull(), F.col("imputed_tmed")).otherwise(F.col("tmed")))

# Eliminar las columnas auxiliares ('prev_tmed', 'next_tmed', 'imputed_tmed')
df_prueba = df_prueba.drop("prev_tmed", "next_tmed", "imputed_tmed")

# ---------------------------------------------------TMIN---------------------------------------------------------------
# Definir una ventana ordenada por fecha
window_spec = Window.orderBy("fecha")

# Obtener el valor anterior no nulo
window_prev = window_spec.rowsBetween(-sys.maxsize, -1)
df_prueba = df_prueba.withColumn("prev_tmin", F.last(F.col("tmin"), ignorenulls=True).over(window_prev))

# Obtener el valor siguiente no nulo
window_next = window_spec.rowsBetween(1, sys.maxsize)
df_prueba = df_prueba.withColumn("next_tmin", F.first(F.col("tmin"), ignorenulls=True).over(window_next))

# Calcular la media entre el valor anterior y el siguiente
df_prueba = df_prueba.withColumn("imputed_tmin", (F.col("prev_tmin") + F.col("next_tmin")) / 2)

# Rellenar solo los valores nulos en 'tmed' con la media calculada
df_prueba = df_prueba.withColumn("tmin", F.when(F.col("tmin").isNull(), F.col("imputed_tmin")).otherwise(F.col("tmin")))

# Eliminar las columnas auxiliares ('prev_tmed', 'next_tmed', 'imputed_tmed')
df_prueba = df_prueba.drop("prev_tmin", "next_tmin", "imputed_tmin")

# ---------------------------------------------------TMAX---------------------------------------------------------------
# Definir una ventana ordenada por fecha
window_spec = Window.orderBy("fecha")

# Obtener el valor anterior no nulo
window_prev = window_spec.rowsBetween(-sys.maxsize, -1)
df_prueba = df_prueba.withColumn("prev_tmax", F.last(F.col("tmax"), ignorenulls=True).over(window_prev))

# Obtener el valor siguiente no nulo
window_next = window_spec.rowsBetween(1, sys.maxsize)
df_prueba = df_prueba.withColumn("next_tmax", F.first(F.col("tmax"), ignorenulls=True).over(window_next))

# Calcular la media entre el valor anterior y el siguiente
df_prueba = df_prueba.withColumn("imputed_tmax", (F.col("prev_tmax") + F.col("next_tmax")) / 2)

# Rellenar solo los valores nulos en 'tmed' con la media calculada
df_prueba = df_prueba.withColumn("tmax", F.when(F.col("tmax").isNull(), F.col("imputed_tmax")).otherwise(F.col("tmax")))

# Eliminar las columnas auxiliares ('prev_tmed', 'next_tmed', 'imputed_tmed')
df_prueba = df_prueba.drop("prev_tmax", "next_tmax", "imputed_tmax")

# ---------------------------------------------------VELMEDIA---------------------------------------------------------------
# Definir una ventana ordenada por fecha
window_spec = Window.orderBy("fecha")

# Obtener el valor anterior no nulo
window_prev = window_spec.rowsBetween(-sys.maxsize, -1)
df_prueba = df_prueba.withColumn("prev_velmedia", F.last(F.col("velmedia"), ignorenulls=True).over(window_prev))

# Obtener el valor siguiente no nulo
window_next = window_spec.rowsBetween(1, sys.maxsize)
df_prueba = df_prueba.withColumn("next_velmedia", F.first(F.col("velmedia"), ignorenulls=True).over(window_next))

# Calcular la media entre el valor anterior y el siguiente
df_prueba = df_prueba.withColumn("imputed_velmedia", (F.col("prev_velmedia") + F.col("next_velmedia")) / 2)

# Rellenar solo los valores nulos en 'tmed' con la media calculada
df_prueba = df_prueba.withColumn("velmedia", F.when(F.col("velmedia").isNull(), F.col("imputed_velmedia")).otherwise(F.col("velmedia")))

# Eliminar las columnas auxiliares ('prev_tmed', 'next_tmed', 'imputed_tmed')
df_prueba = df_prueba.drop("prev_velmedia", "next_velmedia", "imputed_velmedia")


# ---------------------------------------------------HRMEDIA---------------------------------------------------------------
# Definir una ventana ordenada por fecha
window_spec = Window.orderBy("fecha")

# Obtener el valor anterior no nulo
window_prev = window_spec.rowsBetween(-sys.maxsize, -1)
df_prueba = df_prueba.withColumn("prev_hrMedia", F.last(F.col("hrMedia"), ignorenulls=True).over(window_prev))

# Obtener el valor siguiente no nulo
window_next = window_spec.rowsBetween(1, sys.maxsize)
df_prueba = df_prueba.withColumn("next_hrMedia", F.first(F.col("hrMedia"), ignorenulls=True).over(window_next))

# Calcular la media entre el valor anterior y el siguiente
df_prueba = df_prueba.withColumn("imputed_hrMedia", (F.col("prev_hrMedia") + F.col("next_hrMedia")) / 2)

# Rellenar solo los valores nulos en 'tmed' con la media calculada
df_prueba = df_prueba.withColumn("hrMedia", F.when(F.col("hrMedia").isNull(), F.col("imputed_hrMedia")).otherwise(F.col("hrMedia")))

# Eliminar las columnas auxiliares ('prev_tmed', 'next_tmed', 'imputed_tmed')
df_prueba = df_prueba.drop("prev_hrMedia", "next_hrMedia", "imputed_hrMedia")


In [13]:
# Nulos en cada columna
columnas = ["tmed","tmin","tmax","velmedia","hrMedia"]
df_prueba.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columnas]
   ).show()
# df_prueba.printSchema()

+----+----+----+--------+-------+
|tmed|tmin|tmax|velmedia|hrMedia|
+----+----+----+--------+-------+
|   1|   1|   1|       1|      1|
+----+----+----+--------+-------+



In [14]:
# Es el ultimo registro
df_prueba.filter(F.col("tmed").isNull()).show()

+----------+----+----+----+--------+-------+
|     fecha|tmed|tmin|tmax|velmedia|hrMedia|
+----------+----+----+----+--------+-------+
|2025-02-11|NULL|NULL|NULL|    NULL|   NULL|
+----------+----+----+----+--------+-------+



In [15]:
df_prueba = df_prueba.dropna()

In [16]:
# Ya no hay nulos
columnas = ["tmed","tmin","tmax","velmedia","hrMedia"]
df_prueba.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columnas]
   ).show()

+----+----+----+--------+-------+
|tmed|tmin|tmax|velmedia|hrMedia|
+----+----+----+--------+-------+
|   0|   0|   0|       0|      0|
+----+----+----+--------+-------+



In [17]:
df_prueba.write.csv("datos_Alcala_Henares_transformado.csv", header = True, mode = "overwrite")

In [18]:
spark.stop()