# U5_Actividad con PySpark

TECNM Campus La Laguna

Big Data

**Alumno: 18131209 - ADAME SANDOVAL JOSE MISAEL**

# ¿Qué es Spark?

Apache Spark es una tecnología de cómputo de clústeres excepcional, diseñada para cálculos rápidos. Depende de Hadoop MapReduce y extiende el modelo de MapReduce para utilizarlo de manera efectiva para más tipos de cálculos, que incorporan preguntas intuitivas y manejo de flujos. El elemento fundamental de Spark es su agrupamiento en memoria que expande el ritmo de preparación de una aplicación. Spark puede procesar cantidades de datos en el orden de terabytes incluso petabytes.

Spark utiliza Hadoop de dos maneras diferentes: una es para almacenamiento y la segunda para el manejo de procesos. Solo porque Spark tiene su propia administración de clústeres, utiliza Hadoop para el objetivo de almacenamiento.

Spark está diseñado para cubrir una amplia variedad de cargas restantes, por ejemplo, aplicaciones de clústeres, cálculos iterativos, preguntas intuitivas y transmisión. Además de soportar todas estas tareas restantes en un marco particular, disminuye el peso de la administración de mantener aparatos aislados.

## ¿Qué es PySpark?

PySpark es una interfaz para Apache Spark en Python. No sólo permite escribir aplicaciones Spark utilizando las APIs de Python, sino que también proporciona el shell PySpark para analizar interactivamente sus datos en un entorno distribuido. PySpark soporta la mayoría de las características de Spark como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) y Spark Core.

## ¿Para qué es Spark?

Apache Spark es un sistema de computación en clúster muy veloz. Proporciona el conjunto de API de alto nivel, a saber, Java, Scala, Python y R para el desarrollo de aplicaciones. Apache Spark es una herramienta para ejecutar rápidamente aplicaciones Spark.

## Características

* Está integrado con Apache Hadoop.
* Trabaja en memoria, con lo que se consigue mucha mayor velocidad de procesamiento .
* También permite trabajar en disco. De esta manera si por ejemplo tenemos un fichero muy grande o una cantidad de información que no cabe en memoria, la herramienta permite almacenar parte en disco, lo que hace perder velocidad. Esto hace que tengamos que intentar encontrar el equilibrio entre lo que se almacena en memoria y lo que se almacena en disco, para tener una buena velocidad y para que el coste no sea demasiado elevado, ya que la memoria siempre es bastante más cara que el disco.
* Nos proporciona API para Java, Scala, Python y R.
* Permite el procesamiento en tiempo real, con un módulo llamado Spark Streaming, que combinado con Spark SQL nos va a permitir el procesamiento en tiempo real de los datos. Conforme vayamos inyectando los datos podemos ir transformándolos y volcándolos a un resultado final.
* **Resilient Distributed Dataset (RDD):** Usa la evaluación perezosa, lo que significa es que todas las transformaciones que vamos realizando sobre los RDD, no se resuelven, si no que se van almacenando en un grafo acíclico dirigido (DAG) , y cuando ejecutamos una acción, es decir, cuando la herramienta no tenga más opción que ejecutar todas las transformaciones, será cuando se ejecuten.

 # SparkSession
 
SparkSession introducido en la versión 2.0, es un punto de entrada a la funcionalidad subyacente de PySpark con el fin de crear programáticamente PySpark RDD, DataFrame. Su objeto spark está disponible por defecto en pyspark-shell y puede ser creado programáticamente usando SparkSession.

### Funcionalidad y características

SparkSession es una clase combinada para todos los diferentes contextos que teníamos antes de la versión 2.0 (SQLContext y HiveContext, etc.). Desde la versión 2.0, SparkSession puede utilizarse en sustitución de SQLContext, HiveContext y otros contextos definidos antes de la versión 2.0.

Como se mencionó al principio SparkSession es un punto de entrada a PySpark y la creación de una instancia de SparkSession sería la primera declaración que escribirías para programar con RDD, DataFrame y Dataset. La SparkSession se creará utilizando los patrones del constructor SparkSession.builder.

Aunque SparkContext solía ser un punto de entrada antes de la versión 2.0, no se ha sustituido completamente por SparkSession, muchas características de SparkContext siguen estando disponibles y se utilizan en Spark 2.0 y posteriores. También debes saber que SparkSession crea internamente SparkConfig y SparkContext con la configuración proporcionada con SparkSession.

Puedes crear tantos objetos SparkSession como quieras utilizando SparkSession.builder o SparkSession.newSession.

### Ejemplo

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('Pruebita') \
                    .getOrCreate()

**master()** - Si se está ejecutando en el clúster es necesario utilizar el nombre de su maestro como un argumento a master(). por lo general, sería ya sea yarn o mesos depende de la configuración de su clúster.

Utilice **local[x]** cuando se ejecuta en modo Standalone. x debe ser un valor entero y debe ser mayor que 0; esto representa cuántas particiones debe crear cuando se utiliza RDD, DataFrame, y Dataset. Idealmente, el valor de x debería ser el número de núcleos de la CPU que tiene.

**appName()** - Se utiliza para establecer el nombre de su aplicación.

**getOrCreate()** - Devuelve un objeto SparkSession si ya existe, crea uno nuevo si no existe.

Nota: El objeto SparkSession "spark" está disponible por defecto en el shell de PySpark.

In [2]:
type(spark)

pyspark.sql.session.SparkSession

In [3]:
#version - Devuelve la versión de Spark en la que se está ejecutando tu aplicación, probablemente la versión de Spark con la 
#que está configurado tu cluster
spark.version 

'3.1.1'

In [4]:
#getActiveSession() - devuelve una sesión activa de Spark.
spark.getActiveSession()

In [5]:
#sql - Devuelve un DataFrame después de ejecutar el SQL mencionado.
spark.sql

<bound method SparkSession.sql of <pyspark.sql.session.SparkSession object at 0x0000023766A54F10>>

In [6]:
#stop() - Detener el SparkContext actual.
spark.stop()

# SparkContext

Punto de entrada principal para la funcionalidad de Spark. Un SparkContext representa la conexión a un clúster de Spark, y puede utilizarse para crear RDDs, acumuladores y variables de difusión en ese clúster.

### Funcionalidad y características

SparkContext utiliza Py4J para lanzar una JVM y crea un JavaSparkContext. Por defecto, PySpark tiene SparkContext disponible como 'sc', por lo que crear un nuevo SparkContext no funcionará.

Los siguientes son los parámetros de un SparkContext.

