# Working with Spark SQL


### DATA 608: Winter 2019 

**Usman Alim ([ualim@ucalgary.ca](mailto:ualim@ucalgary.ca))** 

Further Reading:

* **Spark SQL, DataFrames and Datasets Guide** [ver. 2.2.0](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html)
* For a more in-depth treatment, please consult **[Mastering Apache Spark](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/)**, by Jacek Laskowski.

## Introduction

- Spark SQL DataFrames are conceptually similar to pandas DataFrames. Under the hood, they are different though since they are natively implemented in Java in a distributed fashion.

- Evaluations are lazy. Inspect the evaluation plan for details.  

- A loaded DataFrame _does not_ reside on the driver node. It is distributed.

- Spark DataFrames can be converted to pandas DataFrames. However, pandas Dataframes _are not distributed_ and reside on the driver node. Be aware of memory limitations. 

- DataFrames can be cached for efficiency.

- We can run SQL queries on a DataFrame, and also on files (that support them) directly to return DataFrames.

- Grouping, partitioning and bucketing operations are available.

- We can run built-in transformations on columns, or supply user defined functions (UDFs). 

## Outline

- [Basic DataFrame Operations](#basicOps)
- [Grouping](#grouping)
- [Query pushdown](#queryPush)
- [User Defined Functions](#UDFs)
- [Sorting and Partitioning](#sortAndPart)

In [36]:
## Before starting, let's obtain a spark context and load a DataFrame from a file.

# Please make sure that you are using an appropriate allocation.

import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="3:00" #3 hours
os.environ['SBATCH_PARTITION']='parallel' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)
print( sc.uiWebUrl )


INFO:sparkhpc.sparkjob:Submitted batch job 644099

INFO:sparkhpc.sparkjob:Submitted cluster 0


http://cn066:4040


In [37]:
## Load a Dataframe from a parquet file

df = sqlCtx.read.parquet('yellow_tripdata_2012-01.parquet')
df.printSchema()

# Where's the data? Inspect the webUI to confirm.

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



## <a name="basicOps"></a>Basic DataFrame Operations

- Head and Summary Views

- Interoperating with Pandas

- Caching

- Selecting Columns and Subsampling

- Filtering and running SQL Queries.

In [3]:
## Summaries

print("Number of Partitions = " + str(df.rdd.getNumPartitions()))
print("Number of Rows = " + str(df.count()))


# This will return a specified number of Rows as a list
head = df.head(10)

# Produces a summary DataFrame. This is an expensive operation!
summary = df.describe()

print(type(head))
print(type(summary))


Number of Partitions = 24
Number of Rows = 14969132
<class 'list'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [4]:
# Use show to print the contents of a dataframe. For efficiency reasons,
# the output is truncated after a certain number of columns. Note that 
# show needs to send data to the driver. 

summary.show()

# Can print the head as follows:

#for r in head:
#    print(r)

+-------+---------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+------------+-----------------+-------------------+--------------------+------------------+-------------------+------------------+
|summary|vendor_id|   passenger_count|     trip_distance| pickup_longitude|  pickup_latitude|         rate_code|store_and_fwd_flag| dropoff_longitude| dropoff_latitude|payment_type|      fare_amount|          surcharge|             mta_tax|        tip_amount|       tolls_amount|      total_amount|
+-------+---------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+------------+-----------------+-------------------+--------------------+------------------+-------------------+------------------+
|  count| 14969132|          14969132|          14969132|         14969132|         14969132|          

### Interoperating with Pandas

In [20]:
# The output is not very pretty. For pretty printing of "small" 
# DataFrames, we can covert the DataFrames to pandas. 

# **Please be aware that this will send data over to the driver.**

import pandas as pd

# Convert the head to a pandas DataFrame and display
display( pd.DataFrame( head, columns=df.columns ) )


# The toPandas() function will converty a Spark SQL DataFrame to a 
# pandas DataFrame
display(summary.toPandas())

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,CMT,2012-01-13 02:58:17,2012-01-13 03:01:22,1,0.3,-73.984653,40.738615,1,N,-73.9782,40.73578,CRD,3.7,0.5,0.5,0.94,0.0,5.64
1,VTS,2012-01-29 15:03:00,2012-01-29 15:09:00,1,1.57,-73.975,40.761627,1,,-73.98697,40.74527,CSH,6.1,0.0,0.5,0.0,0.0,6.6
2,VTS,2012-01-29 15:10:00,2012-01-29 15:21:00,5,1.44,-74.009915,40.706,1,,-73.99239,40.715028,CSH,7.7,0.0,0.5,0.0,0.0,8.2
3,CMT,2012-01-27 10:01:36,2012-01-27 10:31:36,2,11.5,-73.873191,40.774122,1,N,-73.985416,40.738456,CSH,26.5,0.0,0.5,0.0,4.8,31.8
4,CMT,2012-01-27 08:31:52,2012-01-27 08:34:41,3,0.6,-74.006152,40.733198,1,N,-74.002346,40.740652,CSH,4.1,0.0,0.5,0.0,0.0,4.6
5,VTS,2012-01-29 21:46:00,2012-01-29 21:50:00,1,0.82,-73.995695,40.75324,1,,-74.001877,40.72837,CSH,4.5,0.5,0.5,0.0,0.0,5.5
6,CMT,2012-01-27 11:38:41,2012-01-27 11:50:33,2,2.0,-73.971298,40.758012,1,N,-73.987042,40.736773,CSH,8.5,0.0,0.5,0.0,0.0,9.0
7,VTS,2012-01-30 15:31:00,2012-01-30 15:40:00,3,1.75,-73.95382,40.781912,1,,-73.974537,40.77827,CSH,7.3,0.0,0.5,0.0,0.0,7.8
8,CMT,2012-01-27 13:29:29,2012-01-27 13:53:53,2,12.4,-74.007392,40.707024,1,N,-73.861793,40.768209,CRD,28.5,0.0,0.5,7.25,0.0,36.25
9,VTS,2012-01-30 14:32:00,2012-01-30 14:37:00,2,0.67,-74.0017,40.715783,1,,-74.0017,40.715783,CSH,4.9,0.0,0.5,0.0,0.0,5.4


Unnamed: 0,summary,vendor_id,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,count,14969132,14969132.0,14969132.0,14969132.0,14969132.0,14969132.0,7653218,14969101.0,14969101.0,14969132,14969132.0,14969132.0,14969132.0,14969132.0,14969132.0,14969132.0
1,mean,,1.660380909193666,2.759083671651823,-72.3260192571207,39.83961440960158,1.0333543721840384,,-72.3086903591726,39.82731597026834,,9.813487671830249,0.3163416489346209,0.4986664557437265,1.007816524698906,0.1838356305496524,11.820147965159078
2,stddev,,1.3379145071687684,3.299857300487808,22.33616519600959,18.81914482613753,0.3265105968053811,,22.49179847678353,18.93444675812748,,7.807497705755949,0.3653733487399404,0.025787474329171,1.7413468860060222,0.9765922774635036,9.339575781038857
3,min,CMT,0.0,0.0,-3442.019565,-3547.909255,0.0,N,-3442.031232,-3511.137963,CRD,2.5,0.0,0.0,0.0,0.0,2.5
4,max,VTS,208.0,100.0,3459.015435,3394.788702,221.0,Y,3460.426853,3405.993395,NOC,500.0,5.0,0.5,200.0,20.0,540.0


### Caching 

In [28]:
## Caching

# Up until this point, nothing is cached. To make the DataFrame persist
# we need to cache it. Caching depends on the storage level.

df.cache()
df.show(5)

# Please inspect the webUI to see how much data is cahced. 
# Since evaluations are lazy, caching is also lazy.

#
# df.unpersist() and sqlCtx.clearCache() can be used to clear cache. 
# For the most part though, you may want to let Spark handle the cache.


+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+---------+------------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|      CMT|2012-01-13 02:58:17|2012-01-13 03:01:22|              1|          0.3|      -73.984653|      40.738615|        1|                 N|         -73.9782|        40.73578|         CRD|        3.7|      0.5|    0.5|  

In [10]:
# An operation that uses the entire DataFrame will result in full caching
display( df.describe().toPandas() )

# Again, inspect the web UI to verify that this is indeed the case.

Unnamed: 0,summary,vendor_id,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,count,14969132,14969132.0,14969132.0,14969132.0,14969132.0,14969132.0,7653218,14969101.0,14969101.0,14969132,14969132.0,14969132.0,14969132.0,14969132.0,14969132.0,14969132.0
1,mean,,1.660380909193666,2.759083671651823,-72.3260192571207,39.8396144096016,1.0333543721840384,,-72.3086903591726,39.82731597026834,,9.813487671830243,0.3163416489346209,0.4986664557437265,1.007816524698906,0.1838356305496524,11.820147965159078
2,stddev,,1.3379145071687684,3.2998573004878087,22.336165196009585,18.81914482613753,0.3265105968053809,,22.49179847678353,18.93444675812748,,7.8074977057559485,0.3653733487399404,0.025787474329171,1.7413468860060222,0.9765922774635036,9.339575781038857
3,min,CMT,0.0,0.0,-3442.019565,-3547.909255,0.0,N,-3442.031232,-3511.137963,CRD,2.5,0.0,0.0,0.0,0.0,2.5
4,max,VTS,208.0,100.0,3459.015435,3394.788702,221.0,Y,3460.426853,3405.993395,NOC,500.0,5.0,0.5,200.0,20.0,540.0


### Takeaway: Cache tables that you will be using often!

### Selecting Columns and Subsampling


In [11]:
# Column selection is fairly straightforward. 

# We can select one or more columns using the select() method to 
# return another dataframe with the selected columns.
df.select('passenger_count','trip_distance').show()


+---------------+-------------+
|passenger_count|trip_distance|
+---------------+-------------+
|              1|          0.3|
|              1|         1.57|
|              5|         1.44|
|              2|         11.5|
|              3|          0.6|
|              1|         0.82|
|              2|          2.0|
|              3|         1.75|
|              2|         12.4|
|              2|         0.67|
|              1|         0.96|
|              2|         1.44|
|              1|          2.9|
|              1|          1.3|
|              1|         1.28|
|              2|         1.83|
|              1|         1.09|
|              1|         2.93|
|              1|          0.5|
|              1|          1.5|
+---------------+-------------+
only showing top 20 rows



In [12]:
# Use the sample() method to subsample a DataFrame. A subsampling
# fraction needs to be specified. Sampling with replacement is also 
# supported.

# This is a convenient method as we can work with a small subset of the
# data on the driver.

localDF = df.sample(1.0e-4).toPandas()
print( len(localDF) )
localDF.head()

1498


Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,VTS,2012-01-17 18:15:00,2012-01-17 18:30:00,1,2.22,-73.989383,40.728542,1,,-74.002963,40.748787,CRD,9.7,1.0,0.5,2.0,0.0,13.2
1,CMT,2012-01-17 07:28:51,2012-01-17 07:35:51,1,1.4,-73.953942,40.780949,1,N,-73.96919,40.769316,CSH,6.1,0.0,0.5,0.0,0.0,6.6
2,VTS,2012-01-08 11:42:00,2012-01-08 11:58:00,1,5.41,-73.969997,40.797155,1,,-73.990055,40.745817,CRD,15.3,0.0,0.5,1.5,0.0,17.3
3,VTS,2012-01-23 12:46:00,2012-01-23 12:59:00,1,2.42,-73.97471,40.752042,1,,-73.998038,40.724322,CSH,9.3,0.0,0.5,0.0,0.0,9.8
4,VTS,2012-01-10 19:36:00,2012-01-10 19:43:00,1,1.41,-73.990927,40.760862,1,,-73.97854,40.764918,CSH,6.1,1.0,0.5,0.0,0.0,7.6


### Filtering and running SQL Queries

In [13]:
# Use the filter functions with a boolean expression (as a string)
# to filter. 

# Note that there are other ways to filter (via column objects)
# but the syntax does not generalize nicely to boolean expressions 

# column object
df.select(df.passenger_count > 8).show()

# filtering using a string
display(df.filter("passenger_count > 8").toPandas())
display(df.filter("passenger_count > 8 and total_amount < 5").toPandas())

+---------------------+
|(passenger_count > 8)|
+---------------------+
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
|                false|
+---------------------+
only showing top 20 rows



Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,VTS,2012-01-04 08:14:00,2012-01-04 08:55:00,65,18.52,-73.964368,40.80706,2,,-73.789448,40.643452,CSH,45.0,0.0,0.5,0.0,4.8,50.3
1,VTS,2012-01-03 12:53:00,2012-01-03 13:04:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,8.5,0.0,0.0,0.0,0.0,8.5
2,VTS,2012-01-24 10:19:00,2012-01-24 10:26:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.9,0.0,0.0,0.0,0.0,3.9
3,VTS,2012-01-06 17:14:00,2012-01-06 17:14:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
4,VTS,2012-01-06 16:21:00,2012-01-06 16:21:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
5,VTS,2012-01-28 17:53:00,2012-01-28 17:53:00,9,0.01,0.0,0.0,1,,0.0,0.0,CSH,2.5,0.0,0.5,0.0,0.0,3.0
6,VTS,2012-01-06 16:36:00,2012-01-06 16:36:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
7,VTS,2012-01-06 17:21:00,2012-01-06 17:21:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,1.8,0.0,0.0,0.0,5.1
8,VTS,2012-01-06 17:19:00,2012-01-06 17:19:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,1.8,0.0,0.0,0.0,5.1
9,VTS,2012-01-06 17:25:00,2012-01-06 17:25:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,1.8,0.0,0.0,0.0,5.1


Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,VTS,2012-01-24 10:19:00,2012-01-24 10:26:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.9,0.0,0.0,0.0,0.0,3.9
1,VTS,2012-01-06 17:14:00,2012-01-06 17:14:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
2,VTS,2012-01-06 16:21:00,2012-01-06 16:21:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
3,VTS,2012-01-28 17:53:00,2012-01-28 17:53:00,9,0.01,0.0,0.0,1,,0.0,0.0,CSH,2.5,0.0,0.5,0.0,0.0,3.0
4,VTS,2012-01-06 16:36:00,2012-01-06 16:36:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
5,VTS,2012-01-12 04:58:00,2012-01-12 05:04:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,2.7,0.0,0.0,0.0,0.0,2.7
6,VTS,2012-01-11 09:39:00,2012-01-11 09:42:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,2.7,0.0,0.0,0.0,0.0,2.7
7,VTS,2012-01-06 15:42:00,2012-01-06 15:42:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
8,VTS,2012-01-06 16:30:00,2012-01-06 16:30:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
9,VTS,2012-01-06 17:09:00,2012-01-06 17:09:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3


In [14]:
# Alternatively, we can run an SQL query on a table that has been 
# registered with the SQL Context. Temporary views are session scoped.

df.createOrReplaceTempView("taxi_table")
sqlCtx.sql("select * from taxi_table where passenger_count > 8 and total_amount < 5").toPandas()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,VTS,2012-01-24 10:19:00,2012-01-24 10:26:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.9,0.0,0.0,0.0,0.0,3.9
1,VTS,2012-01-06 17:14:00,2012-01-06 17:14:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
2,VTS,2012-01-06 16:21:00,2012-01-06 16:21:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
3,VTS,2012-01-28 17:53:00,2012-01-28 17:53:00,9,0.01,0.0,0.0,1,,0.0,0.0,CSH,2.5,0.0,0.5,0.0,0.0,3.0
4,VTS,2012-01-06 16:36:00,2012-01-06 16:36:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
5,VTS,2012-01-12 04:58:00,2012-01-12 05:04:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,2.7,0.0,0.0,0.0,0.0,2.7
6,VTS,2012-01-11 09:39:00,2012-01-11 09:42:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,2.7,0.0,0.0,0.0,0.0,2.7
7,VTS,2012-01-06 15:42:00,2012-01-06 15:42:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
8,VTS,2012-01-06 16:30:00,2012-01-06 16:30:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3
9,VTS,2012-01-06 17:09:00,2012-01-06 17:09:00,208,0.0,0.0,0.0,1,,0.0,0.0,CSH,3.3,0.0,0.0,0.0,0.0,3.3


## <a name="grouping"></a>Grouping

- We can use the DataFrame API or an SQL query to group.

- Grouping is usually followed by an aggregation.

- Grouping and aggregations are implemented as MapReduce operations.
    
    - First the partitions are aggregated in parallel based on the grouping column (Map). 
    - Then parallel aggregations are performed across partitions (Reduce). 
    - Data is shuffled between the two stages.

- By default, Spark uses 200 shuffle partitions. This can impact performance for large datasets.

- An exchange stage is introduced in the computation. You can see it in the webUI. 

- Use `sqlCtx.setConf("spark.sql.shuffle.partitions", NUM)` to adjust number of shuffle partitions. 

In [15]:
# Let's group by one column and inspect the webUI
df.groupBy("vendor_id").avg().toPandas()


Unnamed: 0,vendor_id,avg(passenger_count),avg(trip_distance),avg(pickup_longitude),avg(pickup_latitude),avg(rate_code),avg(dropoff_longitude),avg(dropoff_latitude),avg(fare_amount),avg(surcharge),avg(mta_tax),avg(tip_amount),avg(tolls_amount),avg(total_amount)
0,CMT,1.236418,2.709657,-72.60489,39.995571,1.029201,-72.62546,40.007885,9.732456,0.313442,0.498562,1.001951,0.176026,11.722437
1,VTS,2.103958,2.810798,-72.034246,39.676443,1.0377,-71.977267,39.638394,9.898268,0.319375,0.498776,1.013953,0.192007,11.922379


In [16]:
# Can group by more than one column
groupDF = df.groupBy("vendor_id", "rate_code").avg()
print( groupDF.rdd.getNumPartitions() )
groupDF.toPandas()


200


Unnamed: 0,vendor_id,rate_code,avg(passenger_count),avg(trip_distance),avg(pickup_longitude),avg(pickup_latitude),avg(rate_code),avg(dropoff_longitude),avg(dropoff_latitude),avg(fare_amount),avg(surcharge),avg(mta_tax),avg(tip_amount),avg(tolls_amount),avg(total_amount)
0,VTS,2,2.112493,15.957462,-70.75607,39.062924,2.0,-70.136381,38.668283,44.998846,1.3e-05,0.499903,4.894423,3.53383,53.927016
1,VTS,1,2.106614,2.58572,-72.105996,39.714918,1.0,-72.067415,39.687853,9.177339,0.32546,0.499958,0.935342,0.124737,11.062835
2,VTS,8,1.0,0.0,0.0,0.0,8.0,0.0,0.0,222.22,0.0,0.0,0.0,0.0,222.22
3,CMT,1,1.234477,2.506841,-72.61054,39.998678,1.0,-72.627965,40.009385,9.095502,0.318447,0.499987,0.926046,0.117693,10.957674
4,VTS,0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,2.5,0.5,0.5,0.2,0.0,3.7
5,VTS,6,1.0,3.044091,-55.465565,30.555038,6.0,-53.80038,29.625237,3.318182,0.0,0.5,0.252273,0.045455,4.115909
6,VTS,5,1.128864,3.480528,-54.385431,29.966011,5.0,-52.192519,28.673542,50.959504,0.000272,0.245541,5.306323,2.129684,58.641325
7,CMT,221,2.0,1.5,-73.973576,40.74799,221.0,-73.970317,40.763811,6.9,0.0,0.5,1.85,0.0,9.25
8,CMT,2,1.349313,15.648359,-72.243772,39.797362,2.0,-72.462226,39.918189,44.972062,0.000172,0.499971,5.208871,3.490189,54.171266
9,CMT,210,1.111111,9.111111,-73.921819,40.746435,210.0,-73.968054,40.754434,23.666667,0.722222,0.5,1.985556,1.6,28.474444


In [19]:
# Can also run grouped aggregations through SQL
sqlCtx.sql("SELECT vendor_id, MAX(passenger_count), AVG(trip_distance) FROM taxi_table GROUP BY vendor_id").toPandas()

Unnamed: 0,vendor_id,max(passenger_count),avg(trip_distance)
0,CMT,7,2.709657
1,VTS,208,2.810798


## <a name="queryPush"></a>Query Pushdown

- This is the name given to the idea of running a query directly on a parquet file (or any other supported format).

- Since parquet files are schema aware and stored in a columnar format,  we don't need to load the entire data into memory and then filter. Spark can _push_ the query down to the file and only retrieve the relevant data.

In [27]:
# Query push down example. Let's load fewer columns that 
# satisfy a query.

df2 = sqlCtx.read.parquet('yellow_tripdata_2012-01.parquet')
df2.createOrReplaceTempView("yellow_table")

query = ("SELECT vendor_id, passenger_count, trip_distance FROM " 
        "yellow_table WHERE total_amount > 10")
sqlDF = sqlCtx.sql(query)
sqlDF.cache()
sqlDF.describe().show()

# Inspect the WebUI to verify

+-------+---------+------------------+-----------------+
|summary|vendor_id|   passenger_count|    trip_distance|
+-------+---------+------------------+-----------------+
|  count|  6285182|           6285182|          6285182|
|   mean|     null|1.6734665758286713|4.864476793193921|
| stddev|     null|1.3254869127181699|4.153943678722601|
|    min|      CMT|                 0|              0.0|
|    max|      VTS|                65|            100.0|
+-------+---------+------------------+-----------------+



## <a name="UDFs"></a>User Defined Functions (UDFs)

- UDFs allow us to run custom functions on Spark SQL DataFrame columns.

- They are similar to pandas DataFrame transformations.

- UDFs are internally converted by Spark to a suitable format so that they can run in parallel.

- Spark SQL has some useful built-in Python UDFs. Please see the API [here](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html).

- We can also write custom UDFs in Python.

- `Groupby` also supports UDFs.


In [31]:
# Example: Extracting day of month from a timestamp column

from pyspark.sql.functions import dayofmonth

# Add a column that is the result of applying a transformation.
df.select('pickup_datetime', dayofmonth('pickup_datetime').alias('day')).show()

+-------------------+---+
|    pickup_datetime|day|
+-------------------+---+
|2012-01-13 02:58:17| 13|
|2012-01-29 15:03:00| 29|
|2012-01-29 15:10:00| 29|
|2012-01-27 10:01:36| 27|
|2012-01-27 08:31:52| 27|
|2012-01-29 21:46:00| 29|
|2012-01-27 11:38:41| 27|
|2012-01-30 15:31:00| 30|
|2012-01-27 13:29:29| 27|
|2012-01-30 14:32:00| 30|
|2012-01-30 11:57:00| 30|
|2012-01-30 14:32:00| 30|
|2012-01-27 13:26:01| 27|
|2012-01-30 14:31:00| 30|
|2012-01-30 14:30:00| 30|
|2012-01-30 14:31:00| 30|
|2012-01-30 10:45:00| 30|
|2012-01-30 14:21:00| 30|
|2012-01-27 13:27:02| 27|
|2012-01-27 14:15:24| 27|
+-------------------+---+
only showing top 20 rows



### Python UDF Steps

- Define the schema of the output (optional). If not defined, Spark will infer the type of the output.
- Define a Python function that take column (one or more) entries as input, and outputs (one or more) entries (consistent with the schema).
- Register the function as a UDF.
- Use the UDF.



In [32]:
# Example: Let's compute the square of trip distance as an Integer

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

dist_udf = udf(lambda x: int(x*x), IntegerType())

df.select('trip_distance', dist_udf('trip_distance').alias('trip_distance2')).show()


+-------------+--------------+
|trip_distance|trip_distance2|
+-------------+--------------+
|          0.3|             0|
|         1.57|             2|
|         1.44|             2|
|         11.5|           132|
|          0.6|             0|
|         0.82|             0|
|          2.0|             4|
|         1.75|             3|
|         12.4|           153|
|         0.67|             0|
|         0.96|             0|
|         1.44|             2|
|          2.9|             8|
|          1.3|             1|
|         1.28|             1|
|         1.83|             3|
|         1.09|             1|
|         2.93|             8|
|          0.5|             0|
|          1.5|             2|
+-------------+--------------+
only showing top 20 rows



## <a name="sortAndPart"></a>Sorting and Partitioning

- Sorting and partitioning rearrange data according to a specified criterion.

- They are considered more advanced operations as they result is data shuffle which can be expensive for large data sets. 

- However, careful pre-partitioning can improve query performance.

- Think before you shuffle!

In [45]:
# Code for sorting is straightforward but the devil is in the details.
# Again, inspect the webUI to see what's going on.

dfSorted = df.sort('trip_distance')
dfSorted.cache()
dfSorted.sample(1e-4).select('trip_distance').show()

+-------------+
|trip_distance|
+-------------+
|          0.0|
|         0.05|
|         0.05|
|         0.06|
|         0.07|
|         0.07|
|         0.07|
|         0.09|
|         0.09|
|          0.1|
|         0.12|
|         0.15|
|         0.17|
|          0.2|
|          0.2|
|         0.21|
|         0.21|
|         0.25|
|         0.25|
|         0.27|
+-------------+
only showing top 20 rows



In [65]:
# Partitioning allows you to control how your data partitions are laid 
# out in memory (and disk if you write out partitioned data)

# Example: Let's partition by day using a UDF
partDF = df.withColumn('day', dayofmonth(df.pickup_datetime)).repartition(31,'day')
partDF.cache()
partDF.describe().toPandas()
partDF.select('day').distinct().show(31)

+---+
|day|
+---+
|  7|
| 31|
|  9|
| 24|
| 22|
| 20|
|  6|
| 19|
|  5|
| 29|
|  2|
|  1|
| 16|
| 21|
| 12|
|  8|
| 26|
| 25|
| 23|
| 14|
| 17|
| 30|
| 28|
| 13|
|  3|
| 10|
|  4|
| 11|
| 15|
| 27|
| 18|
+---+



In [67]:
# Parquet files are partition aware. The partitioning information will 
# be preserved when the Dataframe is written to disk in parquet format.
# We can also explicitly partition when writing in parquet format.

print( partDF.rdd.getNumPartitions() )
partDF.write.partitionBy("day").parquet('yellow_tripdata_2012-01-partitioned.parquet')

!ls yellow_tripdata_2012-01-partitioned.parquet

31
_SUCCESS  day=12  day=16  day=2   day=23  day=27  day=30  day=6
day=1	  day=13  day=17  day=20  day=24  day=28  day=31  day=7
day=10	  day=14  day=18  day=21  day=25  day=29  day=4   day=8
day=11	  day=15  day=19  day=22  day=26  day=3   day=5   day=9


In [68]:
# Don't forget to stop your cluster when you are done

sc.stop()
sj.stop()

INFO:sparkhpc.sparkjob:
