<b>Why do we need to analyze the data ?</b>
- to understand our client's behaviour 
- to make business decisions based on data
- to validate our business decisions

<b>What tools are you using to analyze the data ?</b>
- excel, sql, programming languages (java, python, etc), big data tools 

<b>What does Big Data mean for you ?</b>
- Big Data can means a large volume of data, which cannot be stored and processed efficiently by traditional data management tools.

<b>Why SQL on Big Data ? </b> 
- SQL is one of the most common skill. Almost any developer knows how to write a simple SQL query.
- If a big data framework would support SQL, all of a sudden, everyone could do analysis on big data ! <br>

There are many SQL tools for big data. Amazon Athena, redshift from Amazon, BigQuery from google. Hive, Spark SQL as open source tools.


<img src='https://github.com/tlapusan/itdays-2019/blob/master/bigdata/resources/images/spark_logo.png?raw=true' />

Most of the time we are used to code/work on a single machine (laptop). But there are moments in our developers' life  when a single machine is not powerful enough, especially when we are dealing with processing of a large volume of data. <br>
One idea would be to use a cluster of machines and use all their resources (CPU, RAM, HDD). If we are talking about a cluster of machines to comunicate between each other, that means that we need networking, multitreading, etc skills...scary.
An ideal scenario would be to have a framework to handle this hard work and to give us the impression that we are still working on a single machine. This is what Apache Spark does !


Apache Spark is a distributed computing engine. It is able to process a large volume data, for tasks like batch or streaming processing, SQL, machine learning, graph processing. wow !

How can we deploy/use it ? <br>

Even if Spark looks like a very big framework, we can install it easily on our laptops and just start coding.  
The best part...the code that we write on our laptops can be deployed and run on a clusters with hundreds of servers, whitout any changes ;) <br>
Supported programming languages : Java, Scala, Python, R.








# Install Spark
We can install Spark for Python (pyspark) using pip package manager.

In [4]:
#!pip install pyspark

# Imports

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt

# Init SparkSession
SparkSession is the entry point for each Spark application. <br>
When we instantiate a SparkSession, we create a driver process from where we can execute user-defined code on our big datasets.

<img src='https://github.com/tlapusan/itdays-2019/blob/master/bigdata/resources/images/spark_application_architecture.png?raw=true'/>


In [4]:
spark = SparkSession.builder.\
            master("local[4]").\
            appName("Spark-SQL").\
            getOrCreate()

In [5]:
spark

# Read data
Spark can read/write data from a variaty of data formats, like csv, json, parquet, jdbc

Dataset description : The data is related with direct marketing campaigns of a Portuguese banking institution. The marketing campaigns were based on phone calls. Often, more than one contact to the same client was required, in order to access if the product (bank term deposit) would be ('yes') or not ('no') subscribed. <br>


In [6]:
client_bank = spark.read.parquet("../resources/data/parquet/client_bank/")
client_campaign = spark.read.parquet("../resources/data/parquet/client_campaing/")

In [7]:
type(client_bank)

pyspark.sql.dataframe.DataFrame

# Dataframe

Dataframe is an immutable distributed table-like collection of data. It has a schema which defines the column names and data types.

<img src='https://github.com/tlapusan/itdays-2019/blob/master/bigdata/resources/images/dataframe_structure.png?raw=true' width='70%'/>
 

TODO
- the role of immutability for dataframe ?
- penalty of using python in spark


In [9]:
client_bank.show(10)

+---+---+-----------+-------+-------------------+-------+-------+----+----------+
| id|age|        job|marital|          education|default|housing|loan|subscribed|
+---+---+-----------+-------+-------------------+-------+-------+----+----------+
|  0| 56|  housemaid|married|           basic.4y|     no|     no|  no|        no|
|  1| 57|   services|married|        high.school|unknown|     no|  no|        no|
|  2| 37|   services|married|        high.school|     no|    yes|  no|        no|
|  3| 40|     admin.|married|           basic.6y|     no|     no|  no|        no|
|  4| 56|   services|married|        high.school|     no|     no| yes|        no|
|  5| 45|   services|married|           basic.9y|unknown|     no|  no|        no|
|  6| 59|     admin.|married|professional.course|     no|     no|  no|        no|
|  7| 41|blue-collar|married|            unknown|unknown|     no|  no|        no|
|  8| 24| technician| single|professional.course|     no|    yes|  no|        no|
|  9| 25|   serv

