# SparkSQL y Spark DataFrames

`SparkSQL` además de permitirnos interactuar usando `SQL` (en realidad `HQL` _Hive query language_, ver documentación [aquí](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)), agrega una capa de abstracción al `RDD` y lo convierte en un `DataFrame` análogo al usado en `R` y `Python`.

Se recomienda tomar como referencia el siguiente link : http://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html

Como siempre, empezamos obtniendo el `SparkContext`

In [3]:
import pyspark
sc = pyspark.SparkContext('local[*]')

Pero además, obtendremos el `SQLContext` en esta ocasión

In [4]:
sqlContext = pyspark.SQLContext(sc)

## Creando un DataFrame

Ahora leeremos el archivo de transacciones creado en el laboratorio anterior

In [5]:
! ls -lh output/raw/transacciones

"ls" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.


**NOTA**: Si marca error la última instrucción, regresa al laboratorio anterior y crea de nuevo el archivo.

In [6]:
txs_rdd = sc.textFile("output/raw/transacciones")

In [7]:
txs_rdd.first()

'fc746fc2-3160-4f83-b53e-fe020a8c6f9a|RESTAURANTE EL TRABAJO|COMPRA|9274'

In [8]:
type(txs_rdd)

pyspark.rdd.RDD

Para poder obtener un `DataFrame` hay que extraer cada línea del `RDD` a un objeto `Row`

In [8]:
from pyspark.sql import Row

In [9]:
Transaccion = Row('tdc', 'comercio', 'accion', 'monto')

In [10]:
def getTransaccion(linea):
    cells = linea.split('|')
    cells[3] = int(cells[3])
    return Transaccion(*cells)


In [11]:
txs = txs_rdd.map(getTransaccion)

In [12]:
txs.first()

Row(tdc='fc746fc2-3160-4f83-b53e-fe020a8c6f9a', comercio='RESTAURANTE EL TRABAJO', accion='COMPRA', monto=9274)

In [13]:
txs_df = txs.toDF()
#para obtener un dataframe de spark

In [14]:
txs_df

DataFrame[tdc: string, comercio: string, accion: string, monto: bigint]

In [20]:
txs

PythonRDD[9] at RDD at PythonRDD.scala:53

In [21]:
txs_df.printSchema()

root
 |-- tdc: string (nullable = true)
 |-- comercio: string (nullable = true)
 |-- accion: string (nullable = true)
 |-- monto: long (nullable = true)



In [22]:
txs_df.show()

+--------------------+--------------------+------+-----+
|                 tdc|            comercio|accion|monto|
+--------------------+--------------------+------+-----+
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|COMPRA| 9274|
|e434d65e-cb57-43e...|RESTAURANTE EL TR...|COMPRA|  702|
|e434d65e-cb57-43e...|       ARENA COLISEO|COMPRA| 3070|
|8ef61de7-b609-4e6...|           SUPERCITO|COMPRA| 2128|
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|RETIRO| 4397|
|8ef61de7-b609-4e6...|RESTAURANTE EL TR...|RETIRO| 9042|
|cb679bdc-e5c4-476...|       ARENA COLISEO|RETIRO| 4549|
|8ef61de7-b609-4e6...|       ARENA COLISEO|RETIRO|  466|
|8ef61de7-b609-4e6...|           SUPERCITO|COMPRA| 8565|
|cb679bdc-e5c4-476...|       ARENA COLISEO|COMPRA| 1015|
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|COMPRA| 4119|
|e434d65e-cb57-43e...|       ARENA COLISEO|RETIRO| 8176|
|d03e74dd-0666-441...|RESTAURANTE EL TR...|RETIRO| 6224|
|8ef61de7-b609-4e6...|       ARENA COLISEO|COMPRA| 2085|
|8ef61de7-b609-4e6...|       AR

## Registrando una tabla

Al tener un `DataFrame` es posible usar la `API` o usar `SQL`. Para mostrarlo, primero registremos la tabla

In [23]:
txs_df.registerTempTable('txs')

In [24]:
sqlContext.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |      txs|       true|
+--------+---------+-----------+



