### **PySpark**

**Por: José María Manzano Ortega**.

## **ÍNDICE**


En este *notebook*:
1. Se aprende a trabajar con Apache Spark mediante Python.
2. Se ven las opciones que tenemos para trabajar con los datos en Spark.
3. Se cargarán los datos desde un fichero y se trabajará con estos conjuntos de datos de Big Data.


Contenidos:
1. Spark con Python (PySpark): Instalación y primer programa.    
2. Sesión en PySpark.
3. Lectura de datos de un fichero.
4. Trabajar con DataFrames.  
5. Aplicar consultas SQL sobre DataFrames.

##**1. SPARK CON PYTHON (PYSPARK): Instalación y primer programa.**

###**1.1. Instalar Java, Apache Spark, PySpark y Findspark.**

Los programas de python que vamos a ver necesitarán tener instalado **Java** y **Apache Spark**. Java es necesario para poder ejecutar Spark.
Con el siguiente código, le indicamos a Google Colaboratory que instale Apache Spark (Java ya está instalado).

In [1]:
#Nos descargamos Apache Spark con Hadoop
!wget -q --show-progress https://downloads.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

#Instalamos Apache Spark
!tar xf spark-3.5.3-bin-hadoop3.tgz



Python ya está instalado. Ahora instalamos [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html) que, como hemos dicho, nos permite usar Spark desde Python.
También vamos a tener que instalar [`Findspark`](https://github.com/minrk/findspark), que es
 una biblioteca de Python que nos ayuda a usar Spark en un cuaderno (*notebook*).

In [2]:
!pip install pyspark   #Instalamos Spark para Python
!pip install findspark #Instalamos FindSpark



Una vez que están todos los programas y bibliotecas están instaladas, lo siguiente que debemos hacer es darle valores a las variables de entorno, para indicar dónde están instalados Java y Spark. Estas variables de entorno, las podemos modificar cuando instalemos Spark y Java y, la forma de darles valores, depende de nuestro sistema operativo (Windows, Mac, Linux). No obstante, en estos ejemplos, nosotros lo haremos desde Python.

También hay que tener en cuenta que hay que especificar la ruta completa de dónde están instalados ambos programas.

In [3]:
#Declaramos las variables de entorno
import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

Ya sólo queda decirle a `findspark` la carpeta en la que está Spark, para enlazarlo con Python.

In [4]:
#Indicamos donde está Spark
import findspark
findspark.init("spark-3.5.3-bin-hadoop3")#SPARK_HOME

###**1.2. Primer programa en Spark**


Una vez que en nuestro entorno de trabajo tenemos todo instalado y configurado, ya podemos empezar a trabajar con Spark desde Python. En el inicio de todo programa en PySpark, tendremos que crear una sesión.

In [5]:
#Creamos una sesión de Spark para poder trabajar
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]").getOrCreate()

Ya está todo preparado para probar Spark. Para ello, vamos a crear una variable binaria de forma aleatoria con un DataFrame de Pandas.

Lo convertiremos a un DataFrame de Spark para comprobar que todo funciona y lo mostraremos por pantalla.

In [6]:
import random
import pandas as pd

#Creamos una variable aleatoria de valores binarios
varBinaria=[{"Binaria": random.choice("01")} for x in range(100)]

#Convertimos la variable en un DataFrame de Pandas
df = pd.DataFrame(varBinaria)

#Convertimos el DataFrame de Pandas a Spark, y lo mostramos
ddf = spark.createDataFrame(df)
ddf.show(10)

+-------+
|Binaria|
+-------+
|      1|
|      1|
|      0|
|      1|
|      0|
|      1|
|      0|
|      1|
|      1|
|      0|
+-------+
only showing top 10 rows



Los DataFrames de Sparks son muy similares a los DataFrame de Pandas. Aunque hay una diferencia notable, y es que los DataFrame de Spark pueden estar repartidos por varios ordenadores y los de Pandas sólo pueden estar en un solo ordenador. No obstante, se puede pasar de uno al otro sin mucho problema e incluso de forma eficiente con [Apache Arrow](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html), siempre que los datos quepan en la memoria de sólo un ordenador.



