## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

In [171]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Milestone3").getOrCreate()
sc = spark.sparkContext
from pyspark.sql.functions import col, when, unix_timestamp, round, date_format, ceil, dayofmonth, avg, count, lit, asc
from pyspark.sql.types import DoubleType, FloatType, IntegerType, LongType, ShortType, StringType
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
import os

# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm 

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Load the dataset.

In [172]:
df_trip_original = spark.read.parquet("datasets/yellow_tripdata_2018-11.parquet")

In [173]:
df_trip = df_trip_original

In [174]:
df_same_location = df_trip_original.where(df_trip_original.PU_Location == df_trip_original.DO_Location)
df_same_location.show(1)

+--------------------+--------------------+---------------------+---------------+-------------+-------------+------------------+--------------------+--------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|              Vendor|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|    Rate_type|store_and_fwd_flag|         PU_Location|         DO_Location|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------------------+--------------------+---------------------+---------------+-------------+-------------+------------------+--------------------+--------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|Creative Mobile T...| 2018-11-01 01:51:36|  2018-11-01 01:52:36|            1.0|          0.0

### Preview first 20 rows.

In [175]:
def show_first_20(df):
    df.show(20, vertical=True)
    print("Total number of rows: ", df.count())
show_first_20(df_trip)

-RECORD 0-------------------------------------
 Vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2018-11-01 01:51:36  
 tpep_dropoff_datetime | 2018-11-01 01:52:36  
 passenger_count       | 1.0                  
 trip_distance         | 0.0                  
 Rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 PU_Location           | Queens,Long Islan... 
 DO_Location           | Queens,Long Islan... 
 payment_type          | Cash                 
 fare_amount           | 2.5                  
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 3.8                  
 congestion_surcharge  | null                 
 airport_fee           | null                 
-RECORD 1-------------------------------------
 Vendor      

Total number of rows:  8155459


### How many partitions is this dataframe split into?

In [176]:
def get_num_partitions(df):
    return df.rdd.getNumPartitions()
num_partitions = get_num_partitions(df_trip)
print("Number of partitions: ", num_partitions)

Number of partitions:  12


## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [177]:
def rename_columns(df):
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, col_name.lower().replace(' ', '_'))
    return df
df_trip = rename_columns(df_trip)
show_first_20(df_trip)   

-RECORD 0-------------------------------------
 vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2018-11-01 01:51:36  
 tpep_dropoff_datetime | 2018-11-01 01:52:36  
 passenger_count       | 1.0                  
 trip_distance         | 0.0                  
 rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 pu_location           | Queens,Long Islan... 
 do_location           | Queens,Long Islan... 
 payment_type          | Cash                 
 fare_amount           | 2.5                  
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 3.8                  
 congestion_surcharge  | null                 
 airport_fee           | null                 
-RECORD 1-------------------------------------
 vendor      

### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [178]:
def detect_duplicates(df):
    df_temp = df
    unique_rows = df_temp.dropDuplicates(['tpep_pickup_datetime', 'pu_location', 'tpep_dropoff_datetime', 'do_location', 'trip_distance'])
    print("Number of duplicate rows: ", df.count() - unique_rows.count())
detect_duplicates(df_trip)

Number of duplicate rows:  7329


In [179]:
def drop_duplicates(df):
    df = df.drop_duplicates(['tpep_pickup_datetime', 'pu_location', 'tpep_dropoff_datetime', 'do_location', 'trip_distance'])           
    return df
df_trip = drop_duplicates(df_trip)

### check that there is are no duplicates

In [180]:
detect_duplicates(df_trip)
detect_duplicates(df_trip_original)

Number of duplicate rows:  0
Number of duplicate rows:  7329


### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [181]:
def calculate_missing_percentage(df):
    total_entries = df.count()
    missing_percentage = {}

    for column in df.columns:
        missing_entries = df.where(col(column).isNull()).count()
        percentage = (missing_entries / total_entries) * 100
        missing_percentage[column] = percentage

    return missing_percentage

### Prinout the missing info

In [182]:
missing_percentage = calculate_missing_percentage(df_trip)
print(missing_percentage)

{'vendor': 0.0, 'tpep_pickup_datetime': 0.0, 'tpep_dropoff_datetime': 0.0, 'passenger_count': 1.438415931017301, 'trip_distance': 0.0, 'rate_type': 0.0, 'store_and_fwd_flag': 0.10814751360128029, 'pu_location': 0.0, 'do_location': 0.0, 'payment_type': 6.559185972732394, 'fare_amount': 0.0, 'extra': 55.41693615590325, 'mta_tax': 0.0, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'improvement_surcharge': 0.0, 'total_amount': 0.0, 'congestion_surcharge': 99.99997545449078, 'airport_fee': 100.0}


### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [183]:
def impute_numeric(df):
    numeric_types = [DoubleType, FloatType, IntegerType, LongType, ShortType]
    for column in df.columns:
        if type(df.schema[column].dataType) in numeric_types:
            df = df.fillna(value = 0, subset = [column])
    return df
df_trip = impute_numeric(df_trip)

In [184]:
def impute_categorical(df):
    numeric_types = [DoubleType, FloatType, IntegerType, LongType, ShortType]
    for column in df.columns:
        if type(df.schema[column].dataType) == StringType:
            df = df.withColumn(column, when(col(column).rlike('Unknown'), lit('Unknown')).otherwise(col(column)))
            df = df.fillna(value = 'Unknown', subset = [column])
    return df
df_trip = impute_categorical(df_trip)

### check that there are no missing values

In [185]:
missing_percentage = calculate_missing_percentage(df_trip)
print(missing_percentage)

{'vendor': 0.0, 'tpep_pickup_datetime': 0.0, 'tpep_dropoff_datetime': 0.0, 'passenger_count': 0.0, 'trip_distance': 0.0, 'rate_type': 0.0, 'store_and_fwd_flag': 0.0, 'pu_location': 0.0, 'do_location': 0.0, 'payment_type': 0.0, 'fare_amount': 0.0, 'extra': 0.0, 'mta_tax': 0.0, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'improvement_surcharge': 0.0, 'total_amount': 0.0, 'congestion_surcharge': 0.0, 'airport_fee': 0.0}


## Feature engineering - 
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...) 

In [186]:
def add_trip_duration(df, destination_time_col, pickup_time_col):
    timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
    timeDiff = -1 * (unix_timestamp(destination_time_col, format=timeFmt)
                - unix_timestamp(pickup_time_col, format=timeFmt))
    timeDiff = timeDiff / 60
    df = df.withColumn("trip_duration_m", timeDiff)
    return df
df_trip = add_trip_duration(df_trip, df_trip['tpep_pickup_datetime'], df_trip['tpep_dropoff_datetime'])


In [187]:
def is_weekend(df, date_column):
    df = df.withColumn('is_weekend', when(date_format(col(date_column), 'E').isin(['Sat', 'Sun']), 'Weekend').otherwise('Weekday'))
    return df

df_trip = is_weekend(df_trip, 'tpep_pickup_datetime')

In [188]:
def add_week_number(df, date_column):
    df = df.withColumn('week_number', ceil((dayofmonth(col(date_column)) - 1) / 7))
    return df

df_trip = add_week_number(df_trip, 'tpep_pickup_datetime')

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added). 

In [189]:
columns_to_show = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_duration_m', 'is_weekend', 'week_number']
df_select = df_trip.select(columns_to_show)
df_select.show(20, vertical=True)

-RECORD 0------------------------------------
 tpep_pickup_datetime  | 2018-11-01 01:04:26 
 tpep_dropoff_datetime | 2018-11-01 01:10:25 
 trip_duration_m       | 5.983333333333333   
 is_weekend            | Weekday             
 week_number           | 0                   
-RECORD 1------------------------------------
 tpep_pickup_datetime  | 2018-11-01 01:05:43 
 tpep_dropoff_datetime | 2018-11-01 01:13:45 
 trip_duration_m       | 8.033333333333333   
 is_weekend            | Weekday             
 week_number           | 0                   
-RECORD 2------------------------------------
 tpep_pickup_datetime  | 2018-11-01 01:04:22 
 tpep_dropoff_datetime | 2018-11-01 01:15:05 
 trip_duration_m       | 10.716666666666667  
 is_weekend            | Weekday             
 week_number           | 0                   
-RECORD 3------------------------------------
 tpep_pickup_datetime  | 2018-11-01 01:02:28 
 tpep_dropoff_datetime | 2018-11-01 01:16:04 
 trip_duration_m       | 13.6     

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer) 

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type 

In [190]:
def average_fare_per_payment_type(df, fare_column, payment_type_column):
    return df.groupBy(payment_type_column).agg(avg(fare_column))

average_fare = average_fare_per_payment_type(df_trip, 'fare_amount', 'payment_type')
average_fare.show()

+------------+------------------+
|payment_type|  avg(fare_amount)|
+------------+------------------+
|     Unknown|13.860610648661185|
|        Cash|12.262388189782758|
|     Dispute| 9.084013611784446|
|   No charge| 11.09265405199895|
| Credit card|13.878475601016547|
+------------+------------------+