In [26]:
sqlContext.sql("select * from txs limit 5").show()

+--------------------+--------------------+------+-----+
|                 tdc|            comercio|accion|monto|
+--------------------+--------------------+------+-----+
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|COMPRA| 9274|
|e434d65e-cb57-43e...|RESTAURANTE EL TR...|COMPRA|  702|
|e434d65e-cb57-43e...|       ARENA COLISEO|COMPRA| 3070|
|8ef61de7-b609-4e6...|           SUPERCITO|COMPRA| 2128|
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|RETIRO| 4397|
+--------------------+--------------------+------+-----+



Ahora que `txs` ya tiene esquema, es buena idea guardarlo

In [27]:
sqlContext.sql("select * from txs")\
          .coalesce(1)\
          .write.format("parquet")\
          .save("output/parquet/transacciones", mode="OVERWRITE")

In [28]:
sqlContext.sql("select * from txs")\
          .coalesce(1)\
          .write.format("json")\
          .save("output/json/transacciones", mode="OVERWRITE")

 ## Usando la API

Al igual que los `RDD`s, los `DataFrames` pueden ser operados mediante _transformaciones_ y _acciones_. Las **Transformaciones** son _lazy_ pero contribuyen a la planeación de la ejecución del _query_, las **Acciones** provocan la ejecución del _query_

### Transformaciones

- `filter`
- `select`
- `drop`
- `join`

### Acciones

- `count`
- `collect`
- `show`
- `head`
- `take`

In [30]:
txs_df.select(txs_df['tdc'], txs_df['accion'], txs_df['monto'], txs_df['monto'] >= 5000).show(5)

+--------------------+------+-----+---------------+
|                 tdc|accion|monto|(monto >= 5000)|
+--------------------+------+-----+---------------+
|fc746fc2-3160-4f8...|COMPRA| 9274|           true|
|e434d65e-cb57-43e...|COMPRA|  702|          false|
|e434d65e-cb57-43e...|COMPRA| 3070|          false|
|8ef61de7-b609-4e6...|COMPRA| 2128|          false|
|fc746fc2-3160-4f8...|RETIRO| 4397|          false|
+--------------------+------+-----+---------------+
only showing top 5 rows



In [31]:
sqlContext.sql("select tdc, accion, monto, monto >= 5000 from txs").show(5)

+--------------------+------+-----+-------------------------------+
|                 tdc|accion|monto|(monto >= CAST(5000 AS BIGINT))|
+--------------------+------+-----+-------------------------------+
|fc746fc2-3160-4f8...|COMPRA| 9274|                           true|
|e434d65e-cb57-43e...|COMPRA|  702|                          false|
|e434d65e-cb57-43e...|COMPRA| 3070|                          false|
|8ef61de7-b609-4e6...|COMPRA| 2128|                          false|
|fc746fc2-3160-4f8...|RETIRO| 4397|                          false|
+--------------------+------+-----+-------------------------------+
only showing top 5 rows



In [32]:
txs_df.filter((txs_df["monto"] >= 5000) & (txs_df["accion"] == "RETIRO"))\
      .select(txs_df["tdc"], txs_df["comercio"], txs_df["monto"])\
      .orderBy(txs_df["monto"].desc())\
      .show(5)

+--------------------+--------------------+-----+
|                 tdc|            comercio|monto|
+--------------------+--------------------+-----+
|cb679bdc-e5c4-476...|       ARENA COLISEO| 9996|
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...| 9988|
|d03e74dd-0666-441...|RESTAURANTE EL TR...| 9986|
|d03e74dd-0666-441...|RESTAURANTE EL TR...| 9986|
|cb679bdc-e5c4-476...|RESTAURANTE EL TR...| 9984|
+--------------------+--------------------+-----+
only showing top 5 rows



**NOTA**: `where()` es un _alias_ para `filter()`

In [33]:
txs_df.where((txs_df["monto"] >= 5000) & (txs_df["accion"] == "RETIRO"))\
      .select(txs_df["tdc"], txs_df["comercio"], txs_df["monto"])\
      .show(5)

