# Create Merged Dataset

In this workbook, you will read in the `trip` and `fare` files. You are welcome to use DataFrame and/or SparkSQL API as you desire as long as it produces the expected results.

Instructions:

1. Join both datasets such that you get a merged dataset with 21 unique fields. You need to determine how to join the dataset.
2. Once you create the merged dataset, you need to convert fields to the following types, since all fields were read is as string:
    * pickup_datetime and dropoff_datetime must be TIMESTAMP
    * passenger_count and rate_code must be INT
    * all other numeric fields must be FLOAT
    * the remaining fields stay as STRING
3. Save your merged and converted dataset to your own S3 bucket in parquet format.

You are welcome to add as many cells as you need below up until the next section. **You must include comments in your code.**

In [1]:
# Import findspark
import findspark
findspark.init()

# Import libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("merge-data").getOrCreate()
sc    = spark.sparkContext      # get the context

In [3]:
# Check spark and sc
spark
sc

In [9]:
# Load datasets from sc using read.parquet
trip = spark.read.parquet("s3://bigdatateaching/nyctaxi-2013/parquet/trip/")
fare = spark.read.parquet("s3://bigdatateaching/nyctaxi-2013/parquet/fare/")

In [7]:
# Show data 
trip.show(3)
fare.show(3)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|0F6CF8A85A039CDF3...|33BC9D03BE4E9B8E1...|      VTS|        1|              null|2013-04-20 20:22:00|2013-04-20 20:28:00|              1|              360|          .46|       -73.98571|      40.762817|       -73.978874|       40.762306|
|75148F5ED6DECC2B4...|FC93BBB797CBAA633...| 

In [41]:
# Register the datasets as SQL table
trip.createOrReplaceTempView("tr")
fare.createOrReplaceTempView("fa")

In [43]:
# Merge the two table using sql
merge_data = spark.sql("SELECT tr.medallion, tr.hack_license, tr.vendor_id, CAST(tr.rate_code AS INT), tr.store_and_fwd_flag, \
                        CAST(tr.pickup_datetime AS TIMESTAMP), CAST(tr.dropoff_datetime AS TIMESTAMP), CAST(tr.passenger_count AS INT), \
                        CAST(tr.trip_time_in_secs AS FLOAT), CAST(tr.trip_distance AS FLOAT), CAST(tr.pickup_longitude AS FLOAT), \
                        CAST(tr.pickup_latitude AS FLOAT), CAST(tr.dropoff_longitude AS FLOAT), CAST(tr.dropoff_latitude AS FLOAT), \
                        fa.payment_type, CAST(fa.fare_amount AS FLOAT), CAST(fa.surcharge AS FLOAT), CAST(fa.mta_tax AS FLOAT), \
                        CAST(fa.tip_amount AS FLOAT), CAST(fa.tolls_amount AS FLOAT), CAST(fa.total_amount AS FLOAT) \
                        FROM tr LEFT JOIN fa \
                        ON tr.medallion = fa.medallion \
                        AND tr.hack_license = fa.hack_license \
                        AND tr.vendor_id = fa.vendor_id \
                        AND tr.pickup_datetime = fa.pickup_datetime").cache()


In [44]:
# Print out the merged data
merge_data.show(3)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|00005007A9F30E289...|02015F5B7D1884620...|

## In the following cells, please provide the requested code and output. Do not change the order and/or structure of the cells.

In the next cell, provide the code that saves your merged dataset to your S3 bucket.

In [47]:
merge_data.write.parquet("s3://anly502s3/a5_mergeData/")

In the next cell, print the schema of your merged dataset.

In [48]:
merge_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: float (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



In the next cell, print the first 10 records of your merged dataset.

In [49]:
merge_data.show(10)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|00005007A9F30E289...|02015F5B7D1884620...|

In the next cell, print the row count of your merged dataset.

In [50]:
# Count the data set
merge_data.count()

173185091

In [51]:
spark.stop()