# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm 

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

## Load the dataset.

In [1]:
## start the session
from pyspark.sql import SparkSession
from datetime import date
from pyspark.sql import functions as fn
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType



# Initialize Spark session
spark = SparkSession.builder.appName("M2").config("spark.jars", "./postgresql-42.2.5.jar").getOrCreate()
# spark context to interact with the driver
sc = spark.sparkContext


# Define the schema with corrected column names and data types
# schema = StructType([
#     StructField("vendor", StringType(), True),
#     StructField("tpep_pickup_datetime", TimestampType(), True),
#     StructField("tpep_dropoff_datetime", TimestampType(), True),
#     StructField("passenger_count", DoubleType(), True),
#     StructField("trip_distance", DoubleType(), True),
#     StructField("rate_type", StringType(), True),
#     StructField("store_and_fwd_flag", StringType(), True),
#     StructField("pu_Location", StringType(), True),
#     StructField("do_Location", StringType(), True),
#     StructField("payment_type", StringType(), True),
#     StructField("fare_amount", DoubleType(), True),
#     StructField("extra", DoubleType(), True),
#     StructField("mta_tax", DoubleType(), True),
#     StructField("tip_amount", DoubleType(), True),
#     StructField("tolls_amount", DoubleType(), True),
#     StructField("improvement_surcharge", DoubleType(), True),
#     StructField("total_amount", DoubleType(), True),
#     StructField("congestion_surcharge", IntegerType(), True),
#     StructField("airport_fee", IntegerType(), True)
# ])


In [2]:
# df = spark.read.options(headers='true').schema(schema).parquet('./yellow_tripdata_2016-10.parquet')
df = spark.read.parquet('./yellow_tripdata_2016-10.parquet')

### Preview first 20 rows.

In [3]:
df.printSchema()

root
 |-- Vendor: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- Rate_type: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PU_Location: string (nullable = true)
 |-- DO_Location: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [4]:
df.show(20,vertical=True)

-RECORD 0-------------------------------------
 Vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2016-10-01 02:34:38  
 tpep_dropoff_datetime | 2016-10-01 02:35:07  
 passenger_count       | 1.0                  
 trip_distance         | 0.0                  
 Rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 PU_Location           | Unknown,NV           
 DO_Location           | Unknown,NV           
 payment_type          | null                 
 fare_amount           | 2.5                  
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 3.8                  
 congestion_surcharge  | null                 
 airport_fee           | null                 
-RECORD 1-------------------------------------
 Vendor      

### How many partitions is this dataframe split into?

In [5]:
def get_num_partitions(df):
    num_partitions = df.rdd.getNumPartitions()
    print("Number of partitions:", num_partitions)

In [6]:
get_num_partitions(df)

Number of partitions: 8


## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [7]:
from pyspark.sql.functions import col

import re

def remove_invalid_chars(text):
    # Define the pattern for characters to remove
    pattern = r"[ ,;{}()\n\t=]"
    
    # Substitute the pattern with an empty string
    cleaned_text = re.sub(pattern, "", text)
    
    return cleaned_text

def rename_columns_with_underscore(df):
    """
    rename all columns in a PySpark DataFrame
    by replacing spaces with underscores and converting 
    column names to lowercase.
    """
    # Get the current column names
    current_columns = df.columns

    # Create a dictionary to store old and new column names
    rename_dict = {col_name: remove_invalid_chars(col_name.replace(' ', '_').replace( '(', '')
        .replace( ')', '')
        .replace( ',', '')
        .replace( ';', '')
        .replace( '{', '')
        .replace( '}', '')
        .replace( '\n', '')
        .replace( '\t', '')
        .replace( ' ', '_').lower()) for col_name in current_columns}
    # Use withColumnRenamed to rename columns
    for old_name, new_name in rename_dict.items():
        df = df.withColumnRenamed(old_name, new_name)

    return df



df = rename_columns_with_underscore(df)

In [8]:
df.printSchema()

root
 |-- vendor: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_type: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pu_location: string (nullable = true)
 |-- do_location: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)



### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [9]:
def drop_dups(df, duplicate_columns):
    """
    Remove duplicates in a PySpark DataFrame based on specified columns
    and print the count of removed duplicates.
    """
    # Drop duplicates based on specified columns
    df_no_duplicates = df.dropDuplicates(duplicate_columns)

    # Count the removed duplicates
    count = df.count() - df_no_duplicates.count()

    # Print the count of removed duplicates
    print("Number of duplicates removed: " + str(count) + " based on " + str(duplicate_columns))

    return df_no_duplicates


In [10]:
duplicate_columns = ['tpep_pickup_datetime', 'pu_location', 'tpep_dropoff_datetime', 'do_location', 'trip_distance']
df = drop_dups(df,duplicate_columns)

Number of duplicates removed: 4870 based on ['tpep_pickup_datetime', 'pu_location', 'tpep_dropoff_datetime', 'do_location', 'trip_distance']


### check that there is are no duplicates

In [11]:
_ = drop_dups(df,duplicate_columns)

Number of duplicates removed: 0 based on ['tpep_pickup_datetime', 'pu_location', 'tpep_dropoff_datetime', 'do_location', 'trip_distance']


In [12]:
# another way 
def check_dups(df, duplicate_columns):
    # Group by all columns and count occurrences
    duplicate_count = df.groupBy(duplicate_columns).count().filter(col('count') > 1)
    duplicate_count.show()
    
check_dups(df,duplicate_columns)

+--------------------+-----------+---------------------+-----------+-------------+-----+
|tpep_pickup_datetime|pu_location|tpep_dropoff_datetime|do_location|trip_distance|count|
+--------------------+-----------+---------------------+-----------+-------------+-----+
+--------------------+-----------+---------------------+-----------+-------------+-----+



### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [13]:
def calculate_missing_percentage(df):
    """
    Calculate the percentage of missing entries in each column of a PySpark DataFrame.
    """
    # Calculate the total number of rows in the DataFrame
    total_rows = df.count()

    # Calculate missing percentage for each column
    missing_info = {}
    for col in df.columns:
        missing_count = df.filter(df[col].isNull() | (df[col] == '')).count()
        missing_percentage = (missing_count / total_rows) * 100
        missing_info[col] = round(missing_percentage, 2)

    return missing_info


### Prinout the missing info

In [14]:
print(calculate_missing_percentage(df))

{'vendor': 0.0, 'tpep_pickup_datetime': 0.0, 'tpep_dropoff_datetime': 0.0, 'passenger_count': 0.0, 'trip_distance': 0.0, 'rate_type': 0.0, 'store_and_fwd_flag': 0.0, 'pu_location': 0.0, 'do_location': 0.0, 'payment_type': 7.96, 'fare_amount': 0.0, 'extra': 52.87, 'mta_tax': 0.0, 'tip_amount': 0.0, 'tolls_amount': 0.0, 'improvement_surcharge': 0.0, 'total_amount': 0.0, 'congestion_surcharge': 100.0, 'airport_fee': 100.0}


### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [15]:
from pyspark.sql.functions import when, lit, to_timestamp
from datetime import datetime

def handle_missing_values(df):
    """
    Handle missing values in a PySpark DataFrame:
    - Replace missing values in numerical columns with 0.
    - Replace missing values in categorical/string columns with 'Unknown'.
    """
    # Get column data types
    column_types = {col: dtype for col, dtype in df.dtypes}

    # Replace missing values based on data types
    for col_name, dtype in column_types.items():
        if dtype == 'string':
            df = df.withColumn(col_name, when(col(col_name).rlike('Unknown'), lit('Unknown')).otherwise(col(col_name)))

            df = df.fillna(value = 'Unknown',subset=[col_name])
        else:  # Assuming numerical types are int or float
            df = df.fillna(value = 0,subset=[col_name])


    return df


In [16]:
df = handle_missing_values(df)

### check that there are no missing values

In [17]:
from pyspark.sql.functions import col, count

def check_missing_values(df):
    """
    Check for missing values in a PySpark DataFrame.
    """
    # Dictionary to store missing value counts for each column
    missing_values_dict = {}

    # Calculate the count of missing values for each column
    for column in df.columns:
        missing_count = df.filter(col(column).isNull() | (col(column) == '')).count()
        missing_values_dict[column] = missing_count


    # Print the count of missing values for each column
    for column, count in missing_values_dict.items():
        print(f"Column '{column}' has {count} missing values.")