* Master - Es la URL del cluster al que se conecta.
* appName - Nombre de su trabajo.
* sparkHome - Directorio de instalación de Spark.
* pyFiles - Los archivos .zip o .py a enviar al cluster y añadir al PYTHONPATH.
* Environment - Variables de entorno de los nodos de trabajo.
* batchSize - El número de objetos Python representados como un único objeto Java. Establezca 1 para deshabilitar el batching, 0 para elegir automáticamente el tamaño del batch basado en el tamaño de los objetos, o -1 para utilizar un tamaño de batch ilimitado.
* Serializador - Serializador RDD.
* Conf - Un objeto de L{SparkConf} para establecer todas las propiedades de Spark.
* Gateway - Utilizar una puerta de enlace y JVM existente, de lo contrario inicializar una nueva JVM.
* JSC - La instancia de JavaSparkContext.
* profiler_cls - Una clase de Profiler personalizada utilizada para hacer el profiling (el valor por defecto es pyspark.profiler.BasicProfiler).

Entre los parámetros anteriores, master y appname son los más utilizados. 

### Ejemplo

In [7]:
import pyspark

In [8]:
sc = pyspark.SparkContext(appName="Tarea_Prueba") # sc objeto que apunta a SC al cluster local

In [9]:
type(sc)

pyspark.context.SparkContext

In [10]:
#applicationId - Devuelve un ID único de una aplicación Spark
sc.applicationId

'local-1623998704925'

In [11]:
#master - Devuelve el master que se estableció al crear SparkContext
sc.master

'local[1]'

In [12]:
#appName - Devuelve el nombre de la aplicación que se dio al crear SparkContext
sc.appName

'Tarea_Prueba'

In [13]:
#getOrCreate - Crea o devuelve un SparkContext
sc.getOrCreate

<bound method SparkContext.getOrCreate of <class 'pyspark.context.SparkContext'>>

In [14]:
#sc.stop()

# Parallelize

PySpark parallelize() es una función en SparkContext y se utiliza para crear un RDD a partir de una colección de listas. Pero ¿Qué es un RDD?

**Resilient Distributed Datasets (RDD)** es una estructura de datos fundamental de PySpark, es una colección distribuida inmutable de objetos. Cada conjunto de datos en RDD se divide en particiones lógicas, que pueden ser calculadas en diferentes nodos del clúster.

* PySpark Parallelizing es una colección existente en el driver del programa.

La función parallelize() también tiene otra firma que adicionalmente toma un argumento entero para especificar el número de particiones. Las particiones son unidades básicas de paralelismo en PySpark

In [15]:
# Creando un RDD de 7 enteros: se crea en la memoria del proceso driver
r = sc.parallelize([1,2,3,4,5,6,7])
type(r)

pyspark.rdd.RDD

In [16]:
# Creando un RDD de 4 cadenas de texto
r = sc.parallelize(["Hola", "Maestra", "Lamia", "<3"])
type(r)

pyspark.rdd.RDD

# Lectura de archivos TXT

Cuando se quiere cargar un archivo plano a Spark se puede usar el método textFile(). Dicho método, nos devuelve un objeto de tipo <class ‘pyspark.rdd.RDD’>. El método sparkContext.textFile() se utiliza para leer un archivo de texto desde HDFS, S3 y cualquier sistema de archivos soportado por Hadoop, este método toma la ruta como argumento y opcionalmente toma un número de particiones como segundo argumento.

### Ejemplo de lectura

Es un .txt de un Web Scrapping sobre Romeo y Julieta

In [17]:
#usar el método textFile() para cargarlo a un RDD.
rdd = sc.textFile("C:\\Users\\jmas_\\Jupyter\\Datasets\\ryj.txt")
print(rdd)

C:\Users\jmas_\Jupyter\Datasets\ryj.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0


In [18]:
#se aplica la acción collect() sobre el RDD podemos ver como fueron cargados los datos.
rdd.collect()