Veamos cómo:

In [7]:
# Como antes, guardamos la variableBinaria en un DataFrame de Pandas
pdf = pd.DataFrame(varBinaria)

# Convertimos el DataFrame de Pandas a Spark
sdf = spark.createDataFrame(pdf)

# Convertimos el DataFrame de Spark DataFrame a uno de Pandas y lo mostramos
psdf = sdf.select("*").toPandas()
psdf.head()


Unnamed: 0,Binaria
0,1
1,1
2,0
3,1
4,0


##**2. SESIÓN EN PYSPARK.**


La puerta de entrada a toda la funcionalidad de Spark es la clase `SparkSession`. Para crear una sesión en Spark, usamos `SparkSession.builder`. En informática, una sesión es un intercambio de información interactiva semipermanente entre dos o más entidades, en nuestro caso entre Spark y Python.  Los atributos más comunes usados con [`SparkSession.builder`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.Builder) son:

*   `appName(nombre)` − Para darle un nombre a la aplicación.

*   `config(clave, valor)` − Para darle un valor a una propiedad de la configuración.

*   `master(valor)` − Para indicar la dirección en Internet (URL) del nodo maestro del clúster de ordenadores. Para ejecutarlo en nuestro ordenador personal se hace con `local` para ejecutarlo de forma local con una sola hebra de ejecución. Con `local[n]` se ejecuta localmente con `n` hebras. Si usamos `local[*]` ejecuta Spark en nuestro ordenador personal con tantas hebras como permita nuestra máquina.

*   `getOrCreate()` - Este método obtiene una sesión de Spark existente o, si no hay ninguna, la crea.

In [8]:
#Creamos una sesión de Spark para poder trabajar
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Ejemplo básico PySpark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


##**3. LECTURA DE DATOS DE UN FICHERO.**

Vamos a aprender cómo cargar un fichero en un DataFrame de Spark. Para ello vamos a trabajar con una versión del problema **SUSY**.

El conjunto de datos **SUSY**, compuesto por 5,000,000 de instancias y 18 atributos. El objetivo es distinguir entre una señal que produce partículas supersimétricas (SUSY), valor de la variable clase igual a 1, y una que no, valor de la clase igual a cero.

No obstante, para aprender, vamos a usar sólo 10,0000 instancias para entrenamiento y otro tanto para test.

Antes que nada, vamos a cargar los datos en Google Colab.




In [9]:
#Nos descargamos los ficheros de datos en Google Colab
!wget -nv --no-check-certificate 'https://docs.google.com/uc?export=download&id=1HOrM49tCLA_NqHyD_ps_cv483FPN7aWo' -O susy-10k-tra.csv
!wget -nv --no-check-certificate 'https://docs.google.com/uc?export=download&id=1HT80d5cwU7HMi2XK8CNgxgHvxRZEZB_d' -O susy-10k-tst.csv
#Vamos a mostrar la cabecera de los ficheros y dos filas, para comprobar
#que se han bajado correctamente.
!head -3 susy-10k-tra.csv
!head -3 susy-10k-tst.csv

2024-09-26 14:45:16 URL:https://drive.usercontent.google.com/download?id=1HOrM49tCLA_NqHyD_ps_cv483FPN7aWo&export=download [3463157/3463157] -> "susy-10k-tra.csv" [1]
2024-09-26 14:45:19 URL:https://drive.usercontent.google.com/download?id=1HT80d5cwU7HMi2XK8CNgxgHvxRZEZB_d&export=download [3469204/3469204] -> "susy-10k-tst.csv" [1]
uno,dos,tres,cuatro,cinco,seis,siete,ocho,nueve,diez,once,doce,trece,catorce,quince,dieciseis,diecisiete,dieciocho,clase
0.6433419585227966,1.3615427017211914,0.6828370690345764,0.48453789949417114,-0.32587578892707825,-0.8524075746536255,1.145269513130188,0.8196632862091064,0.40248867869377136,1.325475811958313,0.7232717275619507,0.6548330783843994,0.8034266233444214,0.37070924043655396,0.7308408617973328,0.5756281018257141,1.4113106727600098,0.19226300716400146,1.0
1.2374616861343384,-0.4839438199996948,1.3800559043884277,1.6685774326324463,0.4818389117717743,-0.6302737593650818,0.6783822178840637,-0.43130290508270264,0.34755226969718933,-0.014657160267233