In [18]:
check_missing_values(df)

Column 'vendor' has 0 missing values.
Column 'tpep_pickup_datetime' has 0 missing values.
Column 'tpep_dropoff_datetime' has 0 missing values.
Column 'passenger_count' has 0 missing values.
Column 'trip_distance' has 0 missing values.
Column 'rate_type' has 0 missing values.
Column 'store_and_fwd_flag' has 0 missing values.
Column 'pu_location' has 0 missing values.
Column 'do_location' has 0 missing values.
Column 'payment_type' has 0 missing values.
Column 'fare_amount' has 0 missing values.
Column 'extra' has 0 missing values.
Column 'mta_tax' has 0 missing values.
Column 'tip_amount' has 0 missing values.
Column 'tolls_amount' has 0 missing values.
Column 'improvement_surcharge' has 0 missing values.
Column 'total_amount' has 0 missing values.
Column 'congestion_surcharge' has 0 missing values.
Column 'airport_fee' has 0 missing values.


In [19]:
df.show(vertical=True)

-RECORD 0-------------------------------------
 vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2016-10-01 02:02:04  
 tpep_dropoff_datetime | 2016-10-01 02:06:40  
 passenger_count       | 1.0                  
 trip_distance         | 1.3                  
 rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 pu_location           | Manhattan,Clinton... 
 do_location           | Manhattan,Lincoln... 
 payment_type          | Credit card          
 fare_amount           | 5.5                  
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 1.35                 
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 8.15                 
 congestion_surcharge  | 0                    
 airport_fee           | 0                    
-RECORD 1-------------------------------------
 vendor      

## Feature engineering - 
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...) 

In [20]:
def add_features_duration_minutes(df):
    """Calculate trip duration (in minutes)"""
    df = df.withColumn("trip_duration_minutes", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long")) / 60)
    
    return df


df = add_features_duration_minutes(df)


In [21]:
def add_features_is_weekend(df):
    """Check if the trip occurred on a weekend (Saturday or Sunday)"""
    df = df.withColumn("is_weekend", fn.when(fn.dayofweek(fn.col("tpep_pickup_datetime")).isin([1, 7]), 1).otherwise(0))
    return df
  
df = add_features_is_weekend(df)

In [22]:

def add_features_week_number(df):
    """
    Calculate the week number(1-based) within the month 
    """
    # Extract day of the month
    day_of_month = fn.dayofmonth(fn.col("tpep_pickup_datetime"))
    
    # Subtract 1 to start counting from 0
    day_of_month_adjusted = day_of_month - 1
    
    # Calculate week number within the month
    week_number_within_month = (day_of_month_adjusted / 7).cast("int") + 1
    
    # Add 'week_number' column relative to the month
    df = df.withColumn("week_number", week_number_within_month)
    
    return df



df =add_features_week_number(df)

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added). 

In [23]:
df.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "trip_duration_minutes",
    "is_weekend",
    "week_number"
).show(20)


+--------------------+---------------------+---------------------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_duration_minutes|is_weekend|week_number|
+--------------------+---------------------+---------------------+----------+-----------+
| 2016-10-01 02:02:04|  2016-10-01 02:06:40|                  4.6|         1|          1|
| 2016-10-01 02:01:13|  2016-10-01 02:06:50|    5.616666666666666|         1|          1|
| 2016-10-01 02:01:33|  2016-10-01 02:07:40|    6.116666666666666|         1|          1|
| 2016-10-01 02:06:38|  2016-10-01 02:10:41|                 4.05|         1|          1|
| 2016-10-01 02:07:03|  2016-10-01 02:14:58|    7.916666666666667|         1|          1|
| 2016-10-01 02:12:40|  2016-10-01 02:17:51|    5.183333333333334|         1|          1|
| 2016-10-01 02:07:04|  2016-10-01 02:18:12|   11.133333333333333|         1|          1|
| 2016-10-01 02:13:28|  2016-10-01 02:19:37|                 6.15|         1|          1|
| 2016-10-

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer) 

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type 

In [24]:
def average_fare_amount_per_payment_type(df):
    average_fare_per_payment = df.groupby('payment_type').avg('fare_amount')
    average_fare_per_payment.show()