['The Project Gutenberg EBook of Romeo and Juliet, by William Shakespeare',
 '',
 '',
 '*******************************************************************',
 "THIS EBOOK WAS ONE OF PROJECT GUTENBERG'S EARLY FILES PRODUCED AT A",
 'TIME WHEN PROOFING METHODS AND TOOLS WERE NOT WELL DEVELOPED. THERE',
 'IS AN IMPROVED EDITION OF THIS TITLE WHICH MAY BE VIEWED AS EBOOK',
 '(#1513) at https://www.gutenberg.org/ebooks/1513',
 '*******************************************************************',
 '',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org/license',
 '',
 '',
 'Title: Romeo and Juliet',
 '',
 'Author: William Shakespeare',
 '',
 'Posting Date: May 25, 2012 [EBook #1112]',
 'Release Date: November, 1997  [Etext #1112]',
 '',
 'Language: English',
 '',
 '',
 '*** STAR

# Lectura de archivos CSV

Spark SQL proporciona spark.read.csv("path") para leer un archivo CSV en Spark DataFrame y dataframe.write.csv("path") para guardar o escribir en el archivo CSV. Spark soporta la lectura de archivos con pipes, comas, tabulaciones o cualquier otro delimitador/separador.

Nota: Spark admite la lectura de archivos en CSV, JSON, TEXT, Parquet, y muchos más formatos de archivo en Spark DataFrame.

### Ejemplo de lectura

Vamos a leer el archivo csv ahora usando spark.read.csv

In [19]:
#Crear un SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('Prueba_Leer_CSV') \
                    .getOrCreate()

#Leer el csv
#Header indica que ya tiene una cabecera y el encoding es como se va a codificar, en este caso, latin1 para que pueda leer 
#las letras ñ o con acentos las palabras
df = spark.read.csv("C:\\Users\\jmas_\\Jupyter\\Datasets\\info_sni.csv", header=True, encoding='latin1')

In [20]:
#Comprobar el tipo de dato
type(df)

pyspark.sql.dataframe.DataFrame

In [21]:
#Mostrar los registros del CSV (los 20 primeros)
df.show()

+-----+-----------------+----------------+---------------+------+--------------------------+--------------------+
|Grado|Apellido paterno |Apellido Materno|         Nombre|Nivel |Institución de adcripción |Área de conocimiento|
+-----+-----------------+----------------+---------------+------+--------------------------+--------------------+
|  DR.|            VERMA|         JAISWAL|   SURENDRA PAL|     3|      UNIVERSIDAD NACIO...|Área I: Físico-Ma...|
|  DR.|           ALONSO|         SANCHEZ|          JORGE|     3|      CENTRO DE INVESTI...|Área IV: Humanida...|
| DRA.|           AZAOLA|         GARRIDO|          ELENA|     3|      CENTRO DE INVESTI...|Área IV: Humanida...|
|  DR.|            RAMON|          ROMERO|  FIDEL ALBERTO|     2|      UNIVERSIDAD NACIO...|Área II: Biología...|
|  DR.|            PEREZ|           ANGON|   MIGUEL ANGEL|     3|      CENTRO DE INVESTI...|Área I: Físico-Ma...|
|  DR.|           MEDINA|          NOYOLA|      MAGDALENO|     3|      UNIVERSIDAD AUTON

In [22]:
#Mostrar los primeros 4 registros
df.show(4)

+-----+-----------------+----------------+-------------+------+--------------------------+--------------------+
|Grado|Apellido paterno |Apellido Materno|       Nombre|Nivel |Institución de adcripción |Área de conocimiento|
+-----+-----------------+----------------+-------------+------+--------------------------+--------------------+
|  DR.|            VERMA|         JAISWAL| SURENDRA PAL|     3|      UNIVERSIDAD NACIO...|Área I: Físico-Ma...|
|  DR.|           ALONSO|         SANCHEZ|        JORGE|     3|      CENTRO DE INVESTI...|Área IV: Humanida...|
| DRA.|           AZAOLA|         GARRIDO|        ELENA|     3|      CENTRO DE INVESTI...|Área IV: Humanida...|
|  DR.|            RAMON|          ROMERO|FIDEL ALBERTO|     2|      UNIVERSIDAD NACIO...|Área II: Biología...|
+-----+-----------------+----------------+-------------+------+--------------------------+--------------------+
only showing top 4 rows



# Creacion de RDD: vacío y con datos

RDD (Resilient Distributed Dataset) es una estructura de datos fundamental de Spark y es la principal abstracción de datos en Apache Spark y el Spark Core. Los RDD son colecciones distribuidas de objetos inmutables y tolerantes a fallos, lo que significa que una vez que se crea un RDD no se puede cambiar. Cada conjunto de datos en RDD se divide en particiones lógicas, que pueden ser calculadas en diferentes nodos del clúster.

### Caracteristicas

los RDDs son una colección de objetos similar a las colecciones en Scala, con la diferencia de que los RDDs se computan en varias JVMs dispersas en múltiples servidores físicos también llamados nodos en un cluster mientras que una colección en Scala vive en una sola JVM. Por ejemplo, si cargamos un RDD a partir de un archivo plano se creará un RDD de cadenas de texto, una por cada linea del archivo.

En Scala o Java, al declarar un RDD se debe definir el tipo de los registros:

* Lenguajes estáticos : RDD[String] en Scala y JavaRDD en Java.
* Lenguajes dinámicos : Python se pueden mezclar los tipos de datos.

RDDs han sido diseñados desde el inicio para ser distribuidos: Los registros que lo componen se repartirán entre los clúster.

Los RDDs son inmutables: no se puede modificar ni actualizar. Una vez creado, así permanece hasta que se termina la ejecución del programa

Los RDDs de Spark no son muy adecuados para aplicaciones que realizan actualizaciones en el almacén de estado, como los sistemas de almacenamiento de una aplicación web. El objetivo de los RDD es proporcionar un modelo de programación eficiente para el análisis por lotes y dejar estas aplicaciones asíncronas.

Un RDD puede estar presente en un solo SparkContext y el RDD puede tener un nombre y un identificador único (id)

### Funcionalidad

Se pueden aplicar múltiples operaciones sobre estos RDDs para lograr una determinada tarea.

Para aplicar operaciones sobre estos RDD's, hay dos formas

* **Transformación** - Estas son las operaciones que se aplican sobre un RDD para crear un nuevo RDD. Filtro, groupBy y map son ejemplos de transformaciones.
* **Acción** - Estas son las operaciones que se aplican en el RDD, que instruye a Spark para realizar el cálculo y enviar el resultado al controlador.

## Ejemplo de uno vacío

Crear un RDD vacío y también se puede utilizar parallelize() para crearlo.

In [23]:
RDDVacio = sc.emptyRDD()

In [24]:
#Mostrar que esta vacio el RDDVacio
RDDVacio.collect()

[]

In [25]:
RDDVacio2 = sc.parallelize([])

In [26]:
#Mostrar que esta vacio el RDDVacio2
RDDVacio2.collect()

[]

## Ejemplo con datos

Utilizar sparkContext.parallelize() para crear un RDD a partir de una lista o colección.

In [27]:
# Un RDD con 6 datos de tipo entero
rdd = sc.parallelize([2,4,6,8,10,12])

In [28]:
rdd.collect()

[2, 4, 6, 8, 10, 12]

In [29]:
rdd = sc.parallelize([["Hola", 2, 3, "Quiero", "Vacaciones"]])

In [30]:
rdd.collect()

[['Hola', 2, 3, 'Quiero', 'Vacaciones']]

# Particiones

Spark particiona automáticamente los RDDs y distribuye las particiones entre diferentes nodos. Una partición en Spark es un trozo atómico de datos (división lógica de los datos) almacenado en un nodo del clúster. Las particiones son unidades básicas de paralelismo en Apache Spark. Los RDDs en Apache Spark son una colección de particiones.

### Caracteristicas

* Cada máquina en un cluster de spark contiene una o más particiones.
* El número de particiones en spark es configurable y tener muy pocas o demasiadas particiones no es bueno.
* Las particiones en Spark no abarcan varias máquinas.

### Funcionalidad del particionamiento

Una forma importante de aumentar el paralelismo del procesamiento de Spark es aumentar el número de ejecutores en el clúster. Sin embargo, saber cómo deben distribuirse los datos para que el clúster pueda procesarlos de forma eficiente es extremadamente importante. El secreto para conseguirlo es el particionamiento en Spark. Apache Spark gestiona los datos a través de RDDs utilizando particiones que ayudan a paralelizar el procesamiento de datos distribuidos con un tráfico de red insignificante para el envío de datos entre ejecutores. Por defecto, Apache Spark lee los datos en un RDD desde los nodos que están cerca de él.

El particionamiento es un concepto importante en apache spark ya que determina cómo se accede a todos los recursos de hardware cuando se ejecuta cualquier trabajo. En apache spark, por defecto se crea una partición para cada partición HDFS de tamaño 64MB. Los RDDs se particionan automáticamente en spark sin intervención humana, sin embargo, a veces los programadores quieren cambiar el esquema de partición cambiando el tamaño de las particiones y el número de particiones en base a los requerimientos de la aplicación. Para el particionamiento personalizado los desarrolladores tienen que comprobar el número de ranuras en el hardware y cuántas tareas puede manejar un ejecutor para optimizar el rendimiento y lograr el paralelismo.

### Ejemplo

In [31]:
#crea una lista de 12 enteros con 3 particiones
rdd = sc.parallelize(range(12), 3)

# Shuffle

Spark SQL shuffle es un mecanismo para redistribuir o re-particionar los datos para que los datos se agrupen de manera diferente a través de las particiones, basado en el tamaño de sus datos puede necesitar reducir o aumentar el número de particiones de RDD/DataFrame usando la configuración spark.sql.shuffle.partitions o a través de código.

Shuffling es un mecanismo que Spark utiliza para redistribuir los datos entre diferentes ejecutores e incluso entre máquinas. Spark shuffling dispara operaciones de transformación como **groupByKey(), reducebyKey()**, etc.

## Caracteristicas

Spark Shuffle es una operación costosa ya que implica lo siguiente
 
* E/S de disco
* Implica la serialización y deserialización de datos
* E/S de red

Cuando se crea un RDD, Spark no almacena necesariamente los datos de todas las claves de una partición ya que en el momento de la creación no podemos establecer la clave del conjunto de datos.

## groupByKey

La función groupByKey de Spark RDD recoge los valores de cada clave en forma de iterador

Como su nombre indica la función groupByKey en Apache Spark sólo agrupa todos los valores con respecto a una única clave. A diferencia de reduceByKey no realiza ninguna operación sobre el resultado final. Simplemente agrupa los datos y los devuelve en forma de iterador. Es una operación de transformación lo que significa que su evaluación es perezosa.

Ahora bien, debido a que en el RDD fuente pueden existir múltiples claves en cualquier partición, esta función requiere hacer un shuffle en todos los datos con una misma clave a una sola partición a menos que su RDD fuente ya esté particionado por clave. Y este barajado hace que esta transformación sea una transformación más amplia.

Es ligeramente diferente a la transformación groupBy() ya que requiere un par clave-valor mientras que en groupBy() puede o no tener claves en el RDD fuente. La función de transformación groupBy() también necesita una función para formar una clave que no es necesaria en el caso de la función spark groupByKey().

#### Puntos importantes

* Apache spark groupByKey es una operación de transformación por lo que su evaluación es perezosa
* Es una operación amplia ya que shufflea los datos de múltiples particiones y crea otro RDD
* Esta operación es costosa ya que no utiliza el combinador local de una partición para reducir la transferencia de datos
* No se recomienda su uso cuando se necesita hacer una agregación posterior de los datos agrupados
* groupByKey siempre da como resultado RDDs con particiones Hash

### Ejemplo de groupByKey

In [32]:
# Ejemplo basico de groupByKey
x = sc.parallelize([
    ("EEUU", 1), ("EEUU", 2), ("Mexico", 1),
    ("Ecuador", 1), ("Mexico", 4), ("Mexico", 9),
    ("EEUU", 8), ("EEUU", 3), ("Mexico", 4),
    ("Ecuador", 6), ("Ecuador", 9), ("Ecuador", 5)], 3)
 
# groupByKey con particiones por defecto
y = x.groupByKey()

In [33]:
# Checar particiones
print('Salida: ', y.getNumPartitions()) 

Salida:  3


In [34]:
# Con particiones predefinidas
y = x.groupByKey(2)
print('Salida: ', y.getNumPartitions())

Salida:  2


In [35]:
# Imprimir salida
for t in y.collect():
    print(t[0], [v for v in t[1]])

Mexico [1, 4, 9, 4]
Ecuador [1, 6, 9, 5]
EEUU [1, 2, 8, 3]


## reduceByKey

La función reduceByKey de Spark combina los valores de cada clave utilizando una función de reducción asociativa.

Básicamente, la función reduceByKey sólo funciona para RDDs que contienen pares de elementos clave y valor (es decir, RDDs que tienen una tupla o un mapa como elemento de datos). Es una operación de transformación, lo que significa que se evalúa perezosamente. Necesitamos pasar una función asociativa como parámetro, que se aplicará al RDD fuente y creará un nuevo RDD con los valores resultantes (es decir, un par clave-valor). Esta operación es una operación amplia, ya que los datos pueden ser shuffleados a través de las particiones.

La función asociativa (que acepta dos argumentos y devuelve un único elemento) debe ser conmutativa y asociativa en su naturaleza matemática. Esto significa intuitivamente que esta función produce el mismo resultado cuando se aplica repetidamente sobre el mismo conjunto de datos RDD con múltiples particiones, independientemente del orden de los elementos. Además, realiza la fusión localmente usando la función de reducción y luego envía los registros a través de las particiones para preparar los resultados finales.

#### Puntos importantes

* reduceByKey es una operación de transformación en Spark, por lo que se evalúa perezosamente
* Es una operación amplia ya que shufflea los datos de múltiples particiones y crea otro RDD
* Antes de enviar los datos a través de las particiones, también fusiona los datos localmente usando la misma función asociativa para optimizar el barajado de datos
* Sólo puede utilizarse con RDDs que contengan elementos del tipo pares de claves y valores
* Acepta una función conmutativa y asociativa como argumento
    * La función parámetro debe tener dos argumentos del mismo tipo de datos
    * El tipo de retorno de la función también debe ser el mismo que el de los argumentos
    
### Ejemplo de reduceByKey

In [36]:
# Ejemplo basico de reduceByKey
# crear PairRDD x con pares clave-valor
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
                    ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)