Una vez que lo tenemos en disco vamos a usar el método `read` de la sesión (variable `spark`) para leer ambos ficheros.

Si nos fijamos, para leer el primer fichero, se usan métodos con distintas opciones, por ejemplo, con `load()` indicamos el nombre del fichero. Con `format()` se indica que es un fichero CSV. Con `option()` le indicamos que obtenga el esquema de los datos (`inferSchema`) y que el fichero tiene una primera fila de cabecera con el nombre  las columnas (`header`). Se pueden indicar muchas más opciones, como la codificación, los separadores, comentarios, etc.

También podemos abrir distintos tipos de formatos de ficheros (`json, parquet, orc, libsvm, csv, text`) en local, o desde una base de datos (usando `JDBC`) o usando HDFS.

En la lectura del segundo fichero, hemos usado directamente el método `csv()` para leerlo. Al final de leerlos mostramos las 3 primeras líneas con el método `show()`.




In [10]:
#Leemos el conjunto de entrenamiento
dfTra = spark.read \
    .format("csv") \
    .option("inferSchema",True) \
    .option("header", True) \
    .load("susy-10k-tra.csv")
dfTra.show(3)

#Leemos el conjunto de test
dfTst = spark.read.csv('susy-10k-tst.csv',
                     inferSchema=True,
                     header=True)
dfTst.show(3)

+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+--------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----+
|               uno|                dos|               tres|             cuatro|               cinco|               seis|             siete|                ocho|              nueve|                diez|              once|              doce|             trece|            catorce|            quince|         dieciseis|        diecisiete|          dieciocho|clase|
+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+--------------------+-------------------+--------------------+------------------+------------------+------------------+---------------

##**4. TRABAJAR CON DATAFRAMES.**

Ya que tenemos nuestros datos en DataFrames de Spark, vamos a aprender a trabajar con ellos.


###**4.1. Navegando por los DataFrames.**

Para ver el número de filas que tenemos, podemos usar el método `.count()` y para ver el número de columnas (variables) podemos usar la propiedad `.columns` y ver su tamaño. Para ver nombre y el tipo de dato de cada columna, esto es, el esquema, podemos usar el método `.printSchema()`.

Para mostrar el DataFrame podemos usar el método `.show()` como hemos hecho anteriormente. También lo podemos usar en combinación del método `.describe()` que nos calcula un resumen estadístico del DataFrame.



In [11]:
#Mostramos el tamaño del fichero de entrenamiento
print ("TRAIN: ", len(dfTra.columns), "variables y" , dfTra.count(), "instancias.")
#Mostramos el esquema y un resumen estadístico del fichero de entrenamiento
print ("Esquema de dfTra:")
dfTra.printSchema()
print ("Resumen estadístico dfTra:")
dfTra.describe().show()

#Mostramos el tamaño del fichero de test
print ("\nTEST: ", len(dfTst.columns), "variables y" , dfTst.count(), "instancias.")
#Mostramos el esquema y un resumen estadístico del fichero de test
print ("Esquema de dfTst:")
dfTst.printSchema()
print ("Resumen estadístico dfTst:")
dfTst.describe().show()

TRAIN:  19 variables y 10000 instancias.
Esquema de dfTra:
root
 |-- uno: double (nullable = true)
 |-- dos: double (nullable = true)
 |-- tres: double (nullable = true)
 |-- cuatro: double (nullable = true)
 |-- cinco: double (nullable = true)
 |-- seis: double (nullable = true)
 |-- siete: double (nullable = true)
 |-- ocho: double (nullable = true)
 |-- nueve: double (nullable = true)
 |-- diez: double (nullable = true)
 |-- once: double (nullable = true)
 |-- doce: double (nullable = true)
 |-- trece: double (nullable = true)
 |-- catorce: double (nullable = true)
 |-- quince: double (nullable = true)
 |-- dieciseis: double (nullable = true)
 |-- diecisiete: double (nullable = true)
 |-- dieciocho: double (nullable = true)
 |-- clase: double (nullable = true)

