# 📘 1. Introducción
En este notebook aprenderemos a:
- Leer datos **estructurados** (CSV, Parquet).
- Leer datos **semiestructurados** (JSON, anidado).
- Manejar **esquemas explícitos** para optimización.
- Usar **funciones de PySpark** para transformar datos.
- Aplicar técnicas de **optimización en Databricks**: cache, repartition, broadcast, Delta Lake.

📂 2. Importación de librerías y configuración inicial

In [0]:
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pyspark.sql.functions as F

# 📊 3. Lectura de datos estructurados (CSV público)

In [0]:
# URL del archivo
url = "https://raw.githubusercontent.com/owid/covid-19-data/refs/heads/master/public/data/owid-covid-data.csv"
# ⚠️ Nota: no se puede leer directamente con spark.read.csv(url) debido a restricciones de permisos en Databricks.
# Por eso lo cargamos primero con Pandas y luego lo convertimos a Spark DataFrame.

# Leer con pandas
pdf = pd.read_csv(url)

# Convertir a Spark DataFrame
df_tmp = spark.createDataFrame(pdf)

# Definir esquema manualmente (solo las columnas que queremos)
schema = StructType([
    StructField("iso_code", StringType(), True),
    StructField("continent", StringType(), True),
    StructField("location", StringType(), True),
    StructField("date", StringType(), True),          # puede cambiarse a DateType si se requiere
    StructField("total_cases", DoubleType(), True),
    StructField("new_cases", DoubleType(), True),
    StructField("total_deaths", DoubleType(), True),
    StructField("population", DoubleType(), True)
])

# Seleccionar y castear columnas al esquema definido usando F.col
df_covid = (
    df_tmp
    .select(
        F.col("iso_code").cast(StringType()),
        F.col("continent").cast(StringType()),
        F.col("location").cast(StringType()),
        F.col("date").cast(StringType()),     # o DateType si prefieres
        F.col("total_cases").cast(DoubleType()),
        F.col("new_cases").cast(DoubleType()),
        F.col("total_deaths").cast(DoubleType()),
        F.col("population").cast(DoubleType())
    )
)

# Mostrar datos
df_covid.show(5)
df_covid.printSchema()

# 🛠 4. Transformaciones en datos estructurados

In [0]:
df_covid = df_covid.withColumn("date", F.to_date("date", "yyyy-MM-dd"))
df_covid = df_covid.withColumn("cases_per_million", (F.col("total_cases") / F.col("population")) * 1e6)
df_colombia = df_covid.filter(F.col("location") == "Colombia")
display(df_colombia)

# 📂 5. Lectura de datos semiestructurados (JSON público)

In [0]:
# Leer el JSON desde el volumen
df_weather = spark.read.json("/Volumes/workspace/dbtest/dataclase4/weather.json")

# Mostrar esquema y datos
df_weather.printSchema()
df_weather.show(5, truncate=False)

In [0]:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, DoubleType
from pyspark.sql import SparkSession
import json

spark = SparkSession.builder.appName("WeatherJSON").getOrCreate()

with open("/Volumes/workspace/dbtest/dataclase4/weather.json", "r") as f:
    data = json.load(f)

schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("generationtime_ms", DoubleType(), True),
    StructField("hourly", StructType([
        StructField("time", ArrayType(StringType()), True),
        StructField("temperature_2m", ArrayType(DoubleType()), True)
    ]), True)
])

df_weather = spark.createDataFrame([data], schema=schema)
df_weather.printSchema()
df_weather.show(2, truncate=False)


# 🔍 6. Manejo de datos anidados en JSON
En la parte de arriba trabajamos con un archivo JSON que contiene información meteorológica. El JSON tiene datos anidados, es decir, estructuras internas como diccionarios y listas (arrays). Por ejemplo, dentro de la clave "hourly" hay dos arrays: "time" y "temperature_2m".

Este tipo de datos requiere pasos específicos para poder analizarlos con PySpark.

Paso a Paso

1. Lectura del JSON
Primero se carga el archivo JSON en memoria, convirtiéndolo en un diccionario de Python que se puede manipular.

