# SparkSQL y Spark DataFrames

`SparkSQL` además de permitirnos interactuar usando `SQL` (en realidad `HQL` _Hive query language_, ver documentación [aquí](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)), agrega una capa de abstracción al `RDD` y lo convierte en un `DataFrame` análogo al usado en `R` y `Python`.

Como siempre, empezamos obtniendo el `SparkContext`

In [None]:
import pyspark
sc = pyspark.SparkContext('local[*]')

Pero además, obtendremos el `SQLContext` en esta ocasión

In [None]:
sqlContext = pyspark.SQLContext(sc)

## Creando un DataFrame

Ahora leeremos el archivo de transacciones creado en el laboratorio anterior

In [None]:
! ls -lh output/raw/transacciones

**NOTA**: Si marca error la última instrucción, regresa al laboratorio anterior y crea de nuevo el archivo.

In [None]:
txs_rdd = sc.textFile("output/raw/transacciones")

In [None]:
txs_rdd.first()

Para poder obtener un `DataFrame` hay que extraer cada línea del `RDD` a un objeto `Row`

In [None]:
from pyspark.sql import Row

In [None]:
Transaccion = Row('tdc', 'comercio', 'accion', 'monto')

In [None]:
def getTransaccion(linea):
    cells = linea.split('|')
    cells[3] = int(cells[3])
    return Transaccion(*cells)

In [None]:
txs = txs_rdd.map(getTransaccion)

In [None]:
txs.first()

In [None]:
txs_df = txs.toDF()

In [None]:
txs_df

In [None]:
txs

In [None]:
txs_df.printSchema()

In [None]:
txs_df.show()

## Registrando una tabla

Al tener un `DataFrame` es posible usar la `API` o usar `SQL`. Para mostrarlo, primero registremos la tabla

In [None]:
txs_df.registerTempTable('txs')

In [None]:
sqlContext.sql("show tables").show()

In [None]:
sqlContext.sql("select * from txs limit 5").show()

Ahora que `txs` ya tiene esquema, es buena idea guardarlo

In [None]:
sqlContext.sql("select * from txs")\
          .coalesce(1)\
          .write.format("parquet")\
          .save("output/parquet/transacciones", mode="OVERWRITE")

In [None]:
sqlContext.sql("select * from txs")\
          .coalesce(1)\
          .write.format("json")\
          .save("output/json/transacciones", mode="OVERWRITE")

 ## Usando la API

Al igual que los `RDD`s, los `DataFrames` pueden ser operados mediante _transformaciones_ y _acciones_. Las **Transformaciones** son _lazy_ pero contribuyen a la planeación de la ejecución del _query_, las **Acciones** provocan la ejecución del _query_

### Transformaciones

- `filter`
- `select`
- `drop`
- `join`

### Acciones

- `count`
- `collect`
- `show`
- `head`
- `take`

In [None]:
txs_df.select(txs_df['tdc'], txs_df['accion'], txs_df['monto'], txs_df['monto'] >= 5000).show(5)

In [None]:
sqlContext.sql("select tdc, accion, monto, monto >= 5000 from txs").show(5)

In [None]:
txs_df.filter((txs_df["monto"] >= 5000) & (txs_df["accion"] == "RETIRO"))\
      .select(txs_df["tdc"], txs_df["comercio"], txs_df["monto"])\
      .orderBy(txs_df["monto"].desc())\
      .show(5)

**NOTA**: `where()` es un _alias_ para `filter()`

In [None]:
txs_df.where((txs_df["monto"] >= 5000) & (txs_df["accion"] == "RETIRO"))\
      .select(txs_df["tdc"], txs_df["comercio"], txs_df["monto"])\
      .show(5)

También es posible usar `strings` para el condicional

In [None]:
txs_df.filter("monto >= 5000 and accion = 'RETIRO'")\
      .select(txs_df["tdc"], txs_df["comercio"], txs_df["monto"])\
      .orderBy(txs_df["monto"].asc())\
      .show(5)

In [None]:
txs_df.groupBy("tdc").count().show(5)

In [None]:
txs_df.groupBy("tdc").count().show(5)

### User Defined Functions: UDF

Supongamos que queremos mostrar el  `tdc` en mayúsculas, para tal menester es necesario definir una `UDF`

In [None]:
from pyspark.sql.functions import udf
to_upper = udf(lambda s: s.upper())


In [None]:
txs_df.select(to_upper(txs_df["tdc"]).alias("Mayúsculas"), txs_df["tdc"])\
      .distinct()\
      .show(100)

Imaginemos que sólo estamos interesados en aquellas transacciones que fueron `RETIRO`s en el `SUPERCITO` por montos mayores a `9000` ya que resultan sospechosos.

In [None]:
txs_sospechosas = txs_df.filter("monto >= 9000 and accion = 'RETIRO' and comercio = 'SUPERCITO'")\
      .select(txs_df["tdc"],  txs_df["monto"])\
      .orderBy(txs_df["monto"].desc())

In [None]:
txs_sospechosas.count()

Guardemos la información para un posterior análisis

In [None]:
txs_sospechosas.coalesce(1)\
               .write.format("json")\
               .save("output/json/transacciones_sospechosas", mode="OVERWRITE")

In [None]:
txs_sospechosas.coalesce(1)\
               .write.format("parquet")\
               .save("output/parquet/transacciones_sospechosas", mode="OVERWRITE")

In [None]:
! ls -lh output/json/transacciones_sospechosas

## DataFrame desde JSON

Para esta sección usaremos los datos de proyectos soportados por el **Banco Mundial**

In [None]:
! rm -R data/world_bank*

In [None]:
! wget http://jsonstudio.com/wp-content/uploads/2014/02/world_bank.zip -P data/

In [None]:
! unzip data/world_bank.zip -d data/world_bank

In [None]:
! rm data/world_bank.zip

Creamos el `DataFrame`

In [None]:
world_bank = sqlContext.read.json("data/world_bank/world_bank.json")

Automáticamente detecta el _esquema_ de la fuente de datos

In [None]:
world_bank.printSchema()

La estructura es anidada (en lugar de bidimensional), pero observa que `Spark` no tuvo ningún problema.

In [None]:
world_bank.registerTempTable("world_bank_projects")

In [None]:
sqlContext.sql('show tables').show()

In [None]:
sqlContext.sql('select countryshortname, project_name, totalamt, totalcommamt from world_bank_projects order by countryshortname').show()

In [None]:
projects_by_country = sqlContext.sql('select countryshortname as country, count(project_name) as num_projects, sum(totalamt) as total_amount from world_bank_projects group by countryshortname order by total_amount desc')
projects_by_country.show()

## SparkSQL y Pandas

Es posible usar `Pandas` para hacer análisis, pero hay que tomar en cuenta que esto manda _todo_ el `dataset` a un sólo nodo.

In [None]:
import pandas as pd

In [None]:
projects_by_country_pd = projects_by_country.toPandas()

In [None]:
projects_by_country_pd.columns

In [None]:
projects_by_country_pd=projects_by_country_pd.set_index(['country'])

In [None]:
projects_by_country_pd.num_projects

In [None]:
projects_by_country_pd.head()

In [None]:
projects_by_country_pd.ix['Peru']

In [None]:
%pylab inline
projects_by_country_pd['num_projects'][:10].plot(kind='barh', rot=0, )