In [10]:
client_bank.schema

StructType(List(StructField(id,IntegerType,true),StructField(age,IntegerType,true),StructField(job,StringType,true),StructField(marital,StringType,true),StructField(education,StringType,true),StructField(default,StringType,true),StructField(housing,StringType,true),StructField(loan,StringType,true),StructField(subscribed,StringType,true)))

In [12]:
client_bank.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- subscribed: string (nullable = true)



Data related with bank client information : <br>
<b>id</b> - phone call id <br>
<b>age</b> - client age <br>
<b>job</b> - client job <br>
<b>marital</b> - marital status (categorical: 'divorced','married','single','unknown'; note: 'divorced' means divorced or widowed) <br>
<b>education</b> - client education 'basic.4y','basic.6y','basic.9y','high.school','illiterate','professional.course','university.degree','unknown' <br>
<b>default</b> - has credit in default? (categorical: 'no','yes','unknown'), default is failure to meet the legal obligations (or conditions) of a loan <br>
<b>housing</b> -  has housing loan? (categorical: 'no','yes','unknown') <br>
<b>loan</b> - has personal loan? (categorical: 'no','yes','unknown') <br>
<b>subscribed</b> - if the client subscribed to the bank term deposit (categorical: 'no','yes')

In [18]:
# Data related with the phone call contact
client_campaign.show(10)

+---+---------+-----+-----------+--------+
| id|  contact|month|day_of_week|duration|
+---+---------+-----+-----------+--------+
|  0|telephone|  may|        mon|     261|
|  1|telephone|  may|        mon|     149|
|  2|telephone|  may|        mon|     226|
|  3|telephone|  may|        mon|     151|
|  4|telephone|  may|        mon|     307|
|  5|telephone|  may|        mon|     198|
|  6|telephone|  may|        mon|     139|
|  7|telephone|  may|        mon|     217|
|  8|telephone|  may|        mon|     380|
|  9|telephone|  may|        mon|      50|
+---+---------+-----+-----------+--------+
only showing top 10 rows



In [19]:
client_campaign.printSchema()

root
 |-- id: integer (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)



<b>id</b> phone call id <br>
<b>contact</b> - contact communication type (categorical: 'cellular','telephone') <br>
<b>month</b> - contact month of year (categorical: 'jan', 'feb', 'mar', ..., 'nov', 'dec') <br>
<b>day_of_week</b> - contact day of the week (categorical: 'mon','tue','wed','thu','fri')<br>
<b>duration</b> - contact duration, in seconds (numeric). <br>










# Data analysis
Spark offers multiple ways to analyze data. The most commonly used are DataFrame API and Spark SQL.

## Dataframe API

### Column rename
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumnRenamed

In [21]:
client_bank = client_bank.\
    withColumnRenamed("default", "default_credit").\
    withColumnRenamed("housing", "housing_loan").\
    withColumnRenamed("loan", "personal_loan")

client_bank.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default_credit: string (nullable = true)
 |-- housing_loan: string (nullable = true)
 |-- personal_loan: string (nullable = true)
 |-- subscribed: string (nullable = true)



In [29]:
# Exercise 
# rename column 'contact' into 'contact_type' for client_campaing dataframe

In [22]:
client_campaign = client_campaign.withColumnRenamed("contact", "contact_type")

In [23]:
client_campaign.printSchema()

root
 |-- id: integer (nullable = true)
 |-- contact_type: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)



### select columns
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select

In [24]:
client_bank.\
    select("*").\
    show()