In [37]:
# Aplicar la operación reduceByKey a x
y = x.reduceByKey(lambda acum, n: acum + n)
print(y.collect())

[('b', 5), ('a', 3)]


In [38]:
# Definir la función asociativa por separado 
def sumFunc(acum, n):
    return acum + n
 
y = x.reduceByKey(sumFunc)
print(y.collect())

[('b', 5), ('a', 3)]


# Transformaciones

Las transformaciones RDD de PySpark son de evaluación perezosa y se utilizan para transformar/actualizar de un RDD a otro. Cuando se ejecuta en un RDD, el resultado es un único o varios RDD nuevos.

Dado que los RDD son inmutables por naturaleza, las transformaciones siempre crean un nuevo RDD sin actualizar uno existente, por lo que una cadena de transformaciones de RDD crea un linaje de RDD.

El linaje RDD también se conoce como gráfico de operadores RDD o gráfico de dependencia RDD.

Algunas transformaciones basicas en Spark:

* map()
* flatMap()
* filter()
* sortByKey()

## map()

PySpark MAP es una transformación en PySpark que se aplica sobre todas y cada una de las funciones de un RDD / Data Frame en una aplicación Spark. El tipo de retorno es un nuevo RDD o marco de datos donde se aplica la función Map. Se utiliza para aplicar operaciones sobre cada elemento en una aplicación PySpark como una transformación, una actualización de la columna, etc.

La operación Map es una simple transformación de Spark que toma un elemento del Data Frame / RDD y le aplica la lógica de transformación dada. Podemos definir nuestra propia lógica de transformación personalizada o la función derivada de la biblioteca y aplicarla utilizando la función map. El resultado devuelto será un nuevo RDD con el mismo número de elementos que el anterior.

#### Sintaxis

a.map (lambda x : x+1)

### Ejemplos de map()

In [39]:
# Ejemplo 1
r = sc.parallelize([1,2,3,4,5,6,7,8,10,15])
r2 = r.map(lambda x: x + 2)
r2.collect()

[3, 4, 5, 6, 7, 8, 9, 10, 12, 17]

In [40]:
# Ejemplo 2
#Sin lambda
def incremento_en_dos(x):
    return x + 2

r = sc.parallelize([1,2,3,4,5,6,7,8,10,15])
r2 = r.map(incremento_en_dos)
r2.collect()

[3, 4, 5, 6, 7, 8, 9, 10, 12, 17]

In [41]:
#Ejemplo 3
#Usando strings
r = sc.parallelize(["holi", "maestra", "Lamia", "uwu", "<3"])
r2 = r.map(lambda x: len(x))
r2.collect()

