## **Introducción a Spark usando Python** 


![](http://spark.apache.org/images/spark-logo.png)


### La API de Python



Spark está escrito en Scala, que se compila a bytecode de Java, pero puedes escribir código en Python para comunicarte con la máquina virtual de Java a través de una biblioteca llamada `py4j`. Python tiene la API más completa, pero puede ser algo limitante si necesitas utilizar un método que no está disponible o si necesitas escribir un código especializado. La latencia asociada con la comunicación entre Python y la JVM a veces puede hacer que el código se ejecute más lentamente.  

Una excepción a esto es la biblioteca SparkSQL, que tiene un motor de planificación de ejecución que precompila las consultas. Incluso con esta optimización, hay casos en los que el código puede ejecutarse más lentamente que la versión nativa en Scala.  
La recomendación general para el código en PySpark es utilizar los métodos integrados tanto como sea posible y evitar llamadas excesivamente frecuentes (iterativas) a los métodos de Spark. Si necesitas escribir código de alto rendimiento o especializado, intenta hacerlo en Scala.  

Pero bueno, sabemos que Python es increíble, y sus bibliotecas de visualización son mucho mejores. ¡Así que la decisión es tuya!. 


### Objetivos


En este cuaderno, revisaremos los conceptos básicos de Apache Spark y PySpark. Comenzaremos creando el `SparkContext` y el `SparkSession`. Luego, crearemos un `RDD` y aplicaremos algunas transformaciones y acciones básicas. Finalmente, demostraremos los conceptos básicos de `DataFrames` y `SparkSQL`.  

Después de terminar este cuaderno, serás capaz de:  

* Crear el `SparkContext` y el `SparkSession`  
* Crear un `RDD` y aplicar algunas transformaciones y acciones básicas a los `RDDs`  
* Demostrar el uso básico de `DataFrames` y `SparkSQL`  



----


### Configuración



Para este cuaderno, usaremos Python y Spark (PySpark). Estas bibliotecas deberían estar instaladas en tu entorno de trabajo

In [12]:
# Instalando paquetes requeridos
!pip install pyspark
!pip install findspark



In [13]:
import findspark
findspark.init()

In [14]:
# PySpark es la API de Spark para Python. En este cuaderno, usamos PySpark para inicializar el contexto de Spark. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

### Ejercicio 1 - Spark Context y Spark Session  



En este ejercicio, crearás el contexto de Spark e inicializarás la sesión de Spark necesaria para `SparkSQL` y `DataFrames`.  
`SparkContext` es el punto de entrada para las aplicaciones de Spark y contiene funciones para crear `RDDs`, como `parallelize()`.  
`SparkSession` es necesario para `SparkSQL` y operaciones con `DataFrame`. 

#### Tarea 1: Creando la sesión y contexto de spark


In [18]:
# Creación de una clase de contexto de Spark
sc = SparkContext()

# Creación de una sesión de Spark
spark = SparkSession \
    .builder \
    .appName("Ejemplo básico de DataFrames con Python Spark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Tarea 2: Inicializar la sesión de Spark  
Para trabajar con *DataFrames*, solo necesitamos verificar que la instancia de la sesión de Spark ha sido creada.


In [20]:
if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession está activa y lista para usar.")
else:
    print("SparkSession no está activa. Por favor, crea una SparkSession.")

SparkSession está activa y lista para usar.


####  Ejercicio 2: RDDs  
En este ejercicio trabajaremos con *Resilient Distributed Datasets* (RDDs). Los RDDs son la abstracción de datos primitiva de Spark y utilizamos conceptos de programación funcional para crearlos y manipularlos.  


#### Tarea 1: Crear un RDD  
Con fines de demostración, creamos un RDD llamando a `sc.parallelize()`.  
Creamos un RDD que contiene enteros del 1 al 30.


In [23]:
data = range(1,30)
# primer elemento del iterador
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)
xrangeRDD

1


PythonRDD[1] at RDD at PythonRDD.scala:53

#### Tarea 2: Transformaciones



Una transformación es una operación sobre un RDD que da como resultado un nuevo RDD. El RDD transformado se genera rápidamente porque se evalúa de manera *perezosa* (*lazy evaluation*), lo que significa que el cálculo no se realiza inmediatamente cuando se genera el nuevo RDD.  
El RDD contendrá una serie de transformaciones o instrucciones de cálculo, que solo se ejecutarán cuando se llame a una acción.  

En esta transformación, reducimos cada elemento del RDD en 1. Nota el uso de la función *lambda*. Luego filtramos el RDD para que solo contenga elementos menores a 10.



In [26]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)


