# Primeros pasos con Spark

## SparkContext vs SparkSession

SparkContext es el punto de entrada a Spark desde las versiones 1.x y se utiliza para crear de forma programativa RDD, acumuladores y variables broadcast en el clúster. Desde Spark 2.0, la mayoría de funcionalidades (métodos) disponibles en SparkContext también los están en SparkSession. Su objeto sc está disponible en el spark-shell y se puede crear de forma programativa mediante la clase SparkContext.

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

SparkSession se introdujo en la versión 2.0 y es el punto de entrada para crear RDD, DataFrames y DataSets. El objeto spark se encuentra disponible por defecto en el spark-shell y se puede crear de forma programativa mediante el patrón builder de SparkSession.

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 

Además, desde una sesión de Spark podemos obtener un contexto a través de la propiedad sparkContext:

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Hola Spark

Lo primero que debemos hacer siempre es conectarnos a la sesión de Spark, el cual le indica a Spark como acceder al clúster. Si utilizamos la imagen de Docker, debemos obtener siempre la sesión a partir de la clase SparkSession:

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() # SparkSession de forma programativa
spark

In [0]:
sc = spark.sparkContext                    # SparkContext a partir de la sesión

In [0]:
# Suma de los 100 primeros números
rdd = sc.parallelize(range(100 + 1))
rdd.sum()

Out[23]: 5050

En cambio, si utilizamos la instalación de PySpark (o la solución de Databricks) que tenemos en la máquina virtual, directamente podemos acceder a la instancia de SparkSession a través del objeto global spark:

In [0]:
sc = spark.sparkContext     # spark es una instancia de la clase SparkSession

In [0]:
rdd = sc.parallelize(range(100 + 1))
rdd.sum()

Out[25]: 5050

## Arquitectura

Ya hemos comentado que Spark es un sistema distribuido diseñado para procesar grandes volúmenes de datos de forma rápida y eficiente. Este sistema normalmente se despliega en un conjunto de máquinas que se conocen como un clúster Spark, pudiendo estar compuesta de unas pocas máquinas o miles de ellas. Según el FAQ de Spark, el clúster más grande de Spark contiene más de 8000 nodos.

A la hora del despliegue, se utiliza un sistema de gestión de recursos como el gestor propio de Spark (conocido como Spark Standalone), Apache Mesos, Kubernetes o YARN para gestionar de forma inteligente y eficiente el clúster.

Los dos componentes principales del clúster son:

- el gestor del clúster: nodo maestro que sabe donde se localizan los trabajadores, cuanta memoria disponen y el número de cores CPU de cada nodo. Su mayor responsabilidad es orquestar el trabajo asignándolo a los diferentes nodos.

- los nodos trabajadores (workers): cada nodo ofrece recursos (memoria, CPU, etc...) al gestor del clúster y realiza las tareas que se le asignen.

### Aplicaciones Spark

Una aplicación Spark se compone de dos partes:

1. La lógica de procesamiento de los datos, la cual realizamos mediante alguna de las API que ofrece Spark (Java, Scala, Python, etc...), desde algo sencillo que realice una ETL sobre los datos a problemas más complejos que requieran múltiples iteraciones y tarden varias horas como entrenar un modelo de machine learning.

2. Driver: es el coordinador central encargado de interactuar con el clúster Spark y averiguar qué máquinas deben ejecutar la lógica de procesamiento. Para cada una de esas máquinas, el driver realiza una petición al clúster para lanzar un proceso conocido como ejecutor (executor). Además, el driver Spark es responsable de gestionar y distribuir las tareas a cada ejecutor, y si es necesario, recoger y fusionar los datos resultantes para presentarlos al usuario. Estas tareas se realizan a través de la SparkSession.

Cada ejecutor es un proceso JVM (Java Virtual Machine) dedicado para una aplicación Spark específica. Un ejecutor vivirá tanto como dure la aplicación Spark, lo cual puede ser segundos, minutos o días, dependiendo de la complejidad de la aplicación. Conviene destacar que los ejecutores son elementos aislados que no se comparten entre aplicaciones Spark, por lo que la única manera de compartir información entre diferente ejecutores es mediante un sistema de almacenamiento externo como HDFS o S3.

![](https://community.cloud.databricks.com/files/shared_uploads/notebook_1/1.jpg)

Así pues, Spark utiliza una arquitectura maestro/esclavo, donde el driver es el maestro, y los ejecutores los esclavos. Cada uno de estos componentes se ejecutan como un proceso independiente en el clúster Spark. Por lo tanto, una aplicación Spark se compone de un driver y múltiples ejecutores. Cada ejecutor realiza lo que se le pide en forma de tareas, ejecutando cada una de ellas en un núcleo CPU separado. Así es como el procesamiento paralelo acelera el tratamiento de los datos. Además, cada ejecutor, bajo petición de la lógica de la aplicación, se responsabiliza de cachear un fragmento de los datos en memoria y/o disco.

Al lanzar una aplicación Spark, podemos indicar el número de ejecutores que necesita la aplicación, así como la cantidad de memoria y número de núcleos que debería tener cada ejecutor.

![2](files/2.png)

### Job, Stage y Task

Cuando creamos una aplicación Spark, por debajo, se distinguen los siguientes elementos:

- Job (trabajo): computación paralela compuesta de múltiples tareas que se crean tras una acción de Spark (save, collect, etc...). Al codificar nuestro código mediante PySpark, el driver convierte la aplicación Spark en uno o más jobs, y a continuación, estos jobs los transforma en un DAG (grafo). Este grafo, en esencia, es el plan de ejecución, donde cada elemento dentro del DAG puede implicar una o varias stages (escenas).

- Stage (escena): cada job se divide en pequeños conjuntos de tareas que forman un escenario. Como parte del grafo, las stages se crean a partir de si las operaciones se pueden realizar de forma paralela o de forma secuencial. Como no todas las operaciones pueden realizarse en una única stage, en ocasiones se dividen en varias, normalmente debido a los límites computacionales de los diferentes ejecutores.

- Task (tarea): unidad de trabajo más pequeña que se envía a los ejecutores Spark. Cada escenario se compone de varias tareas. Cada una de las tareas se asigna a un único núcleo y trabaja con una única partición de los datos. Por ello, un ejecutor con 16 núcleos puede tener 16 o más tareas trabajando en 16 o más particiones en paralelo.

![3](files/3.png)

### DataFrame

La principal abstracción de los datos en Spark es el Dataset. Se pueden crear desde las fuentes de entrada de Hadoop (como ficheros que provienen de HDFS o S3) o mediante transformaciones de otros Datasets. Dado el cariz de Python, no necesitamos que los Dataset estén fuertemente tipados, por eso, todos los Dataset que usemos serán Dataset[Row] (si trabajásemos mediante Java o Scala sí deberíamos indicar el tipo de sus datos), y por consistencia con el concepto de Pandas y R, los llamaremos DataFrame.

Por ejemplo, veamos cómo podemos crear un DataFrame a partir de un fichero de texto:

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() 

quijoteTxt = spark.read.text("dbfs:/FileStore/el_quijote.txt")
quijoteTxt.count()  # número de filas del DataFrame - 2186


Out[13]: 2186

#### Inicia una sesión de Spark:

Usamos `SparkSession.builder.getOrCreate()` para crear o recuperar una sesión de Spark, que es esencial para trabajar con DataFrames y RDDs en PySpark. Esta sesión te permite interactuar con el clúster de Spark.
Lee un archivo de texto desde DBFS:

Utilizamos `spark.read.text()` para leer un archivo de texto. En este caso, estamos leyendo el archivo el_quijote.txt que está almacenado en el sistema de archivos DBFS (Databricks File System). Este archivo se convierte en un DataFrame, donde cada línea del archivo será una fila en el DataFrame.
Cuenta el número de líneas:

Luego, usamos el método `.count()` para contar cuántas líneas (filas) tiene el archivo en el DataFrame. En este caso, el archivo el_quijote.txt tiene 2186 líneas, por lo que el resultado de count() será 2186.
#### En resumen:
Este código simplemente lee un archivo de texto desde DBFS, lo convierte en un DataFrame, y cuenta cuántas filas (líneas) tiene ese archivo. Es una forma rápida de verificar el tamaño de un archivo de texto en términos de número de líneas y ver cómo se puede manipular con Spark.




In [0]:
quijoteTxt.first()  # primera fila - Row(value='DON QUIJOTE DE LA MANCHA')

Out[14]: Row(value='DON QUIJOTE DE LA MANCHA')

In [0]:
# Transformamos un DataFrame en otro nuevo
lineasConQuijote = quijoteTxt.filter(quijoteTxt.value.contains("Quijote"))  # DataFrame con las líneas que contiene la palabra Quijote
lineasConQuijote.count()  # cantidad de líneas con la palabra Quijote - 584

Out[15]: 584

In [0]:
# Las transformaciones se pueden encadenar
quijoteTxt.filter(quijoteTxt.value.contains("Quijote")).count()     # idem - 584

Out[16]: 584

Estudiaremos los DataFrame en profundidad en las próximas clases

### Spark UI (Jupyter Lab)

Si accedemos a la dirección 'http://127.0.0.1:4040/', veremos un interfaz gráfico donde podemos monitorizar y analizar el código Spark ejecutado. La barra superior muestra un menú con las opciones para visualizar los jobs, stages, el almacenamiento, el entorno y sus variables de configuración, y finalmente los ejecutores:

![4](files/4.png)

![5](files/5.png)

### Spark UI en Databricks

Para acceder a la herramienta de monitorización en Databricks, una vez creado un clúster, en la opción Compute/Calcular podremos seleccionar el clúster creado y en la pestaña IU de Spark acceder al mismo interfaz gráfico:

![](files/6.png)

![](files/image.png)

# Actividad

En otro notebook hacer los ejemplos de código anteriores utilizando Scala:
- Los ejemplos con Scala estan en el `notebook_scala`

# RDD

Un RDD (Resilient Distributed Datasets) es una estructura de datos que abstrae los datos para su procesamiento en paralelo.

Antes de Spark 2.0, los RDD eran el interfaz principal para interactuar con los datos.

Se trata de una colección de elementos tolerantes a fallos que son immutables (una vez creados, no se pueden modificar) y diseñados para su procesamiento distribuido. Cada conjunto de datos en los RDD se divide en particiones lógicas, que se pueden calcular en diferentes nodos del clúster.

Hay dos formas de crear un RDD:

- Paralelizando una colección ya existente en nuestra aplicación Spark.

- Referenciando un dataset de un sistema externo como HDFS, HBase, etc...

Sobre los RDD se pueden realizar dos tipos de operaciones:

- Acción: devuelven un valor tras ejecutar una computación sobre el conjunto de datos.

- Transformación: es una operación perezosa (lazy) que crea un nuevo conjunto de datos a partir de otro RDD/Dataset, tras realizar un filtrado, join, etc...

> **¿ Esta obsoleto RDD?**
> 
> Antes de la versión 2.0, el principal interfaz para programar en Spark eran los RDD. Tras la versión 2.0, fueron sustituidos por los Dataset, que son RDD fuertemente tipados que además están optimizados a bajo nivel. Aunque el interfaz RDD todavía tiene soporte, sin embargo, se recomienda el uso de los Dataset por su mejor rendimiento.
> Contexto y evolución
RDD (Resilient Distributed Dataset): Es la abstracción fundamental que Apache Spark introdujo inicialmente. Los RDDs son colecciones distribuidas de objetos que pueden ser procesadas en paralelo. Proporcionan una API de bajo nivel y permiten un control detallado sobre las transformaciones y acciones en el procesamiento de datos distribuidos.

> DataFrame API: Introducida en Spark 1.3, la API de DataFrame se basa en el concepto de tablas de bases de datos. Ofrece una capa de abstracción más alta que los RDDs y permite realizar operaciones de manera declarativa. Los DataFrames también ofrecen optimizaciones automáticas gracias al Catalyst Optimizer y el uso de Tungsten para la gestión de memoria, lo que hace que el procesamiento sea más eficiente.

>Dataset API: Introducida en Spark 1.6 y diseñada para el uso en Scala y Java, combina las ventajas de los RDDs (tipos de datos específicos y seguridad de tipo) con las optimizaciones de los DataFrames. La API de Dataset es más fuerte en cuanto a tipo, lo que mejora la seguridad de tipo y reduce errores en tiempo de compilación.
>¿Por qué se prefieren DataFrames y Datasets sobre RDDs?
>Optimización: Las APIs de DataFrame y Dataset son optimizadas automáticamente por el Catalyst Optimizer, lo que permite ejecutar código de forma mucho más eficiente.

> Facilidad de uso: DataFrames y Datasets proporcionan una API de más alto nivel, que es más intuitiva y se asemeja al lenguaje SQL, haciéndolas más accesibles para usuarios no especializados en programación funcional.

> Mejor manejo de memoria: La gestión de memoria en DataFrames y Datasets está mejorada por el proyecto Tungsten, que reduce el consumo de memoria y mejora el rendimiento general de Spark.

> Compatibilidad con SQL: Los DataFrames y Datasets están diseñados para trabajar de manera fluida con SQL, lo cual facilita el análisis y manipulación de datos estructurados.

> ¿Cuándo usar RDDs?
> Aunque los DataFrames y Datasets son generalmente recomendados, los RDDs aún son útiles en ciertas situaciones:

> Procesamiento de datos no estructurados o semiestructurados: Cuando necesitas realizar operaciones complejas en datos que no pueden ser representados fácilmente como DataFrames o Datasets, los RDDs pueden ser una opción.
Transformaciones personalizadas: Si necesitas un control detallado sobre las transformaciones o si las optimizaciones automáticas de Catalyst no son aplicables, RDDs te ofrecen más control.
API de bajo nivel: Los RDDs proporcionan una API de bajo nivel que puede ser útil para desarrolladores avanzados que necesitan manipular datos distribuidos de forma muy específica.

## Acciones

En Apache Spark, las acciones son operaciones que se ejecutan sobre RDDs, DataFrames o Datasets y que desencadenan el cómputo real en el clúster. Es decir, las acciones inician el procesamiento y devuelven un valor o resultado al controlador del programa o escriben datos a un almacenamiento externo.

Las acciones contrastan con las transformaciones, que son operaciones que definen cómo deben transformarse los datos pero no ejecutan inmediatamente el procesamiento. Las transformaciones son "perezosas" (lazy), mientras que las acciones desencadenan la ejecución.

A continuación vamos a revisar las acciones más comunes. Puedes consultar todas las acciones disponibles en la  [documentación oficial](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
de Spark:

> Recuerda importar el contexto:

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Parallelize

En Apache Spark, parallelize es un método utilizado para crear un RDD (Resilient Distributed Dataset) a partir de una colección de datos en el controlador del programa. Permite distribuir los datos en paralelo entre los nodos del clúster para realizar operaciones distribuidas.



Podemos crear RDD directamente desde cero sin necesidad de leer los datos desde un fichero. Para ello, a partir de un `SparkContext` podemos utilizar `parallelize` sobre una lista.

Esta acción divide una colección de elementos entre los nodos de nuestro clústers. Por ejemplo:

In [0]:
miRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])

