<h1>UCL School of Management</h1>
<h2>MSIN0166 Data Engineering</h2>
<h4>SparkSQL workshop - Theory & Practice</h4>

<img src="https://databricks.com/wp-content/uploads/2015/03/Screen-Shot-2015-03-23-at-3.42.56-PM.png"></img>




# What is Spark SQL?

Many data scientists, analysts, and general business intelligence users rely on interactive SQL queries for exploring data. Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine. 

- Spark SQL enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data. It also provides powerful integration with the rest of the Spark ecosystem (e.g., integrating SQL query processing with machine learning).

- It brings native support for SQL to Spark and streamlines the process of querying data stored both in RDDs (Spark’s distributed datasets) and in external sources. It conveniently blurs the lines between RDDs and relational tables. Unifying these powerful abstractions makes it easy for developers to intermix SQL commands querying external data with complex analytics, all within in a single application. 

- Spark SQL also includes a cost-based optimizer, columnar storage, and code generation to make queries fast. At the same time, it scales to thousands of nodes and multi-hour queries using the Spark engine, which provides full mid-query fault tolerance, without having to worry about using a different engine for historical data.

*Source: Databricks*

For more details, go through Spark documentation: https://spark.apache.org/docs/latest/

<br/>

# Why use Spark SQL
- Query data stored in various formats (e.g. Parquet, Hive tables) by using SQL
- Reduce amount of time spent for processing queries
- Conduct data analysis at scale
- Reduce time spent on reading documentation (e.g. Pandas documentation) for data cleaning, processing or querying and use SQL instead.


## Spark SQL: definition & facts




**Spark SQL**: Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. It also provides powerful integration with the rest of the Spark ecosystem (e.g., integrating SQL query processing with machine learning).

<br/>

 **Did you know?** 

1. SQL was invented by IBM researchers Raymond Boyce and Donald Chamberlin. The programming language, known then as SEQUEL, was created following the publishing of Edgar Frank Todd's paper, "A Relational Model of Data for Large Shared Data Banks," in 1970.
<br/>
<br/>
2. There are five types of SQL Commands which can be classified as:

    - DDL(Data Definition Language).
    - DML(Data Manipulation Language).
    - DQL(Data Query Language).
    - DCL(Data Control Language).
    - TCL(Transaction Control Language).


## Prerequisite code
**Note**: Please run the cells below before any other code cell execution

In [None]:
!pip install pyspark


In [None]:
import pyspark

# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# get the context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark)

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7f2128e2a2b0>


## Running SQL queries programatically
**Note**: Examples below will be based on the San Francisco Bike Sharing dataset where possible.



In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
# Create a Dataframe based on your dataset.Here, we will use the SF Bike Sharing trip dataset.
trip_df=sqlContext.read.csv("s3://msin0166-spark-workshop/data/sf_bike_sharing_data/trip.csv",header=True)

In [None]:
trip_df.show()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
#Create a  SQL temporary view (more below)
trip_df.createOrReplaceTempView("trip")

In [None]:
#Query the temporary table
sqlDF = spark.sql("SELECT * FROM trip where duration < 70")
sqlDF.show()

+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
| 6115|      69|8/30/2013 16:30|       2nd at Folsom|              62|8/30/2013 16:31|       2nd at Folsom|            62|    633|       Subscriber|   94107|
| 7416|      62|8/31/2013 20:52|Embarcadero at Sa...|              60|8/31/2013 20:53|Embarcadero at Sa...|            60|    511|         Customer|    null|
| 7250|      68|8/31/2013 16:53|University and Em...

In [None]:
subscribers=spark.sql("SELECT * FROM trip where subscription_type==\"Subscriber\"")
subscribers.show()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
#Count the number of subscribers
subscribers.count()

566746

## Global temporary view 

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.
<br/>
<br/>
Source: Apache Spark documentation
https://spark.apache.org/docs/latest/sql-getting-started.html#global-temporary-view



In [None]:
#Executing a query against the newly created Subscribers dataset will trigger an error.
spark.sql("SELECT * FROM subscribers where duration < 70")

AnalysisException: ignored

<img src="https://media.tenor.com/images/a1343b4e30eeca94b3edf232a767bd31/tenor.gif"/>

In [None]:
#Create a temporary view of subscribers that can be further queried
subscribers.createOrReplaceTempView("subscribers")

In [None]:
#We can now query the Subscribers table
spark.sql("SELECT * FROM subscribers WHERE start_station_name LIKE \"San Jose%\" ").show()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|              10|8/29/2013 11:30|  San Jose City Hall|            10|     26|       Subscriber|   95060|
|4258|     114|8/29/2013 11:33|  San Jose City Hall|              10|8/29/2013 11:35|         MLK Library|            11|    107|       Subscriber|   95060|
|4242|     141|8/29/2013 11:25|  San Jose City Hall|      

## Reading multiple types: 

Spark SQL supports reading the following formats:
- CSV
- JSON
- Avro
- Parquet
- Hive tables
- ORC tables

It can also create a JDBC connection to existing databases.

For more details, please read the Spark documentation
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#data-sources






## Spark SQL architecture

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2016/12/Spark-SQL-Architecture-Spark-SQL-Edureka-1.png"/>

**Source** : Edureka

For a better understanding of Spark SQL, read this article: https://www.edureka.co/blog/spark-sql-tutorial/

# Inferring schema

Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.

A practical example can be found here: https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#inferring-the-schema-using-reflection

