## 1. Introducción a las Variables Compartidas en Spark

En Spark, cuando se ejecutan operaciones distribuidas en un clúster, a menudo es necesario compartir variables entre los nodos del clúster de manera eficiente. Spark ofrece dos tipos de variables compartidas: Broadcast Variables y Accumulators.

## 2. Broadcast Variables

**Concepto**

Las **Broadcast Variables** son variables que se pueden distribuir a todos los nodos del clúster de manera eficiente. En lugar de enviar una copia de la variable a cada tarea, Spark distribuye la variable una sola vez y la reutiliza en cada nodo. Esto mejora la eficiencia y reduce la sobrecarga de red.

**Cuándo usarlo**:

- Cuando tienes datos de solo lectura (no modificables) que deben ser accesibles en múltiples nodos.
- Ideal para grandes colecciones de datos que no cambian durante el procesamiento.
  
**Cómo funciona**:
  
- Spark optimiza el envío de grandes conjuntos de datos a los trabajadores, almacenándolos en la memoria de cada nodo.


### a) Usando Broadcast con RDD:
Cuando usas RDDs en Spark, puedes aplicar la variable broadcast a las transformaciones que se realicen en el RDD, como map(), filter(), flatMap(), entre otras. La ventaja de utilizar el broadcast en este contexto es que puedes compartir grandes cantidades de datos a través de todas las particiones del RDD sin duplicarlos.

**Ejemplo con RDD**:

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast

// Crear la sesión de Spark
val spark = SparkSession.builder().appName("BroadcastRDDExample").master("local[*]").getOrCreate()

// Crear un mapa pequeño que se transmitirá a todos los nodos
val productos = Map(1 -> "Laptop", 2 -> "Teléfono", 3 -> "Tablet")
val productosBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(productos)

// Crear un RDD a partir de una Seq
val data = Seq((1, 10), (2, 20), (3, 30))
val rdd = spark.sparkContext.parallelize(data)

// Usar el broadcast en el RDD
val resultado = rdd.map(x => {
  val config = productosBroadcast.value  // Acceder a la variable broadcast
  s"Producto: ${config.getOrElse(x._1, "Desconocido")}, Cantidad: ${x._2}"
})

// Recoger e imprimir el resultado
resultado.collect().foreach(println)


#### **Explicación**:

**1. Iniciar SparkSession**:

In [None]:
val spark = SparkSession.builder().appName("BroadcastRDDExample").master("local[*]").getOrCreate()

- Aquí estamos creando una sesión de Spark. Es como la puerta de entrada para trabajar con Spark en Scala.

- local[*] significa que vamos todos los núcleos de CPU para ejecutar el código en modo local.

**2. Definir los datos a transmitir (broadcast)**:

In [None]:
val productos = Map(1 -> "Laptop", 2 -> "Teléfono", 3 -> "Tablet")

- productos es un mapa con datos (en este caso, productos electrónicos) que queremos compartir con todos los nodos de Spark.

**3. Crear la variable de broadcast**:

In [None]:
val productosBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(productos)

- Aquí usamos broadcast para distribuir productos a todos los nodos. Ahora todos los nodos pueden acceder a esta información sin tener que enviarla repetidamente.

**4. Crear un RDD (Resilient Distributed Dataset)**:

In [None]:
// Crear un RDD a partir de una Seq
val data = Seq((1, 10), (2, 20), (3, 30))
val rdd = spark.sparkContext.parallelize(data)

- Creamos un RDD a partir de una secuencia de números. Piensa en un RDD como una colección de datos distribuidos que Spark puede procesar en paralelo.

**5. Uso de la variable de broadcast dentro de un map**:

In [None]:
// Usar el broadcast en el RDD
val resultado = rdd.map(x => {
  val config = productosBroadcast.value  // Acceder a la variable broadcast
  s"Producto: ${config.getOrElse(x._1, "Desconocido")}, Cantidad: ${x._2}"
})