lista  = ['Hola', 'Adiós', 'Hasta luego']
listaRDD = sc.parallelize(lista) # Creamos un RDD a partir de una lista
listaRDD4 = sc.parallelize(lista,4) # Creamos un RDD con 4 particiones

- `sc.parallelize([1,2,3,4,5,6,7,8,9]`: Crea un RDD a partir de una lista de números.
- `sc.parallelize(lista)`: Crea un RDD a partir de una lista de cadenas de texto.
- `sc.parallelize(lista, 4)`: Crea un RDD con 4 particiones a partir de la lista.

`sc.parallelize` convierte una lista local en un RDD distribuido, y el número de particiones indica cómo se dividirán los datos entre los nodos.

In [0]:
lista

Out[3]: ['Hola', 'Adiós', 'Hasta luego']

In [0]:
listaRDD

Out[4]: ParallelCollectionRDD[1] at readRDDFromInputStream at PythonRDD.scala:435

- `Out[n]`: Muestra directamente los valores de la lista, ya que es un RDD creado a partir de una lista simple de Python.
- `Out[n]`: Muestra el objeto `RDD` en sí mismo (representación interna de un RDD en Spark), porque no se ha realizado ninguna acción sobre él para recuperar los datos almacenados en el RDD.

In [0]:
listaRDD4

Out[5]: ParallelCollectionRDD[2] at readRDDFromInputStream at PythonRDD.scala:435

La salida `ParallelCollectionRDD[n] at readRDDFromInputStream` es simplemente la forma en que Spark te muestra un RDD cuando no has realizado una acción sobre él. Para obtener los datos de un RDD, debes usar acciones como `collect()`, `take()`, o `count()`, dependiendo de lo que necesites hacer.

### Take y Sample

Son métodos utilizados en RDDs, DataFrames y Datasets para interactuar con los datos. Ambos sirven para trabajar con subconjuntos, pero tienen usos y características diferentes. Una vez tenemos un RDD, y sobre él queremos recuperar un número determinado de resultados, de forma similar a limit en SQL, tenemos la acción `take`:

In [0]:
miRDD.take(3)       # [1, 2, 3]]

Out[6]: [1, 2, 3]

- El comando `miRDD.take(3)` obtiene los primeros 3 elementos del RDD miRDD y te los devuelve como una lista.
- Salida: `[1, 2, 3]` porque esos son los primeros 3 elementos de la colección original de números.

In [0]:
listaRDD.take(2)    # ['Hola', 'Adiós']

Out[7]: ['Hola', 'Adiós']

Otra opción es utilizar `sample` para obtener una muestra de los datos, aunque en este caso no es una acción sino una transformación, ya que va a obtener información posiblemente de varias particiones, teniendo que barajar los datos (**shuffle**):

In [0]:
miRDDmuestra = miRDD.sample(False, 0.5)

- `sample(False, 0.5)` selecciona aleatoriamente el 50% de los elementos de miRDD sin reemplazo.
- El nuevo RDD (`miRDDmuestra`) contiene esta muestra aleatoria.

In [0]:
miRDDmuestra.collect()  # [2, 4, 6, 7, 8, 9] / [1, 2, 3, 4, 6] / [5, 8, 9]

Out[9]: [1, 5, 6, 9]

- `miRDD.sample(False, 0.5)` crea un RDD con una muestra aleatoria de aproximadamente el 50% de los elementos de `miRDD` sin reemplazo.
- `miRDDmuestra.collect()` obtiene todos los elementos de esa muestra y los devuelve como una lista de Python.
- La salida `[1, 5, 6, 9]` es una muestra aleatoria de los elementos de miRDD. El conjunto exacto de elementos varía con cada ejecución, debido a la naturaleza aleatoria del muestreo.

Esta transformación recibe varios parámetros:

- `withReplacement`: booleano para indicar si queremos elementos repetidos
- `fraction` : valor entre 0 y 1 que expresa la probabilidad de elegir cada elemento
- opcionalmente se le puede pasar un tercer valor con la semilla

Así pues, en el ejemplo anterior, cada llamada a sample ha generado un RDD diferente, sin valores repetidos, pero con un tamaño de RDD variable.

**Muestra Estratificada**  

Si necesitamos que nuestra muestra esté estratificada para que los datos no estén sesgados podemos utilizar `sampleByKey`:

¿Qué es una muestra estratificada en este contexto?

- Muestreo estratificado asegura que cada grupo (estrato) de datos esté representado adecuadamente según la proporción que se desea en la muestra.
- `sampleByKey()` realiza un muestreo estratificado de un RDD de tuplas, donde la clave se usa para definir los estratos y se aplica un muestreo independiente a cada estrato según las probabilidades especificadas.
- Esto es útil cuando se quiere asegurar que las diferentes categorías de datos estén representadas proporcionalmente en la muestra, lo que ayuda a evitar sesgos.

In [0]:
datos = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')])

# indicamos la probabilidad para cada clave
fracciones = {1: 0.1, 2: 0.6, 3: 0.3}

approxSample = datos.sampleByKey(False, fracciones)     # [(2, 'c'), (2, 'd')] el resultado puede variar
print(approxSample.collect())

[(2, 'c'), (2, 'e')]


#### Probabilidad de selección para cada clave:

- **Para la clave 1**: La probabilidad de selección es 0.1. Esto significa que aproximadamente el 10% de los elementos con la clave 1 serán seleccionados. Dado que hay 2 elementos con la clave 1, el 10% de 2 es 0.2, lo que sugiere que es probable que se seleccione 1 elemento de esa clave, pero cuál de esos elementos es seleccionado es aleatorio.

- **Para la clave 2**: La probabilidad de selección es 0.6. El 60% de 3 elementos son 1.8, lo que significa que probablemente se seleccionarán 2 de los 3 elementos, pero qué elementos son seleccionados es aleatorio.

- **Para la clave 3**: La probabilidad de selección es 0.3. Como hay solo un elemento con la clave 3, el 30% de ese único elemento hace que haya una alta probabilidad de que se seleccione ese elemento.


Para obtener una muestra mediante una acción, tenemos la opción `takeSample` que funciona de forma similar pero sin hacer `shuffle` 
y devuelve una lista:

In [0]:
miRDDmuestraT = miRDD.takeSample(False, 5)
print(miRDDmuestraT)  # [1, 8, 9, 7, 2] el resultado puede variar

[1, 9, 7, 6, 8]


El primer parámetro vuelve a indicar si hay repetidos, pero el segundo fija la cantidad de elementos a devolver.

Por último, mediante `top` obtenemos los primeros elementos una vez ordenado el RDD:

In [0]:
miRDD.top(3)    # [9, 8, 7]

Out[16]: [9, 8, 7]

De forma similar, tenemos `takeOrdered` que recupera la cantidad de registros necesarios pero ordenados ascendentemente (al contrario que `top`), con la opción de ordenarlos descendentemente (similar a `top`):

In [0]:
miRDD.takeOrdered(3)    # [1, 2, 3]

Out[17]: [1, 2, 3]

In [0]:
miRDD.takeOrdered(3, lambda x: -x)    # [9, 8, 7]

Out[18]: [9, 8, 7]

### Explicación de las funciones lambda en Python

Las **funciones lambda** en Python son funciones anónimas que se pueden definir en una sola línea. Su sintaxis es:

`lambda argumentos: expresión`

#### Características:
- No requieren nombre (son anónimas).
- Pueden tener múltiples argumentos, pero solo una expresión.
- Se utilizan principalmente para funciones simples y como argumentos en otras funciones como `map()`, `filter()`, `sorted()`, entre otras.

#### Relación con el código proporcionado:

En el código:

`miRDD.takeOrdered(3, lambda x: -x)`

La función `lambda` toma un elemento `x` y devuelve su valor negativo (`-x`). Esto permite ordenar los elementos de manera descendente, seleccionando los tres más grandes. La función `takeOrdered()` luego toma los primeros tres elementos después de ordenarlos de mayor a menor.

#### Resumen:
Las funciones lambda son funciones simples y anónimas que se definen en una sola línea y se usan principalmente cuando se necesitan funciones breves y rápidas, como argumentos en otras funciones.

### Ejemplos básicos de funciones lambda en Python

1. **Suma de dos números**:
   Una función lambda que recibe dos números y devuelve su suma:
  ```python
  suma = lambda a, b: a + b
  ```

   En este caso, `lambda` toma los argumentos `a` y `b` y devuelve su suma `a + b`.

2. **Función para multiplicar un número por 2**:
   Una función lambda que multiplica un número por 2:

  ```python
  multiplicar_por_dos = lambda x: x * 2
  ```

   Aquí, la función toma un argumento `x` y devuelve `x * 2`.

3. **Filtrar números pares de una lista**:
   Usando `filter()` y una función lambda para obtener solo los números pares de una lista:

  ```python
  numeros_pares = filter(lambda x: x % 2 == 0, lista)
  ```

   En este ejemplo, la lambda filtra los números de la lista que son divisibles entre 2 (números pares).

4. **Obtener el cuadrado de un número**:
   Una función lambda para calcular el cuadrado de un número:

  ```python
  cuadrado = lambda x: x ** 2
  ```

   Esta función toma un número `x` y devuelve su cuadrado (`x ** 2`).

5. **Ordenar una lista de tuplas por el segundo elemento**:
   Usando `sorted()` con una función lambda para ordenar tuplas por el segundo elemento:

  ```python
  lista_ordenada = sorted(lista_tuplas, key=lambda x: x[1])
  ```

   La lambda toma una tupla `x` y devuelve el segundo elemento (`x[1]`), que se usa para ordenar las tuplas en la lista.

#### Resumen:
Las funciones lambda en Python son útiles para realizar operaciones simples y rápidas en una sola línea, como realizar cálculos, filtrar elementos de una lista o ordenar datos de manera flexible.