In [25]:
average_fare_amount_per_payment_type(df)

+------------+------------------+
|payment_type|  avg(fare_amount)|
+------------+------------------+
|     Unknown|13.150406300825248|
|        Cash|12.451659597353546|
|     Dispute|24.500057116746664|
|   No charge|26.076995833938884|
| Credit card|13.813877136933257|
+------------+------------------+



- 'No charge' entries could involve scenarios where passengers were not required to pay for the trip, possibly due to a promotional offer where customers use their offers in expensive trips mostly. 

-  'Dispute' may involve fares that are unusually high due to  agreements between the passenger and the service provider regarding the fare or trip details 

- 'Cash' , Unknown , Unknown are relatively similar in cheap average fare amount because it is much better for passenger to save  promotional offer for expensive trips or dispute on expensive trips

### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [26]:
def average_trip_duration_weekends_weekdays(df):
    """Calculate average trip duration for weekends and weekdays"""
    average_trip_duration = df.groupBy("is_weekend").agg(fn.avg("trip_duration_minutes").alias("avg_trip_duration"))
    # Collect the results to perform a comparison
    results = average_trip_duration.collect()

    # Compare average trip duration for weekends (is_weekend = 1) and weekdays (is_weekend = 0)
    if results[0]["avg_trip_duration"] > results[1]["avg_trip_duration"]:
        result = "People tend to go on longer(duration) trips during weekends."
    elif results[0]["avg_trip_duration"] < results[1]["avg_trip_duration"]:
        result = "People tend to go on longer(duration) trips during weekdays."
    else:
        result = "The average trip duration is similar between weekends and weekdays."

    print(result)

    average_trip_duration.show()
    
    

    
def average_trip_distance_weekends_weekdays(df):
    """Calculate average trip duration for weekends and weekdays"""
    average_trip_duration = df.groupBy("is_weekend").agg(fn.avg("trip_distance").alias("avg_trip_distance"))
    # Collect the results to perform a comparison
    results = average_trip_duration.collect()

    # Compare average trip duration for weekends (is_weekend = 1) and weekdays (is_weekend = 0)
    if results[0]["avg_trip_distance"] > results[1]["avg_trip_distance"]:
        result = "People tend to go on longer (distance) trips during weekends."
    elif results[0]["avg_trip_distance"] < results[1]["avg_trip_distance"]:
        result = "People tend to go on longer(distance) trips during weekdays."
    else:
        result = "The average trip distance is similar between weekends and weekdays."

    print(result)

    average_trip_duration.show()
    

    


In [27]:
average_trip_duration_weekends_weekdays(df)
average_trip_distance_weekends_weekdays(df)

People tend to go on longer(duration) trips during weekdays.
+----------+------------------+
|is_weekend| avg_trip_duration|
+----------+------------------+
|         1|16.803053472013822|
|         0| 17.16444411115435|
+----------+------------------+

People tend to go on longer (distance) trips during weekends.
+----------+------------------+
|is_weekend| avg_trip_distance|
+----------+------------------+
|         1|6.1228252218269885|
|         0|5.1778848185681365|
+----------+------------------+



- people tend to go on slightly longer trips during weekdays (non-weekends) compared to weekends. The difference in average trip duration between weekends and weekdays is not that big

### 3 - which day recorded the most trips?

In [28]:

def most_trip_day(df):
    """Find the day of the month with the maximum number of trips since our dataset is 1 month"""
    df = df.withColumn("day_of_month", fn.dayofmonth("tpep_pickup_datetime"))

    trips_per_day = df.groupBy("day_of_month").count()

    day_with_most_trips_desc = trips_per_day.orderBy(fn.col("count").desc())
                                        
    day_with_most_trips = day_with_most_trips_desc.first()

    # Get the day number and count of trips for the day with the most trips
    most_trips_day_number = day_with_most_trips["day_of_month"]
    most_trips_count = day_with_most_trips["count"]
    
    print(f"The day with the most trips is day {most_trips_day_number} with {most_trips_count} trips.")
    day_with_most_trips_desc.withColumn("day_name", fn.when(fn.col("day_of_month") == 1, "1st")
                                        .when(fn.col("day_of_month") == 2, "2nd")
                                        .when(fn.col("day_of_month") == 3, "3rd")
                                        .otherwise(fn.concat(fn.col("day_of_month"), fn.lit("th")))).show(31)




