# SparkSQL

In [1]:
from pyspark.sql import SparkSession

# Crea una sesión Spark
spark = SparkSession.builder \
    .appName("MiApp") \
    .getOrCreate()

# Ahora spark es tu SparkSession y no necesitas crear un SparkContext explícitamente.

24/05/12 08:51:16 WARN Utils: Your hostname, miguel-9051-900-0098 resolves to a loopback address: 127.0.1.1; using 192.168.1.58 instead (on interface enp6s0)
24/05/12 08:51:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/12 08:51:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Creación de DataFrames

In [2]:
l = [('a',3.14), ('b', 9.4), ('a',2.7)]
headers = ['id','value']
df = spark.createDataFrame(l, headers)
n = 3 #
df.show(n) # <-- Nos mostrara los primero n elementos del dataframe (por defecto son 20)

df.printSchema() # <-- Spark infiere un esquema de datos, que podemos verlo asi

                                                                                

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  b|  9.4|
|  a|  2.7|
+---+-----+

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



In [3]:
l = [('a',3.14), ('b',True)]
headers = ['id','value']
## df = spark.createDataFrame(l, headers) 
## Esta invocación lanzaría una excepción 
## TypeError: field value: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.BooleanType'>

si no le pasamos los parametros del header el los rellenara automaticamente con valores consecutivos _1, _2, _3

In [4]:
l = [('a',1), ('b', 2), ('a',3)]
df = spark.createDataFrame(l)
df.show(n)

df.printSchema() # <-- Spark sigue infiriendo el esquema de datos

+---+---+
| _1| _2|
+---+---+
|  a|  1|
|  b|  2|
|  a|  3|
+---+---+

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [5]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

l = [('a',3.14), ('b', 9.4), (None, 5.7)]
# VAMOS A ASIGNARUN ESQUEMA EN UN DATAFRAME, PARA ESO DEBEMOS CREAR UN OBJETO DE TIPO ** StrucType **
# Que contenga tantos objetos StructField como columnas vaya a temner nuestro DataFrame
schema = StructType([
  #           ESTE ACEPTA 3 PARAMATROS
  #           Columna, TIpo de dato, admite valores booleanos?
  StructField('id',   StringType(), True),
  StructField('value', FloatType(), False)
])
df = spark.createDataFrame(l, schema)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- value: float (nullable = false)



In [6]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

l = [('a',3.14), ('b', 9.4), ('a', True)]
schema = StructType([
  StructField('id',   StringType(), True),
  StructField('value', FloatType(), False)
])
## df = spark.createDataFrame(l, schema) ## Lanza excepción
## TypeError: field value: FloatType can not accept object True in type <class 'bool'>

# Error porque solo peude contener valoresde tipo flotante

spak context!

In [7]:
sc = spark.sparkContext

In [8]:
r = sc.parallelize([('a',3.14), ('b', 9.4), ('a', 2.7)])
df = spark.createDataFrame(r, ['id','value'])
df.show()
df.printSchema()

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  b|  9.4|
|  a|  2.7|
+---+-----+

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



##  Creacion de un DataFrame a partir de un RDD

In [9]:
# Creación de un DataFrame a partir de un RDD
import csv

raw = (
  sc.textFile("../../data/Cap7/titanic.csv") # Leemos el fichero de texto
    .map(lambda s: list(csv.reader([s]))[0]) # Dividimos el CSV en listas
    .filter(lambda l: l[0] != 'PassengerId') # Eliminamos la cabecera, entonces por defecto quedan numeros...
) # <---- ESTE ES EL RDD

df = spark.createDataFrame(raw)
df.show(3)
df.printSchema()
headers = ['PassengerId','Survived','Pclass','Name','Sex','Age','SibSp','Parch','Ticket','Fare','Cabin','Embarked']
df = spark.createDataFrame(raw, headers)
df.show(3)
df.printSchema()