> Explicar las funciones lambda de python. Añada ejemplos simples

Hay que tener cuidado si el conjunto de datos es muy grande, porque tanto `take` como `takeSample`, `takeOrdered` y `top` llevarán todos los datos a memoria.

> ¿Que alternativas tengo?

> Funciones lambda
> 
> *Spark* hace un uso extensivo de las funciones *lambda de Python* para simplificar el código.
> Una función lambda no es más que una función anónima condensada en una sola línea:

In [0]:
# par1 y par2 son los parámetros de entrada, y expresión el código de la función
lambda par1, par2: expresión

Out[19]: <function __main__.<lambda>(par1, par2)>

#### Explicación de la salida:
- `<function __main__.<lambda>(par1, par2)>`: Es una representación del objeto función lambda que acabas de definir. No ejecuta la función, solo muestra su definición.
- `__main__`: Indica que la función lambda ha sido definida en el contexto del script o del entorno interactivo principal.
- `<lambda>`: Especifica que la función es anónima (es decir, no tiene nombre) y fue creada con `lambda`.
- `(par1, par2)`: Son los parámetros que la función lambda acepta.
#### En resumen:
- La salida muestra que has creado una función lambda que toma dos parámetros (`par1` y `par2`), pero aún no la has ejecutado ni asignado a una variable. La salida simplemente muestra la referencia a la función que acabas de definir.

> Por ejemplo, si tuviéramos una función que suma dos números, podríamos hacerla en Python de estas dos maneras:

In [0]:
def suma(x,y):
    return(x + y)

sumaL = lambda x,y : x + y

res = suma(3, 4)
resL = sumaL(3, 4)

A lo largo de los ejemplos de esta sesión y las siguientes nos iremos familiarizando con su uso

### Collect

Es una acción que se utiliza para recuperar todos los elementos de un RDD, DataFrame o Dataset desde los nodos distribuidos del clúster y traerlos al controlador (la máquina que ejecuta tu aplicación Spark).

Un fallo muy posible a la hora de mostrar los datos de un RDD es utilizar `rdd.foreach(print)` o `rdd.map(print)`.

En una única máquina, esta operación generaría la salida esperada mostrando todos los elementos del RDD. Sin embargo, al trabajar en un clúster, la salida a stdout la realizarían los diferentes nodos y no el nodo principal. Así pues, para mostrar todos los elementos de un RDD/DataFrame/Dataset hemos de emplear el método collect, el cual primero mostrará los RDD del nodo principal (driver node), y luego para cada nodo del cluster mostrará sus datos.

In [0]:
rdd.collect()

Out[26]: [0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100]

> **Out of memory**
> 
> Hay que tener mucho cuidado, ya que nos podemos quedar fácilmente sin memoria, ya que `collect` se trae los datos de todos los ejecutores a un único nodo, el que ésta ejecutando el código (driver).

Si sólo necesitamos mostrar unos pocos elementos, un enfoque más seguro es utilizar `take`:

In [0]:
rdd.take(100)

Out[27]: [0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99]

### Transformaciones

Las transformaciones son operaciones aplicadas a un RDD, DataFrame o Dataset que producen un nuevo conjunto de datos. Estas operaciones son perezosas (lazy), lo que significa que no se ejecutan inmediatamente, sino que solo se registran en un plan lógico de ejecución. Las transformaciones no procesan los datos directamente hasta que se llama a una acción.

En Spark, las estructuras de datos son inmutables, de manera que una vez creadas no se pueden modificar. Para poder modificar un RDD/DataFrame hace falta realizar una transformación, siendo el modo de expresar la lógica de negocio mediante Spark.

Todas las transformaciones en Spark se evalúan de manera perezosa (lazy evaluation), de manera que los resultados no se computan inmediatamente, sino que se retrasa el cálculo hasta que el valor sea necesario. Para ello, se van almacenando los pasos necesarios y se ejecutan únicamente cuando una acción requiere devolver un resultado al driver. Este diseño facilita un mejor rendimiento (por ejemplo, imagina que tras una operación map se realiza un reduce y en vez de devolver todo el conjunto de datos tras el map, sólo le enviamos al driver el resultado de la reducción).

> Explicación: La salida `<function __main__.<lambda>(par1, par2)>` muestra que has definido una función lambda anónima en el entorno principal (`__main__`), que acepta dos parámetros (`par1` y `par2`). Esta salida no ejecuta la función, solo muestra su definición.

Así pues, las acciones provocan la evaluación de todas las transformaciones previas que se habían evaluado de forma perezosa y estaban a la espera.

Por defecto, cada transformación RDD/DataSet se puede recalcular cada vez que se ejecute una acción. Sin embargo, podemos persistir un RDD en memoria mediante los métodos persist (o cache), de manera que Spark mantendrá los datos para un posterior acceso más eficiente. También podemos persistir un RDD en disco o replicarlo en múltiples nodos.

### Tipos de transformaciones

Existen dos tipos de transformaciones, dependiendo de las dependencias entre las particiones de datos:

- Transformaciones **Narrow**: consisten en dependencias estrechas en las que cada partición de entrada contribuye a una única partición de salida.

#### Explicación:
- **Dependencia estrecha**: En este tipo de transformación, cada partición de datos de entrada se procesa de manera independiente y su resultado se envía a una única partición de salida.
- **Una partición de entrada a una partición de salida**: Cada unidad de trabajo de entrada (en una partición) genera una única unidad de trabajo de salida, sin la necesidad de redistribuir los datos entre múltiples particiones.

- Transformaciones **Wide**: consisten en dependencias anchas de manera que varias particiones de entrada contribuyen a muchas otras particiones de salida, es decir, cada partición de salida depende de diferentes particiones de entrada. Este proceso también se conoce como shuffle, ya que Spark baraja los datos entre las particiones del clúster.

#### Explicación simplificada:
- **Dependencia ancha (Wide Dependency)**: En este tipo de transformación, las particiones de entrada no se procesan de manera independiente. Los datos de varias particiones de entrada pueden combinarse para generar una salida, lo que requiere mover datos entre particiones diferentes.
- **Shuffle**: El proceso de redistribuir los datos entre particiones en el clúster. Esto es necesario cuando los datos de diferentes particiones deben ser agrupados o combinados. El shuffle es costoso en términos de tiempo y recursos, ya que implica mover grandes volúmenes de datos a través del clúster.