+--------------------+--------------------+-----+
|                 tdc|            comercio|monto|
+--------------------+--------------------+-----+
|8ef61de7-b609-4e6...|RESTAURANTE EL TR...| 9042|
|e434d65e-cb57-43e...|       ARENA COLISEO| 8176|
|d03e74dd-0666-441...|RESTAURANTE EL TR...| 6224|
|e434d65e-cb57-43e...|           SUPERCITO| 9743|
|cb679bdc-e5c4-476...|       ARENA COLISEO| 7390|
+--------------------+--------------------+-----+
only showing top 5 rows



También es posible usar `strings` para el condicional

In [34]:
txs_df.filter("monto >= 5000 and accion = 'RETIRO'")\
      .select(txs_df["tdc"], txs_df["comercio"], txs_df["monto"])\
      .orderBy(txs_df["monto"].asc())\
      .show(5)

+--------------------+--------------------+-----+
|                 tdc|            comercio|monto|
+--------------------+--------------------+-----+
|cb679bdc-e5c4-476...|       ARENA COLISEO| 5002|
|fc746fc2-3160-4f8...|           SUPERCITO| 5002|
|8ef61de7-b609-4e6...|           SUPERCITO| 5004|
|e434d65e-cb57-43e...|RESTAURANTE EL TR...| 5005|
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...| 5005|
+--------------------+--------------------+-----+
only showing top 5 rows



In [35]:
txs_df.groupBy("tdc").count().show(5)

+--------------------+-----+
|                 tdc|count|
+--------------------+-----+
|cb679bdc-e5c4-476...| 1922|
|e434d65e-cb57-43e...| 2000|
|fc746fc2-3160-4f8...| 1987|
|8ef61de7-b609-4e6...| 2037|
|d03e74dd-0666-441...| 2054|
+--------------------+-----+



### Agregaciones

In [36]:
from pyspark.sql.functions import sum, avg, max, min, round, count, col


In [37]:
# otra manera de escribir el count usando groupBy y agg
txs_df.groupBy("tdc").agg(count('tdc').alias('cant_tx')).show(5)

+--------------------+-------+
|                 tdc|cant_tx|
+--------------------+-------+
|cb679bdc-e5c4-476...|   1922|
|e434d65e-cb57-43e...|   2000|
|fc746fc2-3160-4f8...|   1987|
|8ef61de7-b609-4e6...|   2037|
|d03e74dd-0666-441...|   2054|
+--------------------+-------+



In [38]:
# Para cada comercio obtener el monto promedio de transacción (no importa si es RETIRO o COMPRA)
txs_df.groupBy('comercio').agg(avg('monto').alias('monto_promedio')).show()

+--------------------+-----------------+
|            comercio|   monto_promedio|
+--------------------+-----------------+
|           SUPERCITO|4951.982918789332|
|RESTAURANTE EL TR...|4986.653122198984|
|       ARENA COLISEO|5074.468335343788|
+--------------------+-----------------+



### Ejercicio
Para cada comercio obtener y mostrar los máximos montos por acción.

In [39]:
txs_df.groupby('comercio','accion').agg(max('monto')).show()

+--------------------+------+----------+
|            comercio|accion|max(monto)|
+--------------------+------+----------+
|           SUPERCITO|COMPRA|      9999|
|           SUPERCITO|RETIRO|      9983|
|       ARENA COLISEO|RETIRO|      9996|
|RESTAURANTE EL TR...|COMPRA|      9986|
|RESTAURANTE EL TR...|RETIRO|      9988|
|       ARENA COLISEO|COMPRA|      9984|
+--------------------+------+----------+



### Ejercicio
Obtener y mostrar los 10 clientes con mas transacciones por comercio ordenados de manera descendente

In [40]:
df1 = txs_df.groupBy('tdc','comercio').agg(count('tdc').alias('cantidad_tx'))

In [41]:
df1.orderBy(df1['cantidad_tx'].desc()).show(10,truncate=False)