+---+---+---+--------------------+------+---+---+---+----------------+-------+---+---+
| _1| _2| _3|                  _4|    _5| _6| _7| _8|              _9|    _10|_11|_12|
+---+---+---+--------------------+------+---+---+---+----------------+-------+---+---+
|  1|  0|  3|Braund, Mr. Owen ...|  male| 22|  1|  0|       A/5 21171|   7.25|   |  S|
|  2|  1|  1|Cumings, Mrs. Joh...|female| 38|  1|  0|        PC 17599|71.2833|C85|  C|
|  3|  1|  3|Heikkinen, Miss. ...|female| 26|  0|  0|STON/O2. 3101282|  7.925|   |  S|
+---+---+---+--------------------+------+---+---+---+----------------+-------+---+---+
only showing top 3 rows

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: string (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)
 |-- _11: string (nu

## Lectura y escritura de DataFrames desde ficheros

In [10]:
df = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Aunque no le pasemos parametros en json printSchema tambein peude recibr parametros como csv!
Aunque no hay problema si queda vacia, ya que sprk intentaq predecirlos

In [11]:
df = spark.read.json('../../data/Cap7/tweets.json') # aca no le pasamos parametros, y que spark por defecto los interpreta!
df.printSchema()
df.show()

root
 |-- RT_count: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- verified: boolean (nullable = true)

+--------+--------------+----------------+
|RT_count|          text|            user|
+--------+--------------+----------------+
|       2|   #Tengosueño| {3, Pepe, true}|
|      45|  #VivaElLunes|{15, Ana, false}|
|     100|¡Gol de Señor!|  {2, Eva, true}|
+--------+--------------+----------------+



In [12]:
## Lee todos los ficheros, y cada línea es una fila del DataFrame.
## El DataFrame tiene una única columna 'value' de tipo string
df = spark.read.text('../../data/Cap7/*.txt')
df.printSchema()
df.show()

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|En el anterior ap...|
|y que se crean me...|
|antes de poder co...|
|un RDD (o varios)...|
|Estos RDDs se cre...|
|o a partir de fic...|
|creará un RDD y s...|
|diferentes proces...|
|En el anterior ap...|
|y que se crean me...|
|antes de poder co...|
|un RDD (o varios)...|
|Estos RDDs se cre...|
|o a partir de fic...|
|creará un RDD y s...|
|diferentes proces...|
+--------------------+



In [13]:
## Lee todos los ficheros, y cada ficheros es una fila del DataFrame
## El DataFrame tiene una única columna 'value' de tipo string
df = spark.read.text('../../data/Cap7/*.txt', wholetext=True)
df.printSchema()
df.show()

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|En el anterior ap...|
|En el anterior ap...|
+--------------------+



## Almacenaiento de Dataframes

In [14]:
# Escritura de un DataFrame de 50 elementos en la carpeta /tmp/csv
l = [('a',3.14)] * 50
df = spark.createDataFrame(l, ['id','value'])
df.printSchema()
df.write.csv('/tmp/csv', header=True, mode='overwrite') # recordemos quese guardara al inicio de nuestro disco!

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)



si queiremos cambiar el parametro de separacion podemosa usar `sep` y para incluir las cabeceras en cada fichero utilizariamos `header=True`

Otro paraetro interesante es el modo de escritura `mode`, que nos permitira congfgurar que hacer si la carpeta ya existe. Este acepta 4 valores, `append` que agregara los datos a los ficheros ya existentes, `overwrite` los sobreescribiremos completramente, `ignore` evitaremos cualquier escritura si existen ya ficheros, y `error` lanzaremos una excepecion si la carpeta ya existia anteriormente.

El metodo spark.read nos permite pasar como parametro una carpeta, por lo cual no sera un problema quese escriban varios ficheros al tiempo

### CSV

In [15]:
# Lectura del DataFrame anterior desde la carpeta /tmp/csv
df = spark.read.csv('/tmp/csv', header=True, inferSchema=True) # todos los elmentos de esa carpeta
df.printSchema()
df.show(4)
df.count()

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
+---+-----+
only showing top 4 rows



50

### JSON

In [16]:
l = [('a',3.14)] * 50
df = spark.createDataFrame(l, ['id','value'])
df.printSchema()
print(df.rdd.getNumPartitions())
df.write.json('/tmp/json', mode='overwrite')

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)

12


