<h1>UCL School of Management</h1>
<h2>MSIN0166 Data Engineering</h2>
<h4>SparkSQL workshop - Theory & Practice</h4>

<img src="https://databricks.com/wp-content/uploads/2015/03/Screen-Shot-2015-03-23-at-3.42.56-PM.png"></img>




# What is Spark SQL?

Many data scientists, analysts, and general business intelligence users rely on interactive SQL queries for exploring data. Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. 

- Spark SQL enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data. It also provides powerful integration with the rest of the Spark ecosystem (e.g., integrating SQL query processing with machine learning).

- It brings native support for SQL to Spark and streamlines the process of querying data stored both in RDDs (Spark’s distributed datasets) and in external sources. It conveniently blurs the lines between RDDs and relational tables. Unifying these powerful abstractions makes it easy for developers to intermix SQL commands querying external data with complex analytics, all within in a single application. 

- Spark SQL also includes a cost-based optimizer, columnar storage, and code generation to make queries fast. At the same time, it scales to thousands of nodes and multi-hour queries using the Spark engine, which provides full mid-query fault tolerance, without having to worry about using a different engine for historical data.

*Source: Databricks*

For more details, go through Spark documentation: https://spark.apache.org/docs/latest/

<br/>

# Why use Spark SQL
- Query data stored in various formats (e.g. Parquet, Hive tables) by using SQL
- Reduce amount of time spent for processing queries
- Conduct data analysis at scale
- Reduce time spent on reading documentation (e.g. Pandas documentation) for data cleaning, processing or querying and use SQL instead.


## Spark SQL: definition & facts




**Spark SQL**: Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. It also provides powerful integration with the rest of the Spark ecosystem (e.g., integrating SQL query processing with machine learning).

<br/>

 **Did you know?** 

1. SQL was invented by IBM researchers Raymond Boyce and Donald Chamberlin. The programming language, known then as SEQUEL, was created following the publishing of Edgar Frank Todd's paper, "A Relational Model of Data for Large Shared Data Banks," in 1970.
<br/>
<br/>
2. There are five types of SQL Commands which can be classified as:

    - DDL(Data Definition Language).
    - DML(Data Manipulation Language).
    - DQL(Data Query Language).
    - DCL(Data Control Language).
    - TCL(Transaction Control Language).


## Prerequisite code
**Note**: Please run the cells below before any other code cell execution

Let's by mounting the Google Drive to the Colab environment. This will allow us to access the data files from the Google Drive and use it to save the output files and rest of configuration files:

In [None]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


Let's remove any old installations of Spark and install the Spark 3.2.3 version:

In [None]:
# remove old spark installations if needed
!rm -dr spark*

rm: cannot remove 'spark*': No such file or directory


The next step is to set up the environment variables for the versions of Spark and Hadoop:

In [None]:
%env SPARK_VERSION=spark-3.2.3
%env HADOOP_VERSION=3.2
!echo $SPARK_VERSION

env: SPARK_VERSION=spark-3.2.3
env: HADOOP_VERSION=3.2
spark-3.2.3


The next step is to install the Java 8 version and Hadoop 3.2.3 version. **Notice that the Hadoop version should be the same as the Spark version**:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz
!tar xf $SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz

Let's install the Python libraries to work with Spark like PySpark and findspark (to find the Spark installation and validate that the installation is correct):

In [None]:
!pip install -q findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=5b013c0c3d77e8ffdce39ba95b47d6c593aeb8725ff5ba84561d27634c26a123
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

Finally, let's set the environment variables for JAVA_HOME and SPARK_HOME file paths:

In [None]:
import os

!echo $SPARK_VERSION
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
# os.environ["SPARK_HOME"] = "/content/spark-3.2-bin-hadoop3.2"
# %env SPARK_HOME=/content/$SPARK_VERSION-bin-hadoop3.2

os.environ["SPARK_HOME"] = "/content/" + os.environ["SPARK_VERSION"] + "-bin-hadoop" + os.environ["HADOOP_VERSION"]

# !update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

# Print the versions
!java -version
!python --version
!echo $SPARK_HOME

spark-3.2.3
env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
openjdk version "11.0.17" 2022-10-18
OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu220.04)
OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu220.04, mixed mode, sharing)
Python 3.8.10
/content/spark-3.2.3-bin-hadoop3.2


Once everything is set up, let's check that the installation was successful using `findspark`:

In [None]:
import findspark
findspark.init()

Set JAVA_HOME to PATH:

In [None]:
!ls -l

!echo $JAVA_HOME/bin
!export PATH=$PATH:$JAVA_HOME/bin
!echo $PATH

total 294092
drwx------  5 root root      4096 Feb 13 18:54 drive
drwxr-xr-x  1 root root      4096 Feb 10 14:33 sample_data
drwxr-xr-x 13  501 1000      4096 Nov 14 17:54 spark-3.2.3-bin-hadoop3.2
-rw-r--r--  1 root root 301136158 Nov 14 18:47 spark-3.2.3-bin-hadoop3.2.tgz
/usr/lib/jvm/java-8-openjdk-amd64/bin
/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin


