<a href="https://colab.research.google.com/github/mlaricobar/Spark-Course/blob/master/%5BC-01-ES%5D%20Spark%20SQL%20ba%CC%81sico.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Global data variables
SANDBOX_NAME = # Sandbox Name
DATA_PATH = "/data/sandboxes/" + SANDBOX_NAME + "/data/data/" 


# SparkSession & SparkContext



En versiones de Spark anteriores a 2.0.0, __sparkContext__ era usado como el canal para acceder a todas las funcionalidades de spark. Se encarga de gestionar la conexión con el resource manager y los ejecutores.

A partir de la versión 2.0.0, __sparkSession__ funciona como punto de entrada único para interactuar con spark, contiene toda la funcionalidad básica del _sparkContext_ y añade la API para interacturar con Dataframes. 

En el caso de Intelligence ambos objetos son creados automáticamente al iniciar un _kernel_ de _pyspark_.

In [0]:
spark

<pyspark.sql.session.SparkSession at 0x7f16b9818358>

In [0]:
spark.version

'2.1.0.1'

In [0]:
sc

<pyspark.context.SparkContext at 0x7f16ca02e240>



_sparkSession_ contiene al _sparkContext_

In [0]:
spark.sparkContext

<pyspark.context.SparkContext at 0x7f16ca02e240>



_sparkContext_ contiene la configuración de spark

In [0]:
sc.getConf().getAll()