![7](https://community.cloud.databricks.com/files/7.png)

Con las transformaciones narrow, Spark realiza un pipeline de las dependencias, de manera que si especificamos múltiples filtros sobre DataFrames/RDD, se realizarán todos en memoria.

Esto no sucede con las transformaciones wide, ya que al realizar un shuffle los resultados se persisten en disco.

El shuffle ocurre cuando Spark necesita repartir los datos de forma que ciertas claves o valores relacionados estén en la misma partición. Esto es necesario en transformaciones como groupByKey, reduceByKey, join, etc., donde los cálculos dependen de combinar datos relacionados que podrían estar en diferentes particiones.

> Cuidado con shuffle
> Las operaciones shuffle son computacionalmente caras, ya que implican E/S en disco, serialización de datos y E/S en red. Para organizar los datos previos al shuffle, Spark genera un conjunto de tareas (tareas map para organizar los datos, y reduce para agregar los resultados).
> Internamente, el resultado de las tareas map se mantienen en memoria hasta que no caben. Entonces, se ordenan en la partición destino y se persisten en un único archivo. En la fase de reducción, las tareas leen los bloques ordenados que son relevantes.
> Las operaciones reduceByKey y aggregateByKey son de las que más memoria consumen, al tener que crear las estructuras de datos para organizar los registros en las tareas de map, y luego generar los resultados agregados en la de reduce. Si los datos no caben en memoria, Spark los lleva a disco, incurriendo en operaciones adicionales de E/S en disco y del recolector de basura.

A continuación veremos las diferentes transformaciones que podemos realizar con Spark.

### Transformaciones Narrow

Para los siguientes ejemplo, utilizaremos el siguiente fichero de `empleados.txt` que contiene para cada empleado los siguiente cinco campos:

- el nombre

- un array de ciudades donde trabaja separadas por coma

- una estructura compuesta del sexo y la edad separadas de nuevo por comas

- un array de mapas con sus destrezas y calificación

- y finalmente un array de cargos y su consiguiente array de destrezas

### Map

 Es una transformación que aplica una función específica a cada elemento de un RDD, y devuelve un nuevo conjunto de datos donde cada elemento de salida es el resultado de aplicar la función a su correspondiente elemento de entrada. Es una de las transformaciones estrechas (narrow transformations), lo que significa que cada partición de salida depende de una única partición de entrada. Esto lo hace más eficiente porque no requiere un shuffle (reorganización de datos entre particiones).

La transformación [map](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html) aplica la función recibida a cada elemento del RDD, de manera que vamos a poder añadir una nueva columna, modificar una existente, etc...

Por ejemplo, si la entrada es un RDD que contiene `[1, 2, 3, 4, 5]`, al hacer `rdd.map(lambda x: x*2)` obtendríamos un nuevo RDD cuyos valores contiene su doble, es decir, `[2, 4, 6, 8, 10]`:

In [0]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
resultRDD = rdd.map(lambda x: x*2)
resultRDD.collect()          # [2, 4, 6, 8, 10]

Out[28]: [2, 4, 6, 8, 10]

El código crea un RDD con los valores `[1, 2, 3, 4, 5]`, aplica una transformación con `map()` para multiplicar cada número por 2, y luego recopila los resultados en una lista, obteniendo `[2, 4, 6, 8, 10]`.

(No hacer los codigos relativos a Hadoop o HDFS) Mediante la función textFile podemos cargar un archivo. Supongamos que tenemos cargado en Hadoop el archivo empleados.txt:

In [0]:
rddLocal = sc.textFile("empleados.txt")
rdd = sc.textFile("hdfs://iabd-virtualbox:9000/user/iabd/datos/empleados.txt") 
rdd.count()                 # 4 - cantidad de líneas
resultRDD = rdd.map(len)    # obtenemos la cantidad de caracteres de cada línea
resultRDD.collect()         # [61, 52, 60, 50]

Si sobre el fichero de empleados, realizamos un split para separar los campos haciendo uso de map mediante una función lambda:

In [0]:
resultMap = rddLocal.map(lambda x: x.split("|"))
resultMap.collect()

Obtenemos una lista de listas por cada fila del fichero, cada lista conteniendo los cincos campos de cada empleado:


[['Michael',
  'Montreal,Toronto',
  'Male,30',
  'DB:80',
  'Product:Developer\x04Lead'],
 ['Will', 'Montreal', 'Male,35', 'Perl:85', 'Product:Lead,Test:Lead'],
 ['Shelley', 'New York', 'Female,27', 'Python:80', 'Test:Lead,COE:Architect'],
 ['Lucy', 'Vancouver', 'Female,57', 'Sales:89,HR:94', 'Sales:Lead']]

### FlatMap

Es una transformación que aplica una función a cada elemento de un RDD, y genera cero, uno o más elementos por cada entrada. A diferencia de map, que siempre genera exactamente un elemento por entrada, flatMap "aplana" los resultados en un solo conjunto de datos. Es particularmente útil cuando se necesita dividir, expandir o descomponer los elementos de un conjunto de datos en varios elementos.

La transformación `flatMap` es muy similar a la anterior, pero en vez de devolver una lista con un elemento por cada entrada, devuelve una única lista deshaciendo las colecciones en elementos individuales:

In [0]:
rdd = sc.textFile("/FileStore/empleados.txt") 
resultFM = rdd.flatMap(lambda x: x.split("|"))
resultFM.collect()
# Obtendríamos cada atributo separado y todos dentro de la misma lista

Out[29]: ['Michael',
 'Montreal,Toronto',
 'Male,30',
 'DB:80',
 'Product:Developer\x04Lead',
 'Will',
 'Montreal',
 'Male,35',
 'Perl:85',
 'Product:Lead,Test:Lead',
 'Shelley',
 'New York',
 'Female,27',
 'Python:80',
 'Test:Lead,COE:Architect',
 'Lucy',
 'Vancouver',
 'Female,57',
 'Sales:89,HR:94',
 'Sales:Lead']

### Filter

Es una transformación que se utiliza para filtrar elementos de un RDD según una condición específica. Solo los elementos que cumplen la condición se incluyen en el resultado, mientras que los demás se descartan.

Es una transformación estrecha (narrow), lo que significa que los elementos de salida de cada partición dependen únicamente de los elementos de entrada de la misma partición. Esto lo hace eficiente porque no requiere un shuffle (reorganización de datos entre nodos).

Permite filtrar los elementos que cumplen una condición mediante `filter`:

In [0]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
resultRDD = rdd.filter(lambda x: x%2==0)
resultRDD.collect()     # [2, 4]

Out[30]: [2, 4]

También podemos anidar diferentes transformaciones. Para este ejemplo, vamos a crear tuplas formadas por un número y su cuadrado, y luego quitar los que no coincide el número con su cuadrado (sólo coinciden el 0 y el 1), y luego aplanarlo en una lista:

In [0]:
rdd10 = sc.parallelize(range(10+1))
rddPares = rdd10.map(lambda x: (x, x**2)).filter(lambda x: (x[0] != x[1])).flatMap(lambda x: x)
rddPares.collect()      # [2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]

Out[31]: [2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]

#### Explicación del código:

1. `rddPares = rdd10.map(lambda x: (x, x**2))`:
   - La transformación `map()` toma cada elemento de `rdd10` y lo transforma en una tupla `(x, x**2)`, donde `x` es el valor original y `x**2` es su cuadrado.
   - Si `rdd10` contiene valores del 1 al 10, después de esta operación, `rddPares` tendrá la forma:
     `(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, 64), (9, 81), (10, 100)`

2. `rddPares.filter(lambda x: (x[0] != x[1]))`:
   - Luego, se aplica la transformación `filter()` para eliminar las tuplas donde el primer valor (`x[0]`) es igual al segundo valor (`x[1]`).
   - Esto elimina las tuplas en las que el número es igual a su cuadrado, es decir, elimina `(1, 1)`, dejando solo las tuplas donde el número no es igual a su cuadrado:
     `(2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, 64), (9, 81), (10, 100)`

3. `rddPares.flatMap(lambda x: x)`:
   - La transformación `flatMap()` "aplana" el contenido de cada tupla. En lugar de mantener las tuplas, expande cada una de ellas en elementos individuales.
   - Por ejemplo, la tupla `(2, 4)` se convertirá en los dos elementos `2` y `4`. El resultado de aplicar `flatMap()` será una lista de todos los números individuales de las tuplas:
     `[2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]`

4. `rddPares.collect()`:
   - La acción `collect()` recopila todos los elementos de `rddPares` y los devuelve como una lista en la máquina local.

#### Salida:
El resultado de `rddPares.collect()` será:

`[2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]`

#### Resumen:
Este código realiza las siguientes operaciones sobre el RDD `rdd10`:
1. `map()`: Crea tuplas `(x, x**2)` con cada número y su cuadrado.
2. `filter()`: Elimina las tuplas donde el número es igual a su cuadrado.
3. `flatMap()`: Expande las tuplas en una secuencia de elementos individuales.
4. `collect()`: Recoge los resultados finales en una lista.

La salida final es una lista de números alternados: el número original y su cuadrado.


Veamos otro ejemplo. Retomamos los datos de los empleados y si queremos filtrar los empleados que son hombres, primero separamos por las | y nos quedamos con el tercer elemento que contiene el sexo y la edad. A continuación, separamos por la coma para quedarnos en el sexo en la posición 0 y la edad en el 1, y comparamos con el valor deseado:

In [0]:

rdd = sc.textFile("/FileStore/empleados.txt") 
# Filtrar registros donde el campo de género contiene "Male"
hombres = rdd.filter(lambda x: len(x.split("|")) > 2 and x.split("|")[2].split(",")[0] == "Male")

# Mostrar los resultados
print(hombres.collect())

['Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer\x04Lead', 'Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead']


#### Explicación del código:

1. `rdd = sc.textFile("/FileStore/shared_uploads/notebook_1/empleados.txt")`:
   - Este código lee el archivo `empleados.txt` desde la ruta especificada y lo convierte en un RDD de texto.
   - El archivo `empleados.txt` se asume que tiene datos separados por líneas, y cada línea corresponde a un registro de un empleado.

2. `hombres = rdd.filter(lambda x: len(x.split("|")) > 2 and x.split("|")[2].split(",")[0] == "Male")`:
   - La transformación `filter()` se usa para seleccionar las líneas del RDD que cumplen con la condición especificada.
   - **`x.split("|")`**: Esta parte divide cada línea en una lista usando el carácter `|` como delimitador. Se asume que cada línea tiene campos separados por este carácter.
     - Ejemplo: Una línea puede verse como `ID|Nombre|Género|Edad`.
   - **`len(x.split("|")) > 2`**: Asegura que la línea tiene más de dos campos (para evitar errores si la línea está vacía o no tiene los campos esperados).
   - **`x.split("|")[2].split(",")[0] == "Male"`**: 
     - Toma el tercer campo (índice `2`), que corresponde al campo de género, y lo divide usando la coma `,` como delimitador.
     - La división con `split(",")` sugiere que el campo de género puede contener información adicional (como múltiples géneros o etiquetas), y selecciona el primer valor (índice `0`) para comparar si es igual a `"Male"`.

3. `print(hombres.collect())`:
   - La acción `collect()` obtiene todos los registros del RDD filtrado, que cumplen la condición de tener el género `"Male"`, y los devuelve como una lista en la máquina local.
   - **`print(hombres.collect())`** muestra esos registros en la salida estándar.

#### Resumen:
Este código lee un archivo `empleados.txt`, filtra las líneas donde el género de los empleados es "Male" (hombre), y luego imprime esos registros. La condición de filtrado asegura que se procesen correctamente solo las líneas que tienen más de dos campos y que el género esté correctamente etiquetado como "Male".


Obteniendo:

['Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer\x04Lead',
 'Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead']

### Distinct

Si utilizamos [distinct](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.distinct.html) eliminaremos los elementos repetidos:

### Trabajando con conjuntos

**Union**

Mediante [union](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.union.html) unimos dos RDD en uno:

In [0]:
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([5,6,7,8])
resultRDD = rdd1.union(rdd2)
resultRDD.collect()     # [1, 2, 3, 4, 5, 6, 7, 8]

Out[34]: [1, 2, 3, 4, 5, 6, 7, 8]

**Intersection**

Mediante [intersection](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.intersection.html), obtendremos los elementos que tengan en común:

**Subtract**

Mediante [subtract](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.subtract.html), obtendremos los elementos propios que no estén en el RDD recibido:

In [0]:
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5,6])
resultRDD = rdd1.subtract(rdd2)
resultRDD.collect()     # [1, 2]

Out[35]: [1, 2]

### Actividad:

Si tenemos dos RDD (A y B):

In [0]:
rddA = sc.parallelize([1,2,3,4])
rddB = sc.parallelize([3,4,5,6])

¿Cómo conseguimos los elementos que están en A y no B y los de B que no están en A? (es decir [1, 2, 5, 6])):

In [0]:
# Escriba la respuesta aqui

# Elementos en A pero no en B
soloEnA = rddA.subtract(rddB)

# Elementos en B pero no en A
soloEnB = rddB.subtract(rddA)

# Unir los resultados para obtener todos los elementos que están en A o en B pero no en ambos
resultado = soloEnA.union(soloEnB)

# Mostrar el resultado
print(resultado.collect())  # [1, 2, 5, 6]


[1, 2, 5, 6]


### RDD de Pares

Una técnica muy común a la hora de trabajar con RDD es hacerlo con elementos que tienen el formato (`clave, valor`), pudiendo las claves y los valores ser de cualquier tipo.

In [0]:
listaTuplas = [(1,'a'), (2,'b'), (3,'c'), (4,'d')]
rddTuplas= sc.parallelize(listaTuplas)

Sobre estos RDD podemos realizar algoritmos MapReduce para muchas funciones de procesamiento de datos, como es la agrupación, ordenación, join, count, etc...

Para generar un RDD de pares, además de crearlo nosotros a partir de una lista, podemos emplear las siguientes operaciones:

- [zip](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.zip.html): une dos RDDs del mismo tamaño:

In [0]:
lista1 = ['a','b','c','e','f','g','h']
lista2 = [4, 5, 6, 7, 8, 9, 10]
rddZip = sc.parallelize(lista1).zip(sc.parallelize(lista2)).collect()
# [('a', 4), ('b', 5), ('c', 6), ('e', 7), ('f', 8), ('g', 9), ('h', 10)]

In [0]:
rddZipSecuencia = sc.parallelize(zip(lista1,range(len(lista1)))) # usando el tamaño de la lista
# [('a', 0), ('b', 1), ('c', 2), ('e', 3), ('f', 4), ('g', 5), ('h', 6)]

- `map`: asignando a cada elemento un valor o cálculo sobre él mismo:

In [0]:
lista  = ['Hola', 'Adiós', 'Hasta luego']
rddMap = sc.parallelize(lista).map(lambda x: (x, len(x)))
# [('Hola', 4), ('Adiós', 5), ('Hasta luego', 11)]

- `keyBy`: permite crear las claves a partir de cada elemento:

In [0]:
rddKeyBy = sc.parallelize(lista).keyBy(lambda x: x[0])  # creamos una clave con la primera letra
# [('H', 'Hola'), ('A', 'Adiós'), ('H', 'Hasta luego')]

### Actividad:

A partir de la lista `"Perro Gato Loro Pez León Tortuga Gallina"`

1. Crea un RDD a partir de esta lista
2. Convierte el RDD normal en un RDD de pares cuya clave sea la primera letra del animal
3. Crea otro RDD de pares pero poniendo como clave un número incremental
4. ¿Y si queremos que el índice incremental empiece en 100?

#### Solucion

In [0]:
# Coloca tus respuestas aqui

# Lista de animales
animales = ["Perro", "Gato", "Loro", "Pez", "León", "Tortuga", "Gallina"]

# Crear un RDD a partir de la lista
rddAnimales = sc.parallelize(animales)

# Convertir el RDD normal en un RDD de pares con la primera letra como clave
rddParesLetra = rddAnimales.map(lambda x: (x[0], x))

# Crear otro RDD de pares con un índice incremental
rddParesIndice = rddAnimales.zipWithIndex()

# Si queremos que el índice empiece en 100, usamos zipWithIndex con un offset
rddParesIndice100 = rddAnimales.zipWithIndex().map(lambda x: (x[1] + 100, x[0]))

# Mostrar los resultados
print(rddParesLetra.collect())
print(rddParesIndice.collect())
print(rddParesIndice100.collect())


[('P', 'Perro'), ('G', 'Gato'), ('L', 'Loro'), ('P', 'Pez'), ('L', 'León'), ('T', 'Tortuga'), ('G', 'Gallina')]
[('Perro', 0), ('Gato', 1), ('Loro', 2), ('Pez', 3), ('León', 4), ('Tortuga', 5), ('Gallina', 6)]
[(100, 'Perro'), (101, 'Gato'), (102, 'Loro'), (103, 'Pez'), (104, 'León'), (105, 'Tortuga'), (106, 'Gallina')]


Sobre los RDD de pares, podemos realizar las siguientes transformaciones:

- [`keys`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.keys.html): devuelve las claves
- [`values`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.values.html): devuelve los valores
- [`mapValues`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapValues.html): Aplica la función sobre los valores
- [`flatMapValues`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMapValues.html) Aplica la función sobre los valores y los aplana.
  

A continuación se muestra un fragmento de código para poner en práctica las transformaciones comentadas:

In [0]:
listaTuplas = [('a',1), ('z',3), ('b',4), ('c',3), ('a',4)]
rddTuplas = sc.parallelize(listaTuplas)

claves = rddTuplas.keys()       # ['a', 'z', 'b', 'c', 'a']
valores = rddTuplas.values()    # [1, 3, 4, 3, 4]

rddMapValues = rddTuplas.mapValues(lambda x: (x,x*2))
# [('a', (1, 2)), ('z', (3, 6)), ('b', (4, 8)), ('c', (3, 6)), ('a', (4, 8))]
rddFMV = rddTuplas.flatMapValues(lambda x: (x,x*2))
# [('a', 1),
#  ('a', 2),
#  ('z', 3),
#  ('z', 6),
#  ('b', 4),
# ...

### Transformaciones Wide

Las siguientes transformaciones, además de trabajar con RDD de pares, mezclan los datos de las particiones mediante el shuffle de los elementos.