Since we are going to be reading files from S3, we need to install the AWS CLI and configure it with the AWS credentials. Then, we install `s3fs` to mount our S3 Bucket as a filesystem that Pandas can read from:

In [None]:
!pip install awscli

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting awscli
  Downloading awscli-1.27.69-py3-none-any.whl (4.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m34.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting PyYAML<5.5,>=3.10
  Downloading PyYAML-5.4.1-cp38-cp38-manylinux1_x86_64.whl (662 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m662.4/662.4 KB[0m [31m51.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting colorama<0.4.5,>=0.2.5
  Downloading colorama-0.4.4-py2.py3-none-any.whl (16 kB)
Collecting rsa<4.8,>=3.1.2
  Downloading rsa-4.7.2-py3-none-any.whl (34 kB)
Collecting s3transfer<0.7.0,>=0.6.0
  Downloading s3transfer-0.6.0-py3-none-any.whl (79 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.6/79.6 KB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
Collecting botocore==1.29.69
  Downloading botocore-1.29.69-py3-none-any.whl (10.4 MB)
[2K     [9

In [None]:
!aws configure set aws_access_key_id your_access_key
!aws configure set aws_secret_access_key your_access_secret_key

In [None]:
!pip install s3fs

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting s3fs
  Downloading s3fs-2023.1.0-py3-none-any.whl (27 kB)
Collecting aiobotocore~=2.4.2
  Downloading aiobotocore-2.4.2-py3-none-any.whl (66 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.8/66.8 KB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
Collecting aioitertools>=0.5.1
  Downloading aioitertools-0.11.0-py3-none-any.whl (23 kB)
Collecting botocore<1.27.60,>=1.27.59
  Downloading botocore-1.27.59-py3-none-any.whl (9.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.1/9.1 MB[0m [31m48.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: aioitertools, botocore, aiobotocore, s3fs
  Attempting uninstall: botocore
    Found existing installation: botocore 1.29.69
    Uninstalling botocore-1.29.69:
      Successfully uninstalled botocore-1.29.69
[31mERROR: pip's dependency resolver does not currently take into account all the

# Settting up the Spark session

We are going to be using the `SparkSession`. **The Spark session is the entry point to the Spark environment**. If we want access to the Spark SQL functionality, we need to create a Spark session. The Spark session is built on top of the `Spark context`. The Spark context is the entry point to the Spark cluster. The Spark session is built on top of the Spark context.

We are going only to need to use the Spark session to read the data from S3 and write the output files to S3. Spark Context will be used internally by the Spark session and you don't need to worry about it.

In [None]:
from pyspark.sql import SparkSession

def get_spark(ACCESS_KEY=None, SECRET_KEY=None, TOKEN=None):
    spark = SparkSession.builder.master("local[4]").appName('SparkDelta') \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.jars.packages", 
                "io.delta:delta-core_2.12:1.1.0,"
                "org.apache.hadoop:hadoop-aws:3.2.3,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.180") \
        .getOrCreate()

    # This is mandate config on spark session to use AWS S3
    spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
    spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    # If we want to use AWSCLI credentials provider:
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

    # If we are using temporary sessions:
    # spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', ACCESS_KEY)
    # spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', SECRET_KEY)
    # spark._jsc.hadoopConfiguration().set("fs.s3a.session.token", TOKEN)
    # spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider')
    spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
    # spark.sparkContext.setLogLevel("DEBUG")
        
    return spark

If you notice in the previous code cell, we have already imported the `SparkSession` from the `pyspark.sql` module. We are going to use the `SparkSession.builder` to create the Spark session. 

We are going to set the `appName` to `SparkDelta` and the `master` to `local[*]` (this will run the Spark session in the local machine). 

We are also going to set the `config` for the AWS S3 connector. We are going to set the `spark.jars.packages` to `org.apache.hadoop:hadoop-aws:3.2.3` **(this version of the connector should match the version of Spark that we are using)**. 

We have the option to set the `spark.hadoop.fs.s3a.access.key` and `spark.hadoop.fs.s3a.secret.key` to the AWS credentials that we configured in the previous code cell. However, we are using `AWSCLI` credentials provider chain to handle the storing of credentials. This means that we don't need to set the AWS credentials in the Spark session.

Finally, we are going to call the `getOrCreate()` method to create the Spark session. After that, we are passing some `hadoopConfiguration` to the Spark session. This is to make sure that the Spark session is using the AWS CLI credentials provider chain to handle the storing of credentials and using `fs.s3a.impl` to use the S3A file system implementation in order that we can read the files from S3 as a file system.

In [None]:
# Let's create a spark session
spark = get_spark()

print(spark)
spark.version

<pyspark.sql.session.SparkSession object at 0x7f14b4524fd0>


'3.2.3'

## Running SQL queries programatically

The nex code examples are based on the San Francisco Bike Sharing dataset where possible. The dataset has been uploaded to our S3 bucket, but you can get the files from kaggle as well: [https://www.kaggle.com/datasets/benhamner/sf-bay-area-bike-share](https://www.kaggle.com/datasets/benhamner/sf-bay-area-bike-share)

In [None]:
# Create a Dataframe based on your dataset.Here, we will use the SF Bike Sharing trip dataset.
trip_df = spark.read.csv("s3a://msin0166-spark-workshop/data/sf_bike_sharing_data/trip.csv", header=True)

In [None]:
trip_df.show()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
# Create a SQL temporary view (more below)
trip_df.createOrReplaceTempView("trip")

In [None]:
#Query the temporary table
sqlDF = spark.sql("SELECT * FROM trip where duration < 70")
sqlDF.show()

+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
| 6115|      69|8/30/2013 16:30|       2nd at Folsom|              62|8/30/2013 16:31|       2nd at Folsom|            62|    633|       Subscriber|   94107|
| 7416|      62|8/31/2013 20:52|Embarcadero at Sa...|              60|8/31/2013 20:53|Embarcadero at Sa...|            60|    511|         Customer|    null|
| 7250|      68|8/31/2013 16:53|University and Em...

In [None]:
subscribers=spark.sql("SELECT * FROM trip where subscription_type==\"Subscriber\"")
subscribers.show()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
#Count the number of subscribers
subscribers.count()

566746

## Global temporary view 

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.
<br/>
<br/>
Source: Apache Spark documentation
https://spark.apache.org/docs/latest/sql-getting-started.html#global-temporary-view



In [None]:
#Executing a query against the newly created Subscribers dataset will trigger an error.
spark.sql("SELECT * FROM subscribers where duration < 70")

AnalysisException: ignored

<img src="https://media.tenor.com/images/a1343b4e30eeca94b3edf232a767bd31/tenor.gif"/>

In [None]:
# Create a temporary view of subscribers that can be further queried
subscribers.createOrReplaceTempView("subscribers")

In [None]:
#We can now query the Subscribers table
spark.sql("SELECT * FROM subscribers WHERE start_station_name LIKE \"San Jose%\" ").show()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|              10|8/29/2013 11:30|  San Jose City Hall|            10|     26|       Subscriber|   95060|
|4258|     114|8/29/2013 11:33|  San Jose City Hall|              10|8/29/2013 11:35|         MLK Library|            11|    107|       Subscriber|   95060|
|4242|     141|8/29/2013 11:25|  San Jose City Hall|      

## Reading multiple types: 

Spark SQL supports reading the following formats:
- CSV
- JSON
- Avro
- Parquet
- Hive tables
- ORC tables

It can also create a JDBC connection to existing databases.

For more details, please read the Spark documentation
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#data-sources






## Spark SQL architecture

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2016/12/Spark-SQL-Architecture-Spark-SQL-Edureka-1.png"/>

**Source** : Edureka

For a better understanding of Spark SQL, read this article: https://www.edureka.co/blog/spark-sql-tutorial/

# Inferring schema

Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.

A practical example can be found here: https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#inferring-the-schema-using-reflection

## Bucketing, Sorting and Partitioning

For file-based data source, it is also possible to bucket and sort or partition the output. Bucketing and sorting are applicable only to persistent tables

The bucket by command allows you to sort the rows of Spark SQL table by a certain column. If you then cache the sorted table, you can make subsequent joins faster.

Bucketing is an optimization technique in Spark SQL that uses buckets and bucketing columns to determine data partitioning. When applied properly bucketing can lead to join optimizations by avoiding shuffles (aka exchanges) of tables participating in the join. The talk will give you the necessary information so you can use bucketing to optimize Spark SQL structured queries.

Source: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#bucketing-sorting-and-partitioning

In [None]:
# Bucketing and sorting - Split the data into 42 buckets by the start station name column
subscribers.write.bucketBy(42, "start_station_name").sortBy("duration").saveAsTable("bucketed_subscribers")


In [None]:
# We can now query the bucketed table
from time import time
start_time=time()
spark.sql("SELECT * FROM bucketed_subscribers LIMIT 10").show()
end_time=time()

+------+--------+---------------+------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|    id|duration|     start_date|start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+------+--------+---------------+------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|382320|     100| 7/28/2014 7:03|   Beale at Market|              56| 7/28/2014 7:04|Temporary Transba...|            55|    417|       Subscriber|   94112|
|359590|     100| 7/11/2014 9:32|   Beale at Market|              56| 7/11/2014 9:34|Temporary Transba...|            55|    563|       Subscriber|   94105|
|339009|     100|6/25/2014 14:04|    Post at Kearny|              47|6/25/2014 14:06|   Market at Sansome|            77|    334|       Subscriber|   94105|
|308155|     100|  6/3/2014 7:47|   Beale at Market|      

In [None]:
print("Time spent to query bucketed table: ",end_time - start_time)

Time spent to query bucketed table:  0.6009030342102051


In [None]:
start_time_unbucketed=time()
spark.sql("SELECT * FROM subscribers LIMIT 10").show()
end_time_unbucketed=time()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
print("Time spent to query unbucketed table: ", end_time_unbucketed - start_time_unbucketed)

Time spent to query unbucketed table:  0.692385196685791


## Partitioning


In [None]:
#We partition the data by start_station_name and store it in a Parquet format - More on Parquet files later. For a better understanding, read the documentation.
subscribers.write.partitionBy("start_station_name").format("parquet").save("subscribers.parquet")

In [None]:
#We will now read the data
subscribers_in_parquet_format=spark.read.parquet("subscribers.parquet")
subscribers_in_parquet_format.show()

+------+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+--------------------+
|    id|duration|     start_date|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|  start_station_name|
+------+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+--------------------+
|386032|     544| 7/30/2014 7:22|              70| 7/30/2014 7:31|Embarcadero at Fo...|            51|    571|       Subscriber|   94303|San Francisco Cal...|
|386029|     602| 7/30/2014 7:21|              70| 7/30/2014 7:31|Embarcadero at Fo...|            51|    634|       Subscriber|   94062|San Francisco Cal...|
|386028|     807| 7/30/2014 7:20|              70| 7/30/2014 7:33|     Clay at Battery|            41|    260|       Subscriber|   94025|San Francisco Cal...|
|386024|     665| 7/30/2014 7:12|             

In [None]:
#Reading a partition from the parquet file
davis_at_jackson_subscribers=spark.read.parquet("subscribers.parquet/start_station_name=Davis at Jackson")
davis_at_jackson_subscribers.show()

+------+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|    id|duration|     start_date|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+------+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|386045|     215| 7/30/2014 7:28|              42| 7/30/2014 7:31|Commercial at Mon...|            45|    448|       Subscriber|   94111|
|385974|    1419| 7/30/2014 6:49|              42| 7/30/2014 7:13|    Davis at Jackson|            42|    321|       Subscriber|   94111|
|385930|     220| 7/30/2014 1:43|              42| 7/30/2014 1:47|Washington at Kearny|            46|    134|       Subscriber|   94108|
|385673|     382|7/29/2014 18:56|              42|7/29/2014 19:03|Temporary Transba...|            55|    504|       Subscriber|   94602|
|385664|     197|7/29/2014 18:52| 

In [None]:
#Checking we selected the relevant data
spark.sql("SELECT * FROM subscribers WHERE start_station_name=\'Davis at Jackson\'").show()

+-----+--------+---------------+------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4799|     267|8/29/2013 17:19|  Davis at Jackson|              42|8/29/2013 17:24|   Steuart at Market|            74|    399|       Subscriber|   94105|
| 4890|     347|8/29/2013 18:13|  Davis at Jackson|              42|8/29/2013 18:19|     Beale at Market|            56|    326|       Subscriber|   94123|
| 5004|     430|8/29/2013 20:05|  Davis at Jackson|              42|8/29/2013 20:12|   Market at Sansome|            77|    377|       Subscriber|   94110|
| 4462|     646|8/29/2013 13:07|  Davis at Jackson|             

By checking the zip_code and start_date fields, data is the same, yet the partitioned table does not contain the partition key.

## Practical examples

- Read CSV
- Read JSON
- Read Parquet
- Select multiple columns with PySpark vs SparkSQL query
- Filter, group by
- Join two tables
- Print schema
- Save to persistent tables
- Cache data in memory
- Programatically specifying the schema
- Partinioning example

## Read CSV


In [None]:
# Let's get an spark session
spark = get_spark()

station_csv_df=spark.read.csv("s3a://msin0166-spark-workshop/data/sf_bike_sharing_data/station.csv",header=True)
station_csv_df.show()

+---+--------------------+------------------+-------------------+----------+------------+-----------------+
| id|                name|               lat|               long|dock_count|        city|installation_date|
+---+--------------------+------------------+-------------------+----------+------------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|    San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|    San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|    San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|    San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|    San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|    San Jose|         8/7/2013|
|  8| San Salvador at 1st|  

In [None]:
#Check dataframe schema
station_csv_df.printSchema()


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dock_count: string (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



You can then register this dataframe as a temporary table and query it as shown in previous examples.

## Read JSON
**Note**: This example is based on the Yelp business dataset, provided on Moodle.

In [None]:
yelp_business= spark.read.json("s3a://msin0166-spark-workshop/data/yelp_business_data/yelp_academic_dataset_business.json")
yelp_business.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|              city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|2818 E Camino Ace...|{null, null, null...|1SWheh84yJXfytovI...|   Golf, Active Life|           Phoenix|                null|      0|   33.5221425|   -112.0184807|Arizona Biltmore ...|      85016|           5|  3.0|   AZ|
|30 Eglinton Avenue W|{null, null, u'fu...|QXAEGFB4oINsVuTFx...|Specialty Food, R...|       Mississauga|{9:0-1:0

In [None]:
#Register Yelp business as temporary table and query it
yelp_business.createOrReplaceTempView("yelp_business")

In [None]:
#How many businesses from Las Vegas are listed on our JSON based Yelp business dataset?
spark.sql("SELECT COUNT(*) FROM (SELECT * FROM yelp_business WHERE city==\'Las Vegas\')").show()

+--------+
|count(1)|
+--------+
|   29370|
+--------+



## Read Parquet
**Note**: This is based on the user data dataset provided on Moodle

In [None]:
users_in_parquet = spark.read.parquet("s3a://msin0166-spark-workshop/data/userdata2.parquet")

In [None]:
users_in_parquet.show()

+-------------------+----+----------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+--------+
|  registration_dttm|  id|first_name|last_name|               email|gender|     ip_address|              cc|             country| birthdate|   salary|               title|comments|
+-------------------+----+----------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+--------+
|2016-02-03 13:36:39|   1|    Donald|    Lewis|dlewis0@clickbank...|  Male|  102.22.124.20|                |           Indonesia|  7/9/1972|140249.37|Senior Financial ...|        |
|2016-02-03 00:22:28|   2|    Walter|  Collins|wcollins1@bloglov...|  Male|   247.28.26.93|3587726269478025|               China|          |     null|                    |        |
|2016-02-03 18:29:04|   3|  Michelle|Henderson|mhenderson2@geoci...|Female| 193.68.146.150|    

In [None]:
users_in_parquet.registerTempTable("users_2")
spark.sql("SELECT * FROM users_2 WHERE country==\'China\'").show()



+-------------------+---+----------+----------+--------------------+------+---------------+------------------+-------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name| last_name|               email|gender|     ip_address|                cc|country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+----------+--------------------+------+---------------+------------------+-------+----------+---------+--------------------+--------------------+
|2016-02-03 00:22:28|  2|    Walter|   Collins|wcollins1@bloglov...|  Male|   247.28.26.93|  3587726269478025|  China|          |     null|                    |                    |
|2016-02-03 08:11:34|  8|     Louis|   Simmons|   lsimmons7@icio.us|  Male|    18.69.80.15|                  |  China|  6/1/1992| 90744.86|    Product Engineer|                    |
|2016-02-03 23:36:58| 13|    Evelyn|    Harvey|   eharveyc@time.com|      |  254.174.154.7

## Select multiple columns with PySpark vs SparkSQL query

In [None]:
# Select columns using PySpark
users_in_parquet.select("first_name","last_name").show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Donald|    Lewis|
|    Walter|  Collins|
|  Michelle|Henderson|
|      Lori|   Hudson|
|    Howard|   Miller|
|   Frances|    Adams|
|    Steven|   Hanson|
|     Louis|  Simmons|
|     Keith|   Parker|
|     Wanda|   Walker|
|   Kathryn|   Weaver|
|    Philip|     Ward|
|    Evelyn|   Harvey|
|    Andrea|     Lane|
|     Bobby|  Vasquez|
|   Kenneth|   Gibson|
|     Emily|     Hill|
|     Kelly|   Fowler|
|     Diana|   Howell|
|    Johnny|  Collins|
+----------+---------+
only showing top 20 rows



In [None]:
# Select columns using Spark SQL
spark.sql("SELECT first_name,last_name FROM users_2 LIMIT 20").show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Donald|    Lewis|
|    Walter|  Collins|
|  Michelle|Henderson|
|      Lori|   Hudson|
|    Howard|   Miller|
|   Frances|    Adams|
|    Steven|   Hanson|
|     Louis|  Simmons|
|     Keith|   Parker|
|     Wanda|   Walker|
|   Kathryn|   Weaver|
|    Philip|     Ward|
|    Evelyn|   Harvey|
|    Andrea|     Lane|
|     Bobby|  Vasquez|
|   Kenneth|   Gibson|
|     Emily|     Hill|
|     Kelly|   Fowler|
|     Diana|   Howell|
|    Johnny|  Collins|
+----------+---------+



In [None]:
# Select columns using PySpark
subscribers.select("start_station_name","end_station_name","bike_id","duration").show()

+--------------------+--------------------+-------+--------+
|  start_station_name|    end_station_name|bike_id|duration|
+--------------------+--------------------+-------+--------+
|South Van Ness at...|South Van Ness at...|    520|      63|
|  San Jose City Hall|  San Jose City Hall|    661|      70|
|Mountain View Cit...|Mountain View Cit...|     48|      71|
|  San Jose City Hall|  San Jose City Hall|     26|      77|
|South Van Ness at...|      Market at 10th|    319|      83|
| Golden Gate at Polk| Golden Gate at Polk|    527|     103|
|Santa Clara at Al...|    Adobe on Almaden|    679|     109|
| San Salvador at 1st| San Salvador at 1st|    687|     111|
|South Van Ness at...|South Van Ness at...|    553|     113|
|  San Jose City Hall|         MLK Library|    107|     114|
|     Spear at Folsom|Embarcadero at Br...|    368|     125|
|    San Pedro Square|Santa Clara at Al...|     26|     126|
|Mountain View Cal...|Mountain View Cal...|    140|     129|
|   2nd at South Park|  

In [None]:
# Select columns using Spark SQL
spark.sql("SELECT start_station_name,end_station_name,bike_id,duration FROM subscribers LIMIT 20").show()

+--------------------+--------------------+-------+--------+
|  start_station_name|    end_station_name|bike_id|duration|
+--------------------+--------------------+-------+--------+
|South Van Ness at...|South Van Ness at...|    520|      63|
|  San Jose City Hall|  San Jose City Hall|    661|      70|
|Mountain View Cit...|Mountain View Cit...|     48|      71|
|  San Jose City Hall|  San Jose City Hall|     26|      77|
|South Van Ness at...|      Market at 10th|    319|      83|
| Golden Gate at Polk| Golden Gate at Polk|    527|     103|
|Santa Clara at Al...|    Adobe on Almaden|    679|     109|
| San Salvador at 1st| San Salvador at 1st|    687|     111|
|South Van Ness at...|South Van Ness at...|    553|     113|
|  San Jose City Hall|         MLK Library|    107|     114|
|     Spear at Folsom|Embarcadero at Br...|    368|     125|
|    San Pedro Square|Santa Clara at Al...|     26|     126|
|Mountain View Cal...|Mountain View Cal...|    140|     129|
|   2nd at South Park|  

## Filtering and groupping by

In [None]:
# Filter data using PySpark
subscribers.filter((subscribers.duration>60) & (subscribers.duration<=100)).show()

+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
| 4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
| 4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
| 4251|      77|8/29/2013 11:29|  San Jose City Hall

In [None]:
# Filter data using Spark SQL
spark.sql("SELECT * FROM subscribers WHERE duration >60 AND duration<=100").show()

+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
| 4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
| 4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
| 4251|      77|8/29/2013 11:29|  San Jose City Hall

In [None]:
# Group by using PySpark
#We have to first convert the duration to double type
from pyspark.sql.types import DoubleType
subscribers = subscribers.withColumn("duration", subscribers["duration"].cast(DoubleType()))
subscribers.groupBy("bike_id").sum("duration").show()


+-------+-------------+
|bike_id|sum(duration)|
+-------+-------------+
|    675|     101382.0|
|    467|     476556.0|
|    296|     156101.0|
|    691|      75062.0|
|    125|      39139.0|
|    451|     815995.0|
|    666|      54649.0|
|    124|      88313.0|
|    447|     923226.0|
|    591|     830507.0|
|     51|      88612.0|
|    574|    1034693.0|
|    613|     914909.0|
|    307|      82875.0|
|    475|     684313.0|
|    544|     797837.0|
|    334|     934758.0|
|    577|     784963.0|
|    581|     906101.0|
|    205|      86722.0|
+-------+-------------+
only showing top 20 rows



In [None]:
# Group by using Spark SQL
spark.sql("SELECT bike_id, SUM(duration) AS duration FROM subscribers GROUP BY bike_id").show()

+-------+---------+
|bike_id| duration|
+-------+---------+
|    675| 101382.0|
|    467| 476556.0|
|    296| 156101.0|
|    691|  75062.0|
|    125|  39139.0|
|    451| 815995.0|
|    666|  54649.0|
|    124|  88313.0|
|    447| 923226.0|
|    591| 830507.0|
|     51|  88612.0|
|    574|1034693.0|
|    613| 914909.0|
|    307|  82875.0|
|    475| 684313.0|
|    544| 797837.0|
|    334| 934758.0|
|    577| 784963.0|
|    581| 906101.0|
|    205|  86722.0|
+-------+---------+
only showing top 20 rows



## Joining two tables

In [None]:
#we will first create two tables to be joined 
bike_and_duration=spark.sql("SELECT bike_id,duration FROM subscribers WHERE bike_id <5000")
bike_and_duration.show()

+-------+--------+
|bike_id|duration|
+-------+--------+
|    520|      63|
|    661|      70|
|     48|      71|
|     26|      77|
|    319|      83|
|    527|     103|
|    679|     109|
|    687|     111|
|    553|     113|
|    107|     114|
|    368|     125|
|     26|     126|
|    140|     129|
|    371|     130|
|    503|     134|
|    408|     138|
|     26|     141|
|    319|     142|
|    564|     142|
|    574|     144|
+-------+--------+
only showing top 20 rows



In [None]:
bike_and_stations=spark.sql("SELECT bike_id,start_station_name,end_station_name FROM subscribers WHERE bike_id <5000")
bike_and_stations.show()

+-------+--------------------+--------------------+
|bike_id|  start_station_name|    end_station_name|
+-------+--------------------+--------------------+
|    520|South Van Ness at...|South Van Ness at...|
|    661|  San Jose City Hall|  San Jose City Hall|
|     48|Mountain View Cit...|Mountain View Cit...|
|     26|  San Jose City Hall|  San Jose City Hall|
|    319|South Van Ness at...|      Market at 10th|
|    527| Golden Gate at Polk| Golden Gate at Polk|
|    679|Santa Clara at Al...|    Adobe on Almaden|
|    687| San Salvador at 1st| San Salvador at 1st|
|    553|South Van Ness at...|South Van Ness at...|
|    107|  San Jose City Hall|         MLK Library|
|    368|     Spear at Folsom|Embarcadero at Br...|
|     26|    San Pedro Square|Santa Clara at Al...|
|    140|Mountain View Cal...|Mountain View Cal...|
|    371|   2nd at South Park|   2nd at South Park|
|    503|     Clay at Battery|     Beale at Market|
|    408|     Post at Kearney|     Post at Kearney|
|     26|  S

In [None]:
#Join two tables using PySpark
bike_and_duration.join(bike_and_stations,on='bike_id',how='left').show()


+-------+--------+--------------------+--------------------+
|bike_id|duration|  start_station_name|    end_station_name|
+-------+--------+--------------------+--------------------+
|    107|     114|  San Jose City Hall|         MLK Library|
|    107|     114|         MLK Library|San Jose Diridon ...|
|    107|     114|  San Jose City Hall|San Jose Diridon ...|
|    107|     114|San Jose Diridon ...| San Salvador at 1st|
|    107|     114| San Salvador at 1st|Paseo de San Antonio|
|    107|     114|Paseo de San Antonio| San Salvador at 1st|
|    107|     114|San Jose Diridon ...|  San Jose City Hall|
|    107|     114|  San Jose City Hall| San Salvador at 1st|
|    107|     114| San Salvador at 1st|         MLK Library|
|    107|     114|       St James Park|    San Pedro Square|
|    107|     114|    San Pedro Square|SJSU - San Salvad...|
|    107|     114|San Jose Diridon ...|Arena Green / SAP...|
|    107|     114|Arena Green / SAP...|SJSU - San Salvad...|
|    107|     114|SJSU -

In [None]:
#Join two tables using Spark SQL
#Create views for each dataframe for further querying
bike_and_duration.createTempView("bike_and_duration")
bike_and_stations.createTempView("bike_and_stations") 



In [None]:
spark.sql("SELECT bike_and_duration.bike_id, bike_and_duration.duration, bike_and_stations.start_station_name,bike_and_stations.end_station_name FROM bike_and_duration LEFT JOIN bike_and_stations ON bike_and_duration.bike_id==bike_and_stations.bike_id").show()

+-------+--------+--------------------+--------------------+
|bike_id|duration|  start_station_name|    end_station_name|
+-------+--------+--------------------+--------------------+
|    107|     114|  San Jose City Hall|         MLK Library|
|    107|     114|         MLK Library|San Jose Diridon ...|
|    107|     114|  San Jose City Hall|San Jose Diridon ...|
|    107|     114|San Jose Diridon ...| San Salvador at 1st|
|    107|     114| San Salvador at 1st|Paseo de San Antonio|
|    107|     114|Paseo de San Antonio| San Salvador at 1st|
|    107|     114|San Jose Diridon ...|  San Jose City Hall|
|    107|     114|  San Jose City Hall| San Salvador at 1st|
|    107|     114| San Salvador at 1st|         MLK Library|
|    107|     114|       St James Park|    San Pedro Square|
|    107|     114|    San Pedro Square|SJSU - San Salvad...|
|    107|     114|San Jose Diridon ...|Arena Green / SAP...|
|    107|     114|Arena Green / SAP...|SJSU - San Salvad...|
|    107|     114|SJSU -

## Save to persistent tables

DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. 

Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. 

Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.

Source: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#saving-to-persistent-tables

In [None]:
joined_table=spark.sql("SELECT bike_and_duration.bike_id, bike_and_duration.duration, bike_and_stations.start_station_name,bike_and_stations.end_station_name FROM bike_and_duration LEFT JOIN bike_and_stations ON bike_and_duration.bike_id==bike_and_stations.bike_id LIMIT 10")

In [None]:
#joined_table.write.parquet('bike_station_duration_joined_1.parquet').saveAsTable("bike_station_duration_joined_final")
joined_table.write.mode("append").saveAsTable("bike_station_duration_table")

In [None]:
spark.sql("SELECT * FROM bike_station_duration_table LIMIT 10").show()

+-------+--------+--------------------+--------------------+
|bike_id|duration|  start_station_name|    end_station_name|
+-------+--------+--------------------+--------------------+
|    107|     114|  San Jose City Hall|         MLK Library|
|    107|     114|         MLK Library|San Jose Diridon ...|
|    107|     114|  San Jose City Hall|San Jose Diridon ...|
|    107|     114|San Jose Diridon ...| San Salvador at 1st|
|    107|     114| San Salvador at 1st|Paseo de San Antonio|
|    107|     114|Paseo de San Antonio| San Salvador at 1st|
|    107|     114|San Jose Diridon ...|  San Jose City Hall|
|    107|     114|  San Jose City Hall| San Salvador at 1st|
|    107|     114| San Salvador at 1st|         MLK Library|
|    107|     114|       St James Park|    San Pedro Square|
+-------+--------+--------------------+--------------------+



## Caching data in memory

For more details, please read the documentation: https://spark.apache.org/docs/latest/sql-performance-tuning.html

In [None]:
from time import time

start_time=time()
subscribers.show()
end_time=time()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|    63.0|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    70.0|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|    71.0|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|    77.0|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
print("Processing without cache time difference",end_time - start_time)

Processing without cache time difference 0.9082796573638916


In [None]:
subscribers.cache()

DataFrame[id: string, duration: double, start_date: string, start_station_name: string, start_station_id: string, end_date: string, end_station_name: string, end_station_id: string, bike_id: string, subscription_type: string, zip_code: string]

In [None]:
cache_start_time=time()
subscribers.show()
cache_end_time=time()


+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|    63.0|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    70.0|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|    71.0|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|    77.0|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
print("Processing with cache time difference",cache_end_time - cache_start_time)

Processing with cache time difference 4.388555288314819


## Specify the schema in a programmatic way

For more details, read the Spark documentation: https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#programmatically-specifying-the-schema

In [None]:
# Load a text file and convert each line to a tuple.
from pyspark.sql.types import StructField, StructType, StringType

table_without_schema = spark.sql("SELECT * FROM bike_station_duration_table LIMIT 10").rdd

# The schema is encoded in a string.
schemaString = "bike_id duration start_station_name end_station_name"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schematable = spark.createDataFrame(table_without_schema, schema)

# Register the DataFrame as a table.
schematable.registerTempTable("table_with_schema")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT bike_id FROM table_with_schema")




In [None]:
results.show()

+-------+
|bike_id|
+-------+
|    107|
|    107|
|    107|
|    107|
|    107|
|    107|
|    107|
|    107|
|    107|
|    107|
+-------+



## Spark SQL vs querying Pandas DataFrames

In [None]:
import pandas as pd
from time import time

start_time=time()

pandas_df = pd.read_csv("s3://msin0166-spark-workshop/data/sf_bike_sharing_data/station.csv")

end_time=time()

In [None]:
start_time_spark=time()
station_df = spark.read.csv("s3a://msin0166-spark-workshop/data/sf_bike_sharing_data/station.csv",header=True)
end_time_spark=time()

<div class="alert alert-block alert-info">

We may have noticed that we are usin `s3a//` for Spark and `s3://` for Pandas. This is because Spark is using the S3A file system implementation and Pandas is using the S3 file system implementation from the `s3fs` library.

<div>

In [None]:
station_df.show()

+---+--------------------+------------------+-------------------+----------+------------+-----------------+
| id|                name|               lat|               long|dock_count|        city|installation_date|
+---+--------------------+------------------+-------------------+----------+------------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|    San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|    San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|    San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|    San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|    San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|    San Jose|         8/7/2013|
|  8| San Salvador at 1st|  

In [None]:
#Query Spark DataFrame with Spark SQL
station_df.createTempView("station")
spark.sql("SELECT * FROM station where city=\'San Jose\'").show()

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|San Jose|         8/7/2013|
|  8| San Salvador at 1st|         37.330165|-121.88583100000001

In [None]:
# Query Pandas DataFrame
pandas_df.loc[pandas_df['city']== 'San Jose']

Unnamed: 0,id,name,lat,long,dock_count,city,installation_date
0,2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013
1,3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
2,4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
3,5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013
4,6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013
5,7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013
6,8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013
7,9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013
8,10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013
9,11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013


# Practice!
**Note**: Read the Spark documentation and check Stack Overflow when you get stuck.

<img src="https://media.giphy.com/media/8vkEKXvnXkyCZx8w6b/giphy.gif"/>

# Exercise 1:
Read a file of your choice in JSON format and extract the first 10 rows
<br/>
**Note**: You will have to load the file in your data folder first


Here, you can use the Iris flower dataset provided in JSON format:

`s3a://msin0166-spark-workshop/data/iris_flowers_data/iris.json`

## Exercise 2:
Create a new dataset by filtering your existing dataset. Use both the PySpark and Spark SQL functions shown in the **Practical examples** section

Example: By using the Yelp dataset, create a dataframe consisting of top businesses (ranked with 4 stars or higher) listed on Yelp


## Exercise 3: Use Spark SQL and apply the GROUP BY statement to your existing dataset. 

## Exercise 4
Create a sample dataset and programmatically specify its schema
<br/>
**Note**: Follow the code in the **Practical examples** section

## Exercise 5 

Count the number of records in your dataset by using a SQL query

#Extra exercise

## Exercise 6

Convert a CSV file of choice to Parquet format by using PySpark.
<br/>
Read data from the Parquet file and select 50 rows of data.

Note: Here, we used the weather.csv dataset from: [s3://msin0166-spark-workshop/data/sf_bike_sharing_data/weather.csv](s3://msin0166-spark-workshop/data/sf_bike_sharing_data/weather.csv) and that you can also get from: [https://www.kaggle.com/datasets/benhamner/sf-bay-area-bike-share](https://www.kaggle.com/datasets/benhamner/sf-bay-area-bike-share)