# Laboratorio 1 - Introducción a Spark y RDDs (PySpark)

Este notebook introduce los conceptos básicos de Apache Spark usando **RDDs**.

En este laboratorio vas a:
- Crear/usar un **SparkContext**.
- Entender qué es un **RDD** y por qué es *inmutable* y *distribuido*.
- Ver la diferencia entre **transformaciones** (lazy) y **acciones**.
- Practicar **caché**, **particiones** y operaciones comunes.
- Aplicar el patrón *map-reduce* a un caso real: **ventas por tienda**.

**Dataset del ejemplo aplicado:** `transacciones_retail_large.csv` (debe estar en el mismo directorio del notebook o ajusta el path).


## 1. SparkContext

El **SparkContext** (habitualmente `sc`) es el punto de entrada clásico para trabajar con RDDs.
En muchos entornos de notebooks `sc` ya existe. Si no existe, lo creamos con `getOrCreate()`.


In [1]:
import os
import sys

# Esto le dice a Python: "Usa el Java que está dentro de este entorno de Anaconda"
os.environ["JAVA_HOME"] = sys.prefix
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin;" + os.environ["PATH"]

In [2]:
from pyspark import SparkContext

# En notebooks, normalmente puedes reutilizar el contexto existente.
sc = SparkContext.getOrCreate()
sc

: 

## 2. RDDs (Resilient Distributed Datasets)

Un **RDD** es una colección distribuida e inmutable de elementos. Spark registra *cómo* llegar a ese RDD (su *lineage*) y solo ejecuta cuando haces una **acción**.

### 2.1 Crear RDDs
Una forma rápida es con `parallelize`, útil para ejemplos pequeños.


In [None]:
intro_rdd = sc.parallelize(range(20), 8)  # 8 particiones
print(f"Particiones: {intro_rdd.getNumPartitions()}")
print(f"Primeros elementos: {intro_rdd.take(5)}")
intro_rdd.setName("RDD de ejemplo")
print(f"Nombre: {intro_rdd.name()}")

### 2.2 Caché

El caché es útil cuando vas a reutilizar el mismo RDD en varias acciones. Recuerda que el caché se materializa al ejecutar una acción.


In [None]:
intro_rdd.cache()
intro_rdd.count()  # acción que materializa

### 2.3 Particiones

Las particiones determinan el paralelismo. Puedes inspeccionarlas con `glom()`.


In [None]:
print(f"Número de particiones: {intro_rdd.getNumPartitions()}")
print(f"Datos por partición: {intro_rdd.glom().collect()}")

### 2.4 Repartition vs Coalesce

- `repartition(n)` puede hacer **shuffle completo**.
- `coalesce(n)` intenta reducir particiones con menos movimiento de datos (ideal para bajar particiones).


In [None]:
repartitioned = intro_rdd.repartition(6)
print(f"Particiones (repartition): {repartitioned.getNumPartitions()}")
print(repartitioned.glom().collect())

In [None]:
coalesced = intro_rdd.coalesce(6)
print(f"Particiones (coalesce): {coalesced.getNumPartitions()}")
print(coalesced.glom().collect())

## 3. Transformaciones y acciones

Ejemplo rápido:
- **Transformaciones**: `filter`, `map`, `flatMap`, `distinct`, etc.
- **Acciones**: `count`, `collect`, `take`, `reduce`, `saveAsTextFile`, etc.


In [None]:
transformed = (intro_rdd
               .filter(lambda x: x % 2 == 0)
               .map(lambda x: x + 1))

# Nada se ejecuta hasta que hacemos una acción:
print(transformed.collect())

In [None]:
sum_even = (intro_rdd
            .filter(lambda x: x % 2 == 0)
            .reduce(lambda a, b: a + b))
print(sum_even)

## 4. Ejemplo aplicado (Init): Ventas totales por tienda con RDD

A continuación está el ejemplo base adaptado a tu caso retail (el que empieza por `Init_...`). Calcula `venta_total` por `tienda_id` usando el patrón **map-reduce**.

**Columnas esperadas del CSV:**
`transaccion_id,timestamp,tienda_id,producto_id,categoria,cantidad,precio_unitario,metodo_pago`


In [None]:
# Basado en Init_RDD_PySpark.py
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

rdd_raw = sc.textFile("transacciones_retail_large.csv")
header = rdd_raw.first()
rdd_no_header = rdd_raw.filter(lambda linea: linea != header)

def parsear_linea(linea):
    campos = linea.split(",")
    return (
        campos[2],      # tienda_id
        campos[4],      # categoria
        int(campos[5]), # cantidad
        float(campos[6])# precio_unitario
    )

rdd_parsed = rdd_no_header.map(parsear_linea)
rdd_kv = rdd_parsed.map(lambda x: (x[0], x[2] * x[3]))
rdd_totales = rdd_kv.reduceByKey(lambda a, b: a + b)
rdd_ordenado = rdd_totales.sortBy(lambda x: x[1], ascending=False)

top10 = rdd_ordenado.take(10)
print(f"{'TIENDA':<15} | {'VENTA TOTAL':>15}")
print("-" * 33)
for tienda, total in top10:
    print(f"{tienda:<15} | ${total:>14.2f}")

# Nota: evita collect() si el resultado es muy grande.

## 5. Ejercicios sugeridos

1. Calcula **venta_total por categoría**.
2. Calcula **venta_total por método de pago**.
3. Calcula el **ticket promedio por tienda** (venta_total / número de transacciones).
4. ¿Qué cambia si en vez de `reduceByKey` usas `groupByKey`? ¿Por qué suele ser peor?


## 6. Cierre

Cuando termines, puedes detener el contexto (opcional en notebooks):


In [None]:
# sc.stop()