- From the returned dataframe, we can see that 'Credit Card' payment type has the greatest fare amount. 'Dispute' payment type has the least average fare amount because it this payment type indicates an error or a problem and that is not the default.

### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [191]:
def average_trip_distance(df, trip_distance_column):
    return df.groupBy('is_weekend').agg(avg(trip_distance_column))

average_distance = average_trip_distance(df_trip, 'trip_distance')
average_distance.show()

+----------+------------------+
|is_weekend|avg(trip_distance)|
+----------+------------------+
|   Weekday| 2.915725545876754|
|   Weekend|2.9960711354081933|
+----------+------------------+



- From the returned dataframe, we can see that the longer trips are on the weekend. This might be because on weekends, people travel alot maybe to their families or to have fun and ofcourse travelling takes longer distances.

### 3 - which day recorded the most trips?

In [192]:
def day_with_most_trips(df, date_column):
    return df.groupby(dayofmonth(df.tpep_pickup_datetime)).count().orderBy(col("count").desc())

day_most_trips = day_with_most_trips(df_trip, 'tpep_pickup_datetime')
day_most_trips.show(31)

+--------------------------------+------+
|dayofmonth(tpep_pickup_datetime)| count|
+--------------------------------+------+
|                               1|317424|
|                              10|310571|
|                              17|305864|
|                              30|304631|
|                               2|304128|
|                               3|298181|
|                               8|298057|
|                              14|297835|
|                               9|296721|
|                              29|295487|
|                              16|293873|
|                               7|288293|
|                              13|285771|
|                              28|283042|
|                              20|277449|
|                              27|277188|
|                               6|275989|
|                               5|268468|
|                              11|268139|
|                              18|266259|
|                               4|

- From the returned dataframe, we can see that day 1 of the month has the most trips, while day 31 of the month has only 80 trips.

### 4- What is the average "total amount" of trips with more than 2 passengers?

In [193]:
def average_total_amount(df, total_amount_column, passenger_count_column):
    return df.filter(col(passenger_count_column) > 2).agg(avg(total_amount_column))

average_amount = average_total_amount(df_trip, 'total_amount', 'passenger_count')
average_amount.show()

+-----------------+
|avg(total_amount)|
+-----------------+
|16.84797117923915|
+-----------------+



- From the returned dataframe, we can see that the average total amount form trips with more than 2 passengers is 16.847. That means that on average, each person will pay 5.61 at most if 3 persons are riding the taxi.

### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1.?

In [194]:
def average_tip(df, tip_column, passenger_count_column):
    df_single = df.filter(col(passenger_count_column) == 1).agg(avg(tip_column).alias('average_tip'))
    df_single = df_single.withColumn('passenger_type', lit('Single'))

    df_multiple = df.filter(col(passenger_count_column) > 1).agg(avg(tip_column).alias('average_tip'))
    df_multiple = df_multiple.withColumn('passenger_type', lit('Multiple'))

    return df_single.union(df_multiple)

average_tip = average_tip(df_trip, 'tip_amount', 'passenger_count')
average_tip.show()

+------------------+--------------+
|       average_tip|passenger_type|
+------------------+--------------+
|1.9386926330778256|        Single|
|1.9126559300393196|      Multiple|
+------------------+--------------+



- From the returned dataframe, we can see that the tip amount is almost the same for single or multiple passengers.

### 6- What is the most frequent route on the weekend. 

In [195]:
def most_frequent_route(df, pickup_column, dropoff_column, date_column):
    df = df.filter((df.pu_location != 'Unknown') & (df.do_location != 'Unknown'))
    return df.filter(date_format(col(date_column), 'E').isin(['Sat', 'Sun'])) \
             .groupBy(pickup_column, dropoff_column) \
             .agg(count('*').alias('trip_count')) \
             .orderBy('trip_count', ascending=False)

most_frequent_route = most_frequent_route(df_trip, 'pu_location', 'do_location', 'tpep_pickup_datetime')
most_frequent_route.show(1)

+--------------------+--------------------+----------+
|         pu_location|         do_location|trip_count|
+--------------------+--------------------+----------+
|Manhattan,Upper E...|Manhattan,Upper E...|     10868|
+--------------------+--------------------+----------+
only showing top 1 row



- From the returned dataframe, we can see that Manhattan trips are the most frequent route.

## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

In [196]:
categorical_features = [field.name for field in df_trip.schema.fields if field.dataType == StringType()]
df_trip.select(categorical_features).show(20, vertical=True)

