# Semana 2: Introducción a Spark con Scala

## Contexto del Ejercicio: Mi Porfolio como Data Engineer

Este notebook forma parte de la actividad **"Mi porfolio como Data Engineer"** descrita en el syllabus (páginas 18-19). 

El objetivo es continuar la construcción de tu **base de conocimiento** (porfolio). Puedes utilizar este notebook como:
*   **Guía**: Para entender la arquitectura de Spark y su API.
*   **Base**: Para experimentar con diferentes transformaciones y acciones.
*   **Complemento**: A tu repositorio de código en GitHub.

---

En esta segunda semana, nos adentraremos en el desarrollo de aplicaciones distribuidas utilizando Apache Spark. Veremos los conceptos fundamentales, trabajaramos con RDDs (Spark Core) y DataFrames (Spark SQL).

## 1. Desarrollo de aplicaciones con Apache Spark

Apache Spark es un motor de análisis unificado para el procesamiento de datos a gran escala.

### Conceptos Clave
*   **Driver**: El proceso principal que ejecuta tu aplicación (el `main`), crea el `SparkContext`/`SparkSession` y coordina las tareas.
*   **Executor**: Procesos que se ejecutan en los nodos del clúster, responsables de ejecutar las tareas y almacenar datos en memoria o disco.
*   **Cluster Manager**: Gestor de recursos (e.g., Standalone, YARN, Kubernetes) que asigna recursos a la aplicación.

### Ventajas
*   **Velocidad**: Ejecución en memoria, mucho más rápido que MapReduce tradicional.
*   **Facilidad de uso**: APIs de alto nivel en Scala, Java, Python y R.
*   **Unificado**: Soporta SQL, Streaming, ML y Graph en un solo motor.

In [2]:
import org.apache.spark.sql.SparkSession

// Inicialización de SparkSession (El punto de entrada a Spark)
val spark = SparkSession.builder()
  .appName("Semana2_Porfolio")
  .master("local[*]") // Ejecutar localmente usando todos los cores disponibles
  //.master("spark://spark-master:7077") // Si activas este modo obtendrás algunos errores por la integración de Ammonite y Spark
  // Memoria del Driver (donde se recolectan los resultados de .collect())
  .config("spark.driver.memory", "2g") 
  // Memoria de cada Executor
  .config("spark.executor.memory", "2g")
  // Memoria adicional por encima del heap (útil para evitar errores de overhead)
  .config("spark.executor.memoryOverhead", "512m")
  //.config("deploy-mode","client")
  .getOrCreate()

spark.sparkContext.setLogLevel("ERROR") // Reducir el ruido en los logs

println(s"Spark Version: ${spark.version}")

Spark Version: 4.1.1


