**Preparar el ambiente**

In [None]:
print("PREPARANDO EL ENTORNO\n\n")
import os
 # Instalar SDK java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Descargar Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
# Descomprimir la versión de Spark
!tar xf spark-3.3.1-bin-hadoop3.tgz
# Establecer las variables de entorno
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"
# Descargar findspark
!pip install -q findspark
# Descargar pyspark
!pip install -q pyspark
print("\n\n******** INSTALACIÓN CORRECTA *******")

PREPARANDO EL ENTORNO




******** INSTALACIÓN CORRECTA *******


**Creamos la sesión Spark**

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

**Creamos el contexto**

In [None]:
sc = spark.sparkContext

**Creamos el RDD**

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_texto = sc.parallelize(['José', 'Juan', 'Lucia'])

**Función map**

In [None]:
# Restar 1 a cada elemento
rdd_resta = rdd.map(lambda x: x - 1)
rdd_resta.collect()

[0, 1, 2, 3, 4]

In [None]:
# Ver si el número es par
rdd_par = rdd.map(lambda x: x % 2 == 0)
rdd_par.collect()

[False, True, False, True, False]

In [None]:
# Pasar a mayuscula
rdd_mayuscula = rdd_texto.map(lambda x: x.upper())
rdd_mayuscula.collect()

['JOSÉ', 'JUAN', 'LUCIA']

In [None]:
# Concatenar texto
rdd_hola = rdd_texto.map(lambda _: 'Hola '+ _)
rdd_hola.collect()

['Hola José', 'Hola Juan', 'Hola Lucia']

**Función flatMap**: Similar a map, pero permite aplanar los resultados. Es decir, si existe una tupla lo separa elemento por elemento.

In [None]:
# Valor al cuadrado
# Con el lambda se genera una tupla, pero flatMap la aplana
rdd_cuadrado = rdd.flatMap(lambda x: (x, x ** 2))
rdd_cuadrado.collect()

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25]

In [None]:
# Mayuscula texto
rdd_mayuscula = rdd_texto.flatMap(lambda _: (_, _.upper()))
rdd_mayuscula.collect()

['José', 'JOSÉ', 'Juan', 'JUAN', 'Lucia', 'LUCIA']

**Función filter**

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
rdd_texto = sc.parallelize(['José', 'Juanqui', 'Juan', 'Lucia', 'Karla', 'Katia'])

In [None]:
# Par
rdd_par = rdd.filter(lambda x: x % 2 == 0)
rdd_par.collect()

[2, 4, 6, 8]

In [None]:
# Impar
rdd_impar = rdd.filter(lambda x: x % 2 != 0)
rdd_impar.collect()

[1, 3, 5, 7, 9]

In [None]:
# Texto que comience con K
rdd_k = rdd_texto.filter(lambda x: x.startswith('K'))
rdd_k.collect()

['Karla', 'Katia']

In [None]:
# Texto que comience con J y continue con u
rdd_filtro = rdd_texto.filter(lambda x: x.startswith('J') and x.find('u') == 1)
rdd_filtro.collect()

['Juanqui', 'Juan']

**Función coalesce**: Permite reducir particiones.

In [None]:
# RDD con 10 particiones
rdd = sc.parallelize([1,2,3,4,5], 10)
rdd.getNumPartitions()

10

In [None]:
# Reducimos particiones - No lo toma de esta forma
rdd.coalesce(5)
rdd.getNumPartitions()

10

In [None]:
# Reducimos particiones
rdd5 = rdd.coalesce(5)
rdd5.getNumPartitions()

5

**Función repartition**: Reparticiona la entrada.

**Coalesce** se usa para reducir el número de particiones. Esta es una versión
optimizada de **repartition** donde el movimiento de los datos a través de las
particiones es menor. Estas son operaciones muy costosas, no usar mucho.

In [None]:
rdd = sc.parallelize([1,2,3,4,5], 3)
rdd.getNumPartitions()

3

In [None]:
rdd7 = rdd.repartition(7)
rdd7.getNumPartitions()

7

**Función reduceByKey**: Fusiona los valores de cada clave usando una función asociativa de reducción. 

In [None]:
rdd = sc.parallelize([('casa', 2), 
                      ('parque', 1), 
                      ('que', 5), 
                      ('casa', 1), 
                      ('escuela', 2), 
                      ('casa', 1), 
                      ('que', 1)])

In [None]:
rdd.collect()

[('casa', 2),
 ('parque', 1),
 ('que', 5),
 ('casa', 1),
 ('escuela', 2),
 ('casa', 1),
 ('que', 1)]

In [None]:
rdd_reducido = rdd.reduceByKey(lambda x,y: x + y)