+---+---+-----------+--------+-------------------+--------------+------------+-------------+----------+
| id|age|        job| marital|          education|default_credit|housing_loan|personal_loan|subscribed|
+---+---+-----------+--------+-------------------+--------------+------------+-------------+----------+
|  0| 56|  housemaid| married|           basic.4y|            no|          no|           no|        no|
|  1| 57|   services| married|        high.school|       unknown|          no|           no|        no|
|  2| 37|   services| married|        high.school|            no|         yes|           no|        no|
|  3| 40|     admin.| married|           basic.6y|            no|          no|           no|        no|
|  4| 56|   services| married|        high.school|            no|          no|          yes|        no|
|  5| 45|   services| married|           basic.9y|       unknown|          no|           no|        no|
|  6| 59|     admin.| married|professional.course|            no

In [25]:
client_bank.\
    select(["age", "job", "education", "subscribed"]).\
    show()

+---+-----------+-------------------+----------+
|age|        job|          education|subscribed|
+---+-----------+-------------------+----------+
| 56|  housemaid|           basic.4y|        no|
| 57|   services|        high.school|        no|
| 37|   services|        high.school|        no|
| 40|     admin.|           basic.6y|        no|
| 56|   services|        high.school|        no|
| 45|   services|           basic.9y|        no|
| 59|     admin.|professional.course|        no|
| 41|blue-collar|            unknown|        no|
| 24| technician|professional.course|        no|
| 25|   services|        high.school|        no|
| 41|blue-collar|            unknown|        no|
| 25|   services|        high.school|        no|
| 29|blue-collar|        high.school|        no|
| 57|  housemaid|           basic.4y|        no|
| 35|blue-collar|           basic.6y|        no|
| 54|    retired|           basic.9y|        no|
| 35|blue-collar|           basic.6y|        no|
| 46|blue-collar|   

### where
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where

In [26]:
client_bank.\
    where(F.col("age") == 23).\
    show()

+-----+---+-----------+-------+-------------------+--------------+------------+-------------+----------+
|   id|age|        job|marital|          education|default_credit|housing_loan|personal_loan|subscribed|
+-----+---+-----------+-------+-------------------+--------------+------------+-------------+----------+
|  568| 23|     admin.| single|  university.degree|            no|          no|           no|        no|
| 2640| 23|blue-collar|married|professional.course|            no|         yes|           no|        no|
| 2928| 23|   services| single|        high.school|            no|          no|           no|        no|
| 2930| 23|   services| single|        high.school|            no|          no|           no|        no|
| 3855| 23|   services|married|           basic.9y|            no|          no|           no|        no|
| 5249| 23|   services|married|           basic.9y|            no|         yes|           no|        no|
| 5910| 23|blue-collar|married|           basic.9y|    

In [30]:
client_bank.\
    where((F.col("age") == 23) & (F.col("subscribed") == "yes")).\
    show()

+-----+---+-----------+-------+-------------------+--------------+------------+-------------+----------+
|   id|age|        job|marital|          education|default_credit|housing_loan|personal_loan|subscribed|
+-----+---+-----------+-------+-------------------+--------------+------------+-------------+----------+
|13807| 23|   services| single|        high.school|            no|          no|           no|       yes|
|14220| 23|     admin.|married|           basic.9y|            no|         yes|           no|       yes|
|14815| 23|blue-collar| single|        high.school|            no|          no|           no|       yes|
|14821| 23|blue-collar| single|        high.school|            no|         yes|           no|       yes|
|15126| 23| management| single|  university.degree|            no|          no|           no|       yes|
|15187| 23| management| single|  university.degree|            no|          no|           no|       yes|
|16621| 23|     admin.| single|professional.course|    

### group by

In [31]:
# how many phone calls were successful/unsuccessful ?
client_bank.\
    groupBy(F.col("subscribed")).\
    count().\
    show()

+----------+-----+
|subscribed|count|
+----------+-----+
|        no|36548|
|       yes| 4640|
+----------+-----+



In [33]:
client_bank.\
    groupBy(F.col("subscribed")).\
    agg(
        F.count("*").alias("call_count"),
        F.min("age").alias("min_age"), 
        F.max("age").alias("max_age"),
        F.round(F.mean("age"),2).alias("mean_age")
    ).\
    show()

+----------+----------+-------+-------+--------+
|subscribed|call_count|min_age|max_age|mean_age|
+----------+----------+-------+-------+--------+
|        no|     36548|     17|     95|   39.91|
|       yes|      4640|     17|     98|   40.91|
+----------+----------+-------+-------+--------+