[('spark.intelligence.kernel', 'e66f6ea7-56fae21de60f2a63aca4987a'),
 ('spark.mesos.executor.docker.volumes',
  '/etc/resolv.conf:/etc/resolv.conf:ro'),
 ('spark.mesos.secret', 'DrE6o5r67qnf38H9K5KgGhoeRHqwU6sVNBVStHqw'),
 ('spark.files',
  'file:/var/sds/intelligence/graphs/graphfiles.zip,file:/var/sds/intelligence/bigdl/bigdl-0.5.0-python-api.zip'),
 ('spark.shuffle.reduceLocality.enabled', 'false'),
 ('spark.submit.pyFiles',
  '/var/sds/intelligence/graphs/graphfiles.zip,/var/sds/intelligence/bigdl/bigdl-0.5.0-python-api.zip'),
 ('spark.driver.memory', '1g'),
 ('spark.driver.host', '192.168.192.90'),
 ('spark.cores.max', '1'),
 ('spark.master', 'mesos://leader.mesos:5050'),
 ('spark.sql.sources.parallelPartitionDiscovery.threshold', '10000000'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.scheduler.minRegisteredResourcesRatio', '1.0'),
 ('spark.submit.deployMode', 'client'),
 ('spark.mesos.role', 'infesp'),
 ('spark.mesos.executor.docker.image',
  'nexus.daas.work.es.et


# DataFrames
Es una colección inmutable y distribuida de datos organizados en columnas de forma tabular que permite una abstracción de los datos a alto nivel.


__Características__

- Conjunto de filas (Rows) con un esquema de datos (Schema)
- Colección distribuida de datos organizados en filas y columnas nombradas.
- Conceptualmente equivalente a una tabla en una BD relacional, o a un data frame de R o Python (pandas), pero con optimizaciones avanzadas para soportar aplicaciones Big Data y Data Science.
- Pueden construirse a partir de distintas fuentes: ficheros de datos estructurados, tablas de Hive, BD externas, o RDDs
- DataFrame API disponible en Scala, Java, Python, y R



En SparkSQL cada __transformación__ sobre un DataFrame se añade a lo que se conoce como *query plan*. Cuando se aplica una __acción__ sobre el DataFrame el *Catalyst Optimizer* analiza el *query plan* e intenta optimizarlo, luego selecciona el plan físico (transformación a operaciones RDD de bajo nivel) más eficiente para la ejecución del plan, y lo ejecuta. 

Se puede consultar el *query plan* usando la función `explain()` sobre el DataFrame.



## Creación de DataFrames
Normalmente un DataFrame se crea leyendo datos desde una fuente externa (S3, HDFS, etc). Por ejemplo para leer el fichero 'Building_Permits.csv' almacenado en el Sandbox Data:

In [0]:
# Cargar un file .csv
file_name = "Building_Permits.csv"

buildings_df = spark.read.csv(DATA_PATH + file_name, sep=',', header=True, inferSchema=True)

In [0]:
buildings_df.show(3)

+-------------+-----------+----------------------+--------------------+-----+---+-------------+--------------------+-----------+-------------+----+-----------+--------------------+--------------+-------------------+----------+-----------+--------------+--------------------------------+-----------------------+--------------------------+--------------------------+-----------------------------+----------------+----------------------+--------------+------------+-------------------+--------------+------------+--------------+--------+---------------+--------------------------+--------------------------------------+--------------------------+--------------------------------------+-----------+-------------------+-----------------------------------+-------+--------------------+-------------+
|Permit Number|Permit Type|Permit Type Definition|Permit Creation Date|Block|Lot|Street Number|Street Number Suffix|Street Name|Street Suffix|Unit|Unit Suffix|         Description|Current Status|Current Sta

 

En el caso de leer ficheros parquet no es necesario indicar una cabecera ni separador. Esta iformación ya está contenida en los propios ficheros parquet. Por ejemplo:

In [0]:
file_path = "parquet/online_retail"

online_df = spark.read.parquet(DATA_PATH + file_path)

In [0]:
online_df.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 8:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 8:26|     2,75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows





También se puede crear un DataFrame de Spark a partir de un DataFrame de Pandas.

In [0]:
import pandas as pd

df = pd.DataFrame({'city': ['Madrid', 'Birmingham', 'Barcelona', 'Mexico City'], 
                   'population': [3.2, 0.2, 1.6, 8.8]})
df

Unnamed: 0,city,population
0,Madrid,3.2
1,Birmingham,0.2
2,Barcelona,1.6
3,Mexico City,8.8


In [0]:
df_s = spark.createDataFrame(df)
df_s

DataFrame[city: string, population: double]

In [0]:
#df_s.show.toPandas()

df_s.show()

+-----------+----------+
|       city|population|
+-----------+----------+
|     Madrid|       3.2|
| Birmingham|       0.2|
|  Barcelona|       1.6|
|Mexico City|       8.8|
+-----------+----------+




## Información básica de DataFrames



### Previsualización

`show` es un método que muestra por pantalla _n_ filas del DataFrame.

In [0]:
online_df.show(6)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 8:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 8:26|     2,75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01/12/2010 8:26|     7,65|     17850|United Kingdom|
+---------+---------+--------------------+--------+-------------



### Dimensions

En Spark, no existe un método *shape*, por lo que hay que contar por separados las filas y las columnas.

In [0]:
buildings_df.count()

198900



`columns` es un atributo que contiene los nombres de las columnas del DataFrame.

In [0]:
buildings_df.columns

['Permit Number',
 'Permit Type',
 'Permit Type Definition',
 'Permit Creation Date',
 'Block',
 'Lot',
 'Street Number',
 'Street Number Suffix',
 'Street Name',
 'Street Suffix',
 'Unit',
 'Unit Suffix',
 'Description',
 'Current Status',
 'Current Status Date',
 'Filed Date',
 'Issued Date',
 'Completed Date',
 'First Construction Document Date',
 'Structural Notification',
 'Number of Existing Stories',
 'Number of Proposed Stories',
 'Voluntary Soft-Story Retrofit',
 'Fire Only Permit',
 'Permit Expiration Date',
 'Estimated Cost',
 'Revised Cost',
 'Existing Use',
 'Existing Units',
 'Proposed Use',
 'Proposed Units',
 'Plansets',
 'TIDF Compliance',
 'Existing Construction Type',
 'Existing Construction Type Description',
 'Proposed Construction Type',
 'Proposed Construction Type Description',
 'Site Permit',
 'Supervisor District',
 'Neighborhoods - Analysis Boundaries',
 'Zipcode',
 'Location',
 'Record ID']

In [0]:
len(buildings_df.columns)

43



### Schema

El schema de un dataframe nos muestra como se interpretaran los datos. Esto no significa que los datos estén así. _schema_ es un atributo del objeto, no un método. _printSchema()_ es un método que muestra una versión más entendidible del _schema_.

In [0]:
# Es un método que te arroja cual es la estructura de la tabla.
# Similar a un info()

buildings_df.schema

StructType(List(StructField(Permit Number,StringType,true),StructField(Permit Type,IntegerType,true),StructField(Permit Type Definition,StringType,true),StructField(Permit Creation Date,StringType,true),StructField(Block,StringType,true),StructField(Lot,StringType,true),StructField(Street Number,IntegerType,true),StructField(Street Number Suffix,StringType,true),StructField(Street Name,StringType,true),StructField(Street Suffix,StringType,true),StructField(Unit,IntegerType,true),StructField(Unit Suffix,StringType,true),StructField(Description,StringType,true),StructField(Current Status,StringType,true),StructField(Current Status Date,StringType,true),StructField(Filed Date,StringType,true),StructField(Issued Date,StringType,true),StructField(Completed Date,StringType,true),StructField(First Construction Document Date,StringType,true),StructField(Structural Notification,StringType,true),StructField(Number of Existing Stories,StringType,true),StructField(Number of Proposed Stories,String

In [0]:
# Empleado para visualizar el esquema de la tabla
buildings_df.printSchema()

root
 |-- Permit Number: string (nullable = true)
 |-- Permit Type: integer (nullable = true)
 |-- Permit Type Definition: string (nullable = true)
 |-- Permit Creation Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- Lot: string (nullable = true)
 |-- Street Number: integer (nullable = true)
 |-- Street Number Suffix: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Street Suffix: string (nullable = true)
 |-- Unit: integer (nullable = true)
 |-- Unit Suffix: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Current Status: string (nullable = true)
 |-- Current Status Date: string (nullable = true)
 |-- Filed Date: string (nullable = true)
 |-- Issued Date: string (nullable = true)
 |-- Completed Date: string (nullable = true)
 |-- First Construction Document Date: string (nullable = true)
 |-- Structural Notification: string (nullable = true)
 |-- Number of Existing Stories: string (nullable = true)
 |-- Number of



### dtypes

El atributo `dtypes` contiene los nombres de las columnas del dataframe junto con su tipo. Esto permite seleccionar nombres de columnas basados en el tipo, normalmente las variables categóricas (string y boolean) tienen tratamientos distintos a las numéricas (enteras y decimales).

In [0]:
online_df.dtypes

[('InvoiceNo', 'string'),
 ('StockCode', 'string'),
 ('Description', 'string'),
 ('Quantity', 'int'),
 ('InvoiceDate', 'string'),
 ('UnitPrice', 'string'),
 ('CustomerID', 'int'),
 ('Country', 'string')]

In [0]:
categorial_columns = [c for c,t in online_df.dtypes if t in ['string', 'bool']]
categorial_columns

['InvoiceNo',
 'StockCode',
 'Description',
 'InvoiceDate',
 'UnitPrice',
 'Country']

In [0]:
numerical_columns = [c for c,t in online_df.dtypes if t in ['int', 'double']]
numerical_columns

['Quantity', 'CustomerID']


## Acciones

Las acciones principales de Spark que se van a tratar son las siguientes:
    
    - show()
    - count()
    - first()
    - take()
    - collect()
    - toPandas()
    - write()


### show
Muestra por pantalla _n_ filas del DataFrame (20 por defecto). Es una llamada a un `print`, no permite almacenar el resultado en una variable. El parámetro `truncate` limita el número de caracteres de cada campo.

In [0]:
online_df.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 8:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 8:26|     2,75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/2010 8:26|     3,39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
online_df.show(5, truncate=False)

+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01/12/2010 8:26|2,55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |01/12/2010 8:26|3,39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01/12/2010 8:26|2,75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01/12/2010 8:26|3,39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |01/12/2010 8:26|3,39     |17850     |United Kingdom|
+---------+---------+---------------------------

 

### count

Cuenta el número de filas del DataFrame

In [0]:
online_df.count()

541909

In [0]:
buildings_df.count()

198900

 

### first

Devuelve una única fila del DataFrame como un objeto de tipo `Row`. A los elementos del objeto `Row` se puede acceder tanto por nombre como por posición.

In [0]:
single_row = online_df.first()
single_row

Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='2,55', CustomerID=17850, Country='United Kingdom')

In [0]:
type(single_row)

pyspark.sql.types.Row

In [0]:
single_row[1]

'85123A'

In [0]:
single_row['StockCode']

'85123A'



### take

Devuelve _n_ filas del DataFrame como una lista de objetos `Row`.

In [0]:
row_list = online_df.take(4)
row_list

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='2,55', CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='3,39', CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate='01/12/2010 8:26', UnitPrice='2,75', CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='3,39', CustomerID=17850, Country='United Kingdom')]

In [0]:
row_list[1]

Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='3,39', CustomerID=17850, Country='United Kingdom')

In [0]:
row_list[1][2]

'WHITE METAL LANTERN'

In [0]:
row_list[1]['Description']

'WHITE METAL LANTERN'

 

### collect

Vuelca en un solo nodo todos los datos y los almacena en formato lista de Rows. Funciona como un `take` sin límite. Es una función a evitar salvo en ocasiones muy específicas donde sea necesario tener todos los datos en local y no haya otra forma de gestionarlos. Utilizar únicamente con datos filtrados y/o agregados.

In [0]:
all_rows = online_df.collect()

In [0]:
len(all_rows)

541909

In [0]:
all_rows[:4]

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='2,55', CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='3,39', CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate='01/12/2010 8:26', UnitPrice='2,75', CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate='01/12/2010 8:26', UnitPrice='3,39', CustomerID=17850, Country='United Kingdom')]



### toPandas

Vuelca en un nodo todos los datos como un DataFrame de Pandas. Se deben seguir las mismas restricciones que con `collect`.

In [0]:
online_pandas = online_df.toPandas()

In [0]:
type(online_pandas)

pandas.core.frame.DataFrame

In [0]:
online_pandas.head(3)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,01/12/2010 8:26,255,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,01/12/2010 8:26,339,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,01/12/2010 8:26,275,17850.0,United Kingdom


 

### write

Guarda la información de la tabla en fichero. Se puede escribir tanto en texto plano (CSV) como en formato parquet.

In [0]:
online_df.write.csv(DATA_PATH + 'online_reail.csv')

In [0]:
online_df.write.parquet(DATA_PATH + 'online_retail.parquet')