2. Identificación de los datos anidados
Se observa que dentro del JSON hay estructuras internas (diccionarios y arrays), como "hourly", que contiene "time" y "temperature_2m". Estos arrays representan horas y temperaturas correspondientes.

3. Extracción de los arrays internos
Para poder trabajar con ellos en Spark, se extraen los arrays de horas y temperaturas de forma separada, dejando listas planas que luego se pueden transformar en filas del DataFrame.

4. Creación de un DataFrame con esquema explícito
Se define un esquema que indica a Spark el tipo de cada array (por ejemplo, strings para las horas y números decimales para las temperaturas). Esto es necesario porque Spark no puede inferir automáticamente los tipos de datos anidados complejos.

5. Transformación de arrays en filas individuales
Para analizar los datos por hora, se combinan los arrays de horas y temperaturas y se “explota” cada par en una fila independiente. Esto convierte los datos anidados en un DataFrame plano, más fácil de manipular y analizar.

6. Agregar información adicional
Se incorporan columnas adicionales como la latitud y longitud para que cada fila tenga la ubicación asociada con cada hora y temperatura.

7. Visualización de los resultados
Finalmente, se observa la estructura del DataFrame y algunas filas para confirmar que los datos fueron transformados correctamente y están listos para análisis posteriores.

In [0]:
import json
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, DoubleType, StructType, StructField

# Leer JSON desde archivo
json_file_path = "/Volumes/workspace/dbtest/dataclase4/weather.json"
with open(json_file_path, "r") as f:
    data = json.load(f)

latitude = data["latitude"]
longitude = data["longitude"]

# Crear DataFrame con esquema explícito
schema = StructType([
    StructField("time_array", ArrayType(StringType()), True),
    StructField("temp_array", ArrayType(DoubleType()), True)
])

df_temp = spark.createDataFrame(
    [(data["hourly"]["time"], data["hourly"]["temperature_2m"])],
    schema=schema
)

# Combinar arrays y convertir cada par en una fila usando funciones de F
# Selecciona y combina los arrays de horas y temperaturas
df_weather = df_temp.select(
    # F.arrays_zip("time_array", "temp_array") combina los dos arrays en un solo array de structs,
    # donde cada struct contiene un par (hora, temperatura)
    # F.explode() "explota" cada struct del array en una fila separada
    F.explode(F.arrays_zip("time_array", "temp_array")).alias("row")
).select(
    # F.col("row.time_array") accede al campo 'time_array' dentro del struct creado por arrays_zip
    # Se renombra la columna como 'time'
    F.col("row.time_array").alias("time"),

    # F.col("row.temp_array") accede al campo 'temp_array' dentro del struct
    # Se renombra la columna como 'temperature'
    F.col("row.temp_array").alias("temperature")
)

# Agregar latitud y longitud usando F.lit
df_weather = df_weather.withColumn("latitude", F.lit(latitude)) \
                       .withColumn("longitude", F.lit(longitude))

# Mostrar resultados
df_weather.printSchema()
df_weather.show(10, truncate=False)

# ⚡ 7. Optimización en Databricks / PySpark

En el procesamiento distribuido con Spark, la **optimización** es fundamental para mejorar el rendimiento y reducir costos de cómputo.  
Aquí explicamos tres técnicas clave:

### 🗂 1. `cache()` / `persist()`
- Cuando ejecutamos transformaciones en un DataFrame, Spark no guarda los resultados inmediatamente (evaluación diferida).  
- Si el mismo DataFrame se usa varias veces, Spark lo recalcularía cada vez.  
- Usar `cache()` o `persist()` permite **almacenar en memoria** los resultados intermedios, evitando recomputaciones.  
- 🚀 Beneficio: acelera consultas repetidas a costa de usar más memoria.

### 2. repartition() y coalesce()

- Spark divide los datos en particiones, que son las unidades que se distribuyen entre los nodos del clúster.
- repartition(n, col) → redistribuye los datos en n particiones de forma balanceada (puede implicar un shuffle costoso).
- coalesce(n) → reduce el número de particiones sin shuffle completo, útil cuando queremos consolidar archivos de salida.
- 🚀 Beneficio: controlar el número de particiones evita problemas como subutilización (muy pocas particiones) o sobrecarga (demasiadas particiones pequeñas).