+------------------------------------+----------------------+-----------+
|tdc                                 |comercio              |cantidad_tx|
+------------------------------------+----------------------+-----------+
|e434d65e-cb57-43ef-a16e-b3a6ee226144|RESTAURANTE EL TRABAJO|710        |
|d03e74dd-0666-4418-bec6-df04651842ba|ARENA COLISEO         |702        |
|d03e74dd-0666-4418-bec6-df04651842ba|SUPERCITO             |694        |
|8ef61de7-b609-4e65-9e0b-fd00eb8251a8|SUPERCITO             |688        |
|8ef61de7-b609-4e65-9e0b-fd00eb8251a8|ARENA COLISEO         |676        |
|8ef61de7-b609-4e65-9e0b-fd00eb8251a8|RESTAURANTE EL TRABAJO|673        |
|fc746fc2-3160-4f83-b53e-fe020a8c6f9a|SUPERCITO             |668        |
|fc746fc2-3160-4f83-b53e-fe020a8c6f9a|ARENA COLISEO         |661        |
|d03e74dd-0666-4418-bec6-df04651842ba|RESTAURANTE EL TRABAJO|658        |
|fc746fc2-3160-4f83-b53e-fe020a8c6f9a|RESTAURANTE EL TRABAJO|658        |
+------------------------------------+

### User Defined Functions: UDF

Supongamos que queremos mostrar el  `tdc` en mayúsculas, para tal menester es necesario definir una `UDF`

In [42]:
from pyspark.sql.functions import udf
to_upper = udf(lambda s: s.upper())

In [43]:
txs_df.select(to_upper(txs_df["tdc"]).alias("Mayúsculas"), txs_df["tdc"])\
      .distinct()\
      .show(100) 

+--------------------+--------------------+
|          Mayúsculas|                 tdc|
+--------------------+--------------------+
|8EF61DE7-B609-4E6...|8ef61de7-b609-4e6...|
|CB679BDC-E5C4-476...|cb679bdc-e5c4-476...|
|D03E74DD-0666-441...|d03e74dd-0666-441...|
|FC746FC2-3160-4F8...|fc746fc2-3160-4f8...|
|E434D65E-CB57-43E...|e434d65e-cb57-43e...|
+--------------------+--------------------+



### Ejercicio
Usando una udf, generar un nuevo dataframe (txs_df2) con una nueva columna (prefijo_tdc) que tenga el primer segmento de tdc y mostrarlo

In [44]:
cortar = lambda s: s.split('-')[0] #nos quedamos con el primer elemento

In [45]:
pref_tdc = udf(lambda s: s.split('-')[0])

In [46]:
txs_df2 = txs_df.select ('*',pref_tdc(txs_df['tdc']).alias('prefijo_tdc'))

In [47]:
txs_df2.show(10)

+--------------------+--------------------+------+-----+-----------+
|                 tdc|            comercio|accion|monto|prefijo_tdc|
+--------------------+--------------------+------+-----+-----------+
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|COMPRA| 9274|   fc746fc2|
|e434d65e-cb57-43e...|RESTAURANTE EL TR...|COMPRA|  702|   e434d65e|
|e434d65e-cb57-43e...|       ARENA COLISEO|COMPRA| 3070|   e434d65e|
|8ef61de7-b609-4e6...|           SUPERCITO|COMPRA| 2128|   8ef61de7|
|fc746fc2-3160-4f8...|RESTAURANTE EL TR...|RETIRO| 4397|   fc746fc2|
|8ef61de7-b609-4e6...|RESTAURANTE EL TR...|RETIRO| 9042|   8ef61de7|
|cb679bdc-e5c4-476...|       ARENA COLISEO|RETIRO| 4549|   cb679bdc|
|8ef61de7-b609-4e6...|       ARENA COLISEO|RETIRO|  466|   8ef61de7|
|8ef61de7-b609-4e6...|           SUPERCITO|COMPRA| 8565|   8ef61de7|
|cb679bdc-e5c4-476...|       ARENA COLISEO|COMPRA| 1015|   cb679bdc|
+--------------------+--------------------+------+-----+-----------+
only showing top 10 rows