- Aquí estamos procesando cada elemento del RDD (x). Usamos productosBroadcast.value para acceder a los datos que hemos distribuido a todos los nodos.
- Esto nos permite realizar operaciones con los datos de productos en cada parte del RDD.

**6. Mostrar el resultado**:

In [None]:
// Recoger e imprimir el resultado
resultado.collect().foreach(println)

- Después de procesar los datos, los imprimimos.

In [None]:
- Explicación de s"Producto: ${config.getOrElse(x._1, "Desconocido")}, Cantidad: ${x._2}"

- config.getOrElse(x._1, "Desconocido"):
- config: Es la variable que almacena la información de productosBroadcast
- getOrElse(x._1, "Desconocido"): Este es un método de los mapas en Scala que busca un valor por su clave (x._1 en este caso)
- x._1 representa el ID del producto. Por ejemplo, si x es (1, 10), entonces x._1 sería 1.
- x._2 es la cantidad de productos, es decir, el segundo valor en el par. Por ejemplo, si x es (1, 10), entonces x._2 sería 10

### b) Usando Broadcast con DataFrame:

En el caso de los DataFrames, es bastante común realizar transformaciones más complejas usando las funciones de Spark SQL (como select(), filter(), join(), etc.). Al igual que en los RDDs, broadcast en DataFrames es útil para evitar la duplicación de datos pequeños (como tablas de referencia) al realizar transformaciones.

**Ejemplo con DataFrame**:

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.functions._

// Crear la sesión de Spark
val spark = SparkSession.builder().appName("BroadcastDataFrameExample").master("local[*]").getOrCreate()

// Crear un mapa pequeño que se transmitirá a todos los nodos
val productos = Map(1 -> "Laptop", 2 -> "Teléfono", 3 -> "Tablet")
val productosBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(productos)

// Crear un DataFrame de ejemplo
val data = Seq((1, 10), (2, 20), (3, 30))
val df = spark.createDataFrame(data).toDF("id_producto", "cantidad")

// Crear un UDF para acceder a la variable broadcast
val getProductoNombre = udf((id: Int) => productosBroadcast.value.getOrElse(id, "Desconocido"))

// Aplicar el UDF al DataFrame
val resultado = df.withColumn("nombre_producto", getProductoNombre($"id_producto"))
resultado.show()


#### Explicación:
- En este ejemplo, creamos un DataFrame a partir de una Seq.
- Usamos un UDF (User Defined Function) para acceder a los valores del broadcast. El UDF se aplica a una columna del DataFrame para agregar la columna nombre_producto que se obtiene a partir del mapa productosBroadcast.
- La variable broadcast se distribuye a todas las particiones del clúster, mejorando la eficiencia al evitar duplicar el mapa en cada tarea.

### Diferencias principales entre RDD y DataFrame al usar broadcast:

| Característica                  | RDD                                                                 | DataFrame                                                                 |
|---------------------------------|---------------------------------------------------------------------|---------------------------------------------------------------------------|
| **Abstracción**                 | Menos optimizado; operaciones como `map()`, `filter()`, etc.        | Más optimizado para trabajar con columnas y operaciones SQL               |
| **Uso de Broadcast**            | `broadcast` se aplica directamente sobre los datos distribuidos (RDD)| `broadcast` se puede usar dentro de transformaciones como `withColumn` y funciones SQL |
| **Funciones de optimización**   | Menos optimización interna                                          | Spark optimiza internamente el plan de ejecución de DataFrames (Catalizador) |
| **Flexibilidad**                | Más flexible, pero menos eficiente para grandes datasets            | Más eficiente para trabajar con grandes datasets y operaciones complejas  |
| **Interoperabilidad con SQL**   | No tiene integración nativa con SQL                                 | Totalmente integrado con SQL y funciones de agregación                    |