[4, 7, 5, 3, 2]

## filter()

PySpark Filter es una función en PySpark añadida para tratar los datos filtrados cuando se necesitan en un Spark Data Frame. La limpieza de datos es una tarea muy importante mientras se manejan los datos en PySpark y PYSPARK Filter viene con las funcionalidades que se pueden lograr por el mismo. PySpark Filter se aplica con el Data Frame y se utiliza para filtrar los datos en todo momento de manera que los datos necesarios se dejan para el procesamiento y el resto de los datos no se utilizan. Esto ayuda a un procesamiento más rápido de los datos ya que los datos no deseados o malos son limpiados por el uso de la operación de filtro en un marco de datos.

La condición del filtro de PySpark se aplica en el marco de datos con varias condiciones que filtran los datos basados en los datos, la condición puede ser sobre una sola condición a múltiples condiciones utilizando la función SQL. Las filas se filtran desde el RDD / Data Frame y el resultado se utiliza para su posterior procesamiento.

#### Sintaxis:

La sintaxis de la función PySpark Filter es:

**df.filter(#condicion)**


* Condicion: La condición del filtro que queremos implementar.

### Ejemplos de filter()


In [42]:
# Ejemplo 1
# Creacion del DataFrame

a = spark.createDataFrame(["Misael", "Ricardo", "Jannette","Leonel","Yessica", "Lamia"], "string").toDF("Nombre")

# Mostrar el nombre con el filtro de Leonel
a.filter(a.Nombre == "Leonel").show()

+------+
|Nombre|
+------+
|Leonel|
+------+



In [43]:
#Ejemplo 2
#filter permite seleccionar de un RDD aquellos elementos que cumplen con una condición
def es_par(x):
    for i in range(1,x):
        if x % 2 == 0:
            return True
    return False
r = sc.parallelize(range(1,101))
r2 = r.filter(es_par)
r2.collect()

[2,
 4,
 6,
 8,
 10,
 12,
 14,
 16,
 18,
 20,
 22,
 24,
 26,
 28,
 30,
 32,
 34,
 36,
 38,
 40,
 42,
 44,
 46,
 48,
 50,
 52,
 54,
 56,
 58,
 60,
 62,
 64,
 66,
 68,
 70,
 72,
 74,
 76,
 78,
 80,
 82,
 84,
 86,
 88,
 90,
 92,
 94,
 96,
 98,
 100]

## flatMap()

En Apache Spark, Spark flatMap es una de las operaciones de transformación. La operación Tr de la función Map se aplica a todos los elementos de RDD que significa conjuntos de datos distribuidos resistentes. Estos son inmutables y una colección de registros que son particionados y estos solo pueden ser creados por operaciones (operaciones que se aplican a través de todos los elementos del conjunto de datos) como filter y map. El desarrollador de operaciones en Map tiene la facilidad de crear su propia lógica de negocio personalizada. Map() es principalmente similar a flatMap() y puede devolver sólo 0 o 1 y o más elementos de la función map().

#### Sintaxis:

**RDD.flatMap(<transformation function>)**
    
La función de transformación del código de sintaxis anterior, para cada elemento fuente del RDD, puede devolver múltiples elementos del RDD.

### Funcionalidad

Un flatMap es una operación de transformación. Se devuelve un nuevo RDD con su aplicación en cada elemento del RDD como resultado. Esto da muchos resultados, lo que significa que podemos obtener uno, dos, cero y otros muchos elementos de las aplicaciones de la operación flatMap. La operación Map está un paso por detrás de la técnica de operación flatMap y es mayormente similar.

### Ejemplos de flatMap()

In [44]:
#Ejemplo 1 : aplanar las listas
import csv

#cadenas en formato csv
r = sc.parallelize(["2,4,6", "8,10,12", "14,15,16"]) 

r2 = r.flatMap(lambda s: list(csv.reader([s]))[0])
r2.collect()


['2', '4', '6', '8', '10', '12', '14', '15', '16']

In [45]:
#Ejemplo 2 : aplanar las listas
r = sc.parallelize(["Torreon,Gomez Palacio,Lerdo", "Monterrey,Santa Catarina,San Pedro Garza García,Juarez", "Tampico,Panuco",
                    "Saltillo,Ramos Arizpe,Arteaga"]) 
r2 = r.flatMap(lambda s: list(csv.reader([s]))[0])
r2.collect()

['Torreon',
 'Gomez Palacio',
 'Lerdo',
 'Monterrey',
 'Santa Catarina',
 'San Pedro Garza García',
 'Juarez',
 'Tampico',
 'Panuco',
 'Saltillo',
 'Ramos Arizpe',
 'Arteaga']

## sortByKey()

La transformación de Spark sortByKey() es una operación RDD que se utiliza para ordenar los valores de la clave por orden ascendente o descendente. La función sortByKey() opera sobre un par RDD (par clave/valor).

### Funcionalidad

sortByKey() es una transformación.

* Devuelve un RDD ordenado por Clave.
* La ordenación puede hacerse en (1) Ascendente O (2) Descendente O (3) personalizada

Funcionarán con cualquier tipo de clave K que tenga un Ordering[K] implícito en el ámbito. Los objetos Ordering ya existen para todos los tipos primitivos estándar. Los usuarios también pueden definir sus propios ordenamientos para los tipos personalizados, o para anular el ordenamiento por defecto. Se utilizará la ordenación implícita que se encuentre en el ámbito más cercano.

Cuando se llama a Dataset of (K, V) donde k es Ordered devuelve un conjunto de datos de pares (K, V) ordenados por claves en orden ascendente o descendente, según se especifique en el argumento ascendente.

### Ejemplos de sortByKey()

In [46]:
#Ejemplo 1

rdd = sc.parallelize([('a', 1), ('c', 2), ('4', 3), ('d', 4), ('5', 5), ('7', 6)])

# Muestra el elemento ordenado
rdd.sortByKey().first()

('4', 3)

In [47]:
# Muestra ordenados los elementos
rdd.sortByKey().collect()

[('4', 3), ('5', 5), ('7', 6), ('a', 1), ('c', 2), ('d', 4)]

In [48]:
#Ejemplo 2
rdd = sc.parallelize([("Durango", 1), ("Coahuila", 2), ("CDMX", 3), ("Veracruz", 4), ("Sinaloa", 5)])

rdd2 = rdd.sortByKey()
rdd2.collect()

[('CDMX', 3), ('Coahuila', 2), ('Durango', 1), ('Sinaloa', 5), ('Veracruz', 4)]

In [49]:
#Ordenación inversa
rdd2 = rdd.sortByKey(False)
rdd2.collect()

[('Veracruz', 4), ('Sinaloa', 5), ('Durango', 1), ('Coahuila', 2), ('CDMX', 3)]

# Acciones

Las acciones RDD son operaciones que devuelven los valores brutos, es decir, cualquier función RDD que devuelva algo distinto a RDD[T] se considera una acción en spark programming.

Como se mencionó en Transformaciones RDD, todas las transformaciones son perezosas, lo que significa que no se ejecutan de inmediato y las funciones de acción se activan para ejecutar las transformaciones.

## aggregate()

Agregue los elementos de cada partición, y luego los resultados de todas las particiones.

### Funcionalidad

Tres parametros: 

1. Un valor inicial para el acumulador, zeroValue que tendrá tipo C
2. una función seqop para combinar elementos de nuestro RDD (de tipo T) con el acumulador de tipo C, devolviendo un valor de tipo C (C x T -> C).
3. Una función combOp para combinar dos acumuladores de tipo C y devolver un valor de tipo C.

Cuenta las h en el RDD

### Ejemplo de aggregate()

In [50]:
r = sc.parallelize(["santos", "monterrey", "tigres", "tijuana"])
#          (int, cuenta las "s", suma los acumuladores)  => (valor_inicial, map, reduce)
r.aggregate(0, lambda c, s : c + s.count("s"), lambda c1, c2: c1 + c2)

3

## count()

La acción "count" contará el número de elementos del RDD.

### Ejemplos de count()

In [51]:
# Ejemplo 1
r = sc.parallelize(["santos", "monterrey", "tigres", "tijuana"])
r.count()

4

In [52]:
# Ejemplo 2
r = sc.parallelize([1,2,3,4,5,6,7,8,10,15])
r.count()

10

In [53]:
# Ejemplo 3
r = sc.parallelize(range(15))
r.count()

15

## countByValue()

La acción countByValue() puede utilizarse para averiguar la aparición de cada elemento en el RDD.

### Ejemplos de countByValue()

In [54]:
# Ejemplo 1
r = sc.parallelize([3, "monterrey", "tigres", "tijuana", 3])
r.countByValue()

defaultdict(int, {3: 2, 'monterrey': 1, 'tigres': 1, 'tijuana': 1})

In [55]:
# Ejemplo 2
r = sc.parallelize([1,2,2,3,3,6,7,8,10,15,15,16])
r.countByValue()

defaultdict(int, {1: 1, 2: 2, 3: 2, 6: 1, 7: 1, 8: 1, 10: 1, 15: 2, 16: 1})

In [56]:
# Ejemplo 3
r = sc.parallelize(["Misael", "Misael", "Lamia", "Chespi", "Misael", "Lamia", "Lamia", "Chespi", "Oswi", "Gil", "Gil", "Lamia"])
r.countByValue()

defaultdict(int, {'Misael': 3, 'Lamia': 4, 'Chespi': 2, 'Oswi': 1, 'Gil': 2})

## reduce()

La función reduce de Spark RDD reduce los elementos de este RDD utilizando el operador binario conmutativo y asociativo especificado.

Es una operación de acción de RDD, lo que significa que desencadenará todas las transformaciones alineadas en el RDD base (o en el DAG) que no se ejecutan y luego ejecutará la operación de acción en el último RDD. Esta operación es también una operación amplia. En el sentido de que la ejecución de esta operación resulta en la distribución de los datos a través de las múltiples particiones.

Acepta una función con (que acepta dos argumentos y devuelve un solo elemento) que debe ser Conmutativa y Asociativa en la naturaleza matemática. Esto significa intuitivamente que esta función produce el mismo resultado cuando se aplica repetidamente sobre el mismo conjunto de datos RDD con múltiples particiones, independientemente del orden de los elementos.

#### Puntos importantes

* reduce es una operación de acción en Spark, por lo que desencadena la ejecución del DAG y se ejecuta en el RDD final
* Es una operación amplia ya que baraja los datos de múltiples particiones y los reduce a un único valor
* Acepta una función conmutativa y asociativa como argumento
    * La función parámetro debe tener dos argumentos del mismo tipo de datos
    * El tipo de retorno de la función también debe ser el mismo que el de los argumentos
    
### Ejemplos de reduce()


In [57]:
# Ejemplo 1
# reducir los números del 1 al 10 sumándolos
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)
sumAcum = x.reduce(lambda acum, n: acum + n)
print(sumAcum)