In [29]:
most_trip_day(df)

The day with the most trips is day 22 with 403350 trips.
+------------+------+--------+
|day_of_month| count|day_name|
+------------+------+--------+
|          22|403350|    22th|
|          29|393283|    29th|
|          15|385781|    15th|
|           8|381020|     8th|
|          30|378983|    30th|
|          28|378733|    28th|
|          14|378009|    14th|
|          21|373140|    21th|
|           6|367276|     6th|
|           7|364723|     7th|
|          27|364171|    27th|
|          23|364087|    23th|
|          26|364019|    26th|
|          20|360320|    20th|
|          13|355396|    13th|
|          16|354882|    16th|
|          25|352433|    25th|
|          19|351447|    19th|
|           5|350362|     5th|
|           9|346096|     9th|
|           2|342678|     2nd|
|           1|338085|     1st|
|          18|332685|    18th|
|          24|319667|    24th|
|          11|319267|    11th|
|          12|316771|    12th|
|           4|314069|     4th|
|          17

- most trips in Saturday which is 22th in the month (because it is first in the weekend  ) then Sunday which indicates people tend to have more taxi trips in the weekends than the weekdays

### 4- What is the average "total amount" of trips with more than 2 passengers?

In [30]:
def avg_total_amount(df):
    # Filter the DataFrame for trips with more than 2 passengers
    filtered_df = df.filter(df.passenger_count > 2)

    # Calculate the average total amount for trips with more than 2 passengers
    average_total_amount = filtered_df.agg(fn.avg("total_amount").alias("avg_total_amount"))

    # Show the calculated average total amount
    average_total_amount.show()
    
avg_total_amount(df)

+------------------+
|  avg_total_amount|
+------------------+
|16.763949534481192|
+------------------+



- the average total amount seems moderate with more than 2 passengers 

### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1.?

In [31]:
def multiple_or_single_passengers(df):
    # Calculate the average tip amount for trips with multiple passengers
    avg_tip_multiple_passengers = df.filter(df.passenger_count > 1).agg(fn.avg("tip_amount").alias("avg_tip_multiple_passengers"))

    # Calculate the average tip amount for trips with just one passenger
    avg_tip_single_passenger = df.filter(df.passenger_count == 1).agg(fn.avg("tip_amount").alias("avg_tip_single_passenger"))

    # Show the calculated average tip amounts
    avg_tip_multiple_passengers.show()
    avg_tip_single_passenger.show()
    multiple = avg_tip_multiple_passengers.collect()[0]["avg_tip_multiple_passengers"]
    single =  avg_tip_single_passenger.collect()[0]["avg_tip_single_passenger"]
    
    if single < multiple:
        print(f"Average tip amount for trips with multiple passengers {multiple} are higher than just one passenger {single}")
    else:
        print(f"Average tip amount for trips with just one passenger {single} are higher than with multiple passengers {multiple}")

        



In [32]:
multiple_or_single_passengers(df)

+---------------------------+
|avg_tip_multiple_passengers|
+---------------------------+
|         1.7944686843935194|
+---------------------------+

+------------------------+
|avg_tip_single_passenger|
+------------------------+
|      1.8325730955320854|
+------------------------+

Average tip amount for trips with just one passenger 1.8325730955320854 are higher than with multiple passengers 1.7944686843935194


- single passenger tend to tip on average more a little bit than multiple passengers

### 6- What is the most frequent route on the weekend. 

In [33]:
def most_frequent_route_on_weekends(df):
    """
    Find and display the most frequent route on weekends in the DataFrame.
    """
    # Filter DataFrame to include only trips that occurred on weekends
    weekend_trips = df.filter((df.is_weekend == 1)  &(df.pu_location != 'Unknown') &(df.do_location != 'Unknown'))

    # Create a new column 'route' by concatenating 'pu_location' and 'do_location'
    weekend_trips = weekend_trips.withColumn('route', fn.concat(fn.col('pu_location'), fn.lit('-'), fn.col('do_location')))

    # Find the most frequent route on weekends and display the result
    most_frequent_route_weekend = weekend_trips.groupBy('route').count().orderBy(fn.col('count').desc())
    most_frequent_route_weekend.show(1,vertical=True)
    most_frequent_route = most_frequent_route_weekend.first()['route']
    print(f"Most frequent route on weekends: {most_frequent_route}")    
    