Resumen estadístico dfTra:
+-------+-------------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+----


Para trabajar con los Dataframes podemos quedarnos con una o varias columnas usando el método `select()`. Para identificar a las columnas podemos usar una cadena con el nombre de la columna, o usando el nombre de la columna como una propiedad del DataFrame, usando el DataFrame como un diccionario e indicando como clave el nombre de la columna, o bien, usando el DataFrame e indicando el número de columna, teniendo en cuenta que empezamos a contar desde cero.

In [12]:
#Veamos distintas formas de quedarnos con las variables de entrada segunda,
#cuarta y sexta.
dfTra.select('dos','cuatro','seis').show(5)
dfTra.select(dfTra.dos,dfTra.cuatro,dfTra.seis).show(5)
dfTra.select(dfTra['dos'],dfTra['cuatro'],dfTra['seis']).show(5)
dfTra.select(dfTra[1],dfTra[3],dfTra[5]).show(5)


+-------------------+-------------------+-------------------+
|                dos|             cuatro|               seis|
+-------------------+-------------------+-------------------+
| 1.3615427017211914|0.48453789949417114|-0.8524075746536255|
|-0.4839438199996948| 1.6685774326324463|-0.6302737593650818|
|  0.783230721950531|  1.475182294845581|0.43646135926246643|
| 0.9232335686683655|  0.519090473651886|-1.6212221384048462|
|-0.4539290964603424|  2.352872848510742|0.03150284290313721|
+-------------------+-------------------+-------------------+
only showing top 5 rows

+-------------------+-------------------+-------------------+
|                dos|             cuatro|               seis|
+-------------------+-------------------+-------------------+
| 1.3615427017211914|0.48453789949417114|-0.8524075746536255|
|-0.4839438199996948| 1.6685774326324463|-0.6302737593650818|
|  0.783230721950531|  1.475182294845581|0.43646135926246643|
| 0.9232335686683655|  0.519090473651886|-1.6


También podemos filtrar los datos que queremos mostrar con el método `filter()`, agrupar valores comunes con el método `groupby()`, u ordenar los datos con el método `sort()`.

In [13]:
#Nos quedamos con aquellas filas donde el valor del atributo "dos" es positivo.
dfTra.filter(dfTra.dos>0).show(5)

#Agrupamos los distintos valores de la variable de salida.
dfTra.groupby(dfTra['clase']).count().show()

#Ordenamos los valores del atributo "dos".
dfTra.select(dfTra[0], 'dos').sort('dos').show(5)


+------------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-----+
|               uno|                dos|                tres|             cuatro|               cinco|                seis|              siete|               ocho|              nueve|                diez|              once|              doce|             trece|            catorce|            quince|         dieciseis|         diecisiete|          dieciocho|clase|
+------------------+-------------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+------------------+------------------+------------------+-------

###**4.2 Modificando los DataFrames.**

Con el método `withColumn()` se puede cambiar el tipo de una columna, cambiar sus valores  y también puede utilizarse para crear una nueva columna. Para cambiar el nombre de una columna lo haremos con el método  `withColumnRenamed()`.

Para eliminar una o varias columnas podemos usar el método `drop()` y con `dropDuplicates()` podemos eliminar aquellas filas que sean iguales, dejando sólo una de ellas.

Con el método `join()` podremos unir tablas.

In [14]:
#Vamos cambiar la variable clase de dfTra de Real a Entero
dfTra.withColumn("clase",dfTra.clase.cast("Integer")).show(5)

#Vamos a multiplicar por dos los valores de la primera columna de dfTra
dfTra.withColumn("uno",dfTra.uno*2).show(5)

#Vamos a añadir una columna con los valores de la primera columa de dfTra por 3
dfTra.withColumn("porTres",dfTra.tres*3).show(5)

#Vamos a cambiar el nombre de la variable clase label
dfTra.withColumnRenamed("clase","label").show(5)

#Voy a eliminar la primera columna de dfTra
dfTra.drop('uno').show(5)

