# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm 

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

## Starting the spark session

In [None]:
from pyspark.sql import SparkSession
from datetime import date
from pyspark.sql import functions as fn
from pyspark.ml.feature import StringIndexer
import pandas as pd

In [4]:
# Initialize Spark session
spark = SparkSession.builder.appName("milestone3").getOrCreate()
# spark context to interact with the driver
sc = spark.sparkContext

## Load the dataset.

In [5]:
df = spark.read.parquet('./data/yellow_tripdata_2019-08.parquet')

### Counting the number of records

In [6]:
df.count()

6073366

### Preview first 20 rows.

In [7]:
df.show(20,vertical=True)

-RECORD 0-------------------------------------
 Vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2019-08-01 02:09:46  
 tpep_dropoff_datetime | 2019-08-01 02:10:36  
 passenger_count       | 1.0                  
 trip_distance         | 0.0                  
 Rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 PU_Location           | Queens,Long Islan... 
 DO_Location           | Queens,Long Islan... 
 payment_type          | Cash                 
 fare_amount           | 2.5                  
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 3.8                  
 congestion_surcharge  | 0.0                  
 airport_fee           | null                 
-RECORD 1-------------------------------------
 Vendor      

### How many partitions is this dataframe split into?

In [8]:
df.rdd.getNumPartitions()

16

## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [47]:
def renameColumns(colList):
    renameMap = {}
    for col in colList:
        newName = col.replace(" ","_").lower()
        renameMap[col] = newName
    return renameMap

In [10]:
cols = df.columns
renameMap = renameColumns(cols)
df_clean_columns = df
for col in renameMap:
    df_clean_columns = df_clean_columns.withColumnRenamed(col, renameMap[col])
df_clean_columns.columns

['vendor',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'rate_type',
 'store_and_fwd_flag',
 'pu_location',
 'do_location',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [11]:
df_without_duplicates = df_clean_columns.dropDuplicates(subset=["tpep_pickup_datetime", "pu_location", "tpep_dropoff_datetime", "do_location", "trip_distance"])

### check that there is are no duplicates

In [12]:
df_without_duplicates.select("tpep_pickup_datetime", "pu_location", "tpep_dropoff_datetime", "do_location", "trip_distance").count() - df_without_duplicates.select("tpep_pickup_datetime", "pu_location", "tpep_dropoff_datetime", "do_location", "trip_distance").distinct().count() 

0

### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [13]:
def detectDuplicates(df):
    percs = {}
    cols = df.columns
    df_size = df.count()
    for col in cols:
        colNullCount = df.filter(df[col].isNull()).count()
        percs[col] = colNullCount/df_size*100
    return percs

In [14]:
null_count = detectDuplicates(df_without_duplicates)

### Prinout the missing info

In [15]:
null_count

{'vendor': 0.0,
 'tpep_pickup_datetime': 0.0,
 'tpep_dropoff_datetime': 0.0,
 'passenger_count': 2.3703069264530052,
 'trip_distance': 0.0,
 'rate_type': 0.0,
 'store_and_fwd_flag': 0.5498882351048214,
 'pu_location': 0.0,
 'do_location': 0.0,
 'payment_type': 5.8679334179924565,
 'fare_amount': 0.0,
 'extra': 35.78422320303585,
 'mta_tax': 0.0,
 'tip_amount': 0.0,
 'tolls_amount': 0.0,
 'improvement_surcharge': 0.0,
 'total_amount': 0.0,
 'congestion_surcharge': 0.5498882351048214,
 'airport_fee': 100.0}

### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [16]:
df_imputed = df_without_duplicates

In [17]:
df_imputed = df_imputed.fillna(value=0, subset=["passenger_count","extra","congestion_surcharge","airport_fee"])

In [18]:
df_imputed = df_imputed.fillna(value="Unknown", subset=["payment_type","store_and_fwd_flag"])

### check that there are no missing values

In [20]:
null_count = detectDuplicates(df_imputed)

In [21]:
null_count

{'vendor': 0.0,
 'tpep_pickup_datetime': 0.0,
 'tpep_dropoff_datetime': 0.0,
 'passenger_count': 0.0,
 'trip_distance': 0.0,
 'rate_type': 0.0,
 'store_and_fwd_flag': 0.0,
 'pu_location': 0.0,
 'do_location': 0.0,
 'payment_type': 0.0,
 'fare_amount': 0.0,
 'extra': 0.0,
 'mta_tax': 0.0,
 'tip_amount': 0.0,
 'tolls_amount': 0.0,
 'improvement_surcharge': 0.0,
 'total_amount': 0.0,
 'congestion_surcharge': 0.0,
 'airport_fee': 0.0}

## Feature engineering - 
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...) 

