# Exploratory Data Analysis and Data Prep in PySpark

In this workbook, you will read in the `trip` and `fare` files. You are welcome to use DataFrame and/or SparkSQL API as you desire as long as it produces the expected results.

It is recommended to make small versions of the datasets for testing purposes so that you can trial and error faster. Run a command similar to `df_small = df.limit(10000)`

Make sure you use your [PySparkSQL Cheat Sheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) to help you with the commands to complete the assignment.

Instructions:

1. Read in both datasets and conduct initial exploration. 
    - Determine the shape of the data, column names, data types
    - Check for the number of missing values on the critical fields of the datasets - pickup_datetime, dropoff_datetime, passenger_count, trip_distance, fare_amount, medallion, hack_license, etc. There is a `count` command to use with columns in a dataframe and several ways to check for missing values. Try Googling some options!
    - Check the counts for values of passenger_count and remove any outlier values (how many people fit into a taxi?). Justify your analytical decision. You can use the `.isin()` method to create boolean values from a column ([see here](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.isin.html)).

2. Join both datasets such that you get a merged dataset with 21 unique fields. 
    - Which columns do you use to join the data? You need to determine how to match the schemas.
    - What type of join should you use? Try at least two types of joins and report how much data is matched and is lost based on the join type. Justify which join you selected.

3. Once you create the merged dataset, you need to convert fields to the following types, since all fields were read in as string:
    * pickup_datetime and dropoff_datetime must be TIMESTAMP
    * passenger_count and rate_code must be INT
    * all other numeric fields must be FLOAT
    * the remaining fields stay as STRING

4. Create new variables to your dataset that will be important for understanding the data.
    * Dummy variable for if pickup_datetime appears on a weekend or not
    * Dummy variable for if pickup_datetime appears during weekday rush hour (6:00-9:59am, 2:00-5:59pm) or not
    * Dummy variable for if the tip_amount is greater than 10% of the fare_amount or not

5. Save this merged, converted, and transformed dataset to your own S3 bucket in parquet format.

6. Report the average, median, Q25, and Q75 fare amount in a table conditioned on the dummy variables you created in step 4. Each table should look like the example below, though your numbers might not match exactly.


 |  weekdayrush_dummy  |mean_fare  |Q25|  median|   Q75|  num_rides|
|----------------------|-----------|---|--------|------|-----------|
|0               |True     |12.462  |6.5     |9.0  |14.0   |44848941|
|1              |False     |12.312  |6.5     |9.5  |14.0  |128336134|
       
You are welcome to add as many cells as you need. Clearly indicate in your notebook each step so graders can confirm you have accomplished each task. **You must include comments in your code.**

**REQUIRED:** Start off by copying the taxi data from the public S3 bucket to your personal S3 bucket!

In [2]:
!hadoop distcp s3://bigdatateaching/nyctaxi-2013/parquet/ s3://mc2582/taxi

