# Spark - RDD - Basic Transformations

In [10]:
#!pip install findspark

In [14]:
import findspark

findspark.init()

import pyspark

sc = pyspark.SparkContext(appName="RDDBasics")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=RDDBasics, master=local[*]) created by __init__ at /tmp/ipykernel_10934/1179285508.py:4 

In [None]:
sc

# Creando un RDD con 3 lineas de texto

* __Resilient__ - se va enviar los datos y las funciones que se quieren ejecutar a varios ordenadroes, pero cada trozito de datos es unico. Spark guardará los datos crudos y el planning de ejecución (incluye las funciones) y en caso de falla de una de los Workers se lo envia a otro.
* __Distributed__ - Separar los datos en Trabajadores (Workers). Un worker no es un slave, un worker es un proceso (PID) lo que quiere decir que en un "slave" pueden y normalmente hay mas de uno worker. Razón para poder usar todos los procesadores de la dicha maquina.
* __Dataset__ - Datos no estructurados - acepta cualquier tipo de datos.

In [None]:
rdd_lines = sc.parallelize(["linea 1 Python",
                            "linea 2 Python",
                            "linea 3 Spark"] )

In [None]:
type(rdd_lines)

In [None]:
rdd_lines

In [None]:
rdd_lines.collect()

## Ojo: rdd_lines.collect() es un "code smell"
Es decir tu codigo huele mal, porque estas trayendo 100% de los datos a Python, seguro que quieres hacer eso?

In [None]:
type(rdd_lines.collect())

In [None]:
rdd_numbers = sc.parallelize([1,2,3,3] )

In [None]:
rdd_numbers

.collect() returns the whole rdd()

In [12]:
rdd_lines.collect()

['linea 1 Python', 'linea 2 Python', 'linea 3 Spark']

In [13]:
rdd_numbers.collect() # Action that returns the whole RDD

NameError: name 'rdd_numbers' is not defined

In [None]:
rdd_lines.take(10)

In [None]:
rdd_numbers.take(10)

In [None]:
sc.parallelize([1,2,3,3]).take(10)

In [None]:
contador = rdd_numbers.count()
contador

In [None]:
contador

* Una __acción__ termina con el procesamiento en Spark y devuelve una respuesta al master con un tipo de datos de Python.
* Una __transformación__ no hace trigger (no dispara, no acciona) el procesamiento de Spark (es lazy evaluated),  lo quiere decir que si compruebo el tipo será algo de Spark no de Python.

## map vs. flatMap transformation
El flatMap reduce la dimensión de la lista y si es una lista de una dimensión, rompe la string.

In [None]:
rdd_lines.map(lambda x:x.split(" ")).take(4)

In [None]:
rdd_lines.flatMap(lambda x:x.split(" ")).take(10)

In [None]:
rdd_lines.map(lambda x: x.split(' ')).collect()

In [None]:
sc.parallelize(["linea 1 Python","linea 2 Python y Python","linea 3 Spark"]).map(lambda x:x.split(' ')). \
filter(lambda x: "Python" in x).count()

In [None]:
sc.parallelize(["linea 1 Python","linea 2 Python y Python","linea 3 Spark"]).flatMap(lambda x:x.split(' ')). \
filter(lambda x: "Python" in x).count()

In [None]:
rdd_lines.flatMap(lambda x: x.split(' ')).collect()

In [None]:
rdd_lines.map(lambda x:x).collect()

In [None]:
rdd_lines.flatMap(lambda x:x).collect()

## filter transformation

In [None]:
for item in ["linea 1 Python","linea 2 Python","linea 3 Spark"]:
    print("Python" in item)

In [None]:
list(filter(lambda x: "Python" in x ,["linea 1 Python","linea 2 Python","linea 3 Spark"])) 
# Esto es Python, no Spark!

In [None]:
rdd_lines.filter(lambda x: "Python" in x).take(10)

# Juntando flatMap y filter

In [None]:
sc.parallelize(["linea 1 Python","linea 2 Python y Python y Python","linea 3 Spark"]). \
flatMap(lambda x:x.split(' ')). \
filter(lambda x: "Python" in x).take(10)

In [None]:
rdd_lines.flatMap(lambda x: x.split(' ')).filter(lambda x: "Python" in x).collect()