### 📡 3. broadcast()

- En un join, si una de las tablas es pequeña, se puede replicar en todos los nodos en vez de hacer un shuffle completo.
- broadcast(df) le dice a Spark que use esa estrategia.
- 🚀 Beneficio: reduce drásticamente el tiempo y el costo de joins cuando trabajamos con tablas de referencia pequeñas.

In [0]:
# Se almacena el DataFrame en caché para que las operaciones posteriores lo lean desde memoria
# en lugar de recalcularlo desde la fuente original. Esto mejora el rendimiento si se reutiliza varias veces.
df_covid.cache()

# Se fuerza la materialización del cache ejecutando una acción (count),
# así se asegura que los datos ya queden precargados en memoria.
df_covid.count()

# Se redistribuyen los datos en 10 particiones con base en la columna "location".
# Esto busca mejorar la paralelización y el rendimiento de futuros joins o escrituras,
# aunque el número fijo (10) puede no ser óptimo según el tamaño real del dataset.
df_covid = df_covid.repartition(10, "location")

# Se importa la función de broadcast para optimizar joins.
from pyspark.sql.functions import broadcast

# Se seleccionan las columnas "location" y "population" de forma única.
# Este dataset será mucho más pequeño que df_covid, ideal para usarse como tabla de broadcast.
df_pop = df_covid.select("location", "population").distinct()

# Se realiza un join entre el dataset grande (df_covid) y el pequeño (df_pop).
# El broadcast asegura que df_pop se copie a cada nodo, evitando un shuffle costoso.
df_joined = df_covid.join(broadcast(df_pop), "location")

# Se guarda df_covid en formato Delta en el path del Data Lake.
# El modo "overwrite" reemplaza lo que había antes, garantizando la versión más actualizada.
df_covid.write.format("delta").mode("overwrite").save("/mnt/delta/covid")

# Se vuelve a leer el dataset desde el Delta Lake.
# Esto asegura trabajar con datos consistentes y preparados para consultas posteriores.
df_delta = spark.read.format("delta").load("/mnt/delta/covid")

# 📈 8. Consultas analíticas (Window Functions)

Las **Window Functions** en Spark permiten hacer cálculos avanzados sobre un conjunto de filas **relacionadas** con la fila actual, sin necesidad de agrupar toda la tabla.  

📌 En otras palabras: nos permiten calcular métricas como acumulados, rankings o valores anteriores/siguientes, mientras seguimos viendo todas las filas originales.

### ⚙️ ¿Cómo funcionan?
- Se definen con un **Window Specification** que indica:
  - **PARTITION BY** → cómo dividir los datos en grupos (ej: por país).
  - **ORDER BY** → cómo ordenar dentro de cada grupo (ej: por fecha).

### 🛠 Tipos comunes de funciones de ventana:

1. Cálculo con filas anteriores o siguientes

- lag(col, n) → trae el valor de la fila anterior (n pasos atrás).
- lead(col, n) → trae el valor de la fila siguiente (n pasos adelante).

2. Cálculos acumulativos

sum(), avg(), min(), max() dentro de una ventana ordenada.

3. Funciones de ranking

- row_number() → numera las filas en orden.
- rank() → ranking con posibles empates.
- dense_rank() → ranking consecutivo sin saltos.

In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("location").orderBy("date")
df_covid = df_covid.withColumn("daily_cases", F.col("total_cases") - F.lag("total_cases").over(window_spec))
df_covid.filter(F.col("location") == "Colombia").select("date", "daily_cases").show(10)

# ✅ 9. Conclusiones
- Los **datos estructurados** (CSV, Parquet) son más fáciles de manejar pero requieren cuidado en el **esquema** y **particiones**.
- Los **datos semiestructurados** (JSON) necesitan transformación de estructuras anidadas.
- La **optimización** en Databricks combina técnicas de Spark (cache, repartition, broadcast) con almacenamiento eficiente (**Delta Lake**).