In [22]:
week_number = fn.date_format(df_imputed.tpep_pickup_datetime, "W")

In [23]:
trip_durations = df_imputed.tpep_dropoff_datetime.cast("long") - df_imputed.tpep_pickup_datetime.cast("long")

In [24]:
is_weekend = fn.dayofweek(df_imputed.tpep_pickup_datetime)
is_weekend = fn.when(is_weekend == 1, 1).otherwise(fn.when(is_weekend == 7, 1).otherwise(0))

In [25]:
df_engineered = df_imputed.withColumn("trip_duration", trip_durations).withColumn("is_weekend", is_weekend).withColumn("week_number", week_number)

In [49]:
df_engineered.columns

['vendor',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'rate_type',
 'store_and_fwd_flag',
 'pu_location',
 'do_location',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'trip_duration',
 'is_weekend',
 'week_number']

In [50]:
df_engineered.show(20,vertical=True)

-RECORD 0-------------------------------------
 vendor                | VeriFone Inc.        
 tpep_pickup_datetime  | 2019-07-31 19:32:07  
 tpep_dropoff_datetime | 2019-07-31 19:50:01  
 passenger_count       | 1.0                  
 trip_distance         | 12.43                
 rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 pu_location           | Queens,LaGuardia ... 
 do_location           | Bronx,Belmont        
 payment_type          | Cash                 
 fare_amount           | 34.0                 
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 0.0                  
 tolls_amount          | 6.12                 
 improvement_surcharge | 0.3                  
 total_amount          | 41.42                
 congestion_surcharge  | 0.0                  
 airport_fee           | 0                    
 trip_duration         | 1074                 
 is_weekend  

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added). 

In [26]:
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY' )

In [27]:
df_engineered.select(["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_duration", "is_weekend", "week_number"])\
.show(20,vertical=True)

-RECORD 0------------------------------------
 tpep_pickup_datetime  | 2019-07-31 19:32:07 
 tpep_dropoff_datetime | 2019-07-31 19:50:01 
 trip_duration         | 1074                
 is_weekend            | 0                   
 week_number           | 5                   
-RECORD 1------------------------------------
 tpep_pickup_datetime  | 2019-08-01 02:00:01 
 tpep_dropoff_datetime | 2019-08-01 02:00:04 
 trip_duration         | 3                   
 is_weekend            | 0                   
 week_number           | 1                   
-RECORD 2------------------------------------
 tpep_pickup_datetime  | 2019-08-01 02:05:09 
 tpep_dropoff_datetime | 2019-08-01 02:08:19 
 trip_duration         | 190                 
 is_weekend            | 0                   
 week_number           | 1                   
-RECORD 3------------------------------------
 tpep_pickup_datetime  | 2019-08-01 02:08:09 
 tpep_dropoff_datetime | 2019-08-01 02:12:03 
 trip_duration         | 234      

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer) 

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type 

In [28]:
avg_fare_per_payment = df_engineered.select("fare_amount","payment_type").groupBy("payment_type").avg()

In [29]:
avg_fare_per_payment.show()

+------------+------------------+
|payment_type|  avg(fare_amount)|
+------------+------------------+
|     Unknown|15.165384089237683|
|        Cash|13.150370747938217|
|     Dispute| 2.211285923039453|
|   No charge|22.161083981237574|
| Credit card|13.747046488650074|
+------------+------------------+



### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [30]:
avg_trip_duration = df_engineered.select(["trip_duration","is_weekend"]).groupBy("is_weekend").avg("trip_duration")

In [31]:
avg_trip_duration.show()

+----------+------------------+
|is_weekend|avg(trip_duration)|
+----------+------------------+
|         1| 1059.941610491406|
|         0|1087.2400725817163|
+----------+------------------+



### 3 - which day recorded the most trips?

In [32]:
df_engineered_with_day = df_engineered.withColumn("dom", fn.to_date(df_engineered.tpep_pickup_datetime, "d"))

In [33]:
df_engineered_with_day.select("tpep_pickup_datetime","dom").groupBy("dom").count().orderBy("count", ascending=False).first()[0]

datetime.date(2019, 8, 8)