In [34]:
most_frequent_route_on_weekends(df)

-RECORD 0---------------------
 route | Manhattan,Upper E... 
 count | 14133                
only showing top 1 row

Most frequent route on weekends: Manhattan,Upper East Side South-Manhattan,Upper East Side North


- Manhattan,Upper East Side South-Manhattan,Upper East Side North was the most frequent route , at first the routes where unknown but I filtered them out to get reasonable results

## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

In [35]:
from pyspark.sql.functions import array, lit, col, struct
from pyspark.ml.feature import StringIndexer

def label_encode_categorical_features(df):
    """
    Label encode categorical features, create a lookup table, and remove original categorical features.
    """
    # Get column data types
    categorical_columns = [col_name for col_name, dtype in df.dtypes if dtype == 'string']

    # Store label encoded columns and original columns mapping
    label_encoded_columns = []

    for col_name in categorical_columns:
        # Create StringIndexer for each categorical column
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_encoded").fit(df)
        
        # Transform and add the encoded column to the DataFrame
        df = indexer.transform(df)
        
        # Add the encoded column name to the list
        label_encoded_columns.append(f"{col_name}_encoded")

    # Create a lookup table for label encoded features
    lookup_table_rows = []

    for encoded_col, original_col in zip(label_encoded_columns, categorical_columns):
        # Extract distinct values and their encoded values for each categorical column
        distinct_values = df.select(encoded_col, original_col).distinct().collect()
        for row in distinct_values:
            lookup_table_rows.append((original_col, row[original_col], row[encoded_col]))

    # Create a DataFrame for the lookup table
    lookup_table = spark.createDataFrame(lookup_table_rows, ['column_name', 'original_value', 'encoded_value'])

    # Remove original encoded categorical features from the DataFrame
    df = df.drop(*categorical_columns)
    
    return df, lookup_table, label_encoded_columns


In [36]:
encoded_df ,lookup_table,label_encoded_columns =  label_encode_categorical_features(df)

### Preview first 20 rows of the label encoded features
- is_weekend is already encoded into 1,0

In [37]:
encoded_df.select(*label_encoded_columns).show(20,vertical=True)

-RECORD 0---------------------------
 vendor_encoded             | 1.0   
 rate_type_encoded          | 0.0   
 store_and_fwd_flag_encoded | 0.0   
 pu_location_encoded        | 7.0   
 do_location_encoded        | 10.0  
 payment_type_encoded       | 0.0   
-RECORD 1---------------------------
 vendor_encoded             | 0.0   
 rate_type_encoded          | 0.0   
 store_and_fwd_flag_encoded | 0.0   
 pu_location_encoded        | 29.0  
 do_location_encoded        | 26.0  
 payment_type_encoded       | 1.0   
-RECORD 2---------------------------
 vendor_encoded             | 1.0   
 rate_type_encoded          | 0.0   
 store_and_fwd_flag_encoded | 0.0   
 pu_location_encoded        | 31.0  
 do_location_encoded        | 14.0  
 payment_type_encoded       | 0.0   
-RECORD 3---------------------------
 vendor_encoded             | 1.0   
 rate_type_encoded          | 0.0   
 store_and_fwd_flag_encoded | 0.0   
 pu_location_encoded        | 64.0  
 do_location_encoded        | 86.0  
 

In [38]:
encoded_df.printSchema()