## Bucketing, Sorting and Partitioning

For file-based data source, it is also possible to bucket and sort or partition the output. Bucketing and sorting are applicable only to persistent tables

The bucket by command allows you to sort the rows of Spark SQL table by a certain column. If you then cache the sorted table, you can make subsequent joins faster.

Bucketing is an optimization technique in Spark SQL that uses buckets and bucketing columns to determine data partitioning. When applied properly bucketing can lead to join optimizations by avoiding shuffles (aka exchanges) of tables participating in the join. The talk will give you the necessary information so you can use bucketing to optimize Spark SQL structured queries.

Source: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#bucketing-sorting-and-partitioning

In [None]:
# Bucketing and sorting - Split the data into 42 buckets by the start station name column
subscribers.write.bucketBy(42, "start_station_name").sortBy("duration").saveAsTable("bucketed_subscribers")


In [None]:
# We can now query the bucketed table
from time import time
start_time=time()
spark.sql("SELECT * FROM bucketed_subscribers LIMIT 10").show()
end_time=time()

+------+--------+----------------+------------------+----------------+----------------+--------------------+--------------+-------+-----------------+--------+
|    id|duration|      start_date|start_station_name|start_station_id|        end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+------+--------+----------------+------------------+----------------+----------------+--------------------+--------------+-------+-----------------+--------+
| 90783|     100|11/10/2013 11:39|  Davis at Jackson|              42|11/10/2013 11:41|    Davis at Jackson|            42|    423|       Subscriber|   94111|
|101343|     100|11/19/2013 12:47|  Davis at Jackson|              42|11/19/2013 12:49|Embarcadero at Va...|            48|    586|       Subscriber|   94602|
|376936|     100| 7/23/2014 17:21|  Davis at Jackson|              42| 7/23/2014 17:22|    Davis at Jackson|            42|    541|       Subscriber|   94132|
|193065|    1000|  2/23/2014 9:11|  Davis at J

In [None]:
print("Time spent to query bucketed table: ",end_time - start_time)

Time spent to query bucketed table:  0.15280866622924805


In [None]:
start_time_unbucketed=time()
spark.sql("SELECT * FROM subscribers LIMIT 10").show()
end_time_unbucketed=time()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
print("Time spent to query unbucketed table: ", end_time_unbucketed - start_time_unbucketed)

Time spent to query unbucketed table:  0.24500536918640137


## Partitioning


In [None]:
#We partition the data by start_station_name and store it in a Parquet format - More on Parquet files later. For a better understanding, read the documentation.
subscribers.write.partitionBy("start_station_name").format("parquet").save("subscribers.parquet")

In [None]:
#We will now read the data
subscribers_in_parquet_format=spark.read.parquet("subscribers.parquet")
subscribers_in_parquet_format.show()

+----+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+--------------------+
|  id|duration|     start_date|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|  start_station_name|
+----+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+--------------------+
|4673|     239|8/29/2013 15:48|              70|8/29/2013 15:52|     Townsend at 7th|            65|    568|       Subscriber|   94110|San Francisco Cal...|
|4177|     278|8/29/2013 11:03|              70|8/29/2013 11:08|   2nd at South Park|            64|    371|       Subscriber|   94117|San Francisco Cal...|
|5067|     423|8/29/2013 21:38|              70|8/29/2013 21:45|San Francisco Cal...|            69|    535|       Subscriber|   94133|San Francisco Cal...|
|4833|     543|8/29/2013 17:45|              70|8/29/2013 

In [None]:
#Reading a partition from the parquet file
davis_at_jackson_subscribers=spark.read.parquet("subscribers.parquet/start_station_name=Davis at Jackson")
davis_at_jackson_subscribers.show()

