# Una introducción a Spark con Python

#### Rafael Caballero, modificado por Pablo C. Cañizares

Casi todo el código extraído del libro *Big Data con Python*

## 0: Preparación


#### Windows

El mismo código debería funcionar en otros sistemas 

* 
* Instrucciones para linux: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f
* Instrucciones para windows: https://medium.com/@ashish1512/how-to-setup-apache-spark-pyspark-on-jupyter-ipython-notebook-3330543ab307
* Instrucciones para mac: https://sparkbyexamples.com/pyspark/how-to-install-pyspark-on-mac/

En el laboratorio tenemos el problema de que no podemos hacer nuestras propias instalaciones; por ello podemos copiar la carpeta spark hlocal\tdm y ejecutar el siguiente código

# copiar la carpeta spark en c:\hlocal\tdm
import os

# cambiamos las variables del sistema
spark = "/opt/homebrew/bin/spark"
# en el path se añade
path = os.environ.get("PATH")
path = path + ";" + spark + "\\bin;"
os.environ["PATH"] = path
os.environ["SPARK_HOME"] = spark
os.environ["HADOOP_HOME"] = spark

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"

# si da problema con collect quizás haya que poner java_home
os.environ["JAVA_HOME"] = "/usr/bin/java"
os.environ["PATH"] = os.environ.get("JAVA_HOME") + "//bin;" + path


In [1]:
import findspark

findspark.init()

import pyspark  # only run after findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("""select 'spark' as hola """)
df.show()


24/10/02 17:30:30 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 10.9.20.158 instead (on interface wlp0s20f3)
24/10/02 17:30:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/02 17:30:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-----+
| hola|
+-----+
|spark|
+-----+



## 1: Acciones

Al importar pyspark ya tenemos una variable spark con la conexión con Spark creada. Para manejar RDDs nos interesará extraer de ella el llamado *sparkContext*:

In [2]:
sc = spark.sparkContext
r = sc.parallelize([1, 2, 3])  # Crea un RDD con los valores 1,2,3

Veamos el tipo de la variable r

In [3]:
type(r)

pyspark.rdd.RDD

In [4]:
cuadrados = sc.parallelize([i * i for i in range(100)])
l = cuadrados.take(5)
l[2:]

[4, 9, 16]

24/10/02 17:30:44 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Acciones  collect, take y count

Las *acciones* son las que lanzan un cómputo.
** collect ** recupera todo un RDD como si se tratara de un array

*Ojo*: Si hacemos esto nos traemos toda la colección al ordenador en el que estamos, ¡eso no es big data!

In [5]:
r = sc.parallelize([i * i for i in range(100)])
r.collect()  # Falla en algunas versiones de Java!!!!!!

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961,
 1024,
 1089,
 1156,
 1225,
 1296,
 1369,
 1444,
 1521,
 1600,
 1681,
 1764,
 1849,
 1936,
 2025,
 2116,
 2209,
 2304,
 2401,
 2500,
 2601,
 2704,
 2809,
 2916,
 3025,
 3136,
 3249,
 3364,
 3481,
 3600,
 3721,
 3844,
 3969,
 4096,
 4225,
 4356,
 4489,
 4624,
 4761,
 4900,
 5041,
 5184,
 5329,
 5476,
 5625,
 5776,
 5929,
 6084,
 6241,
 6400,
 6561,
 6724,
 6889,
 7056,
 7225,
 7396,
 7569,
 7744,
 7921,
 8100,
 8281,
 8464,
 8649,
 8836,
 9025,
 9216,
 9409,
 9604,
 9801]

Es mucho más normal recuperar solo unos cuantos elementos. De esto se encarga *take(n)* que devuelve los n primeros miembros del rdd

In [6]:
print(r.take(10))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


Por último, *count* nos dice de cuántos elementos consta un RDD

In [9]:
texto = sc.textFile("quijote.txt")

In [10]:
print(texto.count())
print(texto.take(10))