Para los siguientes ejemplos, utilizaremos un fichero de ventas [`pdi_sales.csv`](https://tajamar365.sharepoint.com/:x:/s/3405-MasterIA2024-2025/EdbX1Ds2BShClAxQaHHZYyYBwu5DY1Li3jvXLN_Z7KbhHg?e=Vj3xFq) (versión extendida) / [`pdi_sales_small.csv`](https://tajamar365.sharepoint.com/:x:/s/3405-MasterIA2024-2025/EdbX1Ds2BShClAxQaHHZYyYBwu5DY1Li3jvXLN_Z7KbhHg?e=Vj3xFq) (versión reducida) el cual tiene el siguiente formato:

ProductID;Date;Zip;Units;Revenue;Country
725;1/15/1999;41540          ;1;115.5;Germany
787;6/6/2002;41540          ;1;314.9;Germany
...

### ReduceByKey

Mediante la transformación [`reducedByKey`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html) los datos se calculan utilizando una función de reducción a partir de la clave combinando en la misma máquina las parejas con la misma clave antes de que los datos se barajen.

Vamos a comenzar con un ejemplo sencillo, contando cuantas ventas se han realizado en cada país, o lo que es lo mismo, las veces que aparece cada palabra en el fichero:

In [0]:
rdd = sc.textFile("/FileStore/pdi_sales_small.csv")
# Recogemos el país y las unidades de las ventas
parPais1 = rdd.map(lambda x: (x.split(";")[-1].strip(), 1))
# parPais1.collect()
# [('Country', 1),
#  ('Germany', 1),
#  ('Germany', 1),
#  ('Germany', 1), ...

Hemos creado un RDD de pares compuesto por el nombre del país y el número uno, para luego en la fase de reducción sumar estos valores. Pero si nos fijamos, el archivo csv contiene el encabezado con los datos, el cual debemos quitar:

In [0]:
header = parPais1.first()
parPais1SinHeader = parPais1.filter(lambda linea: linea != header)
# parPais1SinHeader.collect()
# [('Germany', 1),
#  ('Germany', 1),
#  ('Germany', 1), ...

Y finalmente, ya podemos reducir por la clave:

In [0]:
paisesTotal = parPais1SinHeader.reduceByKey(lambda a,b: a+b)
# paisesTotal.collect()
# [('Mexico', 30060), ('France', 30060), ('Germany', 30059), ('Canada', 30060)]

> Funciones lambda en reduce
> Al aplicar una transformación de tipo reduce, la función lambda recibirá dos parámetros, siendo el primero el valor acumulado y el segundo el valor del elemento a operar.

Veamos otro ejemplo, en este caso vamos a calcular el total de unidades vendidas por país, de manera que vamos a coger el nombre del país (Country) y las unidades (Units) vendidas:

In [0]:
rdd = sc.textFile("/FileStore/pdi_sales_small.csv")
# Recogemos el país y las unidades de las ventas
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
# Le quitamos el encabezado
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
# Pasamos las unidades a un número entero
paisesUnidadesInt = paisesUnidadesSinHeader.map(lambda x: (x[0], int(x[1])))
# Reducimos por el país y sumamos las unidades
paisesTotalUnidades = paisesUnidadesInt.reduceByKey(lambda a,b: a+b)
paisesTotalUnidades.collect()

Out[53]: [('Mexico', 31095), ('France', 31739), ('Germany', 31746), ('Canada', 31148)]

### Actividad:

Dada la siguiente lista de compra:

In [0]:
lista = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]

**Calcula**

1. El total que se ha gastado por cada producto  
2. Cuánto es lo máximo que se ha pagado por cada producto

In [0]:
# Crear el RDD a partir de la lista
rddCompras = sc.parallelize(lista)

# 1. Calcular el total gastado por cada producto
# Usamos reduceByKey para sumar los valores de cada clave (producto)
totalGastado = rddCompras.reduceByKey(lambda x, y: x + y)
print("Total gastado por cada producto:")
print(totalGastado.collect())

# 2. Calcular lo máximo que se ha pagado por cada producto
# Usamos reduceByKey para obtener el valor máximo de cada clave (producto)
maxPagado = rddCompras.reduceByKey(lambda x, y: max(x, y))
print("\nLo máximo que se ha pagado por cada producto:")
print(maxPagado.collect())

Total gastado por cada producto:
[('agua', 2.5), ('pan', 4), ('leche', 4), ('cereales', 3), ('filetes', 5), ('azúcar', 1)]

Lo máximo que se ha pagado por cada producto:
[('agua', 2), ('pan', 3), ('leche', 2), ('cereales', 3), ('filetes', 5), ('azúcar', 1)]


### GroupByKey

Permite agrupar los datos a partir de una clave, repartiendo los resultados (shuffle) entre todos los nodos:

In [0]:
rdd = sc.textFile("/FileStore/pdi_sales_small.csv")
# Creamos un RDD de pares con el nombre del país como clave, y una lista con los valores
ventas = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")))
# Quitamos el primer elemento que es el encabezado del CSV
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
# Agrupamos las ventas por nombre del país
paisesAgrupados = ventas.groupByKey()
paisesAgrupados.collect()
# Obtendremos para cada pais, un iterable con todos sus datos:

Out[56]: [('Country', <pyspark.resultiterable.ResultIterable at 0x7fbd769dc6d0>),
 ('Mexico', <pyspark.resultiterable.ResultIterable at 0x7fbd768dfca0>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x7fbd76a06df0>),
 ('Germany', <pyspark.resultiterable.ResultIterable at 0x7fbd779f5700>),
 ('Canada', <pyspark.resultiterable.ResultIterable at 0x7fbd88641d30>)]

Podemos transformar los iterables a una lista:

In [0]:
paisesAgrupadosLista = paisesAgrupados.map(lambda x: (x[0], list(x[1])))
paisesAgrupadosLista.collect()