In [None]:
rdd_lines.filter(lambda x: "Python" in x).flatMap(lambda x: x.split(' ')).collect()

In [None]:
#sc.p.map.filter.flatMap.map.......

### Cache
* Los caches en Spark se hacen en los workers (es en la memoria del PID) 
* Los caches son volatiles (cerrando Spark todo se pierden)
* Si pides un cache, es una proposión, si Spark no lo puede hacer, no lo hará!
* Si los caches no se utilizan en algun momento los borra
* Si los caches molestan a nuevos procesamiento los manda a disco duro

In [None]:
rdd_lines.cache()

## distinct transformation

In [None]:
rdd_numbers = sc.parallelize([3,3,2,1] )
rdd_numbers.take(10)

In [None]:
rdd_numbers.distinct().collect()

In [None]:
rdd_numbers.map(lambda x:x**2).filter(lambda x:x>3).distinct().count()#.take(10)

## sample without replacement

In [9]:
rdd_numbers.sample(False,0.5).collect()

NameError: name 'rdd_numbers' is not defined

## sample with replacement

In [None]:
rdd_numbers.sample(True,3).collect()

---

# Transformation SET OPERATIONS

In [None]:
rdd_numbers.collect()

In [None]:
rdd_more_numbers = sc.parallelize([3,4,2,5])
rdd_more_numbers.collect()

## union - como un append sin mas lista1.append(lista2)

In [None]:
rdd_numbers.union(rdd_more_numbers).collect()

## intersection - INNER JOIN

In [None]:
rdd_numbers.intersection(rdd_more_numbers).collect()

## subtraction - Left Outer join

In [None]:
rdd_numbers.subtract(rdd_more_numbers).collect()

## cartesian product

In [None]:
rdd_numbers.cartesian(rdd_more_numbers).collect()

In [None]:
rdd_numbers.cartesian(rdd_more_numbers).map(lambda x: x[0]/x[1]).take(100)

In [None]:
rdd_numbers.cartesian(rdd_more_numbers).map(lambda x:x[0]+x[1]).collect()

## Exercise 1) sum rdd1 + rdd2

Expected result:
['(1+3)=4',
 '(1+4)=5',
 '(2+3)=5',
 '(2+4)=6',
 '(1+2)=3',
 '(1+5)=6',
 '(2+2)=4',
 '(2+5)=7',
 '(3+3)=6',
 '(3+4)=7',
 '(3+3)=6',
 '(3+4)=7',
 '(3+2)=5',
 '(3+5)=8',
 '(3+2)=5',
 '(3+5)=8']


In [None]:
rdd1 = sc.parallelize([1,2,3,3])

In [None]:
rdd2 = sc.parallelize([3,4,2,5])

## Exercise 2) Explain what is the code doing, input (values and technology/type), technologies involved and output (values and technology/type)

rdd = sc.parallelize([1,2,3,4])

rdd.map(lambda x: x * 2).collect()

## Exercise 3) What is wrong with the following code and how to fix it?

len(sc.parallelize([1,2,3,4]).map(lambda x: x * 2).collect())

---

# Spark - RDD - Basic Actions

## collect

In [None]:
rdd_numbers = sc.parallelize([1,2,3,3])

In [None]:
rdd_numbers.collect()

## count

In [None]:
rdd_numbers.count()

## countByValue - same as value_counts() in DataFrame in Pandas

In [None]:
rdd_more_numbers = sc.parallelize([4,5,2,7])

In [None]:
rdd_many_numbers = rdd_numbers.union(rdd_more_numbers)

In [None]:
rdd_many_numbers.collect()

In [None]:
rdd_many_numbers.countByValue()

In [None]:
#df.value_counts()

In [None]:
rdd_lines.flatMap(lambda x:x.split(" ")).countByValue()#take(10)

## Ejercicio 4) calcular el numero de ocurrencia de cada palabras en rdd_lines

In [None]:
rdd_lines = sc.parallelize(["linea 1 Python","linea 2 Python","linea 3 Spark"] )

In [None]:
rdd_lines.flatMap(lambda x: x.split(' ')).countByValue()#.take(10) 