### order by
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy

In [35]:
client_bank.\
    orderBy(F.col("age"), ascending=True).\
    show()

+-----+---+-------+-------+-----------+--------------+------------+-------------+----------+
|   id|age|    job|marital|  education|default_credit|housing_loan|personal_loan|subscribed|
+-----+---+-------+-------+-----------+--------------+------------+-------------+----------+
|37558| 17|student| single|   basic.9y|            no|         yes|           no|        no|
|37140| 17|student| single|    unknown|            no|         yes|           no|        no|
|37579| 17|student| single|   basic.9y|            no|     unknown|      unknown|       yes|
|37539| 17|student| single|   basic.9y|            no|         yes|           no|        no|
|38274| 17|student| single|    unknown|            no|          no|          yes|       yes|
|37934| 18|student| single|    unknown|            no|     unknown|      unknown|        no|
|37916| 18|student| single|    unknown|            no|          no|           no|       yes|
|37955| 18|student| single|    unknown|            no|         yes|   

### join
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join

In [37]:
client_campaign.printSchema()

root
 |-- id: integer (nullable = true)
 |-- contact_type: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)



In [40]:
client_bank.\
    join(client_campaign, ["id"], how="inner").\
    show(10)

+---+---+-----------+-------+-------------------+--------------+------------+-------------+----------+------------+-----+-----------+--------+
| id|age|        job|marital|          education|default_credit|housing_loan|personal_loan|subscribed|contact_type|month|day_of_week|duration|
+---+---+-----------+-------+-------------------+--------------+------------+-------------+----------+------------+-----+-----------+--------+
|  0| 56|  housemaid|married|           basic.4y|            no|          no|           no|        no|   telephone|  may|        mon|     261|
|  1| 57|   services|married|        high.school|       unknown|          no|           no|        no|   telephone|  may|        mon|     149|
|  2| 37|   services|married|        high.school|            no|         yes|           no|        no|   telephone|  may|        mon|     226|
|  3| 40|     admin.|married|           basic.6y|            no|          no|           no|        no|   telephone|  may|        mon|     151|

In [63]:
client_bank.\
    join(client_campaign, ["id"], how="inner").\
    select(["age", "education", "contact", "month"]).\
    where(F.col("contact") == "cellular").\
    show()

+---+-------------------+--------+-----+
|age|          education| contact|month|
+---+-------------------+--------+-----+
| 45|           basic.9y|cellular|  jul|
| 29|        high.school|cellular|  jul|
| 33|           basic.9y|cellular|  jul|
| 41|  university.degree|cellular|  jul|
| 30|        high.school|cellular|  jul|
| 58|professional.course|cellular|  jul|
| 31|           basic.9y|cellular|  jul|
| 30|        high.school|cellular|  jul|
| 40|           basic.6y|cellular|  jul|
| 33|        high.school|cellular|  jul|
| 38|           basic.9y|cellular|  jul|
| 29|           basic.9y|cellular|  jul|
| 48|           basic.4y|cellular|  jul|
| 51|           basic.9y|cellular|  jul|
| 51|           basic.9y|cellular|  jul|
| 34|        high.school|cellular|  jul|
| 46|           basic.9y|cellular|  jul|
| 34|        high.school|cellular|  jul|
| 55|        high.school|cellular|  jul|
| 47|  university.degree|cellular|  jul|
+---+-------------------+--------+-----+
only showing top

## Spark SQL
To run any type of SQL query, we need first to create a temporary view

In [41]:
client_bank.createOrReplaceTempView("client_bank")
client_campaign.createOrReplaceTempView("client_campaign")

### select

In [42]:
spark.sql("""SELECT * FROM client_bank""").show()

