# Introducción a PySpark

PySpark es la API de Python para Apache Spark, un motor de procesamiento distribuido diseñado para trabajar con grandes volúmenes de datos de forma eficiente.

Spark trabaja principalmente con DataFrames, que conceptualmente son similares a los DataFrames de pandas, pero:
- Están distribuidos
- Son inmutables
- Las operaciones son lazy (no se ejecutan hasta que se necesita el resultado)

## Preparacion entorno


Si no estuvieramos en Databricks habría que poner lo siguiente:
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Intro PySpark") \
    .getOrCreate()
```


Pero nos lo podemos saltar y comenzar con esto:

Copia lo siguiente en una celda. Vamos a crear un catalogo, schema y volumen

```sql
create catalog if not exists sesion1;
create schema if not exists sesion1.sparkintro;
create volume if not exists sesion1.sparkintro.landing;
```

Trameos del Github la información al volumen

```sql
%sh
curl -L https://raw.githubusercontent.com/jmartinezceste/Databricks_master/refs/heads/main/sesion1/data/spark_intro.csv -o /Volumes/sesion1/sparkintro/landing/spark_intro.csv

%sh
curl -L https://raw.githubusercontent.com/jmartinezceste/Databricks_master/refs/heads/main/sesion1/data/dim_spark_intro.csv -o /Volumes/sesion1/sparkintro/landing/dim_spark_intro.csv
```

### Creamos el DF
Ejecuta el siguiente comando en el notebook de ejercicios

```python
from pyspark.sql import functions as F

# Read CSV from volume as a Spark DataFrame
sales_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)  # Let Spark infer column types for now
    .csv("/Volumes/sesion1/sparkintro/landing/spark_intro.csv")
)

display(sales_df)
```

Es importante entender el schema de los datos. Para ello ejecutamos
```python
sales_df.printSchema()
```

##  Select(): elegir columnas

select() sirve para proyectar columnas, es decir:
- elegir solo aquellas columnas que nos interesan
- crear nuevas columnas calculadas (junto con withColumn)

Ideas importantes:
- Es una narrow transformation: cada partición puede ejecutar el select sin necesidad de hablar con otras particiones.
- Es barata y muy común en pipelines reales.

Se parece mucho al SELECT columna1, columna2 FROM tabla de SQL.

```python
# Select a subset of columns
sales_simple_df = sales_df.select("order_id", "order_date", "country", "units_sold")
display(sales_simple_df)
```

## filter() / where(): filtrar filas


`filter()` (o su alias `where()`) nos permite quedarnos solo con las filas que cumplen una condición lógica.

Ejemplos:
- ventas solo de un país
- pedidos con más de X unidades

Características:
- También es una narrow transformation.
- Cada partición decide, con sus propias filas, qué se queda y qué se descarta.
- Spark puede aplicar técnicas como predicate pushdown para optimizar la lectura.

```python
# Filter sales only for Spain
spain_sales_df = sales_df.filter(F.col("country") == "Spain")
display(spain_sales_df)

# Filter sales with more than 3 units sold
big_orders_df = sales_df.filter(F.col("units_sold") > 3)
display(big_orders_df)
```

## withColumn(): crear o transformar columnas


`withColumn()` se usa para:
- crear columnas nuevas a partir de otras
- transformar valores de columnas existentes

En Spark los DataFrames son inmutables:
- no modificamos el DataFrame original
- siempre devolvemos un DataFrame nuevo con los cambios

Esto permite:
- razonar mejor sobre los pipelines
- que Spark reordene y optimice las operaciones internamente.

```python
from pyspark.sql import functions as

# Create a new column with total sales amount
sales_with_total_df = sales_df.withColumn(
    "total_sales",
    F.col("units_sold") * F.col("unit_price")  # Simple numeric expression
)

display(sales_with_total_df)
```

## groupBy().agg(): agregaciones


`groupBy().agg()` nos permite:
- agrupar filas por una o varias columnas (por ejemplo, país)
- calcular métricas resumidas (suma, media, máximo, mínimo, etc.)

Esta es una de las operaciones más importantes en Spark porque suele:
- requerir reorganizar los datos entre nodos del cluster
- implicar un shuffle (movimiento de datos muy costoso)

Si un alumno entiende que:
"agrupar implica juntar datos que pueden estar en máquinas distintas"
entonces tiene el modelo mental correcto de por qué ciertos pasos de Spark son caros.

```python
# Aggregate total units and total sales by country
sales_country_df = (
    sales_with_total_df
    .groupBy("country")
    .agg(
        F.sum("units_sold").alias("total_units"),
        F.sum("total_sales").alias("total_sales_amount")
    )
)

display(sales_country_df)
```

## orderBy(): ordenar datos


`orderBy()` sirve para ordenar las filas según una o varias columnas.

Problema:
- Ordenar "globalmente" en un sistema distribuido suele requerir:
    - reunir datos
    - redistribuirlos
    - y coordinar el orden final

Esto normalmente implica otro shuffle, y puede ser muy costoso en grandes volúmenes.

Por eso, en pipelines reales:
- hay que usar orderBy() solo cuando aporte valor
- y evitar ordenar datasets gigantes si no es necesario.

```python
# Order countries by total sales amount (descending)
sorted_sales_country_df = sales_country_df.orderBy(F.col("total_sales_amount").desc())
display(sorted_sales_country_df)
```

## join(): combinar datos de distintas tablas


En la práctica, casi siempre trabajamos con varias tablas:
- una tabla de hechos (ventas, transacciones, etc.)
- tablas de dimensiones (productos, clientes, países...)

`join()` nos permite cruzar estas tablas:
- por claves comunes (ej: `product`)

Conceptos importantes:
- Muchos joins implican shuffle, porque hay que alinear filas que pueden estar en nodos diferentes.
- Spark ofrece optimizaciones como el broadcast join cuando una de las tablas es suficientemente pequeña para enviarla a todos los nodos.

Entender bien los joins es clave para que Spark no "se arrastre".

```python
# Read product dimension from volume
product_dim_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/Volumes/sesion1/data/landing/product_dim.csv")
)

display(product_dim_df)
```

```python
# Join sales with product dimension on 'product'
sales_enriched_df = (
    sales_with_total_df.alias("s")
    .join(
        product_dim_df.alias("p"),
        on="product",   # Join key
        how="left"     # Keep all sales even if some product has no dimension row
    )
)

display(sales_enriched_df)
```

## write(): guardar el resultado


Un pipeline de datos útil no se queda solo en la pantalla:
- debe guardar los resultados para que puedan ser reutilizados:
    - por otros notebooks
    - por herramientas de BI
    - por servicios o modelos

En Spark usamos la API write para:
- escribir en distintos formatos (parquet, delta, csv, etc.)
- salvar en tablas gestionadas por el catálogo

Recordatorio:
Sin write, muchas veces solo estamos "jugando" con los datos.

```python
# Save aggregated sales by country as a managed Delta table
(
    sorted_sales_country_df
    .write
    .mode("overwrite")          # Overwrite existing table if it exists
    .saveAsTable("sesion1.sparkintro.sales_by_country")
)

# Check that we can read it back as a table
result_df = spark.table("sesion1.sparkintro.sales_by_country")
display(result_df)
```