root
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = false)
 |-- trip_distance: double (nullable = false)
 |-- fare_amount: double (nullable = false)
 |-- extra: double (nullable = false)
 |-- mta_tax: double (nullable = false)
 |-- tip_amount: double (nullable = false)
 |-- tolls_amount: double (nullable = false)
 |-- improvement_surcharge: double (nullable = false)
 |-- total_amount: double (nullable = false)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)
 |-- trip_duration_minutes: double (nullable = true)
 |-- is_weekend: integer (nullable = false)
 |-- week_number: integer (nullable = true)
 |-- vendor_encoded: double (nullable = false)
 |-- rate_type_encoded: double (nullable = false)
 |-- store_and_fwd_flag_encoded: double (nullable = false)
 |-- pu_location_encoded: double (nullable = false)
 |-- do_location_encoded: double (n

### Preview first 20 rows of your lookup table

In [39]:
lookup_table.show(20,vertical=True)

-RECORD 0------------------------------
 column_name    | vendor               
 original_value | VeriFone Inc.        
 encoded_value  | 0.0                  
-RECORD 1------------------------------
 column_name    | vendor               
 original_value | Creative Mobile T... 
 encoded_value  | 1.0                  
-RECORD 2------------------------------
 column_name    | rate_type            
 original_value | Nassau or Westche... 
 encoded_value  | 4.0                  
-RECORD 3------------------------------
 column_name    | rate_type            
 original_value | Unknown              
 encoded_value  | 5.0                  
-RECORD 4------------------------------
 column_name    | rate_type            
 original_value | JFK                  
 encoded_value  | 1.0                  
-RECORD 5------------------------------
 column_name    | rate_type            
 original_value | Newark               
 encoded_value  | 3.0                  
-RECORD 6------------------------------


In [40]:
lookup_table.printSchema()

root
 |-- column_name: string (nullable = true)
 |-- original_value: string (nullable = true)
 |-- encoded_value: double (nullable = true)



### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [41]:
FILE_PATHS = []
def save_df_and_lookup_table(df,lookup_table):
    year = 2016
    month = 10
    encode_file_path = f'./yellow_trip_data_{year}-{month}clean.parquet'
    # Save the cleaned PySpark DataFrame to a Parquet file
    encoded_df.write.mode('overwrite').parquet(encode_file_path)
    
    
    lookup_table_file_path = './lookup_table_yellow_taxis.csv'
    
    FILE_PATHS.append(encode_file_path)
    FILE_PATHS.append(lookup_table_file_path)
    
    # Save the lookup table to a CSV file
    lookup_table.write.mode('overwrite').csv(lookup_table_file_path, header=True)


In [42]:
save_df_and_lookup_table(df,lookup_table)

## Bonus - Load the cleaned parquet file and lookup table into a Postgres database. 

Note that if you decide to do the bonus, you must include not only your notebook but the docker-compose.yaml file aswell.

In [43]:
!pip install psycopg2-binary

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [44]:
from sqlalchemy import create_engine

def ingest(file_paths,file_names):
    """
    ingest data from dataframes to postgresql
    """
    engine = create_engine('postgresql://root:root@pgdatabase:5432/yellow_taxi')

    
    if(engine.connect()):
        print('connected to postgres successfully')
    else:
        print('failed to connect')
    
    print('ingestion started')
    
    
    mode = 'replace' # 'replace' or 'fail'
    for file_path,file_name in zip(file_paths,file_names):
        try: 
            if file_path.endswith('.parquet'):
                # Read Parquet file into a PySpark DataFrame
                df = spark.read.parquet(file_path)
            elif file_path.endswith('.csv'):
                # Read CSV file into a Pandas DataFrame
                df = spark.read.csv(file_path, header=True,inferSchema=True)

            print(f'reading {file_name} table')
            print(f'ingesting {file_name} table')        
            df.to_sql(name = file_name,con = engine,if_exists=mode,index=False)

            print(f'{file_name} table created successfully')
        except Exception as e:
            print(f'creating table failed because it already exists: {e}')
       
    print('ingestion completed')


In [45]:
ingest(FILE_PATHS, ["yellow_taxi_10_2016", "lookup_yellow_taxi_10_2016"])

connected to postgres successfully
ingestion started
reading yellow_taxi_10_2016 table
ingesting yellow_taxi_10_2016 table
creating table failed because it already exists: 'DataFrame' object has no attribute 'to_sql'
reading lookup_yellow_taxi_10_2016 table
ingesting lookup_yellow_taxi_10_2016 table
creating table failed because it already exists: 'DataFrame' object has no attribute 'to_sql'
ingestion completed


### Screenshot of the table existing in the database and a simple query such as `select count(*) from table_name` or `select * from table_name limit 10`

(You can just copy paste the screenshots in the markdown cells below)