In [17]:
# Lectura del DataFrame anterior desde la carpeta /tmp/json
df = spark.read.json('/tmp/json') # Aqui no debemos
df.printSchema()
df.show(4)
df.count()

root
 |-- id: string (nullable = true)
 |-- value: double (nullable = true)

+---+-----+
| id|value|
+---+-----+
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
|  a| 3.14|
+---+-----+
only showing top 4 rows



50

Crear un Dataframe de Pandas a partir de un Dataframe de spark

In [18]:
#pip install distutils
# Volcar un DataFrame a Pandas
l = [('a',3.14)] * 12
df = spark.createDataFrame(l, ['id','value'])
df_pandas = df.toPandas()

## DataFrames y MongoDB

Para poder cargar y salvar DataFrames en MongoDB es necesario lanzar ```pyspark``` configurando el conector de Mongo-Spark. Para ello es necesario pasar el siguiente parámetro a la hora de invocar a ```pyspark```:

Iiciar mongo:
```shell
sudo systemctl start mongod
```

verifica que esta activo:

```shell
sudo systemctl status mongod
```

empieza a usarlo:

```shell
mongosh
```

si acabas peude finaizar con:

```shell
sudo systemctl stop mongod
```


mongod db documentacion: [aqui](https://www.mongodb.com/docs/manual/tutorial/install-mongodb-on-ubuntu/)


```
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 --master local[*]
```
        
El nombre del conector corresponde a _coordenadas_ Maven:
 * **2.11** porque el conector utiliza la versión 2.11 de Scala
 * **2.2.2** es la versión concreta del conector. La versión 2.2.2 soporta MongoDB 2.2.x y 2.3.x
 
El parámetro ```--master``` es el usual. En este caso lanzamos ```pyspark``` en modo local y utilizando tantos procesos _workers_ como núcleos tenga nuestra CPU.

Se puede encontrar más información en:
 * https://docs.mongodb.com/spark-connector/master/
 * https://docs.mongodb.com/spark-connector/master/python-api/
 
**Supondremos que existe un servidor MongoDB ejecutándose en local (IP 127.0.0.1) y escuchando en el puerto por defecto (27017).**

In [19]:
# Volcar un DataFrame a una colección MongoDB
ciudades = spark.createDataFrame([("Madrid",  3182981), ("Barcelona", 1620809), ("Valencia",787808),
                                  ("Sevilla", 689434), ("Zaragoza", 664938), ("Málaga",569002), 
                                  ("Murcia",443243), ("Palma",406492) ], ["nombre", "habitantes"])
ciudades.show()
ciudades.printSchema()

(ciudades.write.format("com.mongodb.spark.sql.DefaultSource")
              .option("uri","mongodb://127.0.0.1/test.ciudades").save()
)
# *Importante*: La colección test.ciudades no debe existir. Si lo que queremos es añadir a una colección ya existente, debemos usar mode("append"):
### ciudades.write.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.ciudades").mode("append").save()

+---------+----------+
|   nombre|habitantes|
+---------+----------+
|   Madrid|   3182981|
|Barcelona|   1620809|
| Valencia|    787808|
|  Sevilla|    689434|
| Zaragoza|    664938|
|   Málaga|    569002|
|   Murcia|    443243|
|    Palma|    406492|
+---------+----------+

root
 |-- nombre: string (nullable = true)
 |-- habitantes: long (nullable = true)



Py4JJavaError: An error occurred while calling o298.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: com.mongodb.spark.sql.DefaultSource. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [None]:
# Cargar un DataFrame desde una colección MongoDB
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.ciudades").load()
df.show()
df.printSchema()

: 

In [None]:
# Cargar un DataFrame desde una colección MongoDB heterogénea (distintos documentos tienen atributos diferentes)
from pymongo import MongoClient #pip install pymongo

# Borramos la colección test.usuarios e insertamos dos documentos heterogéneos
client = MongoClient('127.0.0.1')
col = client['test']['usuarios']
col.drop()
col.insert_many([{'nombre':'ana', 'edad':33}, {'nombre':'pedro','altura':150}])

# Cargamos un DataFrame a partir de la colección test.usuarios
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/test.usuarios").load()
df.show()
df.printSchema()

: 

In [None]:
pipeline = "{'$match': {'habitantes': {$gt:500000}}}"
masde500mil = (spark.read.format("com.mongodb.spark.sql.DefaultSource")
                        .option("uri","mongodb://127.0.0.1/test.ciudades")
                        .option("pipeline", pipeline).load()
              )
masde500mil.show()

: 

## Inspección de DataFrames

In [None]:
# 
l = [('a',3.14)] * 12
df = spark.createDataFrame(l, ['id','value'])
df.describe().show()
type(df.printSchema())

: 

In [None]:
l = [('a',3.14), ('b',2.0), ('c',4.5)]
df = spark.createDataFrame(l, ['id','value'])
df.count()

: 

In [None]:
df = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
df.describe(['Age','Fare','Sex','Cabin']).show()

: 

## Filtrado de DataFrames

In [None]:
## Eliminación de columnas con drop()
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.columns)
df2 = titanic.drop('PassengerId','Name','Cabin')
print(df2.columns)
df3 = titanic.drop(titanic.PassengerId)
print(df3.columns)
df4 = titanic.drop(titanic['PassengerId'])
print(df4.columns)

