# Práctica – ETL con Delta Lake
## Notebook de trabajo del alumno

Completa los ejercicios a partir de la guía.


## 1️⃣ Extracción de datos



### Demo
Juega con las cargas de los diferentes tipos de ficheros a continuación traídas de la guía

In [0]:
###### csv #####

In [0]:
###### json #####

In [0]:
###### parquet #####

In [0]:
##### schema ######

### Ejercicio 
Carga el archivo CSV 'vuelos.csv' que contiene información de vuelos con las siguientes columnas:
 - vuelo_id (integer)
 - origen (string)
 - destino (string)
 - fecha (date, formato 'yyyy-MM-dd')
 - retraso_minutos (integer)
 Define el schema explícitamente y utiliza los siguientes parámetros adicionales:
 - El archivo tiene encabezado.
 - El delimitador es ';'.
 - Los valores nulos están representados por 'NA'.

In [0]:
# TODO

## 2️⃣ Tipos de extracción


### Demo
Juega con los tipos de carga: total/parcial

## 3️⃣ Transformaciones


### Demo
Juega con los tipos de trasformaciones

### Ejercicio
 Dado el siguiente DataFrame de vuelos, realiza las siguientes transformaciones:
 1. Elimina las filas donde el destino sea nulo. Pista: `filter`+ `isNotNull`
 2. Estandariza los nombres de las ciudades en las columnas 'origen' y 'destino' minúsculas y sin espacios extra. Pista: `lower(trim(...))`
 3. Crea una nueva columna 'retraso_categoria' que clasifique el retraso en 'Sin retraso' (0 minutos), 'Leve' (1-15 minutos), 'Moderado' (16-60 minutos) y 'Grave' (>60 minutos).
 4. Elimina duplicados basados en 'vuelo_id'.
 5. Muestra el DataFrame final.

## 4️⃣ Delta Lake


### Demo
Jugar con las lecturas y escrituras de la tabla

Juega con las modificaciones de la tabla

Juega con el Time Travel

Juega con el optimizador

### Ejercicios

1. Crea un DF con datos de clientes. Para ello puedes utilizar la información que te doy y la funcion `list(zip(X,Y))`

In [0]:
# TODO:

nombres = ["Juan", "María", "Carlos", "Ana", "Luis", "Carmen", "José", "Laura", "Pedro", "Lucía", "Miguel", "Elena", "Javier", "Sofía", "Antonio", "Marta", "Manuel", "Isabel", "Francisco", "Patricia"]
edades = [25, 30, 22, 28, 35, 27, 40, 32, 24, 29, 33, 26, 31, 23, 36, 21, 34, 38, 37, 20]

2. Añade la columna indice. Para ello ayudate de la funcion `monotonically_increasing_id`

In [0]:
# TODO

3. Guardar como Delta Table (por path)

In [0]:
# TODO:

4. Actualiza la edad de un cliente

In [0]:
# TODO

5. Insertar un nuevo cliente

In [0]:
# TODO

6. Consulta el historial de la tabla. Posteriormente lee una version anterior con Time Travel

In [0]:
# TODO

7. Registrar como tabla del metastore

In [0]:
# TODO


8. Borra todos los registros

In [0]:
# TODO

9. Usa widgets para seleccionar dinámicamente la tabla Delta que creamos al principio del notebook ("delta") y mostrar su contenido

In [0]:
dbutils.widgets.text("tabla", "delta", "Selecciona tabla")

In [0]:
# TODO

10. Time travel con widgets, utiliza un widget tipo texto para poder leer dinamicamente la primera version de la tabla delta inicial ("delta").

In [0]:
# TODO

---
## Soluciones

### 1. Carga

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField("vuelo_id", IntegerType(), True),
    StructField("origen", StringType(), True),
    StructField("destino", StringType(), True),
    StructField("fecha", DateType(), True),
    StructField("retraso_minutos", IntegerType(), True)
])

df = spark.read.format("csv") \
    .option("header", True) \
    .option("delimiter", ";") \
    .option("nullValue", "NA") \
    .schema(schema) \
    .load(base_path + "vuelos (2).csv")