55


In [58]:
# Ejemplo 2
 
# reducir los números del 1 al 10 multiplicandolos
mulAcum = x.reduce(lambda acum, n: acum * n)
print(mulAcum)

3628800


In [59]:
# definiendo una función de reduce lambda
def sumaAcumulada(acum, n):
    return acum + n
 
sumAcum = x.reduce(sumaAcumulada)
print(sumAcum)

55


# Broadcast variable

En PySpark RDD y DataFrame, las variables de broadcast son variables compartidas de sólo lectura que se almacenan en caché y están disponibles en todos los nodos de un clúster para que las tareas puedan acceder a ellas o utilizarlas. En lugar de enviar estos datos junto con cada tarea, PySpark distribuye las variables de broadcast a los trabajadores utilizando algoritmos de difusión eficientes para reducir los costes de comunicación.

## Caso de uso

Permítanme explicar con un ejemplo cuándo utilizar las variables de difusión: supongamos que se obtiene un código de país de dos letras en un archivo y se desea transformarlo en el nombre completo del estado (por ejemplo, DGO en Durango, COAH en Coahuila, etc.) mediante una búsqueda en el mapeo de referencia. En algunos casos, estos datos pueden ser de gran tamaño y es posible que haya muchas búsquedas de este tipo (como el código postal, etc.).

En lugar de distribuir esta información junto con cada tarea a través de la red (lo que supone una sobrecarga y una pérdida de tiempo), podemos utilizar la variable de difusión para almacenar en caché esta información de búsqueda en cada máquina y las tareas utilizan esta información en caché mientras ejecutan las transformaciones.

## ¿Cómo funciona PySpark Broadcast?

Las variables Broadcast se utilizan de la misma manera para RDD, DataFrame.

Cuando se ejecuta una aplicación PySpark RDD, DataFrame que tiene las variables Broadcast definidas y utilizadas, PySpark hace lo siguiente.

* PySpark rompe el trabajo en etapas que han distribuido el  shuffling y las acciones se ejecutan con en la etapa.
* Las etapas posteriores también se dividen en tareas
* Spark difunde los datos comunes (reutilizables) que necesitan las tareas dentro de cada etapa.
* Los datos difundidos se almacenan en caché en formato serializado y se deserializan antes de ejecutar cada tarea.

Deberías crear y utilizar variables de broadcast para los datos que se comparten entre múltiples etapas y tareas.

Tenga en cuenta que las variables de broadcast no se envían a los ejecutores con la llamada sc.broadcast(variable), sino que se enviarán a los ejecutores cuando se utilicen por primera vez.

## Ejemplo

In [60]:
estados = {"DGO":"Durango", "COAH":"Coahuila", "NL":"Nuevo Leon"}
broadcastEstados = sc.broadcast(estados)

data = [("Misael","Adame","Mexico","DGO"),
    ("Lamia","Hamdan","Mexico","COAH"),
    ("Leonel","Adame","Mexico","NL"),
    ("Daniel","Saldivar","Mexico","COAH")
  ]

