# **Big Data**
#BD01 Spark Resilient Distributed Datasets (RDD)

In [48]:
# Esto solo lo utilizamos para instalar las librerias
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark

In [49]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

In [50]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
sc = SparkSession.builder.master("local[*]").getOrCreate()

## RDDs

Los Resilient Distributed Datasets (RDD) son colecciones de objetos JVM inmutables que se distribuyen en un clúster de Apache Spark.

Un RDD es el tipo de conjunto de datos más fundamental de Apache Spark; cualquier acción en un Spark DataFrame finalmente se traduce en una ejecución altamente optimizada de transformaciones y acciones en RDD.

Los datos en un RDD se dividen en trozos basados en una clave y luego se dispersan en todos los nodos ejecutores. Los RDD pueden recuperarse rápidamente de cualquier problema, ya que los mismos fragmentos de datos se replican en varios nodos ejecutores. Por lo tanto, incluso si un ejecutor falla, otro seguirá procesando los datos. Esto le permite realizar sus cálculos funcionales contra su conjunto de datos muy rápidamente al aprovechar el poder de múltiples nodos. Los RDD mantienen un registro de todos los pasos de ejecución aplicados a cada fragmento. Esto, además de la replicación de datos, acelera los cálculos y, si algo sale mal, los RDD aún pueden recuperar la parte de los datos perdidos debido a un error del ejecutor.

Si bien es común perder un nodo en entornos distribuidos (por ejemplo, debido a problemas de conectividad, problemas de hardware), la distribución y replicación de los datos protege contra la pérdida de datos, mientras que el linaje de datos permite que el sistema se recupere rápidamente.


**Particions**: Los RDD son una colección de varios datos si no pueden caber en un solo nodo, deben dividirse en varios nodos. Entonces significa que cuanto mayor sea el número de particiones, mayor será el paralelismo. Estas particiones de un RDD se distribuyen por todos los nodos de la red.

**Operaciones con RDD**: Hay dos tipos de operaciones que puede realizar en un RDD: **Transformaciones** y **Acciones**. La transformación aplica alguna función en un RDD y crea un nuevo RDD, no modifica el RDD en el que aplica la función (recuerde que los RDD son inmutables). Además, el nuevo RDD mantiene un puntero a su RDD principal.

Una **acción** se utiliza para guardar el resultado en alguna ubicación o para mostrarlo. También puede imprimir la información del linaje RDD usando el comando

Un RDD puede ser pensado como un conjunto de transformaciones y una acción que collecta el resultado.

A continuacion mostramos el grafo aciclico dirigido del ciclo de vida de un RDD.
<img src="https://drive.google.com/uc?export=view&id=1nlwjvcpNhFZ0YCJ4aBgl8ioAvBeOdZoi" width=600 height=400/>

**DAGScheduler** es la capa de programación de Apache Spark que implementa la programación orientada por etapas. Transforma un plan de ejecución lógico (es decir, el linaje RDD de dependencias construidas usando transformaciones RDD) en un plan de ejecución físico (usando etapas).

<img src="https://drive.google.com/uc?export=view&id=1WZRvLgBh4IZF0_15St3yT2pfVrDVOWo_" width=600 height=400/>

el  **DAGScheduler** divide el gráfico en varias etapas, las etapas se crean en función de las transformaciones. Las transformaciones estrechas se agruparán (en tuberías) juntas en una sola etapa.





## Creemos RDDs

In [51]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [52]:
myRDD = sc.sparkContext.parallelize(
 [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), ('Amber', 9)]
)

In [53]:
myRDD.take(2)

[('Amber', 22), ('Alfred', 23)]

El metodo parallelize() crea una collección paralelizada. Esto permite que Spark distribuya los datos en varios nodos, en lugar de depender de un solo nodo para procesar los datos:

<img src="https://drive.google.com/uc?export=view&id=1lcecAAov0cIpEcVcJ9WCbKyANgJqXva-" width=600 height=400/>



## Reading data from files

In [54]:
path ='/content/drive/MyDrive/DATA SCIENCE UDD 2024/MODULO 8/'
myRDD = sc.sparkContext.textFile(path + 'airport-codes-na.txt')

In [55]:
myRDD.take(5)

['City\tState\tCountry\tIATA',
 'Abbotsford\tBC\tCanada\tYXX',
 'Aberdeen\tSD\tUSA\tABR',
 'Abilene\tTX\tUSA\tABI',
 'Akron\tOH\tUSA\tCAK']

In [56]:
myRDD.count()

527

In [57]:
myRDD = sc.sparkContext.textFile(path + 'airport-codes-na.txt').map(lambda line: line.split("\t"))

In [58]:
myRDD.getNumPartitions()

2