-RECORD 0----------------------------------
 vendor             | Creative Mobile T... 
 rate_type          | Standard rate        
 store_and_fwd_flag | N                    
 pu_location        | Manhattan,East Ha... 
 do_location        | Manhattan,Central... 
 payment_type       | Unknown              
 is_weekend         | Weekday              
-RECORD 1----------------------------------
 vendor             | Creative Mobile T... 
 rate_type          | Standard rate        
 store_and_fwd_flag | N                    
 pu_location        | Manhattan,Gramercy   
 do_location        | Manhattan,Gramercy   
 payment_type       | Credit card          
 is_weekend         | Weekday              
-RECORD 2----------------------------------
 vendor             | VeriFone Inc.        
 rate_type          | Standard rate        
 store_and_fwd_flag | N                    
 pu_location        | Manhattan,Times S... 
 do_location        | Manhattan,Manhatt... 
 payment_type       | Credit car

In [197]:
lookup_map = []
def label_encode(df):
    categorical_features = [field.name for field in df.schema.fields if field.dataType == StringType()]
    for col in categorical_features:
        indexer = StringIndexer(inputCol=col, outputCol=col+"_labeled")
        df = indexer.fit(df).transform(df)
        lookup_map.extend([(col, original, encoded) for original, encoded in df.select(col, col + "_labeled").distinct().collect()])
        df = df.drop(col)
        df = df.withColumnRenamed(col + "_labeled", col)
    return df
    
df_trip = label_encode(df_trip)    
lookup_table = spark.createDataFrame(lookup_map, ['column_name', 'original_value', 'label_after_encoding'])

### Preview first 20 rows of the label encoded features

In [198]:
df_trip.select(categorical_features).show(20, vertical=True)

-RECORD 0------------------
 vendor             | 1.0  
 rate_type          | 0.0  
 store_and_fwd_flag | 0.0  
 pu_location        | 43.0 
 do_location        | 47.0 
 payment_type       | 2.0  
 is_weekend         | 0.0  
-RECORD 1------------------
 vendor             | 1.0  
 rate_type          | 0.0  
 store_and_fwd_flag | 0.0  
 pu_location        | 16.0 
 do_location        | 17.0 
 payment_type       | 0.0  
 is_weekend         | 0.0  
-RECORD 2------------------
 vendor             | 0.0  
 rate_type          | 0.0  
 store_and_fwd_flag | 0.0  
 pu_location        | 4.0  
 do_location        | 39.0 
 payment_type       | 0.0  
 is_weekend         | 0.0  
-RECORD 3------------------
 vendor             | 1.0  
 rate_type          | 0.0  
 store_and_fwd_flag | 0.0  
 pu_location        | 16.0 
 do_location        | 4.0  
 payment_type       | 0.0  
 is_weekend         | 0.0  
-RECORD 4------------------
 vendor             | 0.0  
 rate_type          | 0.0  
 store_and_fwd_flag 

### Preview first 20 rows of your lookup table

In [199]:
lookup_table.show(20, vertical=True)

-RECORD 0------------------------------------
 column_name          | vendor               
 original_value       | Creative Mobile T... 
 label_after_encoding | 1.0                  
-RECORD 1------------------------------------
 column_name          | vendor               
 original_value       | VeriFone Inc.        
 label_after_encoding | 0.0                  
-RECORD 2------------------------------------
 column_name          | vendor               
 original_value       | Unknown              
 label_after_encoding | 2.0                  
-RECORD 3------------------------------------
 column_name          | rate_type            
 original_value       | JFK                  
 label_after_encoding | 1.0                  
-RECORD 4------------------------------------
 column_name          | rate_type            
 original_value       | Standard rate        
 label_after_encoding | 0.0                  
-RECORD 5------------------------------------
 column_name          | rate_type 

### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [200]:
if not os.path.isdir("yellow_trip_data_2018-11clean.parquet"):
    df_trip.write.parquet("yellow_trip_data_2018-11clean.parquet")

In [201]:
if not os.path.isdir("lookup_table_yellow_taxis.csv"):
    df_trip.write.csv("lookup_table_yellow_taxis.csv")

## Bonus - Load the cleaned parquet file and lookup table into a Postgres database. 

Note that if you decide to do the bonus, you must include not only your notebook but the docker-compose.yaml file aswell.

### Screenshot of the table existing in the database and a simple query such as `select count(*) from table_name` or `select * from table_name limit 10`

(You can just copy paste the screenshots in the markdown cells below)