: 

In [None]:
## Selección de columnas con select()
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.columns)
df2 = titanic.select('Survived', 'Pclass', 'Age')
print(df2.columns)

: 

In [None]:
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.count())
df = titanic.dropDuplicates()
print(df.count())
df = titanic.dropDuplicates(['Sex'])
print(df.count())

: 

In [None]:
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
print(titanic.count())
df = titanic.dropna()
print(df.count())
df = titanic.drop('Cabin').dropna()
print(df.count())

: 

In [None]:
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
df = titanic.filter( 'Survived = 1')
print(df.count())
df = titanic.filter( df.Survived == 1)
print(df.count())

: 

In [None]:
df = titanic.filter( 'Survived = 1 AND Sex = "female" AND Age > 20')
print(df.count())
df = titanic.filter( (df.Survived == 1) & (df['Sex'] == 'female') & (df.Age > 20))
print(df.count())

: 

In [None]:
# Filas cuyo camarote tiene 3 caracteres, el primero es una A o una B mayúsculas y el último un 2
# Usa una expresión regular
df = titanic.filter('Cabin RLIKE "^[AB].2$"')
print(df.count())
df = titanic.filter(df.Cabin.rlike('^[AB].2$'))
print(df.count())

: 

## Combinación de DataFrames

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df2 = spark.createDataFrame([(3,'marta'),(1,'ana')],['id','nombre'])
df = df1.union(df2)
df.show()

: 

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([(3.0,'marta'),(1.0,'ana')],['id','nombre'])
df2.printSchema()
df = df1.union(df2) ## No hay problema: long y double son compatibles => double
df.show()
df.printSchema()

: 

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([('3','marta'),('1','ana')],['id','nombre'])
df2.printSchema()
df = df1.union(df2) ## No hay problema: long y string son compatibles => string
df.show()
df.printSchema()

: 

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([(True,'marta'),(False,'ana')],['id','nombre'])
df2.printSchema()
## df = df1.union(df2) ## Excepcion
##AnalysisException: "Union can only be performed on tables with the compatible column types. boolean <> bigint at the first column of the second table

: 

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df1.printSchema()
df2 = spark.createDataFrame([(3,'marta',33),(4,'señor',44)],['id','nombre','edad'])
df2.printSchema()
## df = df1.union(df2)  ## Excepción
## AnalysisException: "Union can only be performed on tables with the same number of columns, but the first table has 2 columns and the second table has 3 columns

: 

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df2 = spark.createDataFrame([(3,'marta'),(1,'ana')],['id','nombre'])
df = df1.intersect(df2)
df.show()

: 

In [None]:
df1 = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
df2 = spark.createDataFrame([(3,'marta'),(1,'ana')],['id','nombre'])
df = df1.subtract(df2)
df.show()

: 

In [None]:
## inner join de dos DataFrames sobre la columna 'id'
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.show()
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.show()
df = users.join(age,'id')
df.show()

: 