### Transformaciones 0 o más? Porque el resultado de una transformación de un objeto de Spark siempre es el mismo objeto, en el caso actual RDD en otro RDD.
### Acciones - 0 o 1? Porque una acción devulve la respuesta a Python en un objeto de Python, ya no hay nade de Spark funcionando!

## take - same as head() in DataFrame in Pandas

In [None]:
rdd_many_numbers.take(2)

## top - return the highest values

In [None]:
rdd_more_numbers = sc.parallelize([3,4,5,2,5])

In [None]:
rdd_more_numbers.top(3)

In [None]:
rdd_more_numbers.take(3)

### Ejercicio 5) coger los 3 valores unicos máximos.

In [None]:
rdd_more_numbers.distinct().top(3)

In [None]:
rdd_lines.distinct().take(3)

In [None]:
rdd_lines.distinct().top(2)

## takeOrdered

In [None]:
rdd_more_numbers.collect()

In [None]:
rdd_more_numbers.take(3)

In [None]:
rdd_more_numbers.takeOrdered(3,lambda x: -x) # Descending

In [None]:
rdd_more_numbers.takeOrdered(3,lambda x: x) # Ascending

In [None]:
rdd_more_numbers.takeOrdered(3) # Ascending

### Ejercicio 6) coger 3 valores por orden, los pares primero en orden descreciente y los impares en orden cresciente.

### Ejercicio 7) coger 3 valores por orden, los pares primero en orden descreciente y los impares a continuación en orden tambien decresciente.

---

# Persist

In [None]:
print(dir())

In [None]:
a = [1,2,3,3] # En casi todas las lenguajes de programación las variables son persistentes dentro de su ambito!

In [None]:
print(dir())

In [None]:
rdd_numbers = sc.parallelize([1,2,3,3]) 
# Un RDD de Spark es volatil!!!! Solo existe cuando hay una acción y luego desaparece

In [None]:
rdd_more_numbers.persist

In [None]:
rdd_more_numbers.takeOrdered(rdd_more_numbers.count()) # Ascending

In [None]:
rdd_more_numbers.unpersist

---

In [None]:
rdd_more_numbers.takeOrdered(3,lambda x: -x**4 if x % 2 == 0 else -x) # Ascending

## takeSample

In [None]:
rdd_many_numbers.collect()

In [None]:
rdd_many_numbers.count()

In [None]:
rdd_many_numbers.take(10)

In [None]:
rdd_many_numbers.takeSample(False,20,seed=321) #Without replacement

In [None]:
rdd_many_numbers.takeSample(True,20,seed=321) #With replacement

In [None]:
for semilla in range(20):
    print(rdd_many_numbers.takeSample(True,10,seed=semilla)) #With replacement

### Ejercicio 8) crear 10 listas aleatorias con 10 elementos de rdd_many_numbers usando semillas diferentes y unirlas todas en un unico rdd. ojo: se puede hacer persistent alguna de ellas?

---

# Spark - RDD - Reduce Actions - Reducing the whole list to a single value

## reduce

In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))


SELECT COUNT(*) FROM tabla1 - la acción COUNT - es un agregador en SQL, en Python/Spark llamamos de reductor o reduce

In [None]:
rdd = sc.parallelize([1, 2, 3, 4]) 
rdd.reduce(lambda a, b: a + b)

In [None]:
(1*2)*(3*4)

In [None]:
rdd_many_numbers.collect()

In [None]:
rdd_many_numbers.reduce(lambda a, b: a * b)

## fold - the same as reduce, but you can provide a starting value

In [None]:
sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+b)

In [None]:
1+25+8+4+2

In [None]:
sc.parallelize([1,25,8,4,2]).fold(1,lambda a,b:a+b)

In [None]:
min([1,2,3,4,5])

In [None]:
max([1,2,3,4,5])

## aggregate

In [None]:
sc.parallelize([1,2,3,4,5]).reduce(lambda a,b:(a+b)/2)

In [None]:
# sc.parallelize([1,2,3,4,5]).aggregate(INICIALIZADORES,REDUCE_DENTRO_WORKER, REDUCE_ENTRE_WORKERS)