rdd = sc.parallelize(data)

def convertir_estado(codigo):
    return broadcastEstados.value[codigo]

result = rdd.map(lambda x: (x[0],x[1],x[2],convertir_estado(x[3]))).collect()
print(result)

[('Misael', 'Adame', 'Mexico', 'Durango'), ('Lamia', 'Hamdan', 'Mexico', 'Coahuila'), ('Leonel', 'Adame', 'Mexico', 'Nuevo Leon'), ('Daniel', 'Saldivar', 'Mexico', 'Coahuila')]


# Convertir un RDD a un DataFrame

En PySpark, la función toDF() del RDD se utiliza para convertir el RDD en DataFrame. Necesitaríamos convertir RDD a DataFrame ya que DataFrame proporciona más ventajas sobre RDD. Por ejemplo, DataFrame es una colección distribuida de datos organizados en columnas con nombre, similar a las tablas de una base de datos, y proporciona mejoras de optimización y rendimiento.

## 1. Crear un RDD de PySpark

En primer lugar, crear un RDD pasando el objeto lista de Python a la función sparkContext.parallelize(). Necesitaremos este objeto rdd para todos nuestros ejemplos a continuación.

En PySpark, cuando tienes datos en una lista, lo que significa que tienes una colección de datos en la memoria del controlador de PySpark, cuando creas un RDD, esta colección va a ser paralelizada.



In [61]:
carrera = [("Sistemas",1),("Mecatronica",2),("Industrial",3),("Electronica",4),("Mecanica",5),("Electrica",6)]
rdd = sc.parallelize(carrera)

## 2. Convertir PySpark RDD a DataFrame

La conversión de PySpark RDD a DataFrame puede hacerse utilizando toDF(), createDataFrame()

### 2.1 Uso de la función rdd.toDF()

PySpark proporciona la función toDF() en RDD que puede ser utilizada para convertir RDD en Dataframe

In [62]:
df = rdd.toDF()
df.printSchema()
df.show(truncate=False)

#Por defecto, la función toDF() crea nombres de columnas como "_1" y "_2". Este fragmento produce el siguiente esquema.


root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+-----------+---+
|_1         |_2 |
+-----------+---+
|Sistemas   |1  |
|Mecatronica|2  |
|Industrial |3  |
|Electronica|4  |
|Mecanica   |5  |
|Electrica  |6  |
+-----------+---+



toDF() tiene otra firma que toma argumentos para definir los nombres de las columnas como se muestra a continuación.

In [63]:
carreraColumnas = ["carrera_nombre","carrera_id"]
df2 = rdd.toDF(carreraColumnas)
df2.printSchema()
df2.show(truncate=False)

#Salida

root
 |-- carrera_nombre: string (nullable = true)
 |-- carrera_id: long (nullable = true)

+--------------+----------+
|carrera_nombre|carrera_id|
+--------------+----------+
|Sistemas      |1         |
|Mecatronica   |2         |
|Industrial    |3         |
|Electronica   |4         |
|Mecanica      |5         |
|Electrica     |6         |
+--------------+----------+



### 2.2 Uso de la función createDataFrame() de PySpark

La clase SparkSession proporciona el método createDataFrame() para crear el DataFrame y toma el objeto rdd como argumento.

In [64]:
carreraDF = spark.createDataFrame(rdd, schema = carreraColumnas)
carreraDF.printSchema()
carreraDF.show(truncate=False)

root
 |-- carrera_nombre: string (nullable = true)
 |-- carrera_id: long (nullable = true)

+--------------+----------+
|carrera_nombre|carrera_id|
+--------------+----------+
|Sistemas      |1         |
|Mecatronica   |2         |
|Industrial    |3         |
|Electronica   |4         |
|Mecanica      |5         |
|Electrica     |6         |
+--------------+----------+



El resultado es el mismo que el anterior

### 2.3 Uso de createDataFrame() con el esquema StructType

Cuando se infiere el esquema, por defecto el tipo de datos de las columnas se deriva de los datos y se establece nullable a true para todas las columnas. Podemos cambiar este comportamiento suministrando el esquema usando StructType - donde podemos especificar un nombre de columna, tipo de datos y nullable para cada campo/columna.

In [65]:
from pyspark.sql.types import StructType,StructField, StringType
carreraSchema = StructType([       
    StructField('carrera_nombre', StringType(), True),
    StructField('carrera_id', StringType(), True)
])