display(df)

## 3. Trasformaciones

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import col, trim, lower, when

# Crear DataFrame de ejemplo
data = [
    Row(vuelo_id=1, origen=" Madrid ", destino="Barcelona", fecha="2026-01-07", retraso_minutos=0),
    Row(vuelo_id=2, origen="Sevilla", destino=" Valencia ", fecha="2026-01-07", retraso_minutos=10),
    Row(vuelo_id=3, origen="Bilbao", destino=None, fecha="2026-01-07", retraso_minutos=5),
    Row(vuelo_id=4, origen="Madrid", destino="Barcelona", fecha="2026-01-07", retraso_minutos=70),
    Row(vuelo_id=4, origen="Madrid", destino="Barcelona", fecha="2026-01-07", retraso_minutos=70), # Duplicado
    Row(vuelo_id=5, origen=" Valencia", destino="Sevilla", fecha="2026-01-07", retraso_minutos=30)
]

df_ej = spark.createDataFrame(data)



# 1. Eliminar filas con destino nulo
df_ej = df_ej.filter(col("destino").isNotNull())

# 2. Estandarizar nombres de ciudades
df_ej = df_ej.withColumn("origen", lower(trim(col("origen")))) \
             .withColumn("destino", lower(trim(col("destino"))))

# 3. Crear columna de categoría de retraso
df_ej = df_ej.withColumn(
    "retraso_categoria",
    when(col("retraso_minutos") == 0, "Sin retraso")
    .when((col("retraso_minutos") > 0) & (col("retraso_minutos") <= 15), "Leve")
    .when((col("retraso_minutos") > 15) & (col("retraso_minutos") <= 60), "Moderado")
    .otherwise("Grave")
)

# 4. Eliminar duplicados por vuelo_id
df_ej = df_ej.dropDuplicates(["vuelo_id"])

# 5. Mostrar el DataFrame final
display(df_ej)

## 4. Tablas Delta

In [0]:
# 1
combinados = list(zip(nombres, edades))
df = spark.createDataFrame(combinados, ["nombre", "edad"])
display(df)

# 2
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("indice", monotonically_increasing_id())
display(df)

#3
df.write.format("delta").mode("overwrite").save(base_path + "clientes")

#4
from delta.tables import DeltaTable
delta_clientes = DeltaTable.forPath(spark, base_path + "clientes")

# Aumentar edad de Laura (id = 1)
delta_clientes.update(
    condition="indice = 1",
    set={"edad": "edad + 1"}
)

#5
from pyspark.sql.functions import current_timestamp
nuevos_clientes_df = spark.createDataFrame(
    [(4, "Marta", 30)],
    ["indice", "nombre", "edad"]
).withColumn("fecha_registro", current_timestamp())

delta_clientes.alias("t").merge(
    nuevos_clientes_df.alias("s"),
    "t.indice = s.indice"
).whenNotMatchedInsertAll().execute()

display(delta_clientes.toDF())

#6
# Ver historial de cambios
display(delta_clientes.history())

# Leer la versión inicial (version 0)
clientes_v0 = spark.read.format("delta").option("versionAsOf", 0).load(base_path + "clientes")
display(clientes_v0)

#7
df.write.format("delta").mode("overwrite").saveAsTable("ceste.clientes")

#8
spark.sql("DROP TABLE IF EXISTS ceste.delta")
# O si fuera en SQL solo
# DROP TABLE IF EXISTS ceste.delta
spark.sql("SELECT * FROM ceste.clientes")

#9
tabla = dbutils.widgets.get("tabla")
df = spark.read.format("delta").load(f"/Volumes/workspace/ceste/archivos/{tabla}")
display(df)

#10
dbutils.widgets.text("version", "0", "Versión de la tabla")
version = dbutils.widgets.get("version")

df_version = spark.read.format("delta") \
    .option("versionAsOf", int(version)) \
    .load("/Volumes/workspace/ceste/archivos/delta")

display(df_version)