In [59]:
myRDD = sc.sparkContext.textFile(path + 'airport-codes-na.txt', minPartitions=4, use_unicode=True).map(lambda line: line.split("\t"))

In [60]:
myRDD.take(5)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

In [61]:
myRDD.getNumPartitions()

4

## <font color='green'>**Ejercicio 1**</font>

Lea el data set departuredelays.csv. Con un numero minimo e 8 particiones y realice una transformacion map la cual realice un split de cada  fila.

Posteriormente imprima 5 elementos y el numero de particiones.

In [62]:
type(myRDD)

In [63]:
myRDD = sc.sparkContext.textFile(path + 'departuredelays.csv').map(lambda line: line.split(","))
myRDD.count()

1391579

In [64]:
myRDD = sc.sparkContext.textFile(path + 'departuredelays.csv', minPartitions=8).map(lambda line: line.split(","))
myRDD.count()

1391579

In [65]:
myRDD.take(5)

[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

In [66]:
myRDD.getNumPartitions()

8

## <font color='green'>**Fin Ejercicio 1**</font>

#### *Using DataFrame*
Note, that its faster (2.44s for DF, 2.96s for RDD w/ 8 partitions) while DF also takes into account of the header and can infer the schema

In [67]:
myDF = sc.read.csv(path + 'departuredelays.csv', header=True, inferSchema=True)
myDF.count()

1391578

In [68]:
myDF.show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

In [69]:
myDF.rdd.getNumPartitions()

2

In [70]:
myDF.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



## RDD Transformations

Una vez creado un RDD, es frecuente usar el método take() para devolver los valores a la consola o notebook. take() es una **acción* RDD. Tenga en cuenta que un enfoque común en PySpark es usar collect(), que devuelve todos los valores en su RDD desde los nodos de trabajo de Spark al controlador. Existen implicaciones de rendimiento cuando se trabaja con una gran cantidad de datos, ya que esto se traduce en grandes volúmenes de datos que se transfieren desde los nodos de trabajo de Spark al controlador. Para pequeñas cantidades de datos (como aquí), esto está perfectamente bien, pero, como una cuestión de costumbre, casi siempre debería usar el método take(n) en su lugar; devuelve los primeros n elementos del RDD en lugar de todo el conjunto de datos. Es un método más eficiente porque primero escanea una partición y usa esas estadísticas para determinar el número de particiones necesarias para devolver los resultados.
<img src="https://drive.google.com/uc?export=view&id=1QWs2K13TW0wT2HK1h-4AkaOzKCFd3pFD" width=800 height=400/>



#### Getting Ready

In [71]:
airports = sc.sparkContext.textFile(path + 'airport-codes-na.txt').map(lambda line: line.split("\t"))
airports.take(5)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

In [72]:
flights = sc.sparkContext.textFile(path + 'departuredelays.csv').map(lambda line: line.split(","))
flights.take(5)

[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

### map()

Los componentes clave de esta transformación de mapa son:

1. lambda: una función anónima (es decir, una función definida sin un
nombre) compuesto por una sola expresión
2. split: Estamos usando la función split de PySpark (dentro de pyspark.sql.functions) para dividir una cadena alrededor de un patrón de expresión regular; en este caso, nuestro delimitador es una pestaña (es decir, \ t)

In [73]:
airports.map(lambda c: (c[0], c[1])).take(5)

[('City', 'State'),
 ('Abbotsford', 'BC'),
 ('Aberdeen', 'SD'),
 ('Abilene', 'TX'),
 ('Akron', 'OH')]

### filter()

La transformación de filter(f) devuelve un nuevo RDD basado en la selección de elementos para lo cual la función f devuelve verdadero.

In [74]:
airports.map(lambda c: (c[0], c[1])).filter(lambda c: c[1] == "WA").take(5)


[('Bellingham', 'WA'),
 ('Moses Lake', 'WA'),
 ('Pasco', 'WA'),
 ('Pullman', 'WA'),
 ('Seattle', 'WA')]

### flatMap()

La transformación flatMap (f) es similar a map, pero el nuevo RDD se aplana
todos los elementos (es decir, una secuencia de eventos).

In [75]:
airports.filter(lambda c: c[1] == "WA").map(lambda c: (c[0], c[1])).flatMap(lambda x: x).take(10)

['Bellingham',
 'WA',
 'Moses Lake',
 'WA',
 'Pasco',
 'WA',
 'Pullman',
 'WA',
 'Seattle',
 'WA']

### distinct()

La transformación distinct() devuelve un nuevo RDD que contiene los distintos
elementos del RDD de origen.

In [76]:
airports.map(lambda c: c[2]).distinct().take(5)

['Country', 'Canada', 'USA']

### sample()

La transformación de sample(withReplacement, fraction, seed) muestrea una fracción de los datos, con o sin reemplazo (el parámetro withReplacement), basándose en una semilla aleatoria.

In [77]:
flights.map(lambda c: c[3]).sample(False, 0.001, 123).take(5)

['ABQ', 'AEX', 'AGS', 'ANC', 'ATL']

### join()

La transformación de leftOuterJoin(RDD) devuelve un RDD de (key, (val_left, val_right)) al llamar a RDD (key, val_left) y RDD (key, val_right). Los Outer joins se admiten a través de left outer join, right outer join, y full outer join.

In [78]:
flights.map(lambda c: (c[3], c[0])).take(5)

[('origin', 'date'),
 ('ABE', '01011245'),
 ('ABE', '01020600'),
 ('ABE', '01021245'),
 ('ABE', '01020605')]

In [79]:
flights.take(5)

[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

In [80]:
airports.map(lambda c: (c[3], c[1])).take(5)

[('IATA', 'State'), ('YXX', 'BC'), ('ABR', 'SD'), ('ABI', 'TX'), ('CAK', 'OH')]

In [81]:
flt = flights.map(lambda c: (c[3], c[0]))
air = airports.map(lambda c: (c[3], c[1]))
flt.join(air).take(5)

[('ADQ', ('01011710', 'AK')),
 ('ADQ', ('01021710', 'AK')),
 ('ADQ', ('01020815', 'AK')),
 ('ADQ', ('01031710', 'AK')),
 ('ADQ', ('01030815', 'AK'))]

In [82]:
flt = flights.map(lambda c: (c[3], c[0]))
air = airports.map(lambda c: (c[3], c[1]))
flt.join(air)

PythonRDD[142] at RDD at PythonRDD.scala:53

### repartition()

La transformación de repartition(n) reparte el RDD en n particiones mediante la reorganización aleatoria y la distribución uniforme de los datos a través de la red. Esto puede mejorar el rendimiento al ejecutar más subprocesos paralelos al mismo tiempo

In [83]:
flights.getNumPartitions()

2

In [84]:
flights2 = flights.repartition(8)
flights2.getNumPartitions()

8

In [85]:
rdd = sc.sparkContext.parallelize([1, 2, 3, 4], 4)
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).sum()


6

## <font color='green'>**Ejercicio 2**</font>

Otro tipo e datos interesante en spark corresponde al dataframe.

1. utilce en sc, el metodo read.json para leer el archivo people.json.
2. Posteriormente muestre los datos.
3. Visualice el esquema con printSchema
4. Realice un describe
5 Cree una nueva columna que se llame 3x age en la cual se almacena el valor de la edad multiplicado por 3.



In [86]:
df = sc.read.json(path + 'people.json')

In [87]:
# Muestre la  data
df.show()

+----+--------+
| age|    name|
+----+--------+
|NULL| Michael|
|  30|    Andy|
|  19|  Justin|
|  11|     Ana|
|  44|Patricia|
|  89|     Leo|
+----+--------+



In [88]:
# view schema
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [89]:
df.describe().show()

+-------+------------------+--------+
|summary|               age|    name|
+-------+------------------+--------+
|  count|                 5|       6|
|   mean|              38.6|    NULL|
| stddev|30.778239065937477|    NULL|
|    min|                11|     Ana|
|    max|                89|Patricia|
+-------+------------------+--------+



In [90]:
df.withColumn('3x age', df['age']*3).show()

+----+--------+------+
| age|    name|3x age|
+----+--------+------+
|NULL| Michael|  NULL|
|  30|    Andy|    90|
|  19|  Justin|    57|
|  11|     Ana|    33|
|  44|Patricia|   132|
|  89|     Leo|   267|
+----+--------+------+



## <font color='green'>**Fin ejercicio 2**</font>

## RDD Actions

Same Getting Ready as Transformations

In [91]:
# take(n)
airports.take(3)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR']]

In [92]:
# collect()
airports.filter(lambda c: c[1] == "WA").collect()

[['Bellingham', 'WA', 'USA', 'BLI'],
 ['Moses Lake', 'WA', 'USA', 'MWH'],
 ['Pasco', 'WA', 'USA', 'PSC'],
 ['Pullman', 'WA', 'USA', 'PUW'],
 ['Seattle', 'WA', 'USA', 'SEA'],
 ['Spokane', 'WA', 'USA', 'GEG'],
 ['Walla Walla', 'WA', 'USA', 'ALW'],
 ['Wenatchee', 'WA', 'USA', 'EAT'],
 ['Yakima', 'WA', 'USA', 'YKM']]

In [93]:
# reduce(f)
flights\
   .filter(lambda c: c[3] == 'SEA' and c[4] == 'SFO')\
   .map(lambda c: int(c[1]))\
   .reduce(lambda x, y: x + y)

22293

In [94]:
flights.take(5)

[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

In [95]:
airports.filter(lambda c: c[1] == "WA").count()

9

In [96]:
# count
airports.filter(lambda c: c[1] == "WA").count()

9

In [97]:
# saveAsTextFile
airports.saveAsTextFile("/tmp/denny/airports")

## <font color='green'>**Ejercicio 3**</font>

**Estime el valor de $\pi$ utilizando spark:**

Este cálculo se basa en la siguiente heurística: Por definición, π es el área A Círculo de un círculo con radio r = 1 (generalmente, $πr^2$ es el área de un círculo de radio r).

Luego, se circunscribe este círculo unitario con un cuadrado cuya área es igual a $A_{Square} = 4$. La razón de estas dos áreas equivale a $\frac{A_{Circle}}{A_{Square}} = \frac{π}{4}$ y da la probabilidad geométrica de que un punto dentro del cuadrado se encuentre dentro del círculo.

Supongamos ahora que elegimos un gran número $n$ de puntos al azar dentro del cuadrado circunscrito, por ejemplo, lanzando dardos o dejando caer gotas de lluvia sobre él. Un cierto número $n_{in}$ de estos puntos terminará dentro del área descrita por el círculo, mientras que el número restante $n_{out}$ de estos puntos quedará fuera de él (pero dentro del cuadrado). Por lo tanto, $n_{in} + n_{out} = n$  y la probabilidad de que un punto se encuentre dentro del área del círculo es $\frac{n_{in}}{n}$.

Entonces heuristicamente uno tiene $\frac{A_{Circle}}{A_{Square}} \approx \frac{n_{in}}{n}$ con lo cual podemos estimar pi.

In [98]:
def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0
partitions = 10
n = 100000 * partitions

In [99]:
import sys
from random import random
from operator import add
count = sc.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Aproximadamente pi es %f" % (4.0 * count / n))
sc.stop()

Aproximadamente pi es 3.149120


## <font color='green'>**Fin Ejercicio 3**</font>

## <font color='green'>**Ejercicio 4**</font>

Construya con Spark una rutina que permita realizar el conteo de palabras.

1. Lea un archivo text con spark.read.text
2. Realice un map de cada una de las lines leidas rdd.map(lambda r: r[0])
3. Para cada linea, realice un flatMap, dpnde se realiza un split por espacio.
4. Luego volvemos a mapear cada palabra a la tupla (palabra,1)
5 Realice un reducedByKey(add) para contar las repeticiones de cada palaba.
6. Finalmente realice un collect e imprima los resultados palabra, cantidad.

In [100]:
from google.colab import files
_file_ = files.upload()

KeyboardInterrupt: 

In [114]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


lines = spark.read.text('/content/drive/MyDrive/DATA SCIENCE UDD 2024/MODULO 8/hola.txt').rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
output = counts.collect()
output.sort(reverse=False)
for (word, count) in output:
    print("%s: %i" % (word, count))
spark.stop()
%time

: 6
Aenean: 1
Aliquam: 2
Class: 1
Cras: 3
Curabitur: 1
Donec: 4
Duis: 6
Fusce: 2
Integer: 3
Interdum: 1
Lorem: 1
Maecenas: 4
Mauris: 2
Morbi: 2
Nam: 1
Nulla: 3
Nullam: 3
Pellentesque: 2
Phasellus: 3
Praesent: 2
Quisque: 1
Sed: 6
Suspendisse: 1
Vestibulum: 3
Vivamus: 1
a: 9
ac: 2
ad: 1
adipiscing: 1
aliquam: 4
aliquet: 2
amet: 4
amet,: 2
ante: 3
aptent: 1
arcu: 1
at: 6
at,: 2
at.: 1
auctor: 4
auctor.: 1
augue: 3
augue,: 1
augue.: 1
bibendum: 8
blandit: 2
commodo: 4
consectetur: 1
consequat: 1
consequat,: 1
conubia: 1
convallis: 3
cubilia: 1
curae;: 1
diam: 2
dictum: 1
dignissim: 4
dolor: 3
dolor,: 1
dolor.: 1
dui: 2
dui,: 1
dui.: 2
efficitur: 1
efficitur.: 3
egestas: 1
eget: 6
eget,: 2
eleifend: 2
elementum: 1
elit.: 3
enim: 2
erat: 1
eros: 2
est: 3
et: 9
et,: 1
eu: 6
eu.: 1
euismod,: 1
ex: 1
ex.: 1
facilisis: 2
fames: 1
faucibus: 4
faucibus.: 2
felis: 1
felis.: 1
fermentum: 2
fermentum,: 1
fermentum.: 1
finibus.: 1
fringilla: 2
fringilla.: 1
gravida: 1
gravida.: 1
hendrerit: 1
himenaeo

In [105]:
type(lines)

## <font color='green'>**Ejercicio 4**</font>