In [None]:
## Inner join donde la columna 'id' contiene enteros y cadenas de texto
users = spark.createDataFrame([('1','ana'),('2','jose')],['id','nombre'])
users.printSchema()
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.printSchema()
df = users.join(age,'id') ## No hay problema: string y long son compatibles => string
df.printSchema()
df.show()

: 

In [None]:
## inner join considerando igualdad de varias columnas a la vez
users = spark.createDataFrame([(1,'ana','golf'),(2,'jose','polo',)],['id','nombre','deporte'])
users.show()
age = spark.createDataFrame([(1,'eva',33),(2,'jose',30)],['id','nombre','edad'])
age.show()
df = users.join(age,["id", "nombre"])
df.show()

: 

In [None]:
## inner join usando una expresión de igualdad entre 2 columnas
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.show()
age = spark.createDataFrame([(1,36),(2,30)],['ident','edad'])
age.show()
df = users.join(age,users.id == age.ident)
df.show()

: 

In [None]:
## inner join usando una expresión de igualdad entre 4 columnas
users = spark.createDataFrame([(1,'ana','golf'),(2,'jose','polo',)],['id','name','sport'])
users.show()
age = spark.createDataFrame([(1,'eva',33),(2,'jose',30)],['ident','nombre','edad'])
age.show()
df = users.join(age,(users.id == age.ident) & (users.name == age.nombre))
df.show()

: 

In [None]:
## inner join usando una expresión compleja entre 2 columnas
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.show()
age = spark.createDataFrame([(None,1,33),(2,5,30)],['id1','id2','edad'])
age.show()
cond = ((age.id1.isNotNull() & (users.id == age.id1)) | 
       (age.id1.isNull() & (users.id == age.id2)))
df = users.join(age,cond)
df.show()

: 

In [None]:
## left outer join de users y age
users = spark.createDataFrame([(1,'ana'),(2,'jose'),(3,'eva')],['id','nombre'])
users.show()
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.show()
df = users.join(age,'id','left_outer')
df.show()

: 

## Transformación de DataFrames

In [None]:
def load_titanic():
    return spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True).drop('Cabin').dropna()

: 

In [None]:
# Operaciones aritméticas entre columnas
titanic = load_titanic()
titanic.selectExpr("Survived","SibSp + Parch As Family", "Age * 12 AS Age").show(3)

: 

In [None]:
# Funciones definidas por el usuario (UDF) para transformar columnas
titanic = load_titanic()

def sex_to_num(s):
    ret = None
    if s == 'female':
        ret = 0
    elif s == 'male':
        ret = 1
    return ret

from pyspark.sql.types import IntegerType
spark.udf.register("sex_to_num", sex_to_num, IntegerType())

titanic.selectExpr("Sex", "sex_to_num(Sex) AS Sex_num").show(3)

: 

In [None]:
# Funciones definidas por el usuario (UDF) para transformar columnas
titanic = load_titanic()

spark.udf.register("max_int", max, IntegerType())
    
titanic.selectExpr("SibSp", "Parch", "max_int(SibSp, Parch) AS Max_Family").show(3)

: 

In [None]:
# Funciones definidas por el usuario (UDF) para transformar columnas
titanic = load_titanic()

def scale(n,minv,maxv):
    return (n - minv) / (maxv - minv)

# Se puede obtener los valores minimos y maximos de cada columna a través del DataFrame generado por describe()
summary = titanic.describe().toPandas()
min_age = float(summary.loc[3,'Age'])
max_age = float(summary.loc[4,'Age'])

from pyspark.sql.types import DoubleType
spark.udf.register("scale_Age", lambda x: scale(x, min_age, max_age), DoubleType())

titanic.selectExpr("Age", "scale_Age(Age) AS Scaled_Age").show(3)

: 

In [None]:
# Transformación de columnas usando expresiones entre columnas
titanic = load_titanic()
titanic.select(titanic.Survived,(titanic.SibSp + titanic.Parch).alias("Family"), (titanic.Age * 12).alias("Age")).show(3)

: 

In [None]:
# Transformaciones con UDFs usando expresiones entre columnas
from pyspark.sql.functions import udf
titanic = load_titanic()

sex_to_num_UDF = udf(sex_to_num, IntegerType())

max_int_UDF = udf(max, IntegerType())