### 4- What is the average "total amount" of trips with more than 2 passengers?

In [34]:
df_engineered.select("total_amount","passenger_count").filter(df_engineered.passenger_count>2).select(fn.avg("total_amount")).show()

+------------------+
| avg(total_amount)|
+------------------+
|20.057509194414752|
+------------------+



### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1.?

In [35]:
avg_tip_1 = df_engineered.select(["passenger_count","tip_amount"]).filter(df_engineered.passenger_count==1).select(fn.avg("tip_amount"))

In [36]:
avg_tip_2 = df_engineered.select(["passenger_count","tip_amount"]).filter(df_engineered.passenger_count>1).select(fn.avg("tip_amount"))

In [37]:
avg_tip_1.show() ,avg_tip_2.show()

+------------------+
|   avg(tip_amount)|
+------------------+
|2.1884534254318124|
+------------------+

+------------------+
|   avg(tip_amount)|
+------------------+
|2.1767675083289855|
+------------------+



(None, None)

### 6- What is the most frequent route on the weekend. 

In [38]:
most_frequent =  df_engineered.select(["pu_location", "do_location", "is_weekend"]).filter(df_engineered.is_weekend==1).groupBy(["pu_location", "do_location"]).count().orderBy("count", ascending=False)

In [39]:
most_frequent.show(2)

+--------------------+--------------------+-----+
|         pu_location|         do_location|count|
+--------------------+--------------------+-----+
|          Unknown,NV|          Unknown,NV| 9862|
|Manhattan,Upper E...|Manhattan,Upper E...| 4232|
+--------------------+--------------------+-----+
only showing top 2 rows



## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

### Label encoding 

In [40]:
lookup = pd.DataFrame(columns=["column_name","original_value","encoded_value"])

In [41]:
df_encoded = df_engineered
categoricalFeatures = ["vendor","rate_type","store_and_fwd_flag","payment_type","pu_location","do_location"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_enc") for col in categoricalFeatures]
for i in indexers:
    transformer = i.fit(df_encoded)
    df_encoded = transformer.transform(df_encoded)
    df_encoded = df_encoded.drop(transformer.getInputCol())
    for idx in range(len(transformer.labels)):
        entry = {"column_name":transformer.getInputCol(), "original_value":transformer.labels[idx], "encoded_value":idx}
        lookup.loc[len(lookup)] = entry

### Preview first 20 rows of the label encoded features

In [42]:
df_encoded.show(20, vertical=True)

-RECORD 0-------------------------------------
 tpep_pickup_datetime   | 2019-07-31 19:32:07 
 tpep_dropoff_datetime  | 2019-07-31 19:50:01 
 passenger_count        | 1.0                 
 trip_distance          | 12.43               
 fare_amount            | 34.0                
 extra                  | 0.5                 
 mta_tax                | 0.5                 
 tip_amount             | 0.0                 
 tolls_amount           | 6.12                
 improvement_surcharge  | 0.3                 
 total_amount           | 41.42               
 congestion_surcharge   | 0.0                 
 airport_fee            | 0                   
 trip_duration          | 1074                
 is_weekend             | 0                   
 week_number            | 5                   
 vendor_enc             | 0.0                 
 rate_type_enc          | 0.0                 
 store_and_fwd_flag_enc | 0.0                 
 payment_type_enc       | 1.0                 
 pu_location_

### Preview first 20 rows of your lookup table

In [43]:
lookup.head(20)

Unnamed: 0,column_name,original_value,encoded_value
0,vendor,VeriFone Inc.,0
1,vendor,"Creative Mobile Technologies, LLC",1
2,vendor,Unknown,2
3,rate_type,Standard rate,0
4,rate_type,JFK,1
5,rate_type,Unknown,2
6,rate_type,Negotiated fare,3
7,rate_type,Newark,4
8,rate_type,Nassau or Westchester,5
9,rate_type,Group ride,6


### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [44]:
df_encoded.write.parquet("./data/yellow_trip_data_2019-8_clean.parquet", mode="overwrite") 

In [45]:
lookup.to_csv("./data/yellow_taxi_lookup_8_19.csv")

## Bonus - Load the cleaned parquet file and lookup table into a Postgres database. 

Note that if you decide to do the bonus, you must include not only your notebook but the docker-compose.yaml file aswell.

### Screenshot of the table existing in the database and a simple query such as `select count(*) from table_name` or `select * from table_name limit 10`

(You can just copy paste the screenshots in the markdown cells below)