+---+---+-----------+--------+-------------------+--------------+------------+-------------+----------+
| id|age|        job| marital|          education|default_credit|housing_loan|personal_loan|subscribed|
+---+---+-----------+--------+-------------------+--------------+------------+-------------+----------+
|  0| 56|  housemaid| married|           basic.4y|            no|          no|           no|        no|
|  1| 57|   services| married|        high.school|       unknown|          no|           no|        no|
|  2| 37|   services| married|        high.school|            no|         yes|           no|        no|
|  3| 40|     admin.| married|           basic.6y|            no|          no|           no|        no|
|  4| 56|   services| married|        high.school|            no|          no|          yes|        no|
|  5| 45|   services| married|           basic.9y|       unknown|          no|           no|        no|
|  6| 59|     admin.| married|professional.course|            no

In [43]:
spark.sql("""SELECT age, job FROM client_bank""").show()

+---+-----------+
|age|        job|
+---+-----------+
| 56|  housemaid|
| 57|   services|
| 37|   services|
| 40|     admin.|
| 56|   services|
| 45|   services|
| 59|     admin.|
| 41|blue-collar|
| 24| technician|
| 25|   services|
| 41|blue-collar|
| 25|   services|
| 29|blue-collar|
| 57|  housemaid|
| 35|blue-collar|
| 54|    retired|
| 35|blue-collar|
| 46|blue-collar|
| 50|blue-collar|
| 39| management|
+---+-----------+
only showing top 20 rows



### where

In [44]:
spark.sql("""SELECT * 
            FROM client_bank
            WHERE age == 34 AND housing_loan == 'yes'""").show()

+----+---+-------------+--------+-------------------+--------------+------------+-------------+----------+
|  id|age|          job| marital|          education|default_credit|housing_loan|personal_loan|subscribed|
+----+---+-------------+--------+-------------------+--------------+------------+-------------+----------+
|  89| 34|       admin.| married|        high.school|            no|         yes|           no|        no|
| 106| 34|    housemaid| married|           basic.6y|            no|         yes|           no|        no|
| 138| 34|     services| married|        high.school|            no|         yes|           no|        no|
| 189| 34|       admin.| married|  university.degree|            no|         yes|           no|        no|
| 194| 34|  blue-collar|  single|           basic.9y|       unknown|         yes|           no|        no|
| 242| 34|   management| married|  university.degree|            no|         yes|           no|        no|
| 266| 34|self-employed|  single|  un

### group by

In [45]:
spark.sql("""SELECT job, count(*) AS count
            FROM client_bank
            WHERE AGE == 21
            GROUP BY job""").show()

+-------------+-----+
|          job|count|
+-------------+-----+
|   management|    1|
|self-employed|    2|
|      student|   54|
|  blue-collar|   15|
|       admin.|   12|
|   technician|    2|
|     services|   12|
|    housemaid|    1|
|   unemployed|    3|
+-------------+-----+



In [50]:
spark.sql("""SELECT job, count(*) AS count, round(mean(age), 3) AS mean_age
            FROM client_bank
            WHERE subscribed == 'no'
            GROUP BY job""").show()

+-------------+-----+--------+
|          job|count|mean_age|
+-------------+-----+--------+
|   management| 2596|   42.31|
|      retired| 1286|  59.926|
|      unknown|  293|  45.375|
|self-employed| 1272|  40.177|
|      student|  600|  26.397|
|  blue-collar| 8616|  39.582|
| entrepreneur| 1332|  41.703|
|       admin.| 9070|   38.22|
|   technician| 6013|    38.6|
|     services| 3646|   38.09|
|    housemaid|  954|  44.705|
|   unemployed|  870|  39.845|
+-------------+-----+--------+



### order by

In [52]:
spark.sql("""SELECT * 
            FROM client_bank
            ORDER BY age ASC""").show()

+-----+---+-------+-------+-----------+--------------+------------+-------------+----------+
|   id|age|    job|marital|  education|default_credit|housing_loan|personal_loan|subscribed|
+-----+---+-------+-------+-----------+--------------+------------+-------------+----------+
|37558| 17|student| single|   basic.9y|            no|         yes|           no|        no|
|37140| 17|student| single|    unknown|            no|         yes|           no|        no|
|37579| 17|student| single|   basic.9y|            no|     unknown|      unknown|       yes|
|37539| 17|student| single|   basic.9y|            no|         yes|           no|        no|
|38274| 17|student| single|    unknown|            no|          no|          yes|       yes|
|37934| 18|student| single|    unknown|            no|     unknown|      unknown|        no|
|37916| 18|student| single|    unknown|            no|          no|           no|       yes|
|37955| 18|student| single|    unknown|            no|         yes|   