2022-10-13 19:28:51,860 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, overwrite=false, append=false, useDiff=false, useRdiff=false, fromSnapshot=null, toSnapshot=null, skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[BLOCKSIZE], atomicWorkPath=null, logPath=null, sourceFileListing=null, sourcePaths=[s3://bigdatateaching/nyctaxi-2013/parquet], targetPath=s3://mc2582/taxi, filtersFile='null', blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, directWrite=false}, sourcePaths=[s3://bigdatateaching/nyctaxi-2013/parquet], targetPathExists=true, preserveRawXattrsfalse
2022-10-13 19:28:52,181 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-20-83.ec2.internal/172.31.20.83:8032
2022-10-13 19:28:52,400 INFO client.AHSProxy: Connecting to Application History server at ip-172-31-20-83.ec2.internal/172.31.20.83:10200
2

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("a6-part2-app").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/14 03:38:05 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
22/10/14 03:38:16 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


### Step 1: Read in data and explore

In [3]:
trip = spark.read.parquet("s3://mc2582/taxi/trip/")

                                                                                

In [4]:
fare = spark.read.parquet("s3://mc2582/taxi/fare/")

In [9]:
print(trip.count(), len(trip.columns))



173179759 14


                                                                                

In [8]:
print(fare.count(), len(fare.columns))



173179759 11


                                                                                

In [9]:
trip.columns

['medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'store_and_fwd_flag',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']

In [10]:
fare.columns

['medallion',
 'hack_license',
 'vendor_id',
 'pickup_datetime',
 'payment_type',
 'fare_amount',
 'surcharge',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'total_amount']

In [11]:
trip.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



In [12]:
fare.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [5]:
from pyspark.sql.functions import col
trip.filter(col('pickup_datetime').isNull()).count()

                                                                                

0

In [14]:
trip.filter(col('dropoff_datetime').isNull()).count()

0

In [15]:
trip.filter(col('passenger_count').isNull()).count()

0

In [16]:
trip.filter(col('trip_distance').isNull()).count()

                                                                                

0

In [17]:
fare.filter(col('fare_amount').isNull()).count()

                                                                                

0

In [18]:
fare.filter(col('medallion').isNull()).count()

                                                                                

0

In [19]:
trip.filter(col('medallion').isNull()).count()

0

In [20]:
fare.filter(col('hack_license').isNull()).count()

                                                                                

0

In [21]:
trip.filter(col('hack_license').isNull()).count()

0

In [13]:
valid_trip = trip.filter(col('passenger_count').isin(['1','2','3','4','5']))

### Step 2: Join the datasets

In [15]:
valid_trip.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



In [16]:
fare.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [18]:
valid_df_test = valid_trip.join(fare, ['medallion', 'hack_license', 'vendor_id', 'pickup_datetime'], 'left')

In [27]:
valid_trip.count()

                                                                                

166409833

In [19]:
valid_df_test.count()

                                                                                

166414859

Increase records with null values.

In [20]:
valid_df = valid_trip.join(fare, ['medallion', 'hack_license', 'vendor_id', 'pickup_datetime'], 'inner')

In [21]:
valid_df.count()

                                                                                

166414859

Don't lose in number of records compared to valid_df.

### Step 3: Change data types
This step can also be integrated into Step 2 merging if using PySparkSQL

In [27]:
valid_df = valid_df.withColumn('pickup_datetime', col('pickup_datetime').cast('timestamp'))\
.withColumn('dropoff_datetime', col('dropoff_datetime').cast('timestamp'))\
.withColumn('passenger_count', col('passenger_count').cast('int'))\
.withColumn('rate_code', col('rate_code').cast('int'))\
.withColumn('trip_time_in_secs', col('trip_time_in_secs').cast('float'))\
.withColumn('trip_distance', col('trip_distance').cast('float'))\
.withColumn('pickup_longitude', col('pickup_longitude').cast('float'))\
.withColumn('pickup_latitude', col('pickup_latitude').cast('float'))\
.withColumn('dropoff_longitude', col('dropoff_longitude').cast('float'))\
.withColumn('dropoff_latitude', col('dropoff_latitude').cast('float'))\
.withColumn('fare_amount', col('fare_amount').cast('float'))\
.withColumn('surcharge', col('surcharge').cast('float'))\
.withColumn('mta_tax', col('mta_tax').cast('float'))\
.withColumn('tip_amount', col('tip_amount').cast('float'))\
.withColumn('tolls_amount', col('tolls_amount').cast('float'))\
.withColumn('total_amount', col('total_amount').cast('float'))

In [28]:
valid_df.cache()

DataFrame[medallion: string, hack_license: string, vendor_id: string, pickup_datetime: timestamp, rate_code: int, store_and_fwd_flag: string, dropoff_datetime: timestamp, passenger_count: int, trip_time_in_secs: float, trip_distance: float, pickup_longitude: float, pickup_latitude: float, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, surcharge: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float]

### Step 4: Create dummy variables

- Dummy variable for if pickup_datetime appears on a weekend or not
- Dummy variable for if pickup_datetime appears during weekday rush hour (6:00-9:59am, 2:00-5:59pm) or not
- Dummy variable for if the tip_amount is greater than 10% of the fare_amount or not

Reminder that using a small dataframe for testing will help you develop your analytics faster.

In [29]:
test_df = valid_df.sample(True, 0.00001, 1)

In [30]:
from pyspark.sql import Row
import datetime
def InsertIntoRow(row, keys, values):
    temp_dict = row.asDict()
    for i in range(len(keys)):
        key = keys[i]
        value = values[i]
        temp_dict[key] = value
    return Row(**temp_dict)
def DateOnWeekend(date_time):
    return date_time.weekday()>4
def TimeInRushHour(date_time):
    tmp_time = date_time.time()
    rush_hour_1 = datetime.time(hour=6, minute=0, second=0)
    rush_hour_2 = datetime.time(hour=9, minute=59, second=0)
    rush_hour_3 = datetime.time(hour=13, minute=0, second=0)
    rush_hour_4 = datetime.time(hour=17, minute=59, second=0)
    return (tmp_time >= rush_hour_1 and tmp_time <= rush_hour_2) | (tmp_time >= rush_hour_3 and tmp_time <= rush_hour_4)


In [34]:
from pyspark.sql import Row
final_df = valid_df.rdd.map(\
                           lambda row: InsertIntoRow(row, ['isOnWeekend', 'isInRushHour', 'isTipAmountGreat'],\
                                                     [DateOnWeekend(row.pickup_datetime), TimeInRushHour(row.pickup_datetime),\
                                                     (row.tip_amount)>(row.fare_amount*0.1)])).toDF()


### Step 5: Save data to S3

In [35]:
final_df.write.parquet('s3://mc2582/cleaned_taxi/')

                                                                                

In [36]:
final_df.cache()

DataFrame[medallion: string, hack_license: string, vendor_id: string, pickup_datetime: timestamp, rate_code: bigint, store_and_fwd_flag: string, dropoff_datetime: timestamp, passenger_count: bigint, trip_time_in_secs: double, trip_distance: double, pickup_longitude: double, pickup_latitude: double, dropoff_longitude: double, dropoff_latitude: double, payment_type: string, fare_amount: double, surcharge: double, mta_tax: double, tip_amount: double, tolls_amount: double, total_amount: double, isOnWeekend: boolean, isInRushHour: boolean, isTipAmountGreat: boolean]

### Step 6: Analytics

In [37]:
final_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_time_in_secs: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- isOnWeekend: boolean (nullable = true)
 |-- is

In [45]:
import pyspark.sql.functions as f
temp_weekend_mean = final_df.select('isOnWeekend', 'fare_amount').groupby('isOnWeekend').agg(f.mean(col('fare_amount'))).alias('mean_fare').take(2)

                                                                                

In [55]:
temp_weekend_quantiles = final_df.select('isOnWeekend', 'fare_amount').groupby('isOnWeekend')\
.agg(f.expr('percentile(fare_amount, array(0.25))')[0].alias('%25'),\
     f.expr('percentile(fare_amount, array(0.50))')[0].alias('%50'),\
     f.expr('percentile(fare_amount, array(0.75))')[0].alias('%75')).collect()

                                                                                

In [48]:
temp_rush_mean = final_df.select('isInRushHour', 'fare_amount').groupby('isInRushHour').agg(f.mean(col('fare_amount'))).alias('mean_fare').take(2)

                                                                                

In [58]:
temp_rush_quantiles = final_df.select('isInRushHour', 'fare_amount').groupby('isInRushHour')\
.agg(f.expr('percentile(fare_amount, array(0.25))')[0].alias('%25'),\
     f.expr('percentile(fare_amount, array(0.50))')[0].alias('%50'),\
     f.expr('percentile(fare_amount, array(0.75))')[0].alias('%75')).collect()

                                                                                

In [52]:
temp_greattip_mean = final_df.select('isTipAmountGreat', 'fare_amount').groupby('isTipAmountGreat').agg(f.mean(col('fare_amount'))).alias('mean_fare').take(2)

                                                                                

In [59]:
temp_greattip_quantiles = final_df.select('isTipAmountGreat', 'fare_amount').groupby('isTipAmountGreat')\
.agg(f.expr('percentile(fare_amount, array(0.25))')[0].alias('%25'),\
     f.expr('percentile(fare_amount, array(0.50))')[0].alias('%50'),\
     f.expr('percentile(fare_amount, array(0.75))')[0].alias('%75')).collect()

                                                                                

In [71]:
temp_weekend_quantiles

[Row(isOnWeekend=True, %25=6.5, %50=9.5, %75=14.0),
 Row(isOnWeekend=False, %25=6.5, %50=9.5, %75=14.0)]

In [63]:
temp_weekend_quantiles

[Row(isOnWeekend=True, %25=6.5, %50=9.5, %75=14.0),
 Row(isOnWeekend=False, %25=6.5, %50=9.5, %75=14.0)]

In [72]:
import pandas as pd
df_weekend = pd.DataFrame({'isOnWeekend':[True,False],\
                          'mean_fare':[temp_weekend_mean[0][1], temp_weekend_mean[1][1]],\
                          'Q25':[temp_weekend_quantiles[0][1], temp_weekend_quantiles[1][1]],\
                          'median':[temp_weekend_quantiles[0][2], temp_weekend_quantiles[1][2]],\
                          'Q75':[temp_weekend_quantiles[0][3], temp_weekend_quantiles[1][3]]})

In [75]:
df_rushhour = pd.DataFrame({'isInRushHour':[True,False],\
                          'mean_fare':[temp_rush_mean[0][1], temp_rush_mean[1][1]],\
                          'Q25':[temp_rush_quantiles[0][1], temp_rush_quantiles[1][1]],\
                          'median':[temp_rush_quantiles[0][2], temp_rush_quantiles[1][2]],\
                          'Q75':[temp_rush_quantiles[0][3], temp_rush_quantiles[1][3]]})

In [77]:
df_tip = pd.DataFrame({'isTipAmountGreat':[True,False],\
                          'mean_fare':[temp_greattip_mean[0][1], temp_greattip_mean[1][1]],\
                          'Q25':[temp_greattip_quantiles[0][1], temp_greattip_quantiles[1][1]],\
                          'median':[temp_greattip_quantiles[0][2], temp_greattip_quantiles[1][2]],\
                          'Q75':[temp_greattip_quantiles[0][3], temp_greattip_quantiles[1][3]]})

## **Save your analytics results to a json object - then add, commit, and push your notebook and json to GitHub!**

In [79]:
import json
json.dump({'weekend' : df_weekend.to_dict('records'),
           'tip' : df_tip.to_dict('records'),
           'rushhour' : df_rushhour.to_dict('records')
          }, 
          fp = open('taxi-soln.json','w'))

# MAKE SURE YOU STOP YOUR EMR CLUSTER!

In [None]:
spark.stop()