# Dataframe rodando query SQL
### Dados relacionais com flexível e poderoso analytics

Armazenamentos de dados relacionais são fáceis de construir e consultar. Usuários e desenvolvedores geralmente preferem escrever consultas declarativas e fáceis de interpretar em uma linguagem legível semelhante a humanos, como SQL. No entanto, conforme os dados começam a aumentar em volume e variedade, a abordagem relacional não é dimensionada bem o suficiente para construir aplicativos de Big Data e sistemas analíticos. A seguir estão alguns dos principais desafios.

* Lidar com diferentes tipos e fontes de dados, que podem ser estruturados, semiestruturados e não estruturados
* Construir pipelines de ETL de e para várias fontes de dados, o que pode levar ao desenvolvimento de muitos códigos personalizados específicos, aumentando assim o débito técnico ao longo do tempo
* Ter a capacidade de realizar análises baseadas em business intelligence (BI) tradicional e análises avançadas (aprendizado de máquina, modelagem estatística, etc.), a última das quais é definitivamente desafiadora para executar em sistemas relacionais

Tivemos sucesso no domínio da análise de Big Data com Hadoop e o paradigma MapReduce. Isso era poderoso, mas geralmente lento, e dava aos usuários uma interface de programação procedural de baixo nível que exigia que as pessoas escrevessem muitos códigos até mesmo para transformações de dados muito simples. No entanto, uma vez que o Spark foi lançado, ele realmente revolucionou a forma como a análise de Big Data era feita com foco na computação in-memory, tolerância a falhas, abstrações de alto nível e facilidade de uso.

A partir de então, várias estruturas e sistemas como Hive, Pig e Shark (que evoluíram para Spark SQL) forneceram interfaces relacionais ricas e mecanismos de consulta declarativa para armazenamentos de Big Data. O desafio era que essas ferramentas eram relacionais ou baseadas em procedimentos, e não poderíamos ter o melhor dos dois mundos.