Imaginemos que sólo estamos interesados en aquellas transacciones que fueron `RETIRO`s en el `SUPERCITO` por montos mayores a `9000` ya que resultan sospechosos.

In [48]:
txs_sospechosas = txs_df.filter("monto >= 9000 and accion = 'RETIRO' and comercio = 'SUPERCITO'")\
      .select(txs_df["tdc"],  txs_df["monto"])\
      .orderBy(txs_df["monto"].desc())

In [51]:
txs_sospechosas.count()

148

Guardemos la información para un posterior análisis

In [52]:
txs_sospechosas.coalesce(1)\
               .write.format("json")\
               .save("output/json/transacciones_sospechosas", mode="OVERWRITE")

In [53]:
txs_sospechosas.coalesce(1)\
               .write.format("parquet")\
               .save("output/parquet/transacciones_sospechosas", mode="OVERWRITE")

In [None]:
! ls -lh output/json/transacciones_sospechosas

## DataFrame desde JSON

Para esta sección usaremos los datos de proyectos soportados por el **Banco Mundial**

In [None]:
! rm -R data/world_bank*

In [None]:
! wget https://github.com/bradenrc/sparksql_pot/raw/master/world_bank.zip -P data/

In [None]:
! unzip data/world_bank.zip -d data/world_bank

In [None]:
! rm data/world_bank.zip

Creamos el `DataFrame`

In [54]:
world_bank = sqlContext.read.json("world_bank.json")

Automáticamente detecta el _esquema_ de la fuente de datos

