
# PySpark en Google Colab: instalación y primeros pasos

Este cuaderno está diseñado **exclusivamente para Google Colab**.  
Incluye instalación de **PySpark**, creación de la **SparkSession**, ejemplos prácticos con **RDDs** y notas específicas para Colab.



## 0) Notas rápidas para Colab

- Colab reinicia el entorno al desconectarse; si vuelves a abrir el cuaderno, **reinstala** dependencias.
- Ejecuta las celdas **en orden** (de arriba hacia abajo).
- La interfaz **Spark UI** en `localhost:4040` no es accesible directamente desde Colab; más abajo mostramos cómo obtener la URL (puede no ser navegable).



## 1) Instalación de PySpark

Ejecuta esta celda para instalar PySpark en el entorno actual.


In [None]:

# Instalación en el runtime de Colab
!pip -q install pyspark
print("Instalación completa.")



## 2) Verificar Java

Colab suele traer Java preinstalado. Verifícalo:


In [None]:

!java -version



## 3) Crear la `SparkSession` y obtener el `SparkContext`

`SparkSession` es el punto de entrada para Spark en Python.  
`SparkContext` permite trabajar con **RDDs** directamente.


In [None]:

from pyspark.sql import SparkSession

# Configuración básica para Colab. Puedes ajustar memoria si lo requieres.
spark = (SparkSession.builder
         .appName("Colab_PySpark_Basico")
         # .config("spark.driver.memory", "3g")  # Descomenta si necesitas más memoria
         .getOrCreate())

sc = spark.sparkContext

print("Versión de Spark:", spark.version)
print("Master:", sc.master)

# URL de la Spark UI (en Colab puede no ser accesible externamente)
try:
    print("Spark UI:", sc.uiWebUrl)
except Exception:
    pass



## 4) RDD desde una lista y *lazy evaluation*

Las **transformaciones** no se ejecutan inmediatamente; Spark construye un plan de ejecución que se dispara cuando invocamos una **acción**.


In [None]:

# Datos de ejemplo
data = list(range(1, 11))

# Crear RDD distribuido
rdd = sc.parallelize(data)

# Definimos transformaciones (todavía NO se ejecutan)
pares = rdd.filter(lambda x: x % 2 == 0)
doble = pares.map(lambda x: x * 2)

# Una ACCIÓN fuerza la ejecución
resultado = doble.collect()
resultado



## 5) Acciones frecuentes sobre RDDs

Ejemplos de `take`, `sum`, `mean` y `stdev` para observar resultados inmediatos (acciones).


In [None]:

primeros = rdd.take(3)
suma = rdd.sum()
promedio = rdd.mean()
desviacion = rdd.stdev()

print("Primeros 3 elementos:", primeros)
print("Suma total:", suma)
print("Promedio:", promedio)
print("Desviación estándar:", round(desviacion, 4))



## 6) Mini *word count* con limpieza mínima

Flujo típico para texto distribuido:
1. Separar palabras con `flatMap`.
2. Limpiar tokens (solo alfanumérico) y filtrar los muy cortos.
3. Contar con `map` y `reduceByKey`.
4. Ordenar por frecuencia y mostrar las más comunes.


In [None]:

texto = [
    "Hola mundo desde Spark",
    "Procesamiento distribuido con PySpark es eficiente",
    "Hola Spark hola datos",
]

rdd_texto = sc.parallelize(texto)
palabras = rdd_texto.flatMap(lambda linea: linea.lower().split())

# Limpieza: deja solo alfanumérico; descarta tokens cortos
limpias = palabras.map(lambda w: ''.join(ch for ch in w if ch.isalnum())) \
                  .filter(lambda w: len(w) >= 3)

pares = limpias.map(lambda w: (w, 1))
conteo = pares.reduceByKey(lambda a, b: a + b)

ordenado = conteo.sortBy(lambda kv: kv[1], ascending=False)
ordenado.take(10)



## 7) Particiones y rendimiento (experimento rápido)

El número de particiones influye en el **paralelismo**. Este experimento compara el tiempo de `count()` bajo distintos valores.


In [None]:

import time

n = 2_000_000
rdd_big = sc.parallelize(range(n), numSlices=4)

for p in [2, 4, 8, 16]:
    t0 = time.time()
    _ = rdd_big.repartition(p).count()
    dt = time.time() - t0
    print(f"particiones={p:>2} -> tiempo={dt:.3f}s")



## 8) (Opcional) Montar Google Drive

Si quieres **guardar** resultados o **leer** archivos desde tu Drive:


In [None]:

# from google.colab import drive
# drive.mount('/content/drive')

# Ejemplo de escritura si montaste el Drive:
# conteo.saveAsTextFile('/content/drive/MyDrive/salida_conteo')



## 9) Cierre ordenado de la sesión


In [None]:

spark.stop()
print("Sesión Spark detenida.")