deptDF1 = spark.createDataFrame(rdd, schema = carreraSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

root
 |-- carrera_nombre: string (nullable = true)
 |-- carrera_id: string (nullable = true)

+--------------+----------+
|carrera_nombre|carrera_id|
+--------------+----------+
|Sistemas      |1         |
|Mecatronica   |2         |
|Industrial    |3         |
|Electronica   |4         |
|Mecanica      |5         |
|Electrica     |6         |
+--------------+----------+



También se obtiene el mismo resultado.

# Serialización de datos con PySpark "encoders"

La serialización se utiliza para ajustar el rendimiento en Apache Spark. Todos los datos que se envían a través de la red o se escriben en el disco o persisten en la memoria deben ser serializados. La serialización juega un papel importante en las operaciones costosas.

Un codificador de un solo paso que asigna una columna de índices de categoría a una columna de vectores binarios, con un único valor por fila que indica el índice de la categoría de entrada. Por ejemplo, con 5 categorías, un valor de entrada de 2,0 se asignaría a un vector de salida de [0,0, 0,0, 1,0, 0,0]. La última categoría no se incluye por defecto (configurable a través de dropLast), porque hace que las entradas del vector sumen uno, y por tanto sean linealmente dependientes. Así, un valor de entrada de 4,0 se asigna a [0,0, 0,0, 0,0, 0,0].


## Ejemplo

In [66]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline

from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, FeatureHasher
from pyspark.sql.functions import col

sqlCtx = SQLContext(sc)

valores = [("K1","a", 5, 'x'), ("K2","a", 5, 'x'), ("K3","b", 5, 'x'), ("K4","b", 10, 'x')]
columnas = ['key', 'alfabeto', 'd1', 'd0']
df = sqlCtx.createDataFrame(valores, columnas)
df.show()


for nf in [3, 4, 5]:
    df = df.drop('key_vector')
    encoder = FeatureHasher(numFeatures = nf, inputCols=["key"], outputCol="key_vector")
    # 'FeatureHasher' object no tiene atributo 'fit'
    df = encoder.transform(df)

    #SparseVector(int size, int[] indices, double[] values) 
    temp = df.collect()
    for i in temp:
        print(i.key_vector, i.key_vector.toArray())
        


+---+--------+---+---+
|key|alfabeto| d1| d0|
+---+--------+---+---+
| K1|       a|  5|  x|
| K2|       a|  5|  x|
| K3|       b|  5|  x|
| K4|       b| 10|  x|
+---+--------+---+---+

(3,[1],[1.0]) [0. 1. 0.]
(3,[0],[1.0]) [1. 0. 0.]
(3,[0],[1.0]) [1. 0. 0.]
(3,[1],[1.0]) [0. 1. 0.]
(4,[2],[1.0]) [0. 0. 1. 0.]
(4,[1],[1.0]) [0. 1. 0. 0.]
(4,[1],[1.0]) [0. 1. 0. 0.]
(4,[2],[1.0]) [0. 0. 1. 0.]
(5,[0],[1.0]) [1. 0. 0. 0. 0.]
(5,[3],[1.0]) [0. 0. 0. 1. 0.]
(5,[1],[1.0]) [0. 1. 0. 0. 0.]
(5,[4],[1.0]) [0. 0. 0. 0. 1.]


In [67]:
sc.stop()

# Referencias

[1]  "Apache Spark: Un poco de historia". Máster en Big Data Málaga – Advanced Analytics on Big Data de la Universidad de Málaga y el grupo de investigación Khaos. https://www.bigdata.uma.es/apache-spark-un-poco-de-historia/ (accedido el 16 de junio de 2021).

[2] "Apache Spark: Introducción para principiantes". sitiobigdata.com. https://sitiobigdata.com/2019/12/24/apache-spark-introduccion-para-principiantes/ (accedido el 16 de junio de 2021)

[3] "PySpark Documentation — PySpark 3.1.2 documentation". Apache Spark™ - Unified Analytics Engine for Big Data. https://spark.apache.org/docs/latest/api/python/ (accedido el 16 de junio de 2021).

[4] "PySpark - What is SparkSession?" Spark by {Examples}. https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/ (accedido el 16 de junio de 2021).

[5] "SparkContext (Spark 3.1.2 JavaDoc)". Apache Spark™ - Unified Analytics Engine for Big Data. https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html (accedido el 16 de junio de 2021).

[6] "PySpark - SparkContext". TutorialsPoint. https://www.tutorialspoint.com/pyspark/pyspark_sparkcontext.htm (accedido el 16 de junio de 2021).

[7] "What is SparkContext? Explained". Spark by {Examples}. https://sparkbyexamples.com/spark/spark-sparkcontext/ (accedido el 16 de junio de 2021).

[8] "PySpark parallelize() - Create RDD from a list data — SparkByExamples". Spark by {Examples}. https://sparkbyexamples.com/pyspark/pyspark-parallelize-create-rdd/ (accedido el 16 de junio de 2021).

[9] "Apache spark RDD tutorial | learn with scala examples — spark by {examples}". Spark by {Examples}. https://sparkbyexamples.com/spark-rdd-tutorial/ (accedido el 16 de junio de 2021).

[10] "PySpark - RDD". TutorialsPoint. https://www.tutorialspoint.com/pyspark/pyspark_rdd.htm (accedido el 16 de junio de 2021).

[11] "Spark Read Text File | RDD | DataFrame". Spark by {Examples}. https://sparkbyexamples.com/spark/spark-read-text-file-rdd-dataframe/ (accedido el 16 de junio de 2021).

[12] "How To Read CSV File Using Python PySpark". NBShare. https://www.nbshare.io/notebook/187478734/How-To-Read-CSV-File-Using-Python-PySpark/ (accedido el 17 de junio de 2021).

[13] J. Lopez. "Spark — 5ta nota: Manejo de archivos de texto". Medium. https://susejzepol.medium.com/spark-5ta-nota-manejo-de-archivos-de-texto-6ba1dd4796d5 (accedido el 17 de junio de 2021).

[14] "How Data Partitioning in Spark helps achieve more parallelism?" ProjectPro. https://www.dezyre.com/article/how-data-partitioning-in-spark-helps-achieve-more-parallelism/297 (accedido el 17 de junio de 2021).

[15] "Spark SQL Shuffle Partitions". Spark by {Examples}. https://sparkbyexamples.com/spark/spark-shuffle-partitions/ (accedido el 17 de junio de 2021).

[16] Varun. "Apache Spark groupByKey Example". Back To Bazics. https://backtobazics.com/big-data/spark/apache-spark-groupbykey-example/ (accedido el 17 de junio de 2021).

[17] Varun. "Apache Spark reduceByKey Example". Back To Bazics. https://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/ (accedido el 17 de junio de 2021).

[18] "PySpark RDD Transformations with examples". Spark by {Examples}. https://sparkbyexamples.com/pyspark/pyspark-rdd-transformations/ (accedido el 17 de junio de 2021).

[19] "Introduction to PySpark Map". Educba. https://www.educba.com/pyspark-map/ (accedido el 17 de junio de 2021).

[20] S. Vithal. "Basic Spark Transformations and Actions using pysparkcom". DWgeek. https://dwgeek.com/basic-spark-transformations-and-actions-using-pyspark.html/ (accedido el 17 de junio de 2021).

[21] "Introduction to PySpark Filter". Educba. https://www.educba.com/pyspark-filter/ (accedido el 17 de junio de 2021).

[22] P. Predamkar. "Spark flatMap". Educba. https://www.educba.com/spark-flatmap/ (accedido el 17 de junio de 2021).

[23] "pyspark.RDD.sortByKey — PySpark 3.1.2 documentation". Apache Spark™ - Unified Analytics Engine for Big Data. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortByKey.html (accedido el 18 de junio de 2021).

[24] "Spark sortByKey() with RDD Example". Spark by {Examples}. https://sparkbyexamples.com/apache-spark-rdd/spark-sortbykey-with-rdd-example/ (accedido el 18 de junio de 2021).

[25] Gitlka. "How SortBykey operation works in Spark". edureka! https://www.edureka.co/community/54036/how-sortbykey-operation-works-in-spark (accedido el 17 de junio de 2021).

[26] "Apache Spark Quick Start Guide". O’Reilly Online Learning. https://www.oreilly.com/library/view/apache-spark-quick/9781789349108/7bb4e531-5094-4fa6-9504-aa58a1efecad.xhtml (accedido el 18 de junio de 2021).

[27]  Varun. "Apache Spark reduce Example". Back To Bazics. https://backtobazics.com/big-data/spark/apache-spark-reduce-example/ (accedido el 18 de junio de 2021).

[28] "PySpark Broadcast Variables". Spark by {Examples}. https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/ (accedido el 18 de junio de 2021).

[29] "Convert PySpark RDD to DataFrame". Spark by {Examples}. https://sparkbyexamples.com/pyspark/convert-pyspark-rdd-to-dataframe/ (accedido el 18 de junio de 2021).

[30] "One Hot Encoding from PySpark, Pandas, Category Encoders and skLearn". survival8. https://survival8.blogspot.com/2020/07/one-hot-encoding-from-pyspark-pandas.html (accedido el 18 de junio de 2021).

[31] "PySpark". TutorialsPoint. https://www.tutorialspoint.com/pyspark/pyspark_serializers.htm (accedido el 18 de junio de 2021).