summary = titanic.describe().toPandas()
min_age = float(summary.loc[3,'Age'])
max_age = float(summary.loc[4,'Age'])
scale_Age_UDF = udf(lambda x : scale(x, min_age, max_age), DoubleType())
                   
titanic.select(scale_Age_UDF(titanic.Age).alias("Scaled_Age"), 
               sex_to_num_UDF(titanic.Sex).alias("Sex_Num"),
               max_int_UDF(titanic.SibSp,titanic.Parch).alias("Max_Family")).show(3)

: 

In [None]:
# Una única agregación
titanic = load_titanic()

titanic.groupBy().count().show()
titanic.groupBy().sum('Survived').show()
titanic.groupBy('Pclass').sum('Survived').show()
titanic.groupBy('Pclass','Embarked').sum('Survived').show()

: 

In [None]:
# Varias agregaciones del mismo tipo
titanic = load_titanic()

titanic.groupBy('Pclass').sum('Survived', 'Fare').show()

: 

In [None]:
# Varias funciones de agregación diferentes a la vez
titanic = load_titanic()

# Usando diccionarios
titanic.groupBy('Pclass').agg({'*':'count', 'Survived':'sum'}).show()

# Usando una secuencia de funciones de la biblioteca pyspark.sql.functions
from pyspark.sql import functions
titanic.groupBy('Pclass').agg(functions.count('*').alias('Total'), functions.sum('Survived').alias('Survivors')).show()


: 

## SQL sobre DataFrames

In [None]:
# Ejecución de código SQL que combina DataFrames
users = spark.createDataFrame([(1,'ana'),(2,'jose')],['id','nombre'])
users.createOrReplaceTempView("users")
age = spark.createDataFrame([(1,36),(2,30)],['id','edad'])
age.createOrReplaceTempView("age")

spark.sql("""SELECT *
             FROM users INNER JOIN age ON users.id == age.id""").show()

: 

In [None]:
titanic = load_titanic()
titanic.createOrReplaceTempView("titanic")
spark.sql("""SELECT Survived, SibSp+Parch AS Family, sex_to_num(Sex) AS Sex_Num
             FROM titanic 
             WHERE Age > 50
             """).show(3)

: 

In [None]:
# Ejecución de código SQL con agregación y ordenación
titanic = load_titanic()
titanic.createOrReplaceTempView("titanic")
spark.sql("""SELECT Pclass, 
                    COUNT(*) AS Total, 
                    SUM(Survived) AS Survivors 
             FROM titanic 
             GROUP BY Pclass
             ORDER BY Pclass ASC
             """).show()

: 

# SparkML

## Clasificación con LinearSVC usando un Pipeline

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline

# Partimos en train (80%) + test (20%)
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

train, test = titanic.randomSplit([0.8, 0.2])

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')

ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked_OHE'], outputCol='features_raw')
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
clf = LinearSVC(featuresCol='features', labelCol='Survived')

pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, clf])


# Entrenamos el pipeline y lo usamos para clasificar 'test'
model = pipeline.fit(train)
prediction = model.transform(test)
results = prediction.select('prediction', 'Survived')
results.show(5)


# Evaluación de la clasificación usando la clase experimental MulticlassClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
claseval = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Survived', metricName='accuracy')
print('Score:', claseval.evaluate(prediction))


# Evaluación de la regresión utilizando mllib (obsolescente)
from pyspark.mllib.evaluation import MulticlassMetrics

rdd = results.rdd.map(lambda row: (row[0], float(row[1]))) # Es necesario representar la clase Survived como float
                                                           # o MulticlassMetrics fallará
metrics = MulticlassMetrics(rdd)
print("Score:", metrics.accuracy)

: 

## Regresión con LinearRegression usando un Pipeline

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Partimos en train (80%) + test (20%)
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

train, test = titanic.randomSplit([0.8, 0.2])

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')

ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Survived', 'Embarked_OHE'], outputCol='features_raw')
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
reg = LinearRegression(featuresCol='features', labelCol='Fare')

pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, reg])