Out[57]: [('Country', [['ProductID', 'Date', 'Zip', 'Units', 'Revenue', 'Country']]),
 ('Mexico',
  [['2235', '1/15/1999', '8650           ', '1', '65.6', 'Mexico '],
   ['837', '2/15/1999', '8650           ', '1', '840', 'Mexico '],
   ['491', '2/15/1999', '8650           ', '1', '815.3', 'Mexico '],
   ['426', '5/31/2002', '8650           ', '1', '843.1', 'Mexico '],
   ['400', '6/6/2002', '8650           ', '1', '734.7', 'Mexico '],
   ['1131', '2/15/1999', '8650           ', '1', '419.9', 'Mexico '],
   ['2277', '3/15/1999', '8650           ', '1', '251.9', 'Mexico '],
   ['491', '3/15/1999', '8650           ', '1', '815.3', 'Mexico '],
   ['467', '6/8/2002', '8650           ', '1', '900.6', 'Mexico '],
   ['565', '4/12/2003', '2000           ', '1', '890.7', 'Mexico '],
   ['604', '4/12/2003', '2000           ', '1', '494.8', 'Mexico '],
   ['940', '3/15/1999', '2000           ', '1', '687.7', 'Mexico '],
   ['2277', '1/15/1999', '8640           ', '1', '251.9', 'Mexico '],
   ['7

### Actividad 

- día 1: pan 3€, agua 2€, azúcar 1€, leche 2€, pan 4€
- día 2: pan 1€, cereales 3€, agua 0.5€, leche 2€, filetes 5€
- día 3: filetes 2€, cereales 1€

Dada la siguiente lista de compra:

In [0]:
dia1 = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',4)]
dia2 = [('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
dia3 = [('filetes',2), ('cereales',1)]

1. ¿Cómo obtenemos lo que hemos gastado en cada producto?
2. ¿Y el gasto medio que hemos realizado en cada uno de ellos?

In [0]:
# Respuestas aqui

# Crear RDDs a partir de las listas
rddDia1 = sc.parallelize(dia1)
rddDia2 = sc.parallelize(dia2)
rddDia3 = sc.parallelize(dia3)

# Unión de todos los RDDs
rddTodosDias = rddDia1.union(rddDia2).union(rddDia3)

# 1. Calcular lo que hemos gastado en cada producto
# Sumamos los valores para cada clave (producto)
gastoTotalPorProducto = rddTodosDias.reduceByKey(lambda x, y: x + y)
print("Gasto total por producto:")
print(gastoTotalPorProducto.collect())

# 2. Calcular el gasto medio que hemos realizado en cada uno de los productos
# Primero, necesitamos contar las compras realizadas para cada producto
# Usamos un RDD de conteo y otro RDD de suma, luego dividimos los totales entre los conteos
conteoPorProducto = rddTodosDias.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y)
sumaPorProducto = rddTodosDias.reduceByKey(lambda x, y: x + y)

# Unimos los dos RDDs (suma y conteo) para obtener el gasto medio
gastoMedioPorProducto = sumaPorProducto.join(conteoPorProducto).mapValues(lambda x: round(x[0] / x[1], 2))
print("\nGasto medio por producto:")
print(gastoMedioPorProducto.collect())

Gasto total por producto:
[('agua', 2.5), ('leche', 4), ('pan', 8), ('cereales', 4), ('filetes', 7), ('azúcar', 1)]

Gasto medio por producto:
[('agua', 1.25), ('leche', 2.0), ('pan', 2.67), ('cereales', 2.0), ('filetes', 3.5), ('azúcar', 1.0)]


> Mejor `reduceByKey` que `groupByKey`  
> Si el tipo de operación a realizar es posible mediante una operación de reduce, su rendimiento será una solución más eficiente. Más información en el artículo [Avoid Group By](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)

### SortByKey

`sortByKey` permite ordenar los datos a partir de una clave. Los pares de la misma máquina se ordenan primero por la misma clave, y luego los datos de las diferentes particiones se barajan.  
Para ello crearemos una tupla, siendo el primer elemento un valor numérico por el cual ordenar, y el segundo el dato asociado.  
Vamos a partir del ejemplo anterior para ordenar los paises por la cantidad de ventas:  


In [0]:
# Ejemplo anterior
rdd = sc.textFile("/FileStore/pdi_sales.csv")
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
paisesTotalUnidades = paisesUnidadesSinHeader.reduceByKey(lambda a,b: int(a)+int(b))

In [0]:
# Le damos la vuelta a la lista
unidadesPaises = paisesTotalUnidades.map(lambda x: (x[1],x[0]))
unidadesPaises.collect()

Out[65]: [(223463, 'Mexico'),
 (327730, 'France'),
 (244265, 'Germany'),
 (77609, 'Canada')]

Y a continuación los ordenamos:

In [0]:
unidadesPaisesOrdenadas = unidadesPaises.sortByKey()
unidadesPaisesOrdenadas.collect()

Out[66]: [(77609, 'Canada'),
 (223463, 'Mexico'),
 (244265, 'Germany'),
 (327730, 'France')]

Y comprobamos el resultado:

In [0]:
[(77609, 'Canada'),
 (223463, 'Mexico'),
 (244265, 'Germany'),
 (327730, 'France')]

Out[67]: [(77609, 'Canada'),
 (223463, 'Mexico'),
 (244265, 'Germany'),
 (327730, 'France')]

Si quisiéramos obtener los datos en orden descendente, le pasamos False a la transformación:

In [0]:
unidadesPaisesOrdenadasDesc = unidadesPaises.sortByKey(False)

### SortBy

Mediante `sortBy` podemos ordenar los datos indicando nosotros la función de ordenación:

In [0]:
paisesTotalUnidades.sortBy(lambda x: x[1]).collect()

Out[69]: [('Canada', 77609),
 ('Mexico', 223463),
 ('Germany', 244265),
 ('France', 327730)]

Si queremos ordenar descendentemente, le pasamos un segundo parámetro con valor `False` (indica si la ordenación es ascendente):

In [0]:
paisesTotalUnidades.sortBy(lambda x: x[1], False).collect()

Out[70]: [('France', 327730),
 ('Germany', 244265),
 ('Mexico', 223463),
 ('Canada', 77609)]

> **Join**  
> Aunque los RDD permitan realizar operaciones join, realmente este tipo de operaciones se realizan mediante DataFrames, por lo que omitimos su explicación en esta sesión y la dejamos para la siguiente.

### Particiones

Spark organiza los datos en particiones, considerándolas divisiones lógicas de los datos entre los nodos del clúster. Por ejemplo, si el almacenamiento se realiza en HDFS, cada partición se asigna a un bloque.

Cada una de las particiones va a llevar asociada una tarea de ejecución, de manera que a más particiones, mayor paralelización del proceso.

Veamos con código como podemos trabajar con las particiones:

In [0]:
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.getNumPartitions() # 4

Out[74]: 8

In [0]:
rdd = sc.parallelize([1,1,2,2,3,3,4,5], 2)
rdd.getNumPartitions() # 2

Out[73]: 2

In [0]:
rddE = sc.textFile("/FileStore/empleados.txt")
rddE.getNumPartitions() # 2

Out[77]: 2

In [0]:
rddE = sc.textFile("/FileStore/empleados.txt", 3)
rddE.getNumPartitions() # 3

Out[78]: 3

La mayoría de operaciones / transformaciones / acciones que trabajan con los datos admiten un parámetro extra indicando la cantidad de particiones con las que queremos trabajar.

### MapPartitions

A diferencia de la transformación map que se invoca por cada elemento del RDD/DataSet, mapPartitions se llama por cada partición.

La función que recibe como parámetro recogerá como entrada un iterador con los elementos de cada partición:

In [0]:
rdd = sc.parallelize([1,1,2,2,3,3,4,5], 2)

In [0]:
def f(iterator): yield sum(iterator)
resultadoRdd = rdd.mapPartitions(f)
resultadoRdd.collect()  # [6, 15]

Out[80]: [6, 15]

In [0]:
resultadoRdd2 = rdd.mapPartitions(lambda iterator: [list(iterator)])
resultadoRdd2.collect() # [[1, 1, 2, 2], [3, 3, 4, 5]]

Out[81]: [[1, 1, 2, 2], [3, 3, 4, 5]]

En el ejemplo, ha dividido los datos en dos particiones, la primera con [1, 1, 2, 2] y la otra con [3, 3, 4, 5], y de ahí el resultado de sumar sus elementos es [6, 15].

### mapPartitionsWithIndex

De forma similar al caso anterior, pero ahora [mapPartitionsWithIndex](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitionsWithIndex.html) recibe una función cuyos parámetros son el índice de la partición y el iterador con los datos de la misma:

In [0]:
def mpwi(indice, iterador):
    return [(indice, list(iterador))]

resultadoRdd = rdd.mapPartitionsWithIndex(mpwi)
resultadoRdd.collect()
# [(0, [1, 1, 2, 2]), (1, [3, 3, 4, 5])]

Out[82]: [(0, [1, 1, 2, 2]), (1, [3, 3, 4, 5])]

### Modificando las particiones

Podemos modificar la cantidad de particiones mediante dos transformaciones *wide*: `coalesce` y `repartition`.

Mediante [`coalesce`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.coalesce.html) podemos obtener un nuevo RDD con la cantidad de particiones a reducir:

In [0]:
rdd = sc.parallelize([1,1,2,2,3,3,4,5], 3)
rdd.getNumPartitions() # 3

Out[83]: 3

In [0]:
rdd1p = rdd.coalesce(1)
rdd1p.getNumPartitions() # 2

Out[84]: 1

En cambio, mediante repartition podemos obtener un nuevo RDD con la cantidad exacta de particiones deseadas (al reducir las particiones, repartition realiza un shuffle para redistribuir los datos, por lo tanto, si queremos reducir la cantidad de particiones, es más eficiente utilizar *coalesce*):

In [0]:
rdd = sc.parallelize([1,1,2,2,3,3,4,5], 3)
rdd.getNumPartitions() # 3

Out[85]: 3

In [0]:
rdd2p = rdd.repartition(2)
rdd2p.getNumPartitions() # 2

Out[86]: 2

### Actividades

En Python, para saber si una cadena contiene una letra puedes usar el operador in:

In [0]:
nombre = "Aitor Medrano"
print("i" in nombre) # True
print("u" in nombre) # False
print("Med" in nombre) # True
print("ai" in nombre) # False (case sensitive)

True
False
True
False


En las siguientes actividades vamos a familiarizarnos con el uso de Spark con RDD y las diferentes acciones y transformaciones disponibles.

**1.** A partir de la lista siguiente ['Alicante','Elche','Valencia','Madrid','Barcelona','Bilbao','Sevilla']:

In [0]:
# Lista de ciudades
ciudades = ['Alicante', 'Elche', 'Valencia', 'Madrid', 'Barcelona', 'Bilbao', 'Sevilla']

# Crear un RDD a partir de la lista de ciudades
rddCiudades = sc.parallelize(ciudades)

a. Almacena sólo las ciudades que tengan la letra e en su nombre y muéstralas.

In [0]:
# Filtrar las ciudades que contienen la letra 'e' en su nombre
ciudadesConE = rddCiudades.filter(lambda ciudad: 'e' in ciudad.lower())

# Mostrar las ciudades filtradas
print(ciudadesConE.collect())

['Alicante', 'Elche', 'Valencia', 'Barcelona', 'Sevilla']


b. Muestra las ciudades que tienen la letra e y el número de veces que aparece en cada nombre. Por ejemplo ('Elche', 2).

In [0]:
# Contar cuántas veces aparece la letra 'e' en cada ciudad
conteoE = ciudadesConE.map(lambda ciudad: (ciudad, ciudad.lower().count('e')))

# Mostrar las ciudades con el conteo de la letra 'e'
print(conteoE.collect())

[('Alicante', 1), ('Elche', 2), ('Valencia', 1), ('Barcelona', 1), ('Sevilla', 1)]


c. Averigua las ciudades que solo tengan una única e.

In [0]:
# Filtrar las ciudades que tienen exactamente una 'e'
ciudadesConUnaE = conteoE.filter(lambda ciudad_count: ciudad_count[1] == 1)

# Mostrar las ciudades con exactamente una 'e'
print(ciudadesConUnaE.collect())

[('Alicante', 1), ('Valencia', 1), ('Barcelona', 1), ('Sevilla', 1)]


d. Nos han enviado una nueva lista pero no han separado bien las ciudades. Reorganiza la lista y colócalas correctamente, y cuenta las apariciones de la letra e de cada ciudad. ciudades_mal = [['Alicante.Elche','Valencia','Madrid.Barcelona','Bilbao.Sevilla'],['Murcia','San Sebastián','Melilla.Aspe']]

In [0]:
# Lista de ciudades mal separadas
ciudades_mal = [['Alicante.Elche','Valencia','Madrid.Barcelona','Bilbao.Sevilla'],
                ['Murcia','San Sebastián','Melilla.Aspe']]

# Reorganizar la lista separando las ciudades correctamente
# 'flatMap' se usa para aplanar la lista resultante
ciudades_reorganizadas = sc.parallelize(ciudades_mal).flatMap(lambda x: [ciudad for ciudad in x if '.' in ciudad])

# Separar correctamente las ciudades unidas por '.'
ciudades_separadas = ciudades_reorganizadas.flatMap(lambda x: x.split('.'))

# Contar cuántas veces aparece la letra 'e' en cada ciudad
conteoE = ciudades_separadas.map(lambda ciudad: (ciudad, ciudad.lower().count('e')))

# Mostrar el resultado
print(conteoE.collect())



[('Alicante', 1), ('Elche', 2), ('Madrid', 0), ('Barcelona', 1), ('Bilbao', 0), ('Sevilla', 1), ('Melilla', 1), ('Aspe', 1)]


**2.** Dada una lista de elementos desordenados y algunos repetidos, devolver una muestra de 5 elementos, que estén en la lista, sin repetir y ordenados descendentemente.

In [0]:
# Lista de elementos
lista = [4, 6, 34, 7, 9, 2, 3, 4, 4, 21, 4, 6, 8, 9, 7, 8, 5, 4, 3, 22, 34, 56, 98]

# Convertir la lista en un RDD y eliminar duplicados
rdd = sc.parallelize(lista).distinct()

# Obtener una muestra de 5 elementos de la lista sin repetición
muestra = rdd.takeSample(False, 5)

# Ordenar la muestra de manera descendente
muestra_ordenada = sorted(muestra, reverse=True)

# Mostrar la muestra ordenada
print("Muestra ordenada:", muestra_ordenada)


Muestra ordenada: [56, 21, 6, 5, 2]


a. Selecciona el elemento mayor de la lista resultante.  

In [0]:
# Seleccionar el elemento mayor de la muestra
max_elemento = max(muestra_ordenada)

# Mostrar el elemento mayor
print("Elemento mayor:", max_elemento)

Elemento mayor: 56


b. Muestra los dos elementos menores.

In [0]:
# Ordenar la muestra de manera ascendente para obtener los menores
muestra_ordenada_asc = sorted(muestra)

# Mostrar la muestra ordenada ascendentemente
print("Muestra ordenada ascendentemente:", muestra_ordenada_asc)

# Seleccionar los dos elementos menores de la muestra
dos_menores = muestra_ordenada_asc[:2]

# Mostrar los dos elementos menores
print("Dos elementos menores:", dos_menores)

Muestra ordenada ascendentemente: [2, 5, 6, 21, 56]
Dos elementos menores: [2, 5]


**3.** A partir de las siguientes listas:

- Inglés: hello, table, angel, cat, dog, animal, chocolate, dark, doctor, hospital, computer.
- Español: hola, mesa, angel, gato, perro, animal, chocolate, oscuro, doctor, hospital, ordenador.

In [0]:
# Listas de palabras en inglés y español
ingles = ['hello', 'table', 'angel', 'cat', 'dog', 'animal', 'chocolate', 'dark', 'doctor', 'hospital', 'computer']
espanol = ['hola', 'mesa', 'angel', 'gato', 'perro', 'animal', 'chocolate', 'oscuro', 'doctor', 'hospital', 'ordenador']

# Crear un RDD con las tuplas de traducción
rdd = sc.parallelize(list(zip(ingles, espanol)))


Una vez creado un RDD con tuplas de palabras y su traducción (puedes usar zip para unir dos listas):

[('hello', 'hola'),
 ('table', 'mesa'),
 ('angel', 'angel'),
 ('cat', 'gato')...

Averigua:  

a. Palabras que se escriben igual en inglés y en español. 

In [0]:
# Filtrar las palabras que se escriben igual en inglés y español
iguales = rdd.filter(lambda x: x[0] == x[1])

# Mostrar las palabras que se escriben igual
print(iguales.collect())


[('angel', 'angel'), ('animal', 'animal'), ('chocolate', 'chocolate'), ('doctor', 'doctor'), ('hospital', 'hospital')]


b. Palabras que en español son distintas que en inglés.  

In [0]:
# Filtrar las palabras que son diferentes en inglés y español
diferentes = rdd.filter(lambda x: x[0] != x[1])

# Mostrar las palabras diferentes
print(diferentes.collect())

[('hello', 'hola'), ('table', 'mesa'), ('cat', 'gato'), ('dog', 'perro'), ('dark', 'oscuro'), ('computer', 'ordenador')]


c. Obtén una única lista con las palabras en ambos idiomas que son distintas entre ellas (['hello', 'hola', 'table', ...).  

In [0]:
# Obtener una lista con las palabras que son distintas en inglés y español
distintas = rdd.filter(lambda x: x[0] != x[1]).flatMap(lambda x: x)

# Mostrar las palabras distintas
print(distintas.collect())

['hello', 'hola', 'table', 'mesa', 'cat', 'gato', 'dog', 'perro', 'dark', 'oscuro', 'computer', 'ordenador']


d. Haz dos grupos con todas las palabras, uno con las que empiezan por vocal y otro con las que empiecen por consonante. 

In [0]:
# Definir las vocales
vocales = ['a', 'e', 'i', 'o', 'u']

# Filtrar las palabras que empiezan con vocal (en inglés o español)
vocales_group = rdd.filter(lambda x: x[0][0].lower() in vocales or x[1][0].lower() in vocales)

# Filtrar las palabras que empiezan con consonante (en inglés o español)
consonantes_group = rdd.filter(lambda x: x[0][0].lower() not in vocales and x[1][0].lower() not in vocales)

# Mostrar los resultados
print("Palabras que empiezan por vocal:", vocales_group.collect())
print("Palabras que empiezan por consonante:", consonantes_group.collect())

Palabras que empiezan por vocal: [('angel', 'angel'), ('animal', 'animal'), ('dark', 'oscuro'), ('computer', 'ordenador')]
Palabras que empiezan por consonante: [('hello', 'hola'), ('table', 'mesa'), ('cat', 'gato'), ('dog', 'perro'), ('chocolate', 'chocolate'), ('doctor', 'doctor'), ('hospital', 'hospital')]


**4.** A partir del fichero de [El Quijote](https://tajamar365.sharepoint.com/:t:/s/3405-MasterIA2024-2025/ERkcxiK7gt5Fr2KDytLuTXYBI3LdxtCa6IWeNvOTHFxbYQ?e=hCkatQ):

a. Crear un RDD a partir del fichero y crea una lista con todas las palabras del documento.   

In [0]:
import re

# Crear un RDD a partir del archivo
rdd = sc.textFile("/FileStore/el_quijote.txt")

# Crear un RDD con todas las palabras del documento, limpiadas y en minúsculas
palabras_rdd = rdd.flatMap(lambda x: re.findall(r'\b\w+\b', x.lower()))  # Reemplazamos split por expresión regular que elimina puntuación

# Mostrar las primeras 10 palabras para comprobar
print(palabras_rdd.take(10))


['don', 'quijote', 'de', 'la', 'mancha', 'miguel', 'de', 'cervantes', 'saavedra', 'primera']


b. ¿Cuantas veces aparece la palabra Dulcinea (independientemente de si está en mayúsculas o minúsculas)? ¿Y Rocinante? (86 y 120 ocurrencias respectivamente) 

In [0]:
# Contar cuántas veces aparece "Dulcinea"
dulcinea_count = palabras_rdd.filter(lambda x: x.lower() == "dulcinea").count()

# Contar cuántas veces aparece "Rocinante"
rocinante_count = palabras_rdd.filter(lambda x: x.lower() == "rocinante").count()

# Mostrar los resultados
print(f"Dulcinea aparece {dulcinea_count} veces.")
print(f"Rocinante aparece {rocinante_count} veces.")

Dulcinea aparece 90 veces.
Rocinante aparece 120 veces.


c. Devuelve una lista ordenada según el número de veces que sale cada palabra de más a menos (las primeras ocurrencias deben ser [('que', 10731), ('de', 9035), ('y', 8668), ('la', 5014), ...).  

In [0]:
# Contar las ocurrencias de cada palabra
conteo_palabras = palabras_rdd.map(lambda palabra: (palabra, 1)).reduceByKey(lambda a, b: a + b)

# Ordenar las palabras por el número de ocurrencias (de mayor a menor)
ordenado_por_frecuencia = conteo_palabras.map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0]))  # Volver a cambiar la clave por la palabra

# Mostrar las primeras 10 palabras ordenadas por su frecuencia
print(ordenado_por_frecuencia.take(10))

[('que', 11134), ('de', 9164), ('y', 8690), ('a', 7968), ('la', 5104), ('en', 4053), ('el', 3858), ('no', 3159), ('se', 2655), ('los', 2154)]


**5.** Dada una cadena que contiene una lista de nombres Juan, Jimena, Luis, Cristian, Laura, Lorena, Cristina, Jacobo, Jorge, una vez transformada la cadena en una lista y luego en un RDD:  

[('J', ['Juan', 'Jimena', 'Jacobo', 'Jorge']),
('L', ['Luis', 'Laura', 'Lorena']),
('C', ['Cristian', 'Cristina'])]

In [0]:
# Lista de nombres
nombres = ['Juan', 'Jimena', 'Luis', 'Cristian', 'Laura', 'Lorena', 'Cristina', 'Jacobo', 'Jorge']

# Crear un RDD a partir de la lista de nombres
rdd = sc.parallelize(nombres)


a. Agrúpalos según su inicial, de manera que tengamos tuplas formadas por la letra inicial y todos los nombres que comienzan por dicha letra:

In [0]:
# Agrupar los nombres por la primera letra
agrupados_por_inicial = rdd.groupBy(lambda x: x[0]).mapValues(list)

# Mostrar el resultado
print(agrupados_por_inicial.collect())

[('J', ['Juan', 'Jimena', 'Jacobo', 'Jorge']), ('C', ['Cristian', 'Cristina']), ('L', ['Luis', 'Laura', 'Lorena'])]


b. De la lista original, obtén una muestra de 5 elementos sin repetir valores.

In [0]:
# Obtener una muestra de 5 elementos sin repetir
muestra_sin_repetir = rdd.distinct().take(5)

# Mostrar el resultado
print(muestra_sin_repetir)

['Jimena', 'Juan', 'Cristian', 'Jorge', 'Luis']


c. Devuelve una muestra de datos de aproximadamente la mitad de registros que la lista original con datos que pudieran llegar a repetirse.

In [0]:
# Obtener una muestra de aproximadamente la mitad de los registros con repetición
muestra_con_repeticion = rdd.sample(withReplacement=True, fraction=0.5).collect()

# Mostrar el resultado
print(muestra_con_repeticion)

['Jimena', 'Cristina', 'Jorge', 'Jorge', 'Jorge']


**6.** En una red social sobre cine, tenemos un fichero [ratings.txt](https://tajamar365.sharepoint.com/:t:/s/3405-MasterIA2024-2025/EaYOt-hRpDBIl2DrlGnLx38Bqolu9xt7f4weY1We7jrkCQ?e=EwltPA) compuesta por el código de la película, el código del usuario, la calificación asignada y el timestamp de la votación con el siguiente formato:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968

Se pide crear dos script en Python, así como los comandos necesarios para ejecutarlos (mediante spark-submit) para:

a. Obtener para cada película, la nota media de todas sus votaciones.  

In [0]:
# calificacion_media.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Cargamos el archivo de ratings
ratings_rdd = sc.textFile("/FileStore/ratings.txt")

# Paso 1: Dividir las líneas por "::" y seleccionar el id de la película y la calificación
ratings_rdd = ratings_rdd.map(lambda x: x.split("::"))

# Paso 2: Crear un RDD de tuplas (película_id, calificación)
movie_ratings_rdd = ratings_rdd.map(lambda x: (int(x[0]), float(x[2])))

# Paso 3: Sumar las calificaciones y contar las votaciones por película
sum_ratings = movie_ratings_rdd.reduceByKey(lambda x, y: x + y)
count_ratings = movie_ratings_rdd.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y)

# Paso 4: Unir las sumas y los conteos para calcular la calificación media
average_ratings = sum_ratings.join(count_ratings).mapValues(lambda x: x[0] / x[1])

# Paso 5: Mostrar la calificación media de cada película
print("Calificación media por película:")
for movie, avg_rating in average_ratings.collect():
    print(f"Película {movie}: {avg_rating:.2f}")


Calificación media por película:
Película 2: 3.71
Película 4: 4.19
Película 6: 3.90
Película 8: 3.88
Película 10: 4.11
Película 12: 3.83
Película 14: 3.32
Película 16: 3.03
Película 18: 3.65
Película 20: 4.08
Película 22: 3.07
Película 24: 3.95
Película 26: 2.96
Película 28: 3.76
Película 30: 3.49
Película 32: 3.62
Película 34: 3.87
Película 36: 4.20
Película 38: 3.58
Película 40: 3.45
Película 42: 3.74
Película 44: 3.63
Película 46: 4.22
Película 48: 3.07
Película 50: 3.07
Película 52: 3.56
Película 54: 4.03
Película 56: 3.97
Película 58: 3.97
Película 60: 3.41
Película 62: 3.57
Película 64: 4.15
Película 66: 3.85
Película 68: 3.75
Película 70: 3.70
Película 72: 3.70
Película 74: 4.05
Película 76: 4.17
Película 78: 3.66
Película 80: 3.90
Película 82: 4.02
Película 84: 3.84
Película 86: 4.31
Película 88: 4.15
Película 90: 3.51
Película 92: 2.76
Película 94: 3.86
Película 96: 3.60
Película 98: 3.80
Película 100: 3.03
Película 102: 3.13
Película 104: 3.35
Película 106: 4.02
Película 108:

b. Películas cuya nota media sea superior a 3.

In [0]:
from pyspark.sql import SparkSession

# Usamos el SparkSession preexistente de Databricks
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Cargamos el archivo de ratings
ratings_rdd = sc.textFile("/FileStore/ratings.txt")

# Paso 1: Dividir las líneas por "::" y seleccionar el id de la película y la calificación
ratings_rdd = ratings_rdd.map(lambda x: x.split("::"))

# Paso 2: Crear un RDD de tuplas (película_id, calificación)
movie_ratings_rdd = ratings_rdd.map(lambda x: (int(x[0]), float(x[2])))

# Paso 3: Sumar las calificaciones y contar las votaciones por película
sum_ratings = movie_ratings_rdd.reduceByKey(lambda x, y: x + y)
count_ratings = movie_ratings_rdd.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y)

# Paso 4: Unir las sumas y los conteos para calcular la calificación media
average_ratings = sum_ratings.join(count_ratings).mapValues(lambda x: x[0] / x[1])

# Paso 5: Filtrar las películas cuya calificación media sea superior a 3
movies_above_3 = average_ratings.filter(lambda x: x[1] > 3)

# Paso 6: Mostrar las películas cuya calificación media es superior a 3
print("Películas con calificación media superior a 3:")
for movie, avg_rating in movies_above_3.collect():
    print(f"Película {movie}: {avg_rating:.2f}")


Películas con calificación media superior a 3:
Película 2: 3.71
Película 4: 4.19
Película 6: 3.90
Película 8: 3.88
Película 10: 4.11
Película 12: 3.83
Película 14: 3.32
Película 16: 3.03
Película 18: 3.65
Película 20: 4.08
Película 22: 3.07
Película 24: 3.95
Película 28: 3.76
Película 30: 3.49
Película 32: 3.62
Película 34: 3.87
Película 36: 4.20
Película 38: 3.58
Película 40: 3.45
Película 42: 3.74
Película 44: 3.63
Película 46: 4.22
Película 48: 3.07
Película 50: 3.07
Película 52: 3.56
Película 54: 4.03
Película 56: 3.97
Película 58: 3.97
Película 60: 3.41
Película 62: 3.57
Película 64: 4.15
Película 66: 3.85
Película 68: 3.75
Película 70: 3.70
Película 72: 3.70
Película 74: 4.05
Película 76: 4.17
Película 78: 3.66
Película 80: 3.90
Película 82: 4.02
Película 84: 3.84
Película 86: 4.31
Película 88: 4.15
Película 90: 3.51
Película 94: 3.86
Película 96: 3.60
Película 98: 3.80
Película 100: 3.03
Película 102: 3.13
Película 104: 3.35
Película 106: 4.02
Película 108: 3.14
Película 110: 3.

# Proyecto

A continuación planteamos el siguiente proyecto para realizar en clase:

**1.** Tenemos las calificaciones de las asignaturas de [matemáticas](https://tajamar365.sharepoint.com/:t:/s/3405-MasterIA2024-2025/EWlo3ON8o4dLjZAmnJa4E2gBG2zbPGb2_kx0UJaqNiMzCA?e=8XRr7X), [inglés](https://tajamar365.sharepoint.com/:t:/s/3405-MasterIA2024-2025/EWbI5WURy9RMk5kPpMVU6HoBIKSpzjWjdSz982dWS3nboQ?e=C5TONj) y [física](https://tajamar365.sharepoint.com/:t:/s/3405-MasterIA2024-2025/EYNDTyKO2UdOt2t3GPKCZ0UBt9-lasNRDclinzD-dCleBw?e=l9MIiu) de los alumnos del instituto en 3 documentos de texto. A partir de estos ficheros:

a. Crea 3 RDD de pares, uno para cada asignatura, con los alumnos y sus notas

In [0]:
# Cargar los datos para cada asignatura (asumiendo que los archivos tienen la estructura 'nombre,nota')
rdd_fisica = sc.textFile("/FileStore/notas_fisica.txt").map(lambda x: (x.split(",")[0], float(x.split(",")[1])))
rdd_ingles = sc.textFile("/FileStore/notas_ingles.txt").map(lambda x: (x.split(",")[0], float(x.split(",")[1])))
rdd_mates = sc.textFile("/FileStore/notas_mates.txt").map(lambda x: (x.split(",")[0], float(x.split(",")[1])))

# a. Crear 3 RDD de pares para cada asignatura

# Notas de Física
print("Notas de Física:")
fisica_sample = rdd_fisica.collect()  # Toma todos los registros de Física
for alumno, nota in fisica_sample:
    print(f"Alumno: {alumno}, Nota: {nota}")

# Notas de Inglés
print("\nNotas de Inglés:")
ingles_sample = rdd_ingles.collect()  # Toma todos los registros de Inglés
for alumno, nota in ingles_sample:
    print(f"Alumno: {alumno}, Nota: {nota}")

# Notas de Matemáticas
print("\nNotas de Matemáticas:")
mates_sample = rdd_mates.collect()  # Toma todos los registros de Matemáticas
for alumno, nota in mates_sample:
    print(f"Alumno: {alumno}, Nota: {nota}")


Notas de Física:
Alumno: Angel, Nota: 9.0
Alumno: Maria, Nota: 3.0
Alumno: Ramon, Nota: 7.0
Alumno: Jorge, Nota: 5.0
Alumno: Susana, Nota: 9.0
Alumno: Anabel, Nota: 2.0
Alumno: Pedro, Nota: 2.0
Alumno: Rocio, Nota: 5.0
Alumno: Carlos, Nota: 4.0
Alumno: Rocio, Nota: 7.0
Alumno: Triana, Nota: 3.0
Alumno: Andres, Nota: 4.0
Alumno: Fernando, Nota: 9.0
Alumno: Leonardo, Nota: 6.0
Alumno: Oscar, Nota: 5.0
Alumno: Isabel, Nota: 8.0
Alumno: Jose Juan, Nota: 3.0
Alumno: Nicolas, Nota: 7.0
Alumno: Alejandro, Nota: 3.0
Alumno: Rosa, Nota: 8.0

Notas de Inglés:
Alumno: Angel, Nota: 4.0
Alumno: Maria, Nota: 6.0
Alumno: Ramon, Nota: 8.0
Alumno: Jorge, Nota: 5.0
Alumno: Susana, Nota: 2.0
Alumno: Anabel, Nota: 7.0
Alumno: Rocio, Nota: 4.0
Alumno: Carlos, Nota: 8.0
Alumno: Triana, Nota: 4.0
Alumno: Andres, Nota: 6.0
Alumno: Fernando, Nota: 7.0
Alumno: Leonardo, Nota: 4.0
Alumno: Oscar, Nota: 3.0
Alumno: Isabel, Nota: 7.0
Alumno: Jose Juan, Nota: 3.0
Alumno: Nicolas, Nota: 5.0
Alumno: Alejandro, Nota: 7

b. Crea un solo RDD con todas las notas  

In [0]:
# b. Crear un solo RDD con todas las notas

# Unir los tres RDD (Fisica, Ingles, y Mates)
rdd_completo = rdd_fisica.union(rdd_ingles).union(rdd_mates)

# Mostrar los primeros 10 resultados del RDD completo
print("RDD con todas las notas (primeros 10):")
for alumno, nota in rdd_completo.take(10):
    print(f"Alumno: {alumno}, Nota: {nota}")

RDD con todas las notas (primeros 10):
Alumno: Angel, Nota: 9.0
Alumno: Maria, Nota: 3.0
Alumno: Ramon, Nota: 7.0
Alumno: Jorge, Nota: 5.0
Alumno: Susana, Nota: 9.0
Alumno: Anabel, Nota: 2.0
Alumno: Pedro, Nota: 2.0
Alumno: Rocio, Nota: 5.0
Alumno: Carlos, Nota: 4.0
Alumno: Rocio, Nota: 7.0


c. ¿Cuál es la nota más baja que ha tenido cada alumno?

In [0]:
# c. ¿Cuál es la nota más baja que ha tenido cada alumno?

# Unir las tres asignaturas en un solo RDD
union_rdd = rdd_fisica.union(rdd_ingles).union(rdd_mates)

# Agrupar por alumno y calcular la nota más baja
nota_baja = union_rdd.groupByKey().mapValues(lambda x: min(x))

# Mostrar las primeras 10 notas más bajas por alumno
print("Nota más baja por alumno (primeros 10):")
for alumno, baja in nota_baja.take(10):
    print(f"Alumno: {alumno}, Nota más baja: {baja}")


Nota más baja por alumno (primeros 10):
Alumno: Angel, Nota más baja: 4.0
Alumno: Carlos, Nota más baja: 4.0
Alumno: Anabel, Nota más baja: 2.0
Alumno: Jorge, Nota más baja: 5.0
Alumno: Susana, Nota más baja: 2.0
Alumno: Andres, Nota más baja: 4.0
Alumno: Jose Juan, Nota más baja: 3.0
Alumno: Rocio, Nota más baja: 4.0
Alumno: Fernando, Nota más baja: 5.0
Alumno: Oscar, Nota más baja: 3.0


d. ¿Cuál es la nota media de cada alumno?

In [0]:
# d. ¿Cuál es la nota media de cada alumno?

# Unir las tres asignaturas en un solo RDD
union_rdd = rdd_fisica.union(rdd_ingles).union(rdd_mates)

# Agrupar por alumno (key) y calcular la media de las notas
nota_media = union_rdd.groupByKey().mapValues(lambda x: round(sum(x) / len(x), 2))

# Mostrar las primeras 10 notas medias por alumno
print("Nota media por alumno (primeros 10):")
for alumno, media in nota_media.take(10):
    print(f"Alumno: {alumno}, Nota media: {media}")


Nota media por alumno (primeros 10):
Alumno: Angel, Nota media: 6.33
Alumno: Carlos, Nota media: 5.33
Alumno: Anabel, Nota media: 5.67
Alumno: Jorge, Nota media: 6.67
Alumno: Susana, Nota media: 6.67
Alumno: Andres, Nota media: 4.67
Alumno: Jose Juan, Nota media: 3.67
Alumno: Rocio, Nota media: 5.5
Alumno: Fernando, Nota media: 7.0
Alumno: Oscar, Nota media: 5.0


e. ¿Cuantos estudiantes suspende cada asignatura? 

In [0]:
# e. ¿Cuántos estudiantes suspende cada asignatura?
suspensos_fisica = rdd_fisica.filter(lambda x: x[1] < 5).count()
suspensos_ingles = rdd_ingles.filter(lambda x: x[1] < 5).count()
suspensos_mates = rdd_mates.filter(lambda x: x[1] < 5).count()

print("Suspensos en Física:", suspensos_fisica)
print("Suspensos en Inglés:", suspensos_ingles)
print("Suspensos en Matemáticas:", suspensos_mates)

Suspensos en Física: 8
Suspensos en Inglés: 7
Suspensos en Matemáticas: 7


f. ¿En qué asignatura suspende más gente?

In [0]:
# f. ¿En qué asignatura suspende más gente?

# Contar los suspensos en cada asignatura
suspensos_fisica = rdd_fisica.filter(lambda x: x[1] < 5).count()
suspensos_ingles = rdd_ingles.filter(lambda x: x[1] < 5).count()
suspensos_mates = rdd_mates.filter(lambda x: x[1] < 5).count()

# Comparar el número de suspensos
max_suspensos = max(suspensos_fisica, suspensos_ingles, suspensos_mates)

# Imprimir el resultado según la asignatura con más suspensos
if max_suspensos == suspensos_fisica:
    print("Más suspensos en Física:", suspensos_fisica)
elif max_suspensos == suspensos_ingles:
    print("Más suspensos en Inglés:", suspensos_ingles)
else:
    print("Más suspensos en Matemáticas:", suspensos_mates)


Más suspensos en Física: 8


g. Total de notables o sobresalientes por alumno, es decir, cantidad de notas superiores o igual a 7.  

In [0]:
# g. Total de notables o sobresalientes por alumno (notas >= 7)
# Unir los RDD de física, inglés y mates, filtrar las notas >= 7
notables_por_alumno = rdd_fisica.union(rdd_ingles).union(rdd_mates) \
    .filter(lambda x: x[1] >= 7) \
    .map(lambda x: (x[0], 1)) \
    .reduceByKey(lambda x, y: x + y)

# Recoger los resultados
notables_por_alumno_result = notables_por_alumno.collect()

# Imprimir los resultados
print("Notables o sobresalientes por alumno:")
for alumno, total in notables_por_alumno_result:
    print(f"{alumno}: {total}")


Notables o sobresalientes por alumno:
Angel: 1
Carlos: 1
Anabel: 2
Susana: 2
Jorge: 1
Rocio: 1
Fernando: 2
Isabel: 3
Alejandro: 1
Oscar: 1
Ramon: 2
Nicolas: 1
Rosa: 2


h. ¿Qué alumno no se ha presentado a inglés?

In [0]:
# h. ¿Qué alumno no se ha presentado a inglés?
# Recoger los alumnos que se han presentado a inglés
alumnos_ingles = rdd_ingles.map(lambda x: x[0]).collect()

# Unir los RDD de física y mates, obtener los alumnos y eliminar los que están en inglés
alumnos_no_ingles = rdd_fisica.union(rdd_mates).map(lambda x: x[0]).distinct().subtract(sc.parallelize(alumnos_ingles))

# Recoger los resultados
alumnos_no_ingles_result = alumnos_no_ingles.collect()

# Imprimir los resultados
print("Alumnos que no se han presentado a Inglés:")
for alumno in alumnos_no_ingles_result:
    print(alumno)


Alumnos que no se han presentado a Inglés:
Pedro


i. ¿A cuántas asignaturas se ha presentado cada alumno?

In [0]:
# i. ¿A cuántas asignaturas se ha presentado cada alumno?
asignaturas_presentadas = rdd_fisica.map(lambda x: (x[0], 1)) \
                                    .union(rdd_ingles.map(lambda x: (x[0], 1))) \
                                    .union(rdd_mates.map(lambda x: (x[0], 1)))

# Contamos cuántas asignaturas se ha presentado cada alumno
asignaturas_presentadas_count = asignaturas_presentadas.reduceByKey(lambda x, y: x + y)

# Recoger los resultados
asignaturas_presentadas_result = asignaturas_presentadas_count.collect()

# Imprimir los resultados ordenados
print("Número de asignaturas a las que se ha presentado cada alumno:")
for alumno, num_asignaturas in sorted(asignaturas_presentadas_result, key=lambda x: x[1], reverse=True):
    print(f"{alumno}: {num_asignaturas}")


Número de asignaturas a las que se ha presentado cada alumno:
Rocio: 4
Angel: 3
Carlos: 3
Anabel: 3
Jorge: 3
Susana: 3
Andres: 3
Jose Juan: 3
Fernando: 3
Oscar: 3
Isabel: 3
Alejandro: 3
Ramon: 3
Leonardo: 3
Nicolas: 3
Maria: 3
Triana: 3
Rosa: 3
Pedro: 2


j. Obtén un RDD con cada alumno con sus notas

In [0]:
# j. Obtener un RDD con cada alumno y sus notas en todas las asignaturas
notas_totales = rdd_fisica.fullOuterJoin(rdd_ingles).fullOuterJoin(rdd_mates).mapValues(
    lambda x: (
        x[0][0] if x[0][0] is not None else 0,  # Nota de Física (si existe, sino 0)
        x[0][1] if x[0][1] is not None else 0,  # Nota de Inglés (si existe, sino 0)
        x[1] if x[1] is not None else 0          # Nota de Matemáticas (si existe, sino 0)
    )
)

# Recoger los resultados
notas_totales_result = notas_totales.collect()

# Imprimir los resultados de forma ordenada
print("Notas totales por alumno (todos los registros):")
for alumno, notas in notas_totales_result:
    print(f"{alumno}: Física = {notas[0]}, Inglés = {notas[1]}, Matemáticas = {notas[2]}")


Notas totales por alumno (todos los registros):
Carlos: Física = 4.0, Inglés = 8.0, Matemáticas = 4.0
Angel: Física = 9.0, Inglés = 4.0, Matemáticas = 6.0
Anabel: Física = 2.0, Inglés = 7.0, Matemáticas = 8.0
Jorge: Física = 5.0, Inglés = 5.0, Matemáticas = 10.0
Susana: Física = 9.0, Inglés = 2.0, Matemáticas = 9.0
Andres: Física = 4.0, Inglés = 6.0, Matemáticas = 4.0
Jose Juan: Física = 3.0, Inglés = 3.0, Matemáticas = 5.0
Oscar: Física = 5.0, Inglés = 3.0, Matemáticas = 7.0
Rocio: Física = 5.0, Inglés = 4.0, Matemáticas = 6.0
Rocio: Física = 7.0, Inglés = 4.0, Matemáticas = 6.0
Fernando: Física = 9.0, Inglés = 7.0, Matemáticas = 5.0
Isabel: Física = 8.0, Inglés = 7.0, Matemáticas = 8.0
Alejandro: Física = 3.0, Inglés = 7.0, Matemáticas = 5.0
Ramon: Física = 7.0, Inglés = 8.0, Matemáticas = 4.5
Pedro: Física = 2.0, Inglés = 0, Matemáticas = 5.0
Leonardo: Física = 6.0, Inglés = 4.0, Matemáticas = 1.0
Nicolas: Física = 7.0, Inglés = 5.0, Matemáticas = 2.0
Maria: Física = 3.0, Inglés = 6