### join

In [98]:
spark.sql("""SELECT cb.id, cc.id, cb.age, cb.education, cc.contact
            FROM client_bank AS cb INNER JOIN client_campaign AS cc ON cb.id == cc.id""").show()

+---+---+---+-------------------+---------+
| id| id|age|          education|  contact|
+---+---+---+-------------------+---------+
|  0|  0| 56|           basic.4y|telephone|
|  1|  1| 57|        high.school|telephone|
|  2|  2| 37|        high.school|telephone|
|  3|  3| 40|           basic.6y|telephone|
|  4|  4| 56|        high.school|telephone|
|  5|  5| 45|           basic.9y|telephone|
|  6|  6| 59|professional.course|telephone|
|  7|  7| 41|            unknown|telephone|
|  8|  8| 24|professional.course|telephone|
|  9|  9| 25|        high.school|telephone|
| 10| 10| 41|            unknown|telephone|
| 11| 11| 25|        high.school|telephone|
| 12| 12| 29|        high.school|telephone|
| 13| 13| 57|           basic.4y|telephone|
| 14| 14| 35|           basic.6y|telephone|
| 15| 15| 54|           basic.9y|telephone|
| 16| 16| 35|           basic.6y|telephone|
| 17| 17| 46|           basic.6y|telephone|
| 18| 18| 50|           basic.9y|telephone|
| 19| 19| 39|           basic.9y

### subselect

In [103]:
spark.sql("""SELECT age, education, count(*) as count
            FROM (
                SELECT cb.id, cc.id, cb.age, cb.education, cc.contact
                FROM client_bank AS cb INNER JOIN client_campaign AS cc ON cb.id == cc.id
                )
            GROUP BY age, education
            ORDER BY age DESC""").show()

+---+-------------------+-----+
|age|          education|count|
+---+-------------------+-----+
| 98|           basic.4y|    2|
| 95|           basic.6y|    1|
| 94|           basic.9y|    1|
| 92|            unknown|    4|
| 91|  university.degree|    2|
| 89|           basic.4y|    2|
| 88|           basic.4y|   20|
| 88|  university.degree|    1|
| 88|        high.school|    1|
| 87|           basic.4y|    1|
| 86|            unknown|    2|
| 86|           basic.4y|    4|
| 86|           basic.9y|    1|
| 86|professional.course|    1|
| 85|           basic.4y|   13|
| 85|professional.course|    2|
| 84|        high.school|    1|
| 84|           basic.4y|    4|
| 84|            unknown|    1|
| 84|           basic.9y|    1|
+---+-------------------+-----+
only showing top 20 rows



# Combine SQL with Dataframe API

In [111]:
spark.sql("""SELECT age, job, education
            FROM client_bank
            WHERE job == 'services'""").\
    where(F.col("age").between(20,50)).\
    where(F.col("education") == "high.school").show()

+---+--------+-----------+
|age|     job|  education|
+---+--------+-----------+
| 37|services|high.school|
| 25|services|high.school|
| 25|services|high.school|
| 34|services|high.school|
| 45|services|high.school|
| 33|services|high.school|
| 43|services|high.school|
| 35|services|high.school|
| 41|services|high.school|
| 34|services|high.school|
| 39|services|high.school|
| 39|services|high.school|
| 38|services|high.school|
| 36|services|high.school|
| 43|services|high.school|
| 44|services|high.school|
| 28|services|high.school|
| 33|services|high.school|
| 40|services|high.school|
| 33|services|high.school|
+---+--------+-----------+
only showing top 20 rows



# Spark SQL clients
https://spark.apache.org/docs/latest/sql-distributed-sql-engine.html