#Voy a eliminar la primera columna de dfTra y luego eliminar filas repetidas
dfTra.drop('uno').dropDuplicates().show(5)

#Nota: para que el cambio fuera permanente tendría que guardarse (son inmutables)
#dfTra=dfTra.drop('uno').dropDuplicates()
#dfTra.show(5)


+------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----+
|               uno|                dos|                tres|             cuatro|               cinco|               seis|              siete|                ocho|              nueve|                diez|              once|              doce|             trece|            catorce|            quince|         dieciseis|        diecisiete|          dieciocho|clase|
+------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+------------------+------------------+------------------+---------

Veamos ahora un ejemplo de un `join` o unión cartesiana de DataFrames.

In [15]:
#Voy a añadir una nueva columna con el número de instancia
from pyspark.sql import functions
dftmp=dfTra.withColumn("id", functions.monotonically_increasing_id())
dftmp.show(5)

#En df1 me voy a quedar con todas las columnas menos la clase
lista=dftmp.columns[:-2]
lista.insert(0,"id")
df1=dftmp.select(lista)
df1.show(5)

#En df1 me voy a quedar con el id y la clase
df2=dftmp.select("id","clase")
df2.show(5)

#Voy a añadir la clase a la tabla de atributos usando el atributo "id"
df1.join(df2, df1.id == df2.id).show(5)

+------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----+---+
|               uno|                dos|                tres|             cuatro|               cinco|               seis|              siete|                ocho|              nueve|                diez|              once|              doce|             trece|            catorce|            quince|         dieciseis|        diecisiete|          dieciocho|clase| id|
+------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+------------------+------------------+------------------+-

##**5. APLICAR CONSULTAS SQL SOBRE DATAFRAMES.**

De forma análoga a como hemos trabajado con los DataFrames en el apartado anterior podemos hacerlo usando consultas SQL. Si el alumno tiene experiencia con SQL, usar SQL puede ser la mejor opción.

Para poder trabajar con los DataFrames como tablas tendremos que definir vistas con el método `createOrReplaceTempView()`. Una vez que hayamos definido una vista y usando el método `sql()` de la sesión, podremos trabajar cómodamente usando SQL.

In [16]:
#Creamos vistas SQL para los DataFrame
dfTra.createOrReplaceTempView("tablaTrain")
dfTst.createOrReplaceTempView("tablaTest")

#Mostramos las tablas que hay
spark.sql("SHOW TABLES").show()

#Equivalente a dfTra.select('dos','cuatro','seis').show(5)
sqlDF = spark.sql("SELECT dos,cuatro,seis FROM tablaTrain")
sqlDF.show(5)

#Equivalente a dfTra.groupby(dfTra['clase']).count().show(5)
spark.sql("SELECT clase, count(clase) FROM tablaTrain GROUP BY clase").show(5)

#Equivalente dfTra.select('uno', 'dos').sort('dos').show(5)
spark.sql("SELECT uno,dos FROM tablaTrain ORDER BY dos").show(5)

#Ejemplo join ci
df1.createOrReplaceTempView("tabla1")
df2.createOrReplaceTempView("tabla2")
#Equivalente a df1.join(df2, df1.id == df2.id).show(5)
spark.sql("SELECT * FROM tabla1,tabla2 WHERE tabla1.id == tabla2.id").show(5)

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         | tablatest|       true|
|         |tablatrain|       true|
+---------+----------+-----------+

+-------------------+-------------------+-------------------+
|                dos|             cuatro|               seis|
+-------------------+-------------------+-------------------+
| 1.3615427017211914|0.48453789949417114|-0.8524075746536255|
|-0.4839438199996948| 1.6685774326324463|-0.6302737593650818|
|  0.783230721950531|  1.475182294845581|0.43646135926246643|
| 0.9232335686683655|  0.519090473651886|-1.6212221384048462|
|-0.4539290964603424|  2.352872848510742|0.03150284290313721|
+-------------------+-------------------+-------------------+
only showing top 5 rows

+-----+------------+
|clase|count(clase)|
+-----+------------+
|  0.0|        5093|
|  1.0|        4907|
+-----+------------+

+-------------------+-------------------+
|                uno|                