#### ¿Cuándo usar cada uno?
- **RDD**: Si necesitas mayor control sobre las transformaciones y tu trabajo implica operaciones más manuales, puedes preferir trabajar con RDDs. Sin embargo, cuando trabajas con RDDs, generalmente no aprovechas al máximo las optimizaciones que ofrece Spark para DataFrames.

- **DataFrame**: Si tu trabajo implica grandes conjuntos de datos con operaciones SQL o transformaciones complejas (como agregaciones, joins, etc.), lo más eficiente es usar DataFrames, ya que Spark realiza optimizaciones automáticas (como la ejecución de planes de consulta optimizados a través del Catalyst Optimizer).

- **RDD**: Se usa en un nivel más bajo, sin optimizaciones SQL, y es más adecuado cuando necesitas un control más directo sobre las transformaciones y acciones.

- **DataFrame**: Es más adecuado para trabajar con datos estructurados y aprovechar optimizaciones y funciones SQL, haciendo el código más conciso y eficiente.
  
- **Broadcast**: funciona en ambos casos, pero la implementación varía ligeramente dependiendo de si estás trabajando con RDDs o DataFrames. En DataFrames, puedes usar funciones SQL y UDFs, mientras que en RDDs trabajas con transformaciones como map() o filter().

###  Cuándo Broadcast NO es útil en Big Data
#### Si los datos a compartir son muy grandes:

  - Broadcast es eficiente cuando el mapa o la tabla que se comparte es pequeña (unos MB).
    
  - Si intentas compartir un mapa con millones de registros, puedes agotar la memoria de los ejecutores y perder rendimiento.
    
    
#### Si ya puedes hacer un JOIN eficiente en Spark:

  - En lugar de usar Broadcast, muchas veces es mejor hacer un JOIN con una tabla bien particionada y optimizada.  

## 3. Accumulators

**Concepto**

Los Accumulators son variables que pueden ser solo agregadas, no leídas. Están diseñados para realizar sumas, conteos u otras operaciones de agregación de manera eficiente en un entorno distribuido. Los acumuladores pueden ser utilizados para hacer seguimientos de información a medida que las tareas se distribuyen.

**Cuándo usarlo**:

Cuando necesitas agregar datos de manera global (por ejemplo, contadores de errores, sumas acumulativas) a través de las tareas distribuidas.

**Cómo funciona**:

Los acumuladores permiten que varias tareas modifiquen una variable de manera segura y simultánea. Sin embargo, solo el driver puede leer su valor final. Los trabajadores pueden solo sumar o agregar a los acumuladores, pero no pueden leer su valor.

### a) Usando Accumulators con RDD

In [None]:
import org.apache.spark.sql.SparkSession

// Inicializar Spark
val spark = SparkSession.builder().appName("Accumulator Example").master("local[2]").getOrCreate()

// Crear un acumulador de tipo Long
val negativeCount = spark.sparkContext.longAccumulator("Negative Count")

// Crear un RDD de ejemplo
val numbers = spark.sparkContext.parallelize(Seq(1, -2, 3, -4, 5, -6, 7))

// Realizar una operación y sumar en el acumulador
numbers.foreach(x => {
  if (x < 0) {
    negativeCount.add(1)
  }
})