# Entrenamos el pipeline y lo usamos para inferir tarifas a partir de las instancias en 'test'
model = pipeline.fit(train)
prediction = model.transform(test)
results = prediction.select('Prediction', 'Fare')
results.show(5)

# Evaluación utilizando la clase experimental RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

maeeval = RegressionEvaluator(predictionCol='Prediction', labelCol='Fare', metricName='mae')
print("MAE :", maeeval.evaluate(results))
mseeval = RegressionEvaluator(predictionCol='Prediction', labelCol='Fare', metricName='mse')
print("MSE :", mseeval.evaluate(results))
rmseeval = RegressionEvaluator(predictionCol='Prediction', labelCol='Fare', metricName='rmse')
print("RMSE:", rmseeval.evaluate(results),'\n')

# Evaluación de la regresión utilizando mllib (obsolescente)
from pyspark.mllib.evaluation import RegressionMetrics

rdd = results.rdd
metrics = RegressionMetrics(rdd)
print("MAE :", metrics.meanAbsoluteError)
print("MSE :", metrics.meanSquaredError)
print("RMSE:", metrics.rootMeanSquaredError)

: 

## Análisis de grupos con k-means usando un Pipeline

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator

# Descartamos las columnas que no nos interesan y eliminamos filas con valores vacíos
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')
# One-hot-encoding de la columna Embarked
ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
# Combina todas las columnas en un único vector
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Survived', 'Embarked_OHE','Fare'], outputCol='features_raw')
# Normaliza los valores de cada posición del vector
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
# Realiza análisis de grupos
clu = KMeans(k=3) # valores por defecto: atributos en 'features' y centroide en 'prediction'


pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, clu])


# Entrenamos el pipeline
model = pipeline.fit(titanic)
prediction = model.transform(titanic)
prediction.select('prediction').show(5)

# Obtenemos los centroides desde el último modelo del pipiline
print('Centroides:')
print(type(model.stages[-1].clusterCenters()[0]))
for c in model.stages[-1].clusterCenters():
    print(c)

# Evaluación utilizando la clase experimental ClusteringEvaluator para obtener el coeficiente de silueta
evaluator = ClusteringEvaluator()
print('Silhouette Coefficient:', evaluator.evaluate(prediction))

# No existen métodos de evaluación de clústers en mllib

: 

## Persistencia de modelos en Pipelines

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Partimos en train (80%) + test (20%)
titanic = spark.read.csv('../../data/Cap7/titanic.csv', header=True, inferSchema=True)
titanic = titanic.drop('PassengerId', 'Name', 'Ticket','Cabin')
titanic = titanic.dropna()

train, test = titanic.randomSplit([0.8, 0.2])

# Transforma Sex y Age a valores numéricos (orden alfabético)
indexerSex = StringIndexer(inputCol='Sex', outputCol='Sex_num', stringOrderType='alphabetAsc')
indexerEmbarked = StringIndexer(inputCol='Embarked', outputCol='Embarked_num', stringOrderType='alphabetAsc')

ohe = OneHotEncoderEstimator(inputCols=['Embarked_num'],outputCols=['Embarked_OHE'])
vec = VectorAssembler(inputCols=['Pclass', 'Sex_num', 'Age', 'SibSp', 'Parch', 'Survived', 'Embarked_OHE'], outputCol='features_raw')
sca = MinMaxScaler(inputCol='features_raw', outputCol='features')
reg = LinearRegression(featuresCol='features', labelCol='Fare')

pipeline = Pipeline(stages=[indexerSex, indexerEmbarked, ohe, vec, sca, reg])

# Entrenamos el pipeline y lo usamos para inferir tarifas a partir de las instancias en 'test'
model = pipeline.fit(train)
prediction = model.transform(test)
results = prediction.select('Prediction', 'Fare')
results.show(5)

# Volcado del modelo
model.save('../../data/Cap7/regression_model') # Equivalente a write().save(path)
#model.write().overwrite().save('../../data/Cap7/regression_model')

: 

In [None]:
from pyspark.ml import PipelineModel

# Carga del modelo
loaded_model = PipelineModel.load('../../data/Cap7/regression_model')

prediction = loaded_model.transform(test)
results = prediction.select('Prediction', 'Fare')
results.show(5)

: 