1347
['El ingenioso hidalgo don Quijote de la Mancha', '', '', 'TASA', '', 'Yo, Juan Gallo de Andrada, escribano de Cámara del Rey nuestro señor, de', 'los que residen en su Consejo, certifico y doy fe que, habiendo visto por', 'los señores dél un libro intitulado El ingenioso hidalgo de la Mancha,', 'compuesto por Miguel de Cervantes Saavedra, tasaron cada pliego del dicho', 'libro a tres maravedís y medio; el cual tiene ochenta y tres pliegos, que']


### Reduce, fold

*reduce* es una de las operaciones claves en Spark. 
Reduce todos los elementos de un RDD a uno solo. 
Para ello toma una función que aplica:
    * Primero a los dos primeros elementos, dando un resultado r1
    * Luego a r1 y al tercer elemento, dando r2
    * ... así hasta rn-1 y el último elemento, que dan el valor final

In [11]:
def add(x, y):
    print(x, y)
    return x + y


r = sc.parallelize(range(1, 6))
print(r.reduce(add))

1 2
3 3
6 4
10 5
15


In [12]:
# Otra forma de lograr lo mismo: funciones lambda
print(r.reduce(lambda x, y: x + y))

15


fold() es similar a reduce, excepto porque toma un 'valor cero' inicial, al que podemos llamar z. 
Dado un valor z y una lista l, la función se aplica
    * Primero a z y al primer elemento de l, dando un resultado r1
    * Luego a r1 y al segundo elemento de l, dando r2
    * ... así hasta rn-1 y el último elemento, que dan el valor final

In [13]:
print(r.fold(0, lambda x, y: x + y))  # raro!!!

15


## 2: Transformaciones

### Map

Aplica una función a todos los miembros de un RDD. La salida es otro RDD con tantos elementos como tenía la entrada

In [14]:
def longitud(s):
    return len(s)


longitudes = texto.map(longitud)
print(longitudes.count())
longitudes.take(5)

1347


[45, 0, 0, 4, 0]

In [15]:
def divide(s):
    return s.split(" ")


##################################
texto.flatMap(divide).map(lambda s: s.upper()).map(lambda s: (s, len(s))).take(20)


[('EL', 2),
 ('INGENIOSO', 9),
 ('HIDALGO', 7),
 ('DON', 3),
 ('QUIJOTE', 7),
 ('DE', 2),
 ('LA', 2),
 ('MANCHA', 6),
 ('', 0),
 ('', 0),
 ('TASA', 4),
 ('', 0),
 ('YO,', 3),
 ('JUAN', 4),
 ('GALLO', 5),
 ('DE', 2),
 ('ANDRADA,', 8),
 ('ESCRIBANO', 9),
 ('DE', 2),
 ('CÁMARA', 6)]

### Filter

Aplica una función booleana a todos los miembros de un RDD. La salida es otro RDD que solo tiene los miembros para los que la función devuelve True

In [16]:
numeros = sc.parallelize(range(50))
numeros.filter(lambda n: (n % 2) == 0).collect()

[0,
 2,
 4,
 6,
 8,
 10,
 12,
 14,
 16,
 18,
 20,
 22,
 24,
 26,
 28,
 30,
 32,
 34,
 36,
 38,
 40,
 42,
 44,
 46,
 48]

### Ejemplo complejo

In [17]:
def maxpareja(s1, s2):
    if s1[1] > s2[1]:
        r = s1
    else:
        r = s2
    return r


pareja = (
    texto.flatMap(lambda line: line.split(" "))
    .map(lambda s: s.upper())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .reduce(maxpareja)
)

print(pareja)

('', 840)


In [22]:
personas = list((("John", 34), ("Ana", 19), ("José", 40)))
# Creación de un RDD a partir de la lista
personasRDD = sc.parallelize(personas)

# Transformación: Filtrar personas mayores de 30 años
treintaRDD = personasRDD.filter(lambda persona: persona[1] > 30)

# Acción: Recolectar el resultado y mostrarlo
treintaRDD.collect()


[('John', 34), ('José', 40)]

## Ejercicio

leer el quijote en un RDD y cuente el número de frases que contienen el texto 'merced'.
Nota: no importa que la palabra venga dentro de otra palabra, se contará igual


In [34]:
texto.filter(lambda line: "merced" in line).count()

15