![spark-1](https://opensource.com/sites/default/files/uploads/2_hadoop-vs-spark.png)

No entanto, no mundo real, a maioria dos pipelines de dados e analíticos pode envolver uma combinação de código relacional e procedural. Forçar os usuários a escolher um deles acaba complicando as coisas e aumentando os esforços do usuário no desenvolvimento, construção e manutenção de diferentes aplicativos e sistemas. O Apache Spark SQL se baseia no esforço SQL-on-Spark mencionado anteriormente, chamado Shark. Em vez de forçar os usuários a escolher entre uma API relacional ou procedural, o Spark SQL tenta permitir que os usuários combinem perfeitamente as duas e executem consulta, recuperação e análise de dados em escala no Big Data.

### Spark SQL
O Spark SQL basicamente tenta preencher a lacuna entre os dois modelos que mencionamos anteriormente - os modelos relacionais e procedurais.

Spark SQL fornece uma API DataFrame que pode executar operações relacionais em fontes de dados externas e coleções distribuídas integradas do Spark - em escala!

Para oferecer suporte a uma ampla variedade de fontes de dados e algoritmos em Big Data, o Spark SQL apresenta um novo otimizador extensível chamado Catalyst, que torna mais fácil adicionar fontes de dados, regras de otimização e tipos de dados para análises avançadas, como aprendizado de máquina.
Essencialmente, o Spark SQL aproveita o poder do Spark para realizar cálculos distribuídos e robustos na memória em grande escala no Big Data.

Spark SQL fornece desempenho SQL de última geração e também mantém a compatibilidade com todas as estruturas e componentes existentes suportados pelo Apache Hive (uma estrutura de armazenamento de Big Data popular), incluindo formatos de dados, funções definidas pelo usuário (UDFs) e o metastore. Além disso, ele também ajuda a ingerir uma ampla variedade de formatos de dados de fontes de Big Data e data warehouses corporativos como JSON, Hive, Parquet e assim por diante, e realizar uma combinação de operações relacionais e procedimentais para análises mais complexas e avançadas.


![Spark-2](https://cdn-images-1.medium.com/max/2000/1*OY41hGbe4IB9-hHLRPuCHQ.png)

### Spark SQL with Dataframe API is fast
O Spark SQL demonstrou ser extremamente rápido, até mesmo comparável aos mecanismos baseados em C ++, como o Impala.

![spark_speed](https://opensource.com/sites/default/files/uploads/9_spark-dataframes-vs-rdds-and-sql.png)

O gráfico a seguir mostra um bom resultado de benchmark de DataFrames vs. RDDs em diferentes linguagens, o que dá uma perspectiva interessante sobre como os DataFrames podem ser otimizados.

![spark-speed-2](https://opensource.com/sites/default/files/uploads/10_comparing-spark-dataframes-and-rdds.png)

Por que o Spark SQL é tão rápido e otimizado? O motivo é por causa de um novo otimizador extensível, ** Catalyst **, baseado em construções de programação funcional em Scala.

O design extensível do Catalyst tem duas finalidades.

* Facilita a adição de novas técnicas e recursos de otimização ao Spark SQL, especialmente para lidar com diversos problemas em torno de Big Data, dados semiestruturados e análises avançadas
* Facilidade de poder estender o otimizador - por exemplo, adicionando regras específicas da fonte de dados que podem enviar filtragem ou agregação em sistemas de armazenamento externo ou suporte para novos tipos de dados

O Catalyst oferece suporte à otimização baseada em regras e em custos. Embora os otimizadores extensíveis tenham sido propostos no passado, eles normalmente exigiam uma linguagem complexa de domínio específico para especificar regras. Normalmente, isso leva a uma curva de aprendizado significativa e carga de manutenção. Em contraste, o Catalyst usa recursos padrão da linguagem de programação Scala, como correspondência de padrões, para permitir que os desenvolvedores usem a linguagem de programação completa, ao mesmo tempo que facilita a especificação das regras.

![catalyst-2](https://cdn-images-1.medium.com/max/1500/1*81ZOMxCci-tM2b-HNUX6Ww.png)

### Useful references for this Notebook
* [PySpark in Jupyter Notebook — Working with Dataframe & JDBC Data Sources](https://medium.com/@thucnc/pyspark-in-jupyter-notebook-working-with-dataframe-jdbc-data-sources-6f3d39300bf6)
* [PySpark - Working with JDBC Sqlite database](http://mitzen.blogspot.com/2017/06/pyspark-working-with-jdbc-sqlite.html)

### Create a SparkSession and read the a stock price dataset CSV

In [4]:
import findspark
findspark.init()

import pyspark as sp

In [7]:
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession

In [8]:
spark1 = SparkSession.builder.appName('SQL').getOrCreate()

In [9]:
df = spark1.read.csv('Data/appl_stock.csv',inferSchema=True,header=True)

In [10]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Create a temporary view

In [11]:
df.createOrReplaceTempView('stock')

### Now run a simple SQL query directly on this view. It returns a DataFrame.

In [12]:
result = spark1.sql("SELECT * FROM stock LIMIT 5")
result

DataFrame[Date: string, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]

In [13]:
result.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [14]:
result.show()

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+



### Run slightly more complex queries
How many entries in the `Close` field are higher than 500?

In [15]:
count_greater_500 = spark1.sql("SELECT COUNT(Close) FROM stock WHERE Close > 500").show()

+------------+
|count(Close)|
+------------+
|         403|
+------------+



What is the average `Open` values of all the entries where `Volume` is either greater than 120 million or less than 110 million?

In [16]:
avg_1 = spark1.sql("SELECT AVG(Open) FROM stock WHERE Volume > 120000000 OR Volume < 110000000").show()

+------------------+
|         avg(Open)|
+------------------+
|309.12406365290224|
+------------------+



### Read a file (and create dataframe) by directly running a `spark.sql` method on the file
Notice the syntax of `csv.<path->filename.csv>` inside the SQL query

In [17]:
df_sales = spark1.sql("SELECT * FROM csv.`Data/sales_info.csv`")

In [18]:
df_sales.show()

+-------+-------+-----+
|    _c0|    _c1|  _c2|
+-------+-------+-----+
|Company| Person|Sales|
|   GOOG|    Sam|  200|
|   GOOG|Charlie|  120|
|   GOOG|  Frank|  340|
|   MSFT|   Tina|  600|
|   MSFT|    Amy|  124|
|   MSFT|Vanessa|  243|
|     FB|   Carl|  870|
|     FB|  Sarah|  350|
|   APPL|   John|  250|
|   APPL|  Linda|  130|
|   APPL|   Mike|  750|
|   APPL|  Chris|  350|
+-------+-------+-----+



### Read tables from a local SQLite database file using `JDBC` connection
For this read, we will use the famous chinook DB in SQLite tutorial. You can [download the file from here](http://www.sqlitetutorial.net/sqlite-sample-database/). Remember to unzip it in a suitable directory. You will need the path to this file later.

#### First `cd` to the directory where all the PySpark jar files are kept. Then download the SQLite jar file from the [given URL](https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc)

In [None]:
!cd /home/tirtha/Spark/spark-2.3.1-bin-hadoop2.7/jars/
!curl https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.28.0/sqlite-jdbc-3.28.0.jar

#### Define driver, path to local .db file, and append that path to `jdbc:sqlite` to construct the `url`

In [20]:
driver = "org.sqlite.JDBC"
path = "Data/chinook.db"
url = "jdbc:sqlite:" + path

#### Define `tablename` to be read

In [21]:
tablename = "albums"

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

In [None]:
df_albums = spark1.read.format("jdbc").option("url", url).option("dbtable", tablename).option("driver", driver).load()

In [17]:
df_albums.show()

+-------+--------------------+--------+
|AlbumId|               Title|ArtistId|
+-------+--------------------+--------+
|      1|For Those About T...|       1|
|      2|   Balls to the Wall|       2|
|      3|   Restless and Wild|       2|
|      4|   Let There Be Rock|       1|
|      5|            Big Ones|       3|
|      6|  Jagged Little Pill|       4|
|      7|            Facelift|       5|
|      8|      Warner 25 Anos|       6|
|      9|Plays Metallica B...|       7|
|     10|          Audioslave|       8|
|     11|        Out Of Exile|       8|
|     12| BackBeat Soundtrack|       9|
|     13|The Best Of Billy...|      10|
|     14|Alcohol Fueled Br...|      11|
|     15|Alcohol Fueled Br...|      11|
|     16|       Black Sabbath|      12|
|     17|Black Sabbath Vol...|      12|
|     18|          Body Count|      13|
|     19|    Chemical Wedding|      14|
|     20|The Best Of Buddy...|      15|
+-------+--------------------+--------+
only showing top 20 rows



In [18]:
df_albums.printSchema()

root
 |-- AlbumId: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- ArtistId: integer (nullable = true)



#### Don't forget to create temp view to use later

In [19]:
df_albums.createTempView('albums')

#### Read another table - `artists`

In [20]:
tablename = "artists"

In [21]:
df_artists = spark1.read.format("jdbc").option("url", url).option("dbtable", tablename).option("driver", driver).load()

In [22]:
df_artists.show()

+--------+--------------------+
|ArtistId|                Name|
+--------+--------------------+
|       1|               AC/DC|
|       2|              Accept|
|       3|           Aerosmith|
|       4|   Alanis Morissette|
|       5|     Alice In Chains|
|       6|Antônio Carlos Jobim|
|       7|        Apocalyptica|
|       8|          Audioslave|
|       9|            BackBeat|
|      10|        Billy Cobham|
|      11| Black Label Society|
|      12|       Black Sabbath|
|      13|          Body Count|
|      14|     Bruce Dickinson|
|      15|           Buddy Guy|
|      16|      Caetano Veloso|
|      17|       Chico Buarque|
|      18|Chico Science & N...|
|      19|        Cidade Negra|
|      20|        Cláudio Zoli|
+--------+--------------------+
only showing top 20 rows



In [23]:
df_artists.createTempView('artists')

#### Test if SQL query is working fine

In [24]:
spark1.sql("SELECT * FROM artists WHERE length(Name)<10 LIMIT 10").show()

+--------+---------+
|ArtistId|     Name|
+--------+---------+
|       1|    AC/DC|
|       2|   Accept|
|       3|Aerosmith|
|       9| BackBeat|
|      15|Buddy Guy|
|      26|  Azymuth|
|      36|  O Rappa|
|      37| Ed Motta|
|      46|Jorge Ben|
|      50|Metallica|
+--------+---------+



### Join the `albums` and `artists` tables in a single dataframe using SQL query (and order by `ArtistId`)

In [25]:
df_combined = spark1.sql("SELECT * FROM artists LEFT JOIN albums ON artists.ArtistId=albums.ArtistId order by artists.ArtistId")

In [26]:
df_combined.show()

+--------+--------------------+-------+--------------------+--------+
|ArtistId|                Name|AlbumId|               Title|ArtistId|
+--------+--------------------+-------+--------------------+--------+
|       1|               AC/DC|      1|For Those About T...|       1|
|       1|               AC/DC|      4|   Let There Be Rock|       1|
|       2|              Accept|      2|   Balls to the Wall|       2|
|       2|              Accept|      3|   Restless and Wild|       2|
|       3|           Aerosmith|      5|            Big Ones|       3|
|       4|   Alanis Morissette|      6|  Jagged Little Pill|       4|
|       5|     Alice In Chains|      7|            Facelift|       5|
|       6|Antônio Carlos Jobim|      8|      Warner 25 Anos|       6|
|       6|Antônio Carlos Jobim|     34|Chill: Brazil (Di...|       6|
|       7|        Apocalyptica|      9|Plays Metallica B...|       7|
|       8|          Audioslave|    271|         Revelations|       8|
|       8|          

### What's the difference between temporary and global SQL views? 

#### A temporary view does not persist (shared) across multiple sessions

In [27]:
df_artists.createOrReplaceTempView("temp_artists")

df_temp = spark1.sql("SELECT * FROM temp_artists LIMIT 10")
df_temp.show()

+--------+--------------------+
|ArtistId|                Name|
+--------+--------------------+
|       1|               AC/DC|
|       2|              Accept|
|       3|           Aerosmith|
|       4|   Alanis Morissette|
|       5|     Alice In Chains|
|       6|Antônio Carlos Jobim|
|       7|        Apocalyptica|
|       8|          Audioslave|
|       9|            BackBeat|
|      10|        Billy Cobham|
+--------+--------------------+



#### A new session is created but the temp view `temp_artists` cannot be accessed

In [28]:
spark2 = SparkSession.builder.appName('SQL2').getOrCreate()

#### We use `try...except` to catch the error and display a generic message

In [29]:
try:
    df_temp = spark2.sql("SELECT * FROM temp_artists LIMIT 10")
except:
    print("Error happened in this execution")

#### Now, a global view is created in this session
Global temporary view is tied to a system preserved database `global_temp`. So the view name must be referenced as such.

In [30]:
tablename = "artists"
df_artists = spark2.read.format("jdbc").option("url", url).option("dbtable", tablename).option("driver", driver).load()

In [31]:
df_artists.createOrReplaceGlobalTempView("global_artists")

df_global = spark2.sql("SELECT * FROM global_temp.global_artists LIMIT 10")
df_global.show()

+--------+--------------------+
|ArtistId|                Name|
+--------+--------------------+
|       1|               AC/DC|
|       2|              Accept|
|       3|           Aerosmith|
|       4|   Alanis Morissette|
|       5|     Alice In Chains|
|       6|Antônio Carlos Jobim|
|       7|        Apocalyptica|
|       8|          Audioslave|
|       9|            BackBeat|
|      10|        Billy Cobham|
+--------+--------------------+



#### Start a new session. The view `global_artists` can be accessed across the sessions

In [32]:
spark3 = SparkSession.builder.appName('SQL3').getOrCreate()

In [33]:
df_global = spark3.sql("SELECT * FROM global_temp.global_artists LIMIT 10")
df_global.show()

+--------+--------------------+
|ArtistId|                Name|
+--------+--------------------+
|       1|               AC/DC|
|       2|              Accept|
|       3|           Aerosmith|
|       4|   Alanis Morissette|
|       5|     Alice In Chains|
|       6|Antônio Carlos Jobim|
|       7|        Apocalyptica|
|       8|          Audioslave|
|       9|            BackBeat|
|      10|        Billy Cobham|
+--------+--------------------+