[32mimport [39m[36morg.apache.spark.sql.SparkSession[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.classic.SparkSession@142110eb

In [3]:
// Obtenemos el estado de los ejecutores
// Retorna un Map: "host:puerto" -> (Memoria Total, Memoria Libre)
val status = spark.sparkContext.getExecutorMemoryStatus

val hosts = status.keys.toSeq
val totalMem = status.values.map(_._1 / (1024 * 1024)).toSeq // Convertir a MB

println(s"Nodos activos detectados: ${hosts.size}")

Nodos activos detectados: 1


[36mstatus[39m: [32mcollection[39m.[32mMap[39m[[32mString[39m, ([32mLong[39m, [32mLong[39m)] = [33mMap[39m(
  [32m"967bedb628f7:33279"[39m -> ([32m1808164454L[39m, [32m1808164454L[39m)
)
[36mhosts[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m([32m"967bedb628f7:33279"[39m)
[36mtotalMem[39m: [32mSeq[39m[[32mLong[39m] = [33mList[39m([32m1724L[39m)

In [2]:
// En el Driver (Jupyter)
println(System.getProperty("java.version"))

// En los Workers
spark.sparkContext.parallelize(Seq(1)).map(_ => System.getProperty("java.version")).collect().foreach(println)

17.0.18
17.0.18


## 2. Introducción al módulo Spark Core (RDDs)

RDD (Resilient Distributed Dataset) es la abstracción fundamental de Spark. Representa una colección inmutable de objetos distribuida y tolerante a fallos.

### Transformaciones vs Acciones
*   **Transformaciones (Lazy)**: Crean un nuevo RDD a partir de uno existente (ej. `map`, `filter`). No se ejecutan inmediatamente.
*   **Acciones**: Disparan la computación y devuelven un resultado al Driver o guardan datos (ej. `count`, `collect`, `saveAsTextFile`).

In [5]:
// Ejemplo Spark Core: Procesamiento de texto básico con RDDs
val datos = Seq("Spark es rapido", "Spark es genial", "Scala y Spark", "Big Data es el futuro")

// 1. Crear RDD paralelizando una colección existente
val rdd = spark.sparkContext.parallelize(datos)

// 2. Transformaciones
val palabrasRDD = rdd
  .flatMap(linea => linea.split(" ")) // Dividir frases en palabras
  .map(palabra => palabra.toLowerCase) // Convertir a minúsculas
  .filter(palabra => palabra.contains("s")) // Filtrar palabras que contienen 's'

// 3. Acción (Solo aquí se ejecuta el procesamiento)
val resultado = palabrasRDD.collect()

println("Palabras con 's':")
resultado.foreach(println)


Palabras con 's':
spark
es
spark
es
scala
spark
es


[36mdatos[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m(
  [32m"Spark es rapido"[39m,
  [32m"Spark es genial"[39m,
  [32m"Scala y Spark"[39m,
  [32m"Big Data es el futuro"[39m
)
[36mrdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mString[39m] = ParallelCollectionRDD[2] at parallelize at cmd5.sc:5
[36mpalabrasRDD[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mString[39m] = MapPartitionsRDD[5] at filter at cmd5.sc:11
[36mresultado[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"spark"[39m,
  [32m"es"[39m,
  [32m"spark"[39m,
  [32m"es"[39m,
  [32m"scala"[39m,
  [32m"spark"[39m,
  [32m"es"[39m
)

## 3. Introducción al módulo Spark SQL (DataFrames)

Spark SQL permite consultar datos estructurados. El DataFrame es la abstracción principal aquí: es como una tabla en una base de datos relacional o un DataFrame en pandas, pero distribuido.

Los DataFrames utilizan el **Catalyst Optimizer** para optimizar automáticamente las consultas.

In [10]:
import spark.implicits._ // Import necesario para conversiones implícitas a DF

// Ejemplo Spark SQL: DataFrames

// 1. Crear DataFrame desde una secuencia de tuplas
val personas = Seq(
  ("Alice", 28, "Data Engineer"),
  ("Bob", 35, "Data Scientist"),
  ("Charlie", 23, "Data Analyst"),
  ("David", 42, "Data Engineer")
)

val df = personas.toDF("nombre", "edad", "rol")

// 2. Mostrar el esquema y los datos
df.printSchema()
df.show()

// 3. Consultas usando API de DataFrame
println("Data Engineers mayores de 25:")
df.filter($"rol" === "Data Engineer" && $"edad" > 25)
  .select("nombre", "edad")
  .show()

// 4. Agregaciones
println("Edad promedio por rol:")
df.groupBy("rol")
  .avg("edad")
  .show()

root
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = false)
 |-- rol: string (nullable = true)

+-------+----+--------------+
| nombre|edad|           rol|
+-------+----+--------------+
|  Alice|  28| Data Engineer|
|    Bob|  35|Data Scientist|
|Charlie|  23|  Data Analyst|
|  David|  42| Data Engineer|
+-------+----+--------------+

Data Engineers mayores de 25:
+------+----+
|nombre|edad|
+------+----+
| Alice|  28|
| David|  42|
+------+----+

Edad promedio por rol:
+--------------+---------+
|           rol|avg(edad)|
+--------------+---------+
| Data Engineer|     35.0|
|Data Scientist|     35.0|
|  Data Analyst|     23.0|
+--------------+---------+



[32mimport [39m[36mspark.implicits._[39m
[36mpersonas[39m: [32mSeq[39m[([32mString[39m, [32mInt[39m, [32mString[39m)] = [33mList[39m(
  ([32m"Alice"[39m, [32m28[39m, [32m"Data Engineer"[39m),
  ([32m"Bob"[39m, [32m35[39m, [32m"Data Scientist"[39m),
  ([32m"Charlie"[39m, [32m23[39m, [32m"Data Analyst"[39m),
  ([32m"David"[39m, [32m42[39m, [32m"Data Engineer"[39m)
)
[36mdf[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [nombre: string, edad: int ... 1 more field]

In [7]:
// 5. Consultas SQL estándar
// Registramos el DataFrame como una vista temporal
df.createOrReplaceTempView("personas_view")

val sqlDF = spark.sql("SELECT rol, count(*) as total FROM personas_view GROUP BY rol")
sqlDF.show()

+--------------+-----+
|           rol|total|
+--------------+-----+
| Data Engineer|    2|
|Data Scientist|    1|
|  Data Analyst|    1|
+--------------+-----+



[36msqlDF[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [rol: string, total: bigint]

In [9]:
//*****    PROPIOS lAURA   *****//
 
// Ejemplo de data frame con Select + filtro
df.select($"nombre", $"edad")
  .where($"edad" >= 29)
  .orderBy($"edad".desc)
  .show()

// Agregaciones
df.groupBy($"rol")
  .agg(
    count(lit(1)).as("num_personas"),
    avg($"edad").as("edad_media"),
    max($"edad").as("edad_max")
  )
  .orderBy($"num_personas".desc)
  .show()




+------+----+
|nombre|edad|
+------+----+
| David|  42|
|   Bob|  35|
+------+----+

+--------------+------------+----------+--------+
|           rol|num_personas|edad_media|edad_max|
+--------------+------------+----------+--------+
| Data Engineer|           2|      35.0|      42|
|Data Scientist|           1|      35.0|      35|
|  Data Analyst|           1|      23.0|      23|
+--------------+------------+----------+--------+



## 4. Ejercicios Prácticos (Sin resolver)

Usa estos ejercicios como base para practicar y documentar en tu porfolio.

### Ejercicio 1: Manipulación de DataFrames
Crea un DataFrame a partir de una lista de productos (nombre, precio, stock). Luego:
1. Añade una columna `valor_inventario` (precio * stock).
2. Filtra los productos que tengan un stock menor a 10.
3. Muestra el resultado.

In [8]:
import spark.implicits._
import org.apache.spark.sql.functions._

// 1. Creo la lista de productos con su nombre, precio y stock.
val productos = Seq(
  ("Laptop", 1200.0, 5),
  ("Mouse", 25.0, 20),
  ("Teclado", 45.0, 8),
  ("Monitor", 300.0, 15),
  ("USB", 10.0, 50)
)

// 2. A partir de dicha lista, creo el dataframe
val dfProductos = productos.toDF("nombre", "precio", "stock")

// 3. Añado una nueva columna al dataframe "valor_inventario" que tendra el valor de multiplciar el precio por el stock.
val dfConValor = dfProductos.withColumn(
  "valor_inventario",
  col("precio") * col("stock")
)

// 4. Filtro los productos con stock menor a 10
val dfResultado = dfConValor.filter(col("stock") < 10)

// 5. Muestro el resultado
dfResultado.show()


+-------+------+-----+----------------+
| nombre|precio|stock|valor_inventario|
+-------+------+-----+----------------+
| Laptop|1200.0|    5|          6000.0|
|Teclado|  45.0|    8|           360.0|
+-------+------+-----+----------------+



[32mimport [39m[36mspark.implicits._[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[36mproductos[39m: [32mSeq[39m[([32mString[39m, [32mDouble[39m, [32mInt[39m)] = [33mList[39m(
  ([32m"Laptop"[39m, [32m1200.0[39m, [32m5[39m),
  ([32m"Mouse"[39m, [32m25.0[39m, [32m20[39m),
  ([32m"Teclado"[39m, [32m45.0[39m, [32m8[39m),
  ([32m"Monitor"[39m, [32m300.0[39m, [32m15[39m),
  ([32m"USB"[39m, [32m10.0[39m, [32m50[39m)
)
[36mdfProductos[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [nombre: string, precio: double ... 1 more field]
[36mdfConValor[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [nombre: string, precio: double ... 2 more fields]
[36mdfResultado[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow

### Ejercicio 2: Consultas SQL
Registra el DataFrame de productos anterior como una vista temporal y realiza una consulta SQL que devuelva el precio medio de los productos.

In [9]:
// 1. Registro el DataFrame de productos como una vista temporal
dfProductos.createOrReplaceTempView("productos")

// 2. Ejecuto la consulta SQL para calcular el precio medio
val dfMedia = spark.sql("""
  SELECT AVG(precio) AS precio_medio
  FROM productos
""")

// 3. Muestro el resultado
dfMedia.show()


+------------+
|precio_medio|
+------------+
|       316.0|
+------------+



[36mdfMedia[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [precio_medio: double]

## 5. Windows functions: ranking por cicudad

Objetivo: Tener un ejemplo de uso de de la función windows



In [12]:
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"rol").orderBy($"edad".desc)

df.withColumn("rank_edad_rol", dense_rank().over(w))
  .orderBy($"rol", $"rank_edad_rol")
  .show()


+-------+----+--------------+-------------+
| nombre|edad|           rol|rank_edad_rol|
+-------+----+--------------+-------------+
|Charlie|  23|  Data Analyst|            1|
|  David|  42| Data Engineer|            1|
|  Alice|  28| Data Engineer|            2|
|    Bob|  35|Data Scientist|            1|
+-------+----+--------------+-------------+



[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[36mw[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@262f15ab

## 6. UDF: Ejemplo de uso de UDF

UDF signfica función definida por el usuario.
Uso: Se suele usarla dentro de Spark SQL o DataFrames cuando las funciones estándar de Spark no son suficientes.


In [14]:
// función que segun el palametro de entrada 'edad' lo evalua y duelve una cadena
val aRangoEdad = udf((edad: Int) =>
  if (edad < 25) "joven"
  else if (edad < 35) "adulto"
  else "senior"
)


//Creo una columna en el dataframe 'rango_edad' y le calculo los valores haciendo uso de la función que acabo de definir.
df.withColumn("rango_edad", aRangoEdad($"edad"))
  .groupBy("rango_edad")
  .count()
  .orderBy(desc("count"))
  .show()


+----------+-----+
|rango_edad|count|
+----------+-----+
|    senior|    2|
|    adulto|    1|
|     joven|    1|
+----------+-----+



[36maRangoEdad[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mUserDefinedFunction[39m = [33mSparkUserDefinedFunction[39m(
  f = ammonite.$sess.cmd14$Helper$$Lambda$6430/0x00007efd9d7251a0@6f0dd0a6,
  dataType = StringType,
  inputEncoders = [33mArraySeq[39m([33mSome[39m(value = PrimitiveIntEncoder)),
  outputEncoder = [33mSome[39m(value = StringEncoder),
  givenName = [32mNone[39m,
  nullable = [32mtrue[39m,
  deterministic = [32mtrue[39m
)

## 7. Lectura/escritura: Parquet

Ejemplo local (crea carpeta en el proyecto).


In [15]:
val outPath = "output/personas_parquet"

// Escritura
df.write.mode("overwrite").parquet(outPath)

// Lectura
val df2 = spark.read.parquet(outPath)
df2.show()



+-------+----+--------------+
| nombre|edad|           rol|
+-------+----+--------------+
|Charlie|  23|  Data Analyst|
|  Alice|  28| Data Engineer|
|  David|  42| Data Engineer|
|    Bob|  35|Data Scientist|
+-------+----+--------------+



[36moutPath[39m: [32mString[39m = [32m"output/personas_parquet"[39m
[36mdf2[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [nombre: string, edad: int ... 1 more field]

In [11]:
spark.stop()