In [55]:
world_bank.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- approvalfy: string (nullable = true)
 |-- board_approval_month: string (nullable = true)
 |-- boardapprovaldate: timestamp (nullable = true)
 |-- borrower: string (nullable = true)
 |-- closingdate: timestamp (nullable = true)
 |-- country_namecode: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- countryshortname: string (nullable = true)
 |-- docty: string (nullable = true)
 |-- envassesmentcategorycode: string (nullable = true)
 |-- grantamt: long (nullable = true)
 |-- ibrdcommamt: long (nullable = true)
 |-- id: string (nullable = true)
 |-- idacommamt: long (nullable = true)
 |-- impagency: string (nullable = true)
 |-- lendinginstr: string (nullable = true)
 |-- lendinginstrtype: string (nullable = true)
 |-- lendprojectcost: long (nullable = true)
 |-- majorsector_percent: array (nullable = true)
 |    |-- element: struct (cont

La estructura es anidada (en lugar de bidimensional), pero observa que `Spark` no tuvo ningún problema.

In [56]:
world_bank.registerTempTable("world_bank_projects")

In [57]:
sqlContext.sql('show tables').show()

+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
|        |                txs|       true|
|        |world_bank_projects|       true|
+--------+-------------------+-----------+



In [58]:
sqlContext.sql('select countryshortname, project_name, totalamt, totalcommamt from world_bank_projects order by countryshortname').show()

+----------------+--------------------+---------+------------+
|countryshortname|        project_name| totalamt|totalcommamt|
+----------------+--------------------+---------+------------+
|     Afghanistan|Afghanistan: Nati...|        0|   100000000|
|     Afghanistan|Afghanistan Agric...|        0|    74730000|
|     Afghanistan|Afghanistan: Safe...| 12500000|    12500000|
|     Afghanistan|Afghanistan - Sec...| 55000000|    55000000|
|     Afghanistan|AF: Development P...| 50000000|    50000000|
|     Afghanistan|Afghanistan: Syst...|100000000|   100000000|
|          Africa|Additional Financ...| 60000000|    60000000|
|          Africa|West Africa  Regi...|        0|    10000000|
|          Africa|West Africa Regio...| 60000000|    60000000|
|          Africa|Southern Africa T...|213000000|   213000000|
|          Africa|Nile Cooperation ...|        0|    15300000|
|          Africa|RCIP4 - Regional ...| 22000000|    22000000|
|          Africa|Regional Rusumo F...|339900000|   339

In [59]:
projects_by_country = sqlContext.sql('select countryshortname as country, count(project_name) as num_projects, sum(totalamt) as total_amount from world_bank_projects group by countryshortname order by total_amount desc')
projects_by_country.show()

+--------------------+------------+------------+
|             country|num_projects|total_amount|
+--------------------+------------+------------+
|               India|          16|  2595700000|
|              Brazil|           9|  2326200000|
|           Indonesia|          19|  2045500000|
|             Vietnam|          17|  1832400000|
|          Bangladesh|          12|  1566500000|
|               China|          19|  1540000000|
|              Poland|           1|  1307800000|
|              Turkey|           4|  1301000000|
|            Ethiopia|           4|  1245000000|
|             Nigeria|           7|  1220000000|
|              Africa|          11|   987900000|
|            Colombia|           3|   950000000|
|               Kenya|           6|   875000000|
|             Morocco|          12|   793200000|
|            Pakistan|           9|   744000000|
|             Myanmar|           3|   660000000|
|          Mozambique|          11|   657000000|
|            Tanzani

In [60]:
projects_by_country.printSchema()

root
 |-- country: string (nullable = true)
 |-- num_projects: long (nullable = false)
 |-- total_amount: long (nullable = true)



### Ejercicio
A partir del dataframe projects_by_country encontrar el monto promedio por proyecto para Uruguay y Bolivia

In [61]:
projects_by_country.filter((projects_by_country['country'] == 'Uruguay') | (projects_by_country['country'] == 'Bolivia')).select('country', (col('total_amount') / col('num_projects')).alias('promedio_por_pais')).show()

+-------+-----------------+
|country|promedio_por_pais|
+-------+-----------------+
|Uruguay|           1.22E8|
|Bolivia|            3.7E7|
+-------+-----------------+



### Ejercicio
A partir del dataframe projects_by_country encontrar los 5 paises con mayores montos promedio

In [62]:
df = projects_by_country

In [67]:
df1 = df.select('country', (col('total_amount') / col('num_projects')).alias('promedio_por_pais'))

In [68]:
df1.orderBy(df1['promedio_por_pais'].desc()).show(5,truncate=False)

+-----------------------+-------------------+
|country                |promedio_por_pais  |
+-----------------------+-------------------+
|Poland                 |1.3078E9           |
|Turkey                 |3.2525E8           |
|Colombia               |3.166666666666667E8|
|Ethiopia               |3.1125E8           |
|Egypt, Arab Republic of|2.927E8            |
+-----------------------+-------------------+
only showing top 5 rows



## SparkSQL y Pandas

Es posible usar `Pandas` para hacer análisis, pero hay que tomar en cuenta que esto manda _todo_ el `dataset` a un sólo nodo.

In [89]:
import pandas as pd

In [137]:
projects_by_country_pd = projects_by_country.toPandas()

In [138]:
projects_by_country_pd.columns

Index(['country', 'num_projects', 'total_amount'], dtype='object')

In [139]:
projects_by_country_pd=projects_by_country_pd.set_index(['country'])

In [140]:
projects_by_country_pd.num_projects

country
India                 16
Brazil                 9
Indonesia             19
Vietnam               17
Bangladesh            12
                      ..
Mongolia               2
Namibia                1
Jamaica                2
West Bank and Gaza     6
Vanuatu                3
Name: num_projects, Length: 118, dtype: int64

In [141]:
projects_by_country_pd.head()

Unnamed: 0_level_0,num_projects,total_amount
country,Unnamed: 1_level_1,Unnamed: 2_level_1
India,16,2595700000
Brazil,9,2326200000
Indonesia,19,2045500000
Vietnam,17,1832400000
Bangladesh,12,1566500000


In [142]:
projects_by_country_pd.loc['Peru']

num_projects            6
total_amount    125000000
Name: Peru, dtype: int64

In [143]:
projects_by_country_pd['num_projects'][:10].plot(kind='barh', rot=0, )

<matplotlib.axes._subplots.AxesSubplot at 0x21ea20acac8>