# sc.parallelize([1,2,3,4,5]).aggregate((0),lambda acumlulador,valor_de_linea:x, lambda x,y:x)
t = sc.parallelize([1,2,3,4,5]).aggregate((0,0,1), \
                                      lambda x,y:(x[0]+y,x[1]+1,x[2]*y), \
                                      lambda x,y:(x[0]+y[0],x[1]+y[1],x[2]*y[2]))
# x es el acumulador
# y son los valores


In [None]:
t[0]/t[1]

In [None]:
t[2]**(1/t[1])

In [None]:
t = sc.parallelize([1,2,3,4,5]).aggregate(
  (0, 0), # INICIALIZADOR DE LOS DOS CONTADORES 
  (lambda acc, value: (acc[0] + value, acc[1] + 1)), # REDUCE dentro del WORKER
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))) #REDUCE entre WORKERS

In [None]:
t

In [None]:
t[0]/t[1]

## reduce by Key

In [None]:
[(1,2), (3,4), (3,6)]

In [None]:
d = {1:2, 3:4,3:6}
d

In [None]:
rdd = sc.parallelize({1:2, 3:4,3:6})
rdd.collect()

In [None]:
rdd.reduceByKey(lambda a, b: a + b).collect()

In [None]:
[("Python",20),("Python",50),("Python",35),("Spark",23)]

In [None]:
sc.parallelize([(None,30),(4+4j,50),(4+4j,35),(4+4j,23)]).\
reduceByKey(lambda x,y: x+y).\
collect()

In [None]:
sc.parallelize([("Python",20),("Python",50),("Python",35),("Spark",23)]).\
reduceByKey(lambda x,y: x+y).\
collect()

In [None]:
[("Python",(20,3)),("Python",(50,4)),("Python",(35,2)),("Spark",(20,3))]

In [None]:
sc.parallelize([("Python",(20,3)),("Python",(50,4)),("Python",(35,2)),("Spark",(20,3))]).\
reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])).\
collect()

## Persistent (Catching)

In [None]:
rdd.persist

In [None]:
rdd.count()

## Cache

In [None]:
rdd_cached_lines = rdd_lines.cache()

In [None]:
rdd_cached_lines.collect()

In [None]:
rdd_cached_lines.count()

---

# Example 1

In [None]:
lines = sc.parallelize(["linea 500 Python","linea 404 Python","linea 404 Spark"] )

In [None]:
lines.map(lambda x: x.split(' ')).filter(lambda x : "404" in x).map(lambda word : (word, 1)).collect()

In [None]:
lines.flatMap(lambda x: x.split(' ')).collect()

In [None]:
lines.map(lambda x: x.split(' ')).collect()

In [None]:
lines.flatMap(lambda x: x.split(' ')).filter(lambda x : "404" in x).collect()

In [None]:
lines.flatMap(lambda x: x.split(' ')).\
filter(lambda x : "404" in x).\
map(lambda word : (word, 1)).collect()

In [None]:
from operator import add
#lambda x,y: x+y

In [None]:
lines.flatMap(lambda x: x.split(' ')).\
filter(lambda x : "404" in x).\
map(lambda word : (word, 1)).\
reduceByKey(add).collect()

In [None]:
lines.flatMap(lambda x: x.split(' ')).filter(lambda x : "404" in x or "500" in x).count()

In [None]:
from operator import add

In [None]:
rdd = lines.flatMap(lambda x: x.split(' '))
rdd.map(lambda x:(x,1)).reduceByKey(add).collect()

In [None]:
lines.flatMap(lambda x: x.split(' '))\
.filter(lambda x : "404" in x or "500" in x).map(lambda word : (word, 1)) \
.reduceByKey(add).collect()

In [None]:
lines.flatMap(lambda x: x.split(' ')).filter(lambda x : "404" in x or "500" in x).map(lambda word : (word, 1)) \
.reduceByKey(lambda x,y: x+y).collect()

# Ejercicio - Usando %3, sumar los número entre 1 y 1000000 divisible por %3==0, %3==1 o %3==2 (como si fuera por grupo). Hasta las 11:00
respuesta esperada: (166666833333, 166667166667, 166666500000)

# Ejercicio 2) Hacer el anterior juntando las claves 0 y 1
respuesta esperada: (333334000000, 166666500000)