+-----+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4799|     267|8/29/2013 17:19|              42|8/29/2013 17:24|   Steuart at Market|            74|    399|       Subscriber|   94105|
| 4890|     347|8/29/2013 18:13|              42|8/29/2013 18:19|     Beale at Market|            56|    326|       Subscriber|   94123|
| 5004|     430|8/29/2013 20:05|              42|8/29/2013 20:12|   Market at Sansome|            77|    377|       Subscriber|   94110|
| 4462|     646|8/29/2013 13:07|              42|8/29/2013 13:18|Mechanics Plaza (...|            75|    543|       Subscriber|   94109|
| 4943|     970|8/29/2013 19:06|         

In [None]:
#Checking we selected the relevant data
spark.sql("SELECT * FROM subscribers WHERE start_station_name=\'Davis at Jackson\'").show()

+-----+--------+---------------+------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4799|     267|8/29/2013 17:19|  Davis at Jackson|              42|8/29/2013 17:24|   Steuart at Market|            74|    399|       Subscriber|   94105|
| 4890|     347|8/29/2013 18:13|  Davis at Jackson|              42|8/29/2013 18:19|     Beale at Market|            56|    326|       Subscriber|   94123|
| 5004|     430|8/29/2013 20:05|  Davis at Jackson|              42|8/29/2013 20:12|   Market at Sansome|            77|    377|       Subscriber|   94110|
| 4462|     646|8/29/2013 13:07|  Davis at Jackson|             

By checking the zip_code and start_date fields, data is the same, yet the partitioned table does not contain the partition key.

## Practical examples

- Read CSV
- Read JSON
- Read Parquet
- Select multiple columns with PySpark vs SparkSQL query
- Filter, group by
- Join two tables
- Print schema
- Save to persistent tables
- Cache data in memory
- Programatically specifying the schema
- Partinioning example

## Read CSV


In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
station_csv_df=sqlContext.read.csv("s3://msin0166-spark-workshop/data/sf_bike_sharing_data/station.csv",header=True)
station_csv_df.show()

+---+--------------------+------------------+-------------------+----------+------------+-----------------+
| id|                name|               lat|               long|dock_count|        city|installation_date|
+---+--------------------+------------------+-------------------+----------+------------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|    San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|    San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|    San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|    San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|    San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|    San Jose|         8/7/2013|
|  8| San Salvador at 1st|  

In [None]:
#Check dataframe schema
station_csv_df.printSchema()


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dock_count: string (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



You can then register this dataframe as a temporary table and query it as shown in previous examples.

## Read JSON
**Note**: This example is based on the Yelp business dataset, provided on Moodle.

In [None]:
yelp_business= spark.read.json("s3://msin0166-spark-workshop/data/yelp_business_data/yelp_academic_dataset_business.json")
yelp_business.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|              city|               hours|is_open|     latitude|      longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-------+-------------+---------------+--------------------+-----------+------------+-----+-----+
|2818 E Camino Ace...|[,,,,,,,,,,,,,,,,...|1SWheh84yJXfytovI...|   Golf, Active Life|           Phoenix|                null|      0|   33.5221425|   -112.0184807|Arizona Biltmore ...|      85016|           5|  3.0|   AZ|
|30 Eglinton Avenue W|[,, u'full_bar', ...|QXAEGFB4oINsVuTFx...|Specialty Food, R...|       Mississauga|[9:0-1:0

In [None]:
#Register Yelp business as temporary table and query it
yelp_business.createOrReplaceTempView("yelp_business")

In [None]:
#How many businesses from Las Vegas are listed on our JSON based Yelp business dataset?
spark.sql("SELECT COUNT(*) FROM (SELECT * FROM yelp_business WHERE city==\'Las Vegas\')").show()

+--------+
|count(1)|
+--------+
|   29370|
+--------+



## Read Parquet
**Note**: This is based on the user data dataset provided on Moodle

In [None]:
users_in_parquet = spark.read.parquet("s3://msin0166-spark-workshop/data/userdata2.parquet")

In [None]:
users_in_parquet.show()

+-------------------+----+----------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+--------+
|  registration_dttm|  id|first_name|last_name|               email|gender|     ip_address|              cc|             country| birthdate|   salary|               title|comments|
+-------------------+----+----------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+--------+
|2016-02-03 13:36:39|   1|    Donald|    Lewis|dlewis0@clickbank...|  Male|  102.22.124.20|                |           Indonesia|  7/9/1972|140249.37|Senior Financial ...|        |
|2016-02-03 00:22:28|   2|    Walter|  Collins|wcollins1@bloglov...|  Male|   247.28.26.93|3587726269478025|               China|          |     null|                    |        |
|2016-02-03 18:29:04|   3|  Michelle|Henderson|mhenderson2@geoci...|Female| 193.68.146.150|    

In [None]:
users_in_parquet.registerTempTable("users_2")
spark.sql("SELECT * FROM users_2 WHERE country==\'China\'").show()

+-------------------+---+----------+----------+--------------------+------+---------------+------------------+-------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name| last_name|               email|gender|     ip_address|                cc|country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+----------+--------------------+------+---------------+------------------+-------+----------+---------+--------------------+--------------------+
|2016-02-03 00:22:28|  2|    Walter|   Collins|wcollins1@bloglov...|  Male|   247.28.26.93|  3587726269478025|  China|          |     null|                    |                    |
|2016-02-03 08:11:34|  8|     Louis|   Simmons|   lsimmons7@icio.us|  Male|    18.69.80.15|                  |  China|  6/1/1992| 90744.86|    Product Engineer|                    |
|2016-02-03 23:36:58| 13|    Evelyn|    Harvey|   eharveyc@time.com|      |  254.174.154.7

## Select multiple columns with PySpark vs SparkSQL query

In [None]:
# Select columns using PySpark
 users_in_parquet.select("first_name","last_name").show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Donald|    Lewis|
|    Walter|  Collins|
|  Michelle|Henderson|
|      Lori|   Hudson|
|    Howard|   Miller|
|   Frances|    Adams|
|    Steven|   Hanson|
|     Louis|  Simmons|
|     Keith|   Parker|
|     Wanda|   Walker|
|   Kathryn|   Weaver|
|    Philip|     Ward|
|    Evelyn|   Harvey|
|    Andrea|     Lane|
|     Bobby|  Vasquez|
|   Kenneth|   Gibson|
|     Emily|     Hill|
|     Kelly|   Fowler|
|     Diana|   Howell|
|    Johnny|  Collins|
+----------+---------+
only showing top 20 rows



In [None]:
# Select columns using Spark SQL
spark.sql("SELECT first_name,last_name FROM users_2 LIMIT 20").show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Donald|    Lewis|
|    Walter|  Collins|
|  Michelle|Henderson|
|      Lori|   Hudson|
|    Howard|   Miller|
|   Frances|    Adams|
|    Steven|   Hanson|
|     Louis|  Simmons|
|     Keith|   Parker|
|     Wanda|   Walker|
|   Kathryn|   Weaver|
|    Philip|     Ward|
|    Evelyn|   Harvey|
|    Andrea|     Lane|
|     Bobby|  Vasquez|
|   Kenneth|   Gibson|
|     Emily|     Hill|
|     Kelly|   Fowler|
|     Diana|   Howell|
|    Johnny|  Collins|
+----------+---------+



In [None]:
# Select columns using PySpark
 subscribers.select("start_station_name","end_station_name","bike_id","duration").show()

+--------------------+--------------------+-------+--------+
|  start_station_name|    end_station_name|bike_id|duration|
+--------------------+--------------------+-------+--------+
|South Van Ness at...|South Van Ness at...|    520|      63|
|  San Jose City Hall|  San Jose City Hall|    661|      70|
|Mountain View Cit...|Mountain View Cit...|     48|      71|
|  San Jose City Hall|  San Jose City Hall|     26|      77|
|South Van Ness at...|      Market at 10th|    319|      83|
| Golden Gate at Polk| Golden Gate at Polk|    527|     103|
|Santa Clara at Al...|    Adobe on Almaden|    679|     109|
| San Salvador at 1st| San Salvador at 1st|    687|     111|
|South Van Ness at...|South Van Ness at...|    553|     113|
|  San Jose City Hall|         MLK Library|    107|     114|
|     Spear at Folsom|Embarcadero at Br...|    368|     125|
|    San Pedro Square|Santa Clara at Al...|     26|     126|
|Mountain View Cal...|Mountain View Cal...|    140|     129|
|   2nd at South Park|  

In [None]:
# Select columns using Spark SQL
spark.sql("SELECT start_station_name,end_station_name,bike_id,duration FROM subscribers LIMIT 20").show()

+--------------------+--------------------+-------+--------+
|  start_station_name|    end_station_name|bike_id|duration|
+--------------------+--------------------+-------+--------+
|South Van Ness at...|South Van Ness at...|    520|      63|
|  San Jose City Hall|  San Jose City Hall|    661|      70|
|Mountain View Cit...|Mountain View Cit...|     48|      71|
|  San Jose City Hall|  San Jose City Hall|     26|      77|
|South Van Ness at...|      Market at 10th|    319|      83|
| Golden Gate at Polk| Golden Gate at Polk|    527|     103|
|Santa Clara at Al...|    Adobe on Almaden|    679|     109|
| San Salvador at 1st| San Salvador at 1st|    687|     111|
|South Van Ness at...|South Van Ness at...|    553|     113|
|  San Jose City Hall|         MLK Library|    107|     114|
|     Spear at Folsom|Embarcadero at Br...|    368|     125|
|    San Pedro Square|Santa Clara at Al...|     26|     126|
|Mountain View Cal...|Mountain View Cal...|    140|     129|
|   2nd at South Park|  

## Filtering and groupping by

In [None]:
# Filter data using PySpark
subscribers.filter((subscribers.duration>60) & (subscribers.duration<=100)).show()

+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
| 4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
| 4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
| 4251|      77|8/29/2013 11:29|  San Jose City Hall

In [None]:
# Filter data using Spark SQL
spark.sql("SELECT * FROM subscribers WHERE duration >60 AND duration<=100").show()

+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|   id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+-----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
| 4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
| 4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
| 4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
| 4251|      77|8/29/2013 11:29|  San Jose City Hall

In [None]:
# Group by using PySpark
#We have to first convert the duration to double type
from pyspark.sql.types import DoubleType
subscribers = subscribers.withColumn("duration", subscribers["duration"].cast(DoubleType()))
subscribers.groupBy("bike_id").sum("duration").show()


+-------+-------------+
|bike_id|sum(duration)|
+-------+-------------+
|    675|     101382.0|
|    467|     476556.0|
|    296|     156101.0|
|    691|      75062.0|
|    125|      39139.0|
|    451|     815995.0|
|    666|      54649.0|
|    124|      88313.0|
|    447|     923226.0|
|    591|     830507.0|
|     51|      88612.0|
|    574|    1034693.0|
|    613|     914909.0|
|    307|      82875.0|
|    475|     684313.0|
|    544|     797837.0|
|    334|     934758.0|
|    577|     784963.0|
|    581|     906101.0|
|    205|      86722.0|
+-------+-------------+
only showing top 20 rows



In [None]:
# Group by using Spark SQL
spark.sql("SELECT bike_id, SUM(duration) AS duration FROM subscribers GROUP BY bike_id").show()

+-------+---------+
|bike_id| duration|
+-------+---------+
|    675| 101382.0|
|    467| 476556.0|
|    296| 156101.0|
|    691|  75062.0|
|    125|  39139.0|
|    451| 815995.0|
|    666|  54649.0|
|    124|  88313.0|
|    447| 923226.0|
|    591| 830507.0|
|     51|  88612.0|
|    574|1034693.0|
|    613| 914909.0|
|    307|  82875.0|
|    475| 684313.0|
|    544| 797837.0|
|    334| 934758.0|
|    577| 784963.0|
|    581| 906101.0|
|    205|  86722.0|
+-------+---------+
only showing top 20 rows



## Joining two tables

In [None]:
#we will first create two tables to be joined 
bike_and_duration=spark.sql("SELECT bike_id,duration FROM subscribers WHERE bike_id <5000")
bike_and_duration.show()

+-------+--------+
|bike_id|duration|
+-------+--------+
|    520|      63|
|    661|      70|
|     48|      71|
|     26|      77|
|    319|      83|
|    527|     103|
|    679|     109|
|    687|     111|
|    553|     113|
|    107|     114|
|    368|     125|
|     26|     126|
|    140|     129|
|    371|     130|
|    503|     134|
|    408|     138|
|     26|     141|
|    319|     142|
|    564|     142|
|    574|     144|
+-------+--------+
only showing top 20 rows



In [None]:
bike_and_stations=spark.sql("SELECT bike_id,start_station_name,end_station_name FROM subscribers WHERE bike_id <5000")
bike_and_stations.show()

+-------+--------------------+--------------------+
|bike_id|  start_station_name|    end_station_name|
+-------+--------------------+--------------------+
|    520|South Van Ness at...|South Van Ness at...|
|    661|  San Jose City Hall|  San Jose City Hall|
|     48|Mountain View Cit...|Mountain View Cit...|
|     26|  San Jose City Hall|  San Jose City Hall|
|    319|South Van Ness at...|      Market at 10th|
|    527| Golden Gate at Polk| Golden Gate at Polk|
|    679|Santa Clara at Al...|    Adobe on Almaden|
|    687| San Salvador at 1st| San Salvador at 1st|
|    553|South Van Ness at...|South Van Ness at...|
|    107|  San Jose City Hall|         MLK Library|
|    368|     Spear at Folsom|Embarcadero at Br...|
|     26|    San Pedro Square|Santa Clara at Al...|
|    140|Mountain View Cal...|Mountain View Cal...|
|    371|   2nd at South Park|   2nd at South Park|
|    503|     Clay at Battery|     Beale at Market|
|    408|     Post at Kearney|     Post at Kearney|
|     26|  S

In [None]:
#Join two tables using PySpark
bike_and_duration.join(bike_and_stations,on='bike_id',how='left').show()


+-------+--------+--------------------+--------------------+
|bike_id|duration|  start_station_name|    end_station_name|
+-------+--------+--------------------+--------------------+
|    296|    1345|       Park at Olive|Palo Alto Caltrai...|
|    296|    1345|Palo Alto Caltrai...|Palo Alto Caltrai...|
|    296|    1345|Palo Alto Caltrai...|California Ave Ca...|
|    296|    1345|California Ave Ca...|California Ave Ca...|
|    296|    1345|Redwood City Medi...|San Mateo County ...|
|    296|    1345|San Mateo County ...|Redwood City Medi...|
|    296|    1345|   Franklin at Maple|   Franklin at Maple|
|    296|    1345|   Franklin at Maple|Redwood City Calt...|
|    296|    1345|Redwood City Calt...|Redwood City Publ...|
|    296|    1345|San Jose Diridon ...| San Salvador at 1st|
|    296|    1345|SJSU - San Salvad...|Paseo de San Antonio|
|    296|    1345|Paseo de San Antonio|           Japantown|
|    296|    1345|           Japantown|         MLK Library|
|    296|    1345|Cowper

In [None]:
#Join two tables using Spark SQL
#Create views for each dataframe for further querying
bike_and_duration.createTempView("bike_and_duration")
bike_and_stations.createTempView("bike_and_stations") 



In [None]:
spark.sql("SELECT bike_and_duration.bike_id, bike_and_duration.duration, bike_and_stations.start_station_name,bike_and_stations.end_station_name FROM bike_and_duration LEFT JOIN bike_and_stations ON bike_and_duration.bike_id==bike_and_stations.bike_id").show()

+-------+--------+--------------------+--------------------+
|bike_id|duration|  start_station_name|    end_station_name|
+-------+--------+--------------------+--------------------+
|    296|    1345|       Park at Olive|Palo Alto Caltrai...|
|    296|    1345|Palo Alto Caltrai...|Palo Alto Caltrai...|
|    296|    1345|Palo Alto Caltrai...|California Ave Ca...|
|    296|    1345|California Ave Ca...|California Ave Ca...|
|    296|    1345|Redwood City Medi...|San Mateo County ...|
|    296|    1345|San Mateo County ...|Redwood City Medi...|
|    296|    1345|   Franklin at Maple|   Franklin at Maple|
|    296|    1345|   Franklin at Maple|Redwood City Calt...|
|    296|    1345|Redwood City Calt...|Redwood City Publ...|
|    296|    1345|San Jose Diridon ...| San Salvador at 1st|
|    296|    1345|SJSU - San Salvad...|Paseo de San Antonio|
|    296|    1345|Paseo de San Antonio|           Japantown|
|    296|    1345|           Japantown|         MLK Library|
|    296|    1345|Cowper

## Save to persistent tables

DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. 

Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. 

Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.

Source: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#saving-to-persistent-tables

In [None]:
joined_table=spark.sql("SELECT bike_and_duration.bike_id, bike_and_duration.duration, bike_and_stations.start_station_name,bike_and_stations.end_station_name FROM bike_and_duration LEFT JOIN bike_and_stations ON bike_and_duration.bike_id==bike_and_stations.bike_id LIMIT 10")

In [None]:
#joined_table.write.parquet('bike_station_duration_joined_1.parquet').saveAsTable("bike_station_duration_joined_final")
joined_table.write.mode("append").saveAsTable("bike_station_duration_table")

In [None]:
spark.sql("SELECT * FROM bike_station_duration_table LIMIT 10").show()

+-------+--------+--------------------+--------------------+
|bike_id|duration|  start_station_name|    end_station_name|
+-------+--------+--------------------+--------------------+
|    296|    1345|       Park at Olive|Palo Alto Caltrai...|
|    296|    1345|Palo Alto Caltrai...|Palo Alto Caltrai...|
|    296|    1345|Palo Alto Caltrai...|California Ave Ca...|
|    296|    1345|California Ave Ca...|California Ave Ca...|
|    296|    1345|Redwood City Medi...|San Mateo County ...|
|    296|    1345|San Mateo County ...|Redwood City Medi...|
|    296|    1345|   Franklin at Maple|   Franklin at Maple|
|    296|    1345|   Franklin at Maple|Redwood City Calt...|
|    296|    1345|Redwood City Calt...|Redwood City Publ...|
|    296|    1345|San Jose Diridon ...| San Salvador at 1st|
+-------+--------+--------------------+--------------------+



## Caching data in memory

For more details, please read the documentation: https://spark.apache.org/docs/latest/sql-performance-tuning.html

In [None]:
from time import time

start_time=time()
subscribers.show()
end_time=time()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
print("Processing without cache time difference",end_time - start_time)

Processing without cache time difference 0.19474244117736816


In [None]:
subscribers.cache()

DataFrame[id: string, duration: string, start_date: string, start_station_name: string, start_station_id: string, end_date: string, end_station_name: string, end_station_id: string, bike_id: string, subscription_type: string, zip_code: string]

In [None]:
cache_start_time=time()
subscribers.show()
cache_end_time=time()


+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
print("Processing with cache time difference",cache_end_time - cache_start_time)

Processing with cache time difference 0.11307477951049805


## Specify the schema in a programmatic way

For more details, read the Spark documentation: https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#programmatically-specifying-the-schema

In [None]:
# Load a text file and convert each line to a tuple.
from pyspark.sql.types import StructField, StructType, StringType
table_without_schema = spark.sql("SELECT * FROM bike_station_duration_table LIMIT 10").rdd
# The schema is encoded in a string.
schemaString = "bike_id duration start_station_name end_station_name"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schematable = sqlContext.createDataFrame(table_without_schema, schema)

# Register the DataFrame as a table.
schematable.registerTempTable("table_with_schema")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT bike_id FROM table_with_schema")


In [None]:
results.show()

+-------+
|bike_id|
+-------+
|    296|
|    296|
|    296|
|    296|
|    296|
|    296|
|    296|
|    296|
|    296|
|    296|
+-------+



## Spark SQL vs querying Pandas DataFrames

In [None]:
import pandas as pd
from time import time
start_time=time()
pandas_df=pd.read_csv("s3://msin0166-spark-workshop/data/sf_bike_sharing_data/station.csv")
end_time=time()

In [None]:
start_time_spark=time()
station_df = sqlContext.read.csv("s3://msin0166-spark-workshop/data/sf_bike_sharing_data/station.csv",header=True)
end_time_spark=time()

In [None]:
station_df.show()

+---+--------------------+------------------+-------------------+----------+------------+-----------------+
| id|                name|               lat|               long|dock_count|        city|installation_date|
+---+--------------------+------------------+-------------------+----------+------------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|    San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|    San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|    San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|    San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|    San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|    San Jose|         8/7/2013|
|  8| San Salvador at 1st|  

In [None]:
#Query Spark DataFrame with Spark SQL
station_df.createTempView("station")
spark.sql("SELECT * FROM station where city=\'San Jose\'").show()

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
|  7|Paseo de San Antonio|         37.333798|-121.88694299999999|        15|San Jose|         8/7/2013|
|  8| San Salvador at 1st|         37.330165|-121.88583100000001

In [None]:
# Query Pandas DataFrame
pandas_df.loc[pandas_df['city']== 'San Jose']

Unnamed: 0,id,name,lat,long,dock_count,city,installation_date
0,2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013
1,3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
2,4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
3,5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013
4,6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013
5,7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013
6,8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013
7,9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013
8,10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013
9,11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013


# Practice!
**Note**: Read the Spark documentation and check Stack Overflow when you get stuck.

<img src="https://media.giphy.com/media/8vkEKXvnXkyCZx8w6b/giphy.gif"/>

# Exercise 1:
Read a file of your choice in JSON format and extract the first 10 rows
<br/>
**Note**: You will have to load the file in your data folder first


Here, we used the Iris flower dataset provided in JSON format

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
iris_flowers_dataset=sqlContext.read.json('s3://msin0166-spark-workshop/data/iris_flowers_data/iris.json')


In [None]:
iris_flowers_dataset.head(10)

[Row(petalLength=None, petalWidth=None, sepalLength=None, sepalWidth=None, species=None),
 Row(petalLength=1.4, petalWidth=0.2, sepalLength=5.1, sepalWidth=3.5, species='setosa'),
 Row(petalLength=1.4, petalWidth=0.2, sepalLength=4.9, sepalWidth=3.0, species='setosa'),
 Row(petalLength=1.3, petalWidth=0.2, sepalLength=4.7, sepalWidth=3.2, species='setosa'),
 Row(petalLength=1.5, petalWidth=0.2, sepalLength=4.6, sepalWidth=3.1, species='setosa'),
 Row(petalLength=1.4, petalWidth=0.2, sepalLength=5.0, sepalWidth=3.6, species='setosa'),
 Row(petalLength=1.7, petalWidth=0.4, sepalLength=5.4, sepalWidth=3.9, species='setosa'),
 Row(petalLength=1.4, petalWidth=0.3, sepalLength=4.6, sepalWidth=3.4, species='setosa'),
 Row(petalLength=1.5, petalWidth=0.2, sepalLength=5.0, sepalWidth=3.4, species='setosa'),
 Row(petalLength=1.4, petalWidth=0.2, sepalLength=4.4, sepalWidth=2.9, species='setosa')]

## Exercise 2:
Create a new dataset by filtering your existing dataset. Use both the PySpark and Spark SQL functions shown in the **Practical examples** section

Example: By using the Yelp dataset, create a dataframe consisting of top businesses (ranked with 4 stars or higher) listed on Yelp


In [None]:
#PySpark
versicolor_flowers_pyspark= iris_flowers_dataset.filter(iris_flowers_dataset.species=='versicolor')
versicolor_flowers_pyspark.show()

+-----------+----------+-----------+----------+----------+
|petalLength|petalWidth|sepalLength|sepalWidth|   species|
+-----------+----------+-----------+----------+----------+
|        4.7|       1.4|        7.0|       3.2|versicolor|
|        4.5|       1.5|        6.4|       3.2|versicolor|
|        4.9|       1.5|        6.9|       3.1|versicolor|
|        4.0|       1.3|        5.5|       2.3|versicolor|
|        4.6|       1.5|        6.5|       2.8|versicolor|
|        4.5|       1.3|        5.7|       2.8|versicolor|
|        4.7|       1.6|        6.3|       3.3|versicolor|
|        3.3|       1.0|        4.9|       2.4|versicolor|
|        4.6|       1.3|        6.6|       2.9|versicolor|
|        3.9|       1.4|        5.2|       2.7|versicolor|
|        3.5|       1.0|        5.0|       2.0|versicolor|
|        4.2|       1.5|        5.9|       3.0|versicolor|
|        4.0|       1.0|        6.0|       2.2|versicolor|
|        4.7|       1.4|        6.1|       2.9|versicolo

In [None]:
# Spark SQL
iris_flowers_dataset.createOrReplaceTempView("iris_flowers")
versicolor_flowers_sql=sqlContext.sql("SELECT * FROM iris_flowers WHERE species==\'versicolor\'")
versicolor_flowers_sql.show()

+-----------+----------+-----------+----------+----------+
|petalLength|petalWidth|sepalLength|sepalWidth|   species|
+-----------+----------+-----------+----------+----------+
|        4.7|       1.4|        7.0|       3.2|versicolor|
|        4.5|       1.5|        6.4|       3.2|versicolor|
|        4.9|       1.5|        6.9|       3.1|versicolor|
|        4.0|       1.3|        5.5|       2.3|versicolor|
|        4.6|       1.5|        6.5|       2.8|versicolor|
|        4.5|       1.3|        5.7|       2.8|versicolor|
|        4.7|       1.6|        6.3|       3.3|versicolor|
|        3.3|       1.0|        4.9|       2.4|versicolor|
|        4.6|       1.3|        6.6|       2.9|versicolor|
|        3.9|       1.4|        5.2|       2.7|versicolor|
|        3.5|       1.0|        5.0|       2.0|versicolor|
|        4.2|       1.5|        5.9|       3.0|versicolor|
|        4.0|       1.0|        6.0|       2.2|versicolor|
|        4.7|       1.4|        6.1|       2.9|versicolo

## Exercise 3: Use Spark SQL and apply the GROUP BY statement to your existing dataset. 

In [None]:
count_of_flowers=sqlContext.sql("SELECT species, COUNT(species) FROM iris_flowers GROUP BY species ")
count_of_flowers.show()

+----------+--------------+
|   species|count(species)|
+----------+--------------+
| virginica|            50|
|      null|             0|
|versicolor|            50|
|    setosa|            50|
+----------+--------------+



## Exercise 4
Create a sample dataset and programmatically specify its schema
<br/>
**Note**: Follow the code in the **Practical examples** section

In [None]:
import numpy as np
import pandas as pd
x = np.random.randn(10)
y = np.sin(x)
df = pd.DataFrame({'x':x, 'y':y})

In [None]:
df

Unnamed: 0,x,y
0,0.543465,0.517105
1,0.223337,0.221485
2,-0.350076,-0.342969
3,0.808326,0.723132
4,-1.116476,-0.898559
5,0.276936,0.27341
6,-2.63563,-0.48465
7,-1.373781,-0.980655
8,-0.3237,-0.318077
9,0.864112,0.760519


In [None]:
spark_df=sqlContext.createDataFrame(df)

In [None]:
# Load a text file and convert each line to a tuple.
from pyspark.sql.types import StructField, StructType, StringType
table_without_schema = spark_df.rdd
# The schema is encoded in a string.
schemaString = "number sin(number)"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schematable = sqlContext.createDataFrame(table_without_schema, schema)

# Register the DataFrame as a table.
schematable.registerTempTable("table_with_schema")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT * FROM table_with_schema")

In [None]:
results.show()

+--------------------+--------------------+
|              number|         sin(number)|
+--------------------+--------------------+
|  0.5434650843713459|  0.5171049320892215|
|  0.2233374776312185| 0.22148543753958555|
| -0.3500758702916083| -0.3429690769501241|
|  0.8083262693520252|  0.7231321257504997|
|  -1.116475855179513| -0.8985594478818084|
|   0.276935960688025| 0.27340964426884523|
|  -2.635629970708692|-0.48464973149206214|
|   -1.37378121639646| -0.9806552169996551|
|-0.32369993208185205|-0.31807650605295223|
|  0.8641119509199137|  0.7605189393373316|
+--------------------+--------------------+



#Extra exercises

## Exercise 5

Convert a CSV file of choice to Parquet format by using PySpark.
<br/>
Read data from the Parquet file and select 50 rows of data.

Note: Here, we used the weather.csv dataset

In [None]:
weather_data= sqlContext.read.csv("weather.csv",header=True)

In [None]:
weather_data.show()

+---------+-----------------+------------------+-----------------+---------------+----------------+---------------+------------+-------------+------------+-----------------------------+------------------------------+-----------------------------+--------------------+---------------------+--------------------+------------------+-------------------+------------------+--------------------+-----------+------+----------------+--------+
|     date|max_temperature_f|mean_temperature_f|min_temperature_f|max_dew_point_f|mean_dew_point_f|min_dew_point_f|max_humidity|mean_humidity|min_humidity|max_sea_level_pressure_inches|mean_sea_level_pressure_inches|min_sea_level_pressure_inches|max_visibility_miles|mean_visibility_miles|min_visibility_miles|max_wind_Speed_mph|mean_wind_speed_mph|max_gust_speed_mph|precipitation_inches|cloud_cover|events|wind_dir_degrees|zip_code|
+---------+-----------------+------------------+-----------------+---------------+----------------+---------------+------------+--

In [None]:
weather_data.write.parquet("weather_data.parquet",compression=None)

In [None]:
#The parquet file is saved, as seen below.
%ls -l 

total 3058831
-rw------- 1 root root    3881605 Feb  3 22:32 berlin-listings.csv
-rw------- 1 root root     193083 Jan 21 18:13 hamlet.txt
-rw------- 1 root root      15802 Feb 19 11:39 iris.json
-rw------- 1 root root     192340 Feb  3 22:32 New_York_City_.png
-rw------- 1 root root    7077973 Feb  3 22:32 new-york-listings.csv
drwx------ 2 root root       4096 Jan 21 17:16 [0m[01;34mspark-2.3.3-bin-hadoop2.7[0m/
-rw------- 1 root root  226027370 Feb  4  2019 spark-2.3.3-bin-hadoop2.7.tgz
-rw------- 1 root root  226027370 Feb  4  2019 spark-2.3.3-bin-hadoop2.7.tgz.1
drwx------ 2 root root       4096 May 23  2019 [01;34mspark-2.4.3-bin-hadoop2.7[0m/
-rw------- 1 root root  229988313 May  1  2019 spark-2.4.3-bin-hadoop2.7.tgz
-rw------- 1 root root  229988313 May  1  2019 spark-2.4.3-bin-hadoop2.7.tgz.1
-rw------- 1 root root       5647 Jan 21 18:15 station.csv
-rw------- 1 root root 1989696383 Jan 21 18:16 status.csv
-rw------- 1 root root   80208848 Jan 21 18:15 trip.csv
-rw-----

In [None]:
parquet_weather_data=sqlContext.read.parquet("weather_data.parquet")

In [None]:
parquet_weather_data.take(10)

[Row(date='8/29/2013', max_temperature_f='74.0', mean_temperature_f='68.0', min_temperature_f='61.0', max_dew_point_f='61.0', mean_dew_point_f='58.0', min_dew_point_f='56.0', max_humidity='93.0', mean_humidity='75.0', min_humidity='57.0', max_sea_level_pressure_inches='30.07', mean_sea_level_pressure_inches='30.02', min_sea_level_pressure_inches='29.97', max_visibility_miles='10.0', mean_visibility_miles='10.0', min_visibility_miles='10.0', max_wind_Speed_mph='23.0', mean_wind_speed_mph='11.0', max_gust_speed_mph='28.0', precipitation_inches='0', cloud_cover='4.0', events=None, wind_dir_degrees='286.0', zip_code='94107'),
 Row(date='8/30/2013', max_temperature_f='78.0', mean_temperature_f='69.0', min_temperature_f='60.0', max_dew_point_f='61.0', mean_dew_point_f='58.0', min_dew_point_f='56.0', max_humidity='90.0', mean_humidity='70.0', min_humidity='50.0', max_sea_level_pressure_inches='30.05', mean_sea_level_pressure_inches='30.0', min_sea_level_pressure_inches='29.93', max_visibility

In [None]:
parquet_weather_data.show()

+---------+-----------------+------------------+-----------------+---------------+----------------+---------------+------------+-------------+------------+-----------------------------+------------------------------+-----------------------------+--------------------+---------------------+--------------------+------------------+-------------------+------------------+--------------------+-----------+------+----------------+--------+
|     date|max_temperature_f|mean_temperature_f|min_temperature_f|max_dew_point_f|mean_dew_point_f|min_dew_point_f|max_humidity|mean_humidity|min_humidity|max_sea_level_pressure_inches|mean_sea_level_pressure_inches|min_sea_level_pressure_inches|max_visibility_miles|mean_visibility_miles|min_visibility_miles|max_wind_Speed_mph|mean_wind_speed_mph|max_gust_speed_mph|precipitation_inches|cloud_cover|events|wind_dir_degrees|zip_code|
+---------+-----------------+------------------+-----------------+---------------+----------------+---------------+------------+--

## Exercise 6 

Count the number of records in your dataset by using a SQL query

In [None]:
total_count=sqlContext.sql("SELECT COUNT(*) FROM iris_flowers")
total_count.show()

+--------+
|count(1)|
+--------+
|     152|
+--------+