In [None]:
rdd_reducido.collect()

[('parque', 1), ('que', 6), ('casa', 4), ('escuela', 2)]

**Ejercicios**

1. Cree un RDD llamado lenguajes que contenga los siguientes lenguajes de programación: Python, R, C, Scala, Rugby y SQL.

In [None]:
lenguajes = sc.parallelize(['Python', 'R', 'C', 'Scala', 'Rugby', 'SQL'])
lenguajes.collect()

['Python', 'R', 'C', 'Scala', 'Rugby', 'SQL']

1.a. Obtenga un nuevo RDD a partir del RDD lenguajes donde todos los lenguajes de programación estén en mayúsculas.

In [None]:
lenguajes_mayuscula = lenguajes.map(lambda _: _.upper())
lenguajes_mayuscula.collect()

['PYTHON', 'R', 'C', 'SCALA', 'RUGBY', 'SQL']

1.b. Obtenga un nuevo RDD a partir del RDD lenguajes donde todos los lenguajes de programación estén en minúsculas.

In [None]:
lenguaje_minuscula = lenguajes.map(lambda _: _.lower())
lenguaje_minuscula.collect()

['python', 'r', 'c', 'scala', 'rugby', 'sql']

1.c. Cree un nuevo RDD que solo contenga aquellos lenguajes de programación que comiencen con la letra R.

In [None]:
lenguajes_r = lenguajes.filter(lambda _: _.startswith('R'))
lenguajes_r.collect()

['R', 'Rugby']

2. Cree un RDD llamado pares que contenga los números pares existentes en el intervalo [20;30].

In [None]:
arreglo = []
for i in range(20, 31):
  if(i % 2 == 0):
    arreglo.append(i)
arreglo

[20, 22, 24, 26, 28, 30]

In [None]:
pares = sc.parallelize(arreglo)
pares.collect()

[20, 22, 24, 26, 28, 30]

2.a. Cree el RDD llamado sqrt, este debe contener la raíz cuadrada de los elementos que componen el RDD pares.

In [None]:
import numpy as np
sqrt = pares.map(lambda x: np.sqrt(x))
sqrt.collect()

[4.47213595499958,
 4.69041575982343,
 4.898979485566356,
 5.0990195135927845,
 5.291502622129181,
 5.477225575051661]

2.b. Obtenga una lista compuesta por los números pares en el intervalo [20;30] y sus respectivas raíces cuadradas. Un ejemplo del resultado deseado para el intervalo [50;60] sería la lista [50, 7.0710678118654755, 52, 7.211102550927978, 54, 7.3484692283495345, 56, 7.483314773547883, 58, 7.615773105863909, 60, 7.745966692414834].

In [None]:
lista = []
lista = pares.flatMap(lambda x: (x, np.sqrt(x))).collect()
print(lista)

[20, 4.47213595499958, 22, 4.69041575982343, 24, 4.898979485566356, 26, 5.0990195135927845, 28, 5.291502622129181, 30, 5.477225575051661]


2.c. Eleve el número de particiones del RDD sqrt a 20.

In [None]:
sqrt20 = sqrt.repartition(20)
sqrt20.getNumPartitions()

20

2.d. Si tuviera que disminuir el número de particiones luego de haberlo establecido en 20, ¿qué función utilizaría para hacer más eficiente su código?: **Coalesce**

3. Cree un RDD del tipo clave valor a partir de los datos adjuntos como recurso a esta lección. Tenga en cuenta que deberá procesar el RDD leído para obtener el resultado solicitado. Supongamos que el RDD resultante de tipo clave valor refleja las transacciones realizadas por número de cuentas. Obtenga el monto total por cada cuenta.

In [None]:
rdd_texto = sc.textFile('./transacciones.txt')
rdd_texto.collect()

['(1001, 52.3)',
 '(1005, 20.8)',
 '(1001, 10.1)',
 '(1004, 52.7)',
 '(1005, 20.7)',
 '(1002, 85.3)',
 '(1004, 20.9)']

In [None]:
def proceso(s):
  return(tuple(s.replace('(', '').replace(')', '').split(', ')))

In [None]:
rdd_llave_valor = rdd_texto.map(proceso)
rdd_llave_valor.collect()

[('1001', '52.3'),
 ('1005', '20.8'),
 ('1001', '10.1'),
 ('1004', '52.7'),
 ('1005', '20.7'),
 ('1002', '85.3'),
 ('1004', '20.9')]

In [None]:
rdd_llave_valor = rdd_llave_valor.reduceByKey(lambda x, y: float(x) + float(y))
rdd_llave_valor.collect()

[('1002', '85.3'), ('1001', 62.4), ('1005', 41.5), ('1004', 73.6)]