// Mostrar el valor del acumulador (solo se puede leer en el driver)
println(s"Total negativos: ${negativeCount.value}"// Detener Spark


#### **Explicación**:

**1. Iniciar SparkSession**:

// Inicializar Spark
val spark = SparkSession.builder().appName("Accumulator Example").master("local[2]").getOrCreate()

**2. Crear el acumulador**:

In [None]:
val negativeCount = spark.sparkContext.longAccumulator("Negative Count")

- Creamos un acumulador llamado negativeCount que solo podrá contar. Aquí estamos usando un acumulador de tipo Long, que solo puede sumar valores enteros.

**3. Crear el RDD de números**:

In [None]:
val numbers = spark.sparkContext.parallelize(Seq(1, -2, 3, -4, 5, -6, 7))

- Creamos un RDD con algunos números, incluidos algunos negativos.

**4. Contar números negativos utilizando el acumulador**:

In [None]:
numbers.foreach(x => {
  if (x < 0) {
    negativeCount.add(1) //add(1): En este caso, se está utilizando el método add(1) para incrementar el valor del acumulador en 1 cada vez que se 
  }                      //encuentra un número negativo.
})


- En este paso, estamos iterando a través de cada número del RDD. Si el número es negativo (x < 0), agregamos 1 al acumulador (negativeCount.add(1)).
- El acumulador se actualiza solo, y los trabajadores no pueden leer su valor, solo pueden sumarle.

**5. Leer el valor del acumulador**:

In [None]:
println(s"Total negativos: ${negativeCount.value}")

- Una vez que todas las tareas están completas, leemos el valor del acumulador. Esto solo se puede hacer en el driver (el nodo que controla todo el proceso).


### b) Usando Accumulators con Dataframe

La diferencia clave es que con DataFrames, trabajamos con columnas y no con los elementos individuales de una secuencia. Aun así, el enfoque con el acumulador no cambia demasiado; solo que en lugar de recorrer el RDD, lo haremos con una acción foreach en un DataFrame.

In [None]:
import org.apache.spark.sql.SparkSession

// Inicializar Spark
val spark = SparkSession.builder().appName("Accumulator Example").master("local[2]").getOrCreate()

// Crear un acumulador de tipo Long
val negativeCount = spark.sparkContext.longAccumulator("Negative Count")

// Crear un DataFrame de ejemplo
import spark.implicits._  // Para permitir la conversión de un Seq a DataFrame
val numbersDF = Seq(1, -2, 3, -4, 5, -6, 7).toDF("number")

// Realizar una operación y sumar en el acumulador
numbersDF.foreach(row => {
  val x = row.getAs[Int]("number")
  if (x < 0) {
    negativeCount.add(1)
  }
})

// Mostrar el valor del acumulador (solo se puede leer en el driver)
println(s"Total negativos: ${negativeCount.value}")

// Detener Spark
spark.stop()

**Explicación**

**1. Inicializar Spark**: Usamos SparkSession para trabajar con DataFrames.

In [None]:
val spark = SparkSession.builder().appName("Accumulator Example").master("local[2]").getOrCreate()

**2. Crear un acumulador**: Creamos un acumulador de tipo Long para contar los números negativos

In [None]:
val negativeCount = spark.sparkContext.longAccumulator("Negative Count")

**3. Crear un DataFrame de ejemplo**: En lugar de usar un RDD, creamos un DataFrame a partir de una secuencia de números.

In [None]:
val numbersDF = Seq(1, -2, 3, -4, 5, -6, 7).toDF("number")

**4. Iterar a través del DataFrame**: Usamos foreach para iterar a través de las filas del DataFrame. Accedemos a cada valor de la columna "number" y verificamos si es negativo.

In [None]:
numbersDF.foreach(row => {
  val x = row.getAs[Int]("number")
  if (x < 0) {
    negativeCount.add(1)
  }
})


- row.getAs[Int]("number") extrae el valor de la columna "number" como un tipo Int para trabajar con él.

**5. Mostrar el valor del acumulador**: Al final, mostramos el valor del acumulador que contiene la cantidad de números negativos encontrados.

In [None]:
println(s"Total negativos: ${negativeCount.value}")

### Cuándo Accumulators NO son útiles en Big Data

#### Si necesitas valores exactos:

  - Los acumuladores no garantizan precisión en entornos distribuidos.

  - Spark puede fallar en una tarea y volver a ejecutarla, incrementando el acumulador más de una vez.

#### Si puedes usar groupBy() en su lugar:

  - En lugar de un Accumulator, muchas veces un groupBy().agg() es más eficiente y preciso.