- SparkSession
- JDBC/ODBC ([BI tools](https://docs.databricks.com/bi/index.html))
- command-line 

# How Spark can run SQL code ?
An SQL query declares our intentions, but it does not tell the exact logic flow to run. Spark needs to convert the SQL in a query plan, which is a set of steps of executions. <br>
This process was not invented by Spark, it happens in all SQL servers.

<img src='https://github.com/tlapusan/itdays-2019/blob/master/bigdata/resources/images/sql_catalyst.png?raw=true'/>



<b>Parsed logical</b> plan checks code syntax. <br>
<b>Analyzed logical plan</b> checks for tables, columns validity. <br>
<b>Optimized logical plan</b> tries to apply optimisations on the logical plan, like pushing down predicates or column selections. <br>
<b>Physical plan</b> specifies exactly how the plan will be executed on the cluster.

<img src='https://github.com/tlapusan/itdays-2019/blob/master/bigdata/resources/images/query_plan_states.png?raw=true'/>

In [55]:
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQuery.explain
spark.sql("SELECT id, age, education FROM client_bank").explain(extended=True)

== Parsed Logical Plan ==
'Project ['id, 'age, 'education]
+- 'UnresolvedRelation `client_bank`

== Analyzed Logical Plan ==
id: int, age: int, education: string
Project [id#0, age#1, education#4]
+- SubqueryAlias `client_bank`
   +- Project [id#0, age#1, job#2, marital#3, education#4, default_credit#152, housing_loan#162, loan#7 AS personal_loan#172, subscribed#8]
      +- Project [id#0, age#1, job#2, marital#3, education#4, default_credit#152, housing#6 AS housing_loan#162, loan#7, subscribed#8]
         +- Project [id#0, age#1, job#2, marital#3, education#4, default#5 AS default_credit#152, housing#6, loan#7, subscribed#8]
            +- Relation[id#0,age#1,job#2,marital#3,education#4,default#5,housing#6,loan#7,subscribed#8] parquet

== Optimized Logical Plan ==
Project [id#0, age#1, education#4]
+- Relation[id#0,age#1,job#2,marital#3,education#4,default#5,housing#6,loan#7,subscribed#8] parquet

== Physical Plan ==
*(1) Project [id#0, age#1, education#4]
+- *(1) FileScan parquet [id

## check for parsed logical plan

In [57]:
spark.sql("SELECT id, age, education FROM2 client_bank").explain()

ParseException: "\nextraneous input 'client_bank' expecting <EOF>(line 1, pos 32)\n\n== SQL ==\nSELECT id, age, education FROM2 client_bank\n--------------------------------^^^\n"

## check for analyzed logical plan

In [60]:
spark.sql("SELECT id2, age, education FROM client_bank").explain()

AnalysisException: "cannot resolve '`id2`' given input columns: [client_bank.personal_loan, client_bank.id, client_bank.default_credit, client_bank.education, client_bank.job, client_bank.age, client_bank.marital, client_bank.housing_loan, client_bank.subscribed]; line 1 pos 7;\n'Project ['id2, age#1, education#4]\n+- SubqueryAlias `client_bank`\n   +- Project [id#0, age#1, job#2, marital#3, education#4, default_credit#152, housing_loan#162, loan#7 AS personal_loan#172, subscribed#8]\n      +- Project [id#0, age#1, job#2, marital#3, education#4, default_credit#152, housing#6 AS housing_loan#162, loan#7, subscribed#8]\n         +- Project [id#0, age#1, job#2, marital#3, education#4, default#5 AS default_credit#152, housing#6, loan#7, subscribed#8]\n            +- Relation[id#0,age#1,job#2,marital#3,education#4,default#5,housing#6,loan#7,subscribed#8] parquet\n"

## optimized logical plan

In [63]:
spark.sql("""SELECT id, age, education 
            FROM client_bank
            WHERE age == 3""").explain(extended=True)

== Parsed Logical Plan ==
'Project ['id, 'age, 'education]
+- 'Filter ('age = 3)
   +- 'UnresolvedRelation `client_bank`

== Analyzed Logical Plan ==
id: int, age: int, education: string
Project [id#0, age#1, education#4]
+- Filter (age#1 = 3)
   +- SubqueryAlias `client_bank`
      +- Project [id#0, age#1, job#2, marital#3, education#4, default_credit#152, housing_loan#162, loan#7 AS personal_loan#172, subscribed#8]
         +- Project [id#0, age#1, job#2, marital#3, education#4, default_credit#152, housing#6 AS housing_loan#162, loan#7, subscribed#8]
            +- Project [id#0, age#1, job#2, marital#3, education#4, default#5 AS default_credit#152, housing#6, loan#7, subscribed#8]
               +- Relation[id#0,age#1,job#2,marital#3,education#4,default#5,housing#6,loan#7,subscribed#8] parquet

== Optimized Logical Plan ==
Project [id#0, age#1, education#4]
+- Filter (isnotnull(age#1) && (age#1 = 3))
   +- Relation[id#0,age#1,job#2,marital#3,education#4,default#5,housing#6,loan#7,su

## API vs SQL query plan 

In [64]:
client_bank.select(["age","job","marital","education"]).\
    where(F.col("age") > 30).\
    where(F.col("job") == "management").explain()

== Physical Plan ==
*(1) Project [age#1, job#2, marital#3, education#4]
+- *(1) Filter (((isnotnull(age#1) && isnotnull(job#2)) && (age#1 > 30)) && (job#2 = management))
   +- *(1) FileScan parquet [age#1,job#2,marital#3,education#4] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/tudorlapusan/Documents/workspaces/workshops/itdays-2019/bigdata/res..., PartitionFilters: [], PushedFilters: [IsNotNull(age), IsNotNull(job), GreaterThan(age,30), EqualTo(job,management)], ReadSchema: struct<age:int,job:string,marital:string,education:string>


In [65]:
spark.sql("""
        SELECT age, job, marital, education
        FROM client_bank
        WHERE age > 30 AND job =='management'""").explain()

== Physical Plan ==
*(1) Project [age#1, job#2, marital#3, education#4]
+- *(1) Filter (((isnotnull(age#1) && isnotnull(job#2)) && (age#1 > 30)) && (job#2 = management))
   +- *(1) FileScan parquet [age#1,job#2,marital#3,education#4] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/tudorlapusan/Documents/workspaces/workshops/itdays-2019/bigdata/res..., PartitionFilters: [], PushedFilters: [IsNotNull(age), IsNotNull(job), GreaterThan(age,30), EqualTo(job,management)], ReadSchema: struct<age:int,job:string,marital:string,education:string>


# Spark SQL vs RDBMS
Why can't we use databases with lots of disks and CPUs to do large scale analytics ? <br>
The answer comes from another trend in disk drives : seek time is improving more slowly than transfer rate.

If the data access pattern is dominated by seeks, it will take longer to write/read the data then streaming through it, and vice versa.

In many ways, Spark SQL and RDBMS complement each other. Spark SQL is very good for analysing the whole dataset (ad hoc queries) and RDBMS is good for point queries or updates.

# Spark operations
In Spark we have two types of operations : transformations and actions. <br>
Transformations are those operations used to express the business logic of a spark application. <br>
Actions are those operations used to trigger a pipeline of transformations.

Lazy evaluation : spark will compute all the transformations, only in the last minute, when you actually call an action on them. In this way, Spark can look at the whole set of transformation and will try to apply optimisations on it.




In [39]:
# tranformation
%timeit -r1 -n1 print(client_bank.\
                            where(F.col("age")==24)) 

DataFrame[id: int, age: int, job: string, marital: string, education: string, default: string, housing: string, loan: string, subscribed: string, subscribed_int: int]
52.8 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [40]:
# action
%timeit -r1 -n1 print(client_bank.\
                            where(F.col("age")==24).\
                            count())

463
142 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


# Spark SQL functions
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

# Spark SQL resources
Book - https://www.amazon.com/Spark-Definitive-Guide-Processing-Simple/dp/1491912219 <br>
Videos :
- https://databricks.com/sparkaisummit/north-america/sessions
- https://databricks.com/session/from-basic-to-advanced-aggregate-operators-in-apache-spark-sql-2-2-by-examples-and-their-catalyst-optimizations-continues


project push down, filter push down, and partition pruning : https://drill.apache.org/docs/parquet-filter-pushdown/