#### Tarea 3: Acciones 



Una transformación devuelve un resultado al *driver*. Ahora aplicamos la acción `collect()` para obtener la salida de la transformación.


In [29]:
print(filteredRDD.collect())
filteredRDD.count()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more


####  Tarea 4: Almacenamiento en caché de datos 


Este simple ejemplo muestra cómo crear un RDD y almacenarlo en caché. ¡Observa la mejora de velocidad **10x**!  
Si deseas ver el tiempo de cómputo real, accede a la interfaz de Spark UI en `host:4040`. Notarás que el segundo cálculo tomó mucho menos tiempo.



In [None]:
import time 

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()
# el primer count activará la evaluación del count *y* almacenará en caché
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)


t2 = time.time()
# el segundo count opera solo sobre los datos en caché
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

#test.count()

### Ejercicio 3: DataFrames y SparkSQL 


Para trabajar con el potente motor SQL de Apache Spark, necesitas una *Spark Session*.  
Ya la creamos en el primer ejercicio, así que verifiquemos que la sesión sigue activa.


In [None]:
spark

#### Tarea 1: Crea tu primero Dataframe!



Puedes crear un conjunto de datos estructurado (similar a una tabla de base de datos) en Spark.  
Una vez hecho esto, puedes utilizar herramientas SQL avanzadas para consultar y unir tus *DataFrames*.


In [None]:
# Descarga los datos primero en un archivo local `people.json`
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

In [None]:
# Lee el conjunto de datos en un dataframe de Spark usando la función `read.json()`
df = spark.read.json("people.json").cache()

In [None]:
# Imprime el DataFrame y su esquema de datos
df.show()
df.printSchema()

In [None]:
# Registra el DataFrame como una vista SQL temporal
df.createTempView("people")

#### Tarea 2: Explorar los datos usando funciones de DataFrame y SparkSQL  
En esta sección, exploramos los conjuntos de datos utilizando funciones tanto de *DataFrames* como de consultas SQL correspondientes con *SparkSQL*.  
Nota las diferentes formas de lograr la misma tarea.


In [None]:
# Seleccionar y mostrar columnas básicas de datos

df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()

In [None]:
# Realizar filtrado básico

df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

In [None]:
# Realizar agregación básica de datos

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

----


### Pregunta 1 - RDDs


Crea un RDD con enteros del 1 al 50.  
Aplica una transformación para multiplicar cada número por 2, obteniendo un RDD que contenga los primeros 50 números pares.  



In [None]:
# Codigo inicial
# numbers = range(1, 50)
# numbers_RDD = ...
# even_numbers_RDD = numbers_RDD.map(lambda x: ..)

In [None]:
# Codigo 

Doble-click **aquí** para la solución.

<!-- La respuesta esta abajo:
numbers = range(1, 50) 
numbers_RDD = sc.parallelize(numbers) 
even_numbers_RDD = numbers_RDD.map(lambda x: x * 2)
print( even_numbers_RDD.collect()) 
-->


#### Pregunta 2 - DataFrames y SparkSQL


Similar al archivo `people.json`, ahora lee el archivo `people2.json` en el cuaderno, cárgalo en un *DataFrame* y aplica operaciones SQL para determinar la edad promedio en nuestro archivo `people2`. 


In [None]:
# codigo inicial
# !curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json
# df = spark.read...
# df.createTempView..
# spark.sql("SELECT ...")

In [None]:
# Codigo

Haz doble clic **aquí** para ver una pista.

<!-- La pista está abajo:

1. La consulta SQL "SELECT AVG(nombre_columna) FROM..." se puede utilizar para encontrar el valor promedio de una columna.  
2. Otra posible forma es utilizar las operaciones de *DataFrame* `select()` y `mean()`.
-->


Haz doble clic **aquí** para ver la solución.

<!-- La respuesta está abajo:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json
df = spark.read.json("people2.json").cache()
df.createTempView("people2")
result = spark.sql("SELECT AVG(age) from people2")
result.show()
-->


### Pregunta 3 - SparkSession


Cierra la *SparkSession* que creamos para este cuaderno.  


In [None]:
# Codigo

Haz doble clic **aquí** para ver la solución.

<!-- La respuesta está abajo:

spark.stop() detendrá la sesión de Spark.

-->  
