# Create Merged Dataset

In this workbook, you will read in the `trip` and `fare` files. You are welcome to use DataFrame and/or SparkSQL API as you desire as long as it produces the expected results.

Instructions:

1. Join both datasets such that you get a merged dataset with 21 unique fields. You need to determine how to join the dataset.
2. Once you create the merged dataset, you need to convert fields to the following types, since all fields were read is as string:
    * pickup_datetime and dropoff_datetime must be TIMESTAMP
    * passenger_count and rate_code must be INT
    * all other numeric fields must be FLOAT
    * the remaining fields stay as STRING
3. Save your merged and converted dataset to your own S3 bucket in parquet format.

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

In [1]:
# Start SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HW-5").getOrCreate()

In [2]:
# Test whether the Spark can work
spark

In [3]:
# Load the trip data from s3 bucket
trip = spark.read\
  .format('parquet')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdatateaching/nyctaxi-2013/parquet/trip/')

In [4]:
# Print the first 10 records for trip
trip.show(10)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|0F6CF8A85A039CDF3...|33BC9D03BE4E9B8E1...|      VTS|        1|              null|2013-04-20 20:22:00|2013-04-20 20:28:00|              1|              360|          .46|       -73.98571|      40.762817|       -73.978874|       40.762306|
|75148F5ED6DECC2B4...|FC93BBB797CBAA633...| 

In [5]:
# Load the fare data from s3 bucket
fare = spark.read\
  .format('parquet')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('s3://bigdatateaching/nyctaxi-2013/parquet/fare/')

In [6]:
# Print the first 10 records for fare 
fare.show(10)

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+
|ABC075C8871353E9F...|F13B6FECD12C974E7...|      VTS|2013-04-09 18:11:00|         CRD|       11.5|        1|    0.5|      3.12|           0|       16.12|
|6F05C2D331B2178F3...|38A4C7A8805AF6713...|      VTS|2013-04-09 18:16:00|         CSH|        5.5|        1|    0.5|         0|           0|           7|
|A2B2E4DC64E3B3B9F...|902DEAA1A28E6022C...|      VTS|2013-04-09 18:08:00|         CRD|       15.5|        1|    0.5|         2|           0|          19|
|D5DB49A3CBC353FB7...|65324A9B1F91221CA...|      VTS|2013-04-09 18:17:00|   

In [7]:
# Look at the Schema for the two datasets
trip.printSchema()
fare.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |--

In [8]:
# Create two tables called 'trip_table' and 'fare_table'
trip.createOrReplaceTempView("trip_table")
fare.createOrReplaceTempView("fare_table")

In [9]:
# Join two tables by the variables 'medallion','hack_license' and 'pickup_datetime'
# To make sure the 'fare' matches the same 'trip'
nyctaxi = spark.sql("""
SELECT
t.medallion              
, t.hack_license           
, t.vendor_id              
, t.pickup_datetime        
, f.payment_type           
, f.fare_amount            
, f.surcharge              
, f.mta_tax             
, f.tip_amount            
, f.tolls_amount           
, f.total_amount         
, t.rate_code              
, t.store_and_fwd_flag     
, t.dropoff_datetime       
, t.passenger_count     
, t.trip_time_in_secs      
, t.trip_distance          
, t.pickup_longitude       
, t.pickup_latitude        
, t.dropoff_longitude      
, t.dropoff_latitude      
FROM trip_table t
JOIN fare_table f 
ON
(t.medallion = f.medallion 
and t.hack_license = f.hack_license 
and t.pickup_datetime = f.pickup_datetime)
""").cache()

In [10]:
# See the Schema of the merged dataset
nyctaxi.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



In [11]:
# Convert data type
# pickup_datetime and dropoff_datetime must be TIMESTAMP
nyctaxi_converted = nyctaxi.withColumn("pickup_datetime",nyctaxi["pickup_datetime"].cast("timestamp"))
nyctaxi_converted = nyctaxi_converted.withColumn("dropoff_datetime",nyctaxi["dropoff_datetime"].cast("timestamp"))

# passenger_count and rate_code must be INT
nyctaxi_converted = nyctaxi_converted.withColumn("passenger_count",nyctaxi["passenger_count"].cast("int"))
nyctaxi_converted = nyctaxi_converted.withColumn("rate_code",nyctaxi["rate_code"].cast("int"))

# all other numeric fields must be FLOAT
nyctaxi_converted = nyctaxi_converted.withColumn("trip_distance",nyctaxi["trip_distance"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("pickup_latitude",nyctaxi["pickup_latitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("pickup_longitude",nyctaxi["pickup_longitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("dropoff_latitude",nyctaxi["dropoff_latitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("dropoff_longitude",nyctaxi["dropoff_longitude"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("fare_amount",nyctaxi["fare_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("surcharge",nyctaxi["surcharge"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("mta_tax",nyctaxi["mta_tax"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("tip_amount",nyctaxi["tip_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("tolls_amount",nyctaxi["tolls_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("total_amount",nyctaxi["total_amount"].cast("float"))
nyctaxi_converted = nyctaxi_converted.withColumn("trip_time_in_secs",nyctaxi["trip_time_in_secs"].cast("float"))

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the next cell, provide the code that saves your merged dataset to your S3 bucket.

In [13]:
nyctaxi_converted.write.parquet("s3://zzzzzzhy0607/merged_data",mode="overwrite")

In the next cell, print the schema of your merged dataset.

In [15]:
nyctaxi_converted.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)



In the next cell, print the first 10 records of your merged dataset.

In [16]:
nyctaxi_converted.show(10)

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|rate_code|store_and_fwd_flag|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|00005007A9F30E289...|0649DA10C83DE7C6A...|

In the next cell, print the row count of your merged dataset.

In [17]:
print("Number of records: " + str(nyctaxi_converted.count()))

Number of records: 173185091
