##Defining Functions for Data cleaning

Aim:
In this phase of the project, our main goal is to tidy up the vast dataset that includes records of both yellow and green taxi cabs. Given the sheer size of this dataset, we're breaking it down year by year This approach not only makes it easier to handle the data but also lets our Spark cluster work its way when it comes to cleaning up the unnecessary rows.



Function to filter out of range dates.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, datediff
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType

def filter_and_clean_data(df_cleaned, year):
    # Filter rows with filenames containing the specified year
    trips_year = df_cleaned.filter(df_cleaned['filename'].contains(str(year)))
    # Create a temporary view for SQL querying
    trips_year.createOrReplaceTempView("trips")

    # Define conditions for valid trips within the given year and boundaries
    year_condition = f"(year(tpep_pickup_datetime) = {year} AND year(tpep_dropoff_datetime) = {year})"
    boundary_condition = f"((year(tpep_pickup_datetime) = {year - 1} AND year(tpep_dropoff_datetime) = {year})"
    boundary_condition += f" OR (year(tpep_pickup_datetime) = {year} AND year(tpep_dropoff_datetime) = {year + 1}))"
    
    # Define condition for short-duration trips
    duration_condition = "datediff(tpep_dropoff_datetime, tpep_pickup_datetime) <= 1"

    # Construct the SQL query to filter the data
    year_query = f"""
        SELECT *
        FROM trips
        WHERE ({year_condition} OR {boundary_condition}) AND {duration_condition}
    """
    
    # Execute the SQL query
    trips_year = spark.sql(year_query)
    
    return trips_year


Function to Check for Negative Speeds and Speeds=0

In [0]:
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql.types import TimestampType

def filter_by_distance_and_speed(df, min_distance=0, min_speed=0):
    # Filter trips based on minimum distance and minimum speed criteria
    return df.filter(
        (col("trip_distance") > min_distance) &
        ((col("trip_distance") / (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))) >= min_speed)
    )


Function to Filter Unusual Speeds


In [0]:
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql import DataFrame

def filter_and_find_highest_speed(df: DataFrame, max_allowable_speed_kmph: float) -> DataFrame:
    # Calculate trip speed (in km/h)
    df = df.withColumn("trip_speed", (col("trip_distance") / (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))) * 3600 / 1.609344)

    # Filter out trips with very high speed
    df = df.filter(df["trip_speed"] <= max_allowable_speed_kmph)

    return df


Function to Filter Out Trips with Very Short or Very Long Durations


In [0]:
from pyspark.sql.functions import col, unix_timestamp
from pyspark.sql import DataFrame

def filter_by_trip_duration(df: DataFrame, min_duration: int, max_duration: int) -> DataFrame:
    # Calculate trip duration (in seconds)
    df = df.withColumn("trip_duration", unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))

    # Filter out trips with very short or very long durations
    df = df.filter((df["trip_duration"] >= min_duration) & (df["trip_duration"] <= max_duration))

    return df


Filter out trips with very short or very long distances

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_by_trip_distance(df: DataFrame, min_distance: float, max_distance: float) -> DataFrame:
    # Filter out trips with very short or very long distances
    df = df.filter((df["trip_distance"] >= min_distance) & (df["trip_distance"] <= max_distance))

    return df


Function to Filter Out Trips with Unrealistic Passenger Counts

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_by_passenger_count(df: DataFrame, min_count: int, max_count: int) -> DataFrame:
    # Filter out trips with unrealistic passenger counts
    df = df.filter((col("Passenger_count") >= min_count) & (col("Passenger_count") <= max_count))

    return df


Function to Filter for Trips with Invalid Rate Codes

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_by_invalid_rate_codes(df: DataFrame, valid_rate_codes: list) -> DataFrame:
    # Filter for trips with invalid rate codes
    df = df.filter(~col("RateCodeID").isin(valid_rate_codes))

    return df


Function to Filter for Trips with Valid Rate Codes

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_by_valid_rate_codes(df: DataFrame, valid_rate_codes: list) -> DataFrame:
    # Filter for trips with valid rate codes
    df = df.filter(col("RateCodeID").isin(valid_rate_codes))

    return df


Function to Filter for Trips with Extreme Tip Amounts

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_extreme_tip_amounts(df: DataFrame, threshold: float) -> DataFrame:
    # Filter for trips with extreme tip amounts
    df = df.filter((col("Tip_amount") > threshold) | (col("Tip_amount") < 0))

    return df

# Example usage:
# threshold = 150  # Adjust the threshold as needed
# extreme_tip_trips = filter_extreme_tip_amounts(trips_2015, threshold)
# extreme_tip_trips.show()


Filter out trips with extreme tip amounts (above the threshold) and negative tips

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_tip_amounts(df: DataFrame, threshold: float) -> DataFrame:
    # Filter out trips with extreme tip amounts (above the threshold) and negative tips
    df = df.filter((col("Tip_amount") <= threshold) & (col("Tip_amount") >= 0))

    return df

# Example usage:
# threshold = 150  # Adjust the threshold as needed
# trips_2015 = filter_tip_amounts(trips_2015, threshold)
# trips_2015.show()


Check for Missing values in columns 

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def check_missing_values(df: DataFrame, columns: list):
    for column in columns:
        # Count missing values in each specified column
        missing_count = df.filter(col(column).isNull()).count()
        print(f"Missing values in column '{column}': {missing_count}")


Function to: 
Compute the average toll amount
& Compute the highest toll amount

In [0]:
from pyspark.sql.functions import col, avg, max
from pyspark.sql import DataFrame

def compute_average_and_highest_toll(df: DataFrame):
    # Compute the average toll amount
    average_toll = df.select(avg(col("tolls_amount"))).collect()[0][0]

    # Compute the highest toll amount
    highest_toll = df.select(max(col("tolls_amount"))).collect()[0][0]

    print("Average Toll Amount:", average_toll)
    print("Highest Toll Amount:", highest_toll)


Filter for trips with high toll amounts & Show the count of trips with high toll amounts

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def find_high_toll_outliers(df: DataFrame, threshold: float):
    # Filter for trips with toll amounts greater than the specified threshold
    toll_outliers = df.filter(col("tolls_amount") > threshold)
    
    # Show the count of trips with high toll amounts
    count = toll_outliers.count()
    print(f"Trips with toll amounts greater than {threshold}: {count}")


Show the DataFrame with rows having unexpected values in the 'store_and_fwd_flag' column

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def filter_unexpected_store_and_fwd_flag(df: DataFrame):
    # Filter for rows with values other than 'Y' and 'N' in the 'store_and_fwd_flag' column
    unexpected_values = df.filter(~col("store_and_fwd_flag").isin('Y', 'N'))

    # Show the DataFrame with rows having unexpected values in the 'store_and_fwd_flag' column
    unexpected_values.show()


Compute the average surcharge
Compute the highest surcharge

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def compute_average_and_highest_surcharge(df: DataFrame):
    # Compute the average surcharge
    average_surcharge = df.select(avg(col("congestion_surcharge"))).collect()[0][0]

    # Compute the highest surcharge
    highest_surcharge = df.select(max(col("congestion_surcharge"))).collect()[0][0]

    print("Average Surcharge:", average_surcharge)
    print("Highest Surcharge:", highest_surcharge)


## Yellow Taxi Dataset Cleaning Preparation 

We are importing the original parquet files of the Yellow Data into DBFS from the shared Google Drive. 

In [0]:
df_yellow = spark.read.parquet("/FileStore/yellow")
df_yellow.count()

Out[16]: 663055251

There are approximately 663 million rows. As stated earlier we will need to segment the rows year-wise. 

In [0]:
# Filter DataFrame df_yellow to keep only rows where drop-off datetime is greater than or equal to pickup datetime.

df_cleaned = df_yellow.filter(df_yellow['tpep_dropoff_datetime'] >= df_yellow['tpep_pickup_datetime'])


In [0]:
# Import the necessary function for capturing input file names.
from pyspark.sql.functions import input_file_name

# Create a new DataFrame df_cleaned that includes a new column 'filename' containing the input file name.
df_cleaned = df_cleaned.withColumn("filename", input_file_name())


In [0]:
df_cleaned.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+
|       1| 2015-01-01 00:11:33|  2015-01-01 00:16:48|            1.0|          1.0|       1.0|                 N|          4

Processing Year 2015

In [0]:
# Set the target year to 2015.
year = 2015

# Create a new DataFrame trips_2015 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2015 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply a filter to the trips_2015 DataFrame to keep only trips with a minimum distance of 0 and a minimum speed of 0.
trips_2015 = filter_by_distance_and_speed(trips_2015, min_distance=0, min_speed=0)


In [0]:
# Apply a filter to the trips_2015 DataFrame to keep only trips with a maximum allowable speed of 40 km/h.
trips_2015 = filter_and_find_highest_speed(trips_2015, max_allowable_speed_kmph=40)


In [0]:
# Apply a filter to the trips_2015 DataFrame to keep only trips with a minimum duration of 60 seconds and a maximum duration of 86,400 seconds (24 hours).
trips_2015 = filter_by_trip_duration(trips_2015, min_duration=60, max_duration=86400)


In [0]:
# Apply a filter to the trips_2015 DataFrame to keep only trips with a minimum distance of 0.1 units and a maximum distance of 100 units.
trips_2015 = filter_by_trip_distance(trips_2015, min_distance=0.1, max_distance=100)


In [0]:
# Apply a filter to the trips_2015 DataFrame to keep only trips with a minimum passenger count of 1 and a maximum passenger count of 7.
trips_2015 = filter_by_passenger_count(trips_2015, min_count=1, max_count=7)


The below code was used to show any invalid rate codes in the dataset. Indeed, there were some invalid codes once this code as executed.. But code itself didn't apply any transformation on the dataset. So we have commented it out to save some computation and time and kept it for reference. And the same holds for the code that checks for extreme tip amounts 

In [0]:
#valid_rate_codes = [1, 2, 3, 4, 5, 6]  # the list of valid rate codes we want to check against
#invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2015, valid_rate_codes)
#invalid_rate_code_trips.show()

In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2015 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2015 = filter_by_valid_rate_codes(trips_2015, valid_rate_codes)


In [0]:
#threshold = 150  # Adjusted the threshold per business decision
#extreme_tip_trips = filter_extreme_tip_amounts(trips_2015, threshold)
#extreme_tip_trips.show()

In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjust the threshold as needed

# Apply a filter to the trips_2015 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2015 = filter_tip_amounts(trips_2015, threshold)


In [0]:
# Call the filter_unexpected_store_and_fwd_flag function to filter and display rows with unexpected values in the 'store_and_fwd_flag' column of the trips_2015 DataFrame.
filter_unexpected_store_and_fwd_flag(trips_2015)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------+----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|filename|trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------+----------+-------------+
+--------+--------------------+---------------------+---------------+-------------+--

This suggests that all rows in the trips_2015 DataFrame have valid values in the 'store_and_fwd_flag' column, and there are no unexpected values to filter. This is a good indication that the data in this specific column is clean and conforms to the expected format.

In [0]:
# Define the output folder and filename for saving the DataFrame as a Parquet file.
output_folder = "/CleanedData/yellow/"
output_filename = output_folder + "cleaned_2015.parquet"

# Save the trips_2015 DataFrame as a Parquet file with the specified filename, overwriting if it already exists.
trips_2015.write.parquet(output_filename, mode="overwrite")


Saved the Parquet file to a cleaned_data folder. And the data cleaning process continues for the rest of the years. 

Processing Year 2016

In [0]:
# Set the target year to 2016.
year = 2016

# Create a new DataFrame trips_2016 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2016 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2016 DataFrame for the year 2016.

# Filter trips with minimum distance and speed requirements.
trips_2016 = filter_by_distance_and_speed(trips_2016, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2016 = filter_and_find_highest_speed(trips_2016, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2016 = filter_by_trip_duration(trips_2016, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2016 = filter_by_trip_distance(trips_2016, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2016 = filter_by_passenger_count(trips_2016, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2016 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2016 = filter_by_valid_rate_codes(trips_2016, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjusted the threshold per business decision. 

# Apply a filter to the trips_2016 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2016 = filter_tip_amounts(trips_2016, threshold)

# Display the filtered DataFrame.
trips_2016.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2016-01-01 00:

Saving the parquet file of 2016 after applying the data cleaning steps. 

In [0]:
# Save the DataFrame as a Parquet file with the specified filename
output_folder = "/CleanedData/yellow/"

output_filename = output_folder + "cleaned_2016.parquet"
trips_2016.write.parquet(output_filename, mode="overwrite")

Processing Year 2017

In [0]:
# Set the target year to 2017.
year = 2017

# Create a new DataFrame trips_2017 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2017 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2017 DataFrame for the year 2017.

# Filter trips with minimum distance and speed requirements.
trips_2017 = filter_by_distance_and_speed(trips_2017, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2017 = filter_and_find_highest_speed(trips_2017, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2017 = filter_by_trip_duration(trips_2017, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2017 = filter_by_trip_distance(trips_2017, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2017 = filter_by_passenger_count(trips_2017, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2017 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2017 = filter_by_valid_rate_codes(trips_2017, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150

# Apply a filter to the trips_2017 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2017 = filter_tip_amounts(trips_2017, threshold)

# Display the filtered DataFrame.
trips_2017.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2017-01-01 00:

In [0]:
# Save the DataFrame as a Parquet file with the specified filename
output_folder = "/CleanedData/yellow/"

output_filename = output_folder + "cleaned_2017.parquet"
trips_2017.write.parquet(output_filename, mode="overwrite")

Processing Year 2018

In [0]:
# Set the target year to 2018.
year = 2018

# Create a new DataFrame trips_2018 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2018 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2018 DataFrame for the year 2018.

# Filter trips with minimum distance and speed requirements.
trips_2018 = filter_by_distance_and_speed(trips_2018, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2018 = filter_and_find_highest_speed(trips_2018, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2018 = filter_by_trip_duration(trips_2018, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2018 = filter_by_trip_distance(trips_2018, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2018 = filter_by_passenger_count(trips_2018, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2018 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2018 = filter_by_valid_rate_codes(trips_2018, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjust the threshold as needed

# Apply a filter to the trips_2018 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2018 = filter_tip_amounts(trips_2018, threshold)

# Display the filtered DataFrame.
trips_2018.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2018-01-01 00:

In [0]:
# Define the output folder where the Parquet file will be saved.
output_folder = "/CleanedData/yellow/"

# Define the output filename.
output_filename = output_folder + "cleaned_2018.parquet"

# Write the trips_2018 DataFrame to a Parquet file with the specified filename.
trips_2018.write.parquet(output_filename, mode="overwrite")


Processing Year 2019

In [0]:
# Set the target year to 2019.
year = 2019

# Create a new DataFrame trips_2019 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2019 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2019 DataFrame for the year 2019.

# Filter trips with minimum distance and speed requirements.
trips_2019 = filter_by_distance_and_speed(trips_2019, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2019 = filter_and_find_highest_speed(trips_2019, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2019 = filter_by_trip_duration(trips_2019, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2019 = filter_by_trip_distance(trips_2019, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2019 = filter_by_passenger_count(trips_2019, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2019 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2019 = filter_by_valid_rate_codes(trips_2019, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjust the threshold as needed

# Apply a filter to the trips_2019 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2019 = filter_tip_amounts(trips_2019, threshold)

# Display the filtered DataFrame.
trips_2019.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2019-03-01 00:

In [0]:
# Define the output folder where the Parquet file will be saved.
output_folder = "/CleanedData/yellow/"

# Define the output filename.
output_filename = output_folder + "cleaned_2019.parquet"

# Write the trips_2019 DataFrame to a Parquet file with the specified filename.
trips_2019.write.parquet(output_filename, mode="overwrite")


In [0]:
trips_2019 = spark.read.parquet("/CleanedData/yellow/cleaned_2019.parquet")
 
trips_2019.count()


Out[32]: 81520120

Skipped the below step as it only checks for unexected Store and Forward Flags and doesn't apply any tranformation. There were no unexpected values. 

In [0]:
# # columns_to_check = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"]
# check_missing_values(trips_2019, columns_to_check)

# filter_unexpected_store_and_fwd_flag(trips_2019)

Processing Year 2020

In [0]:
# Set the target year to 2020.
year = 2020

# Create a new DataFrame trips_2020 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2020 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2020 DataFrame for the year 2020.

# Filter trips with minimum distance and speed requirements.
trips_2020 = filter_by_distance_and_speed(trips_2020, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2020 = filter_and_find_highest_speed(trips_2020, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2020 = filter_by_trip_duration(trips_2020, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2020 = filter_by_trip_distance(trips_2020, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2020 = filter_by_passenger_count(trips_2020, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2020 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2020 = filter_by_valid_rate_codes(trips_2020, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjust the threshold as needed

# Apply a filter to the trips_2020 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2020 = filter_tip_amounts(trips_2020, threshold)

# Display the filtered DataFrame.
trips_2020.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2020-01-01 00:

In [0]:
# Define the output folder where the Parquet file will be saved.
output_folder = "/CleanedData/yellow/"

# Define the output filename.
output_filename = output_folder + "cleaned_2020.parquet"

# Write the trips_2020 DataFrame to a Parquet file with the specified filename.
trips_2020.write.parquet(output_filename, mode="overwrite")


Processing Year 2021

In [0]:
# Set the target year to 2021.
year = 2021

# Create a new DataFrame trips_2021 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2021 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2021 DataFrame for the year 2021.

# Filter trips with minimum distance and speed requirements.
trips_2021 = filter_by_distance_and_speed(trips_2021, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2021 = filter_and_find_highest_speed(trips_2021, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2021 = filter_by_trip_duration(trips_2021, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2021 = filter_by_trip_distance(trips_2021, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2021 = filter_by_passenger_count(trips_2021, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply a filter to the trips_2021 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2021 = filter_by_valid_rate_codes(trips_2021, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjust the threshold as needed

# Apply a filter to the trips_2021 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2021 = filter_tip_amounts(trips_2021, threshold)

# Display the filtered DataFrame.
trips_2021.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2021-10-01 00:

In [0]:
# Define the output folder where the Parquet file will be saved.
output_folder = "/CleanedData/yellow/"

# Define the output filename.
output_filename = output_folder + "cleaned_2021.parquet"

# Write the trips_2021 DataFrame to a Parquet file with the specified filename.
trips_2021.write.parquet(output_filename, mode="overwrite")


Processing Year 2022

In [0]:
# Set the target year to 2022.
year = 2022

# Create a new DataFrame trips_2022 by calling the filter_and_clean_data function with df_cleaned and the specified year as arguments.
trips_2022 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply data cleaning and filtering operations to the trips_2022 DataFrame for the year 2022.

# Filter trips with minimum distance and speed requirements.
trips_2022 = filter_by_distance_and_speed(trips_2022, min_distance=0, min_speed=0)

# Filter trips with a maximum allowable speed.
trips_2022 = filter_and_find_highest_speed(trips_2022, max_allowable_speed_kmph=40)

# Filter trips by trip duration.
trips_2022 = filter_by_trip_duration(trips_2022, min_duration=60, max_duration=86400)

# Filter trips by trip distance.
trips_2022 = filter_by_trip_distance(trips_2022, min_distance=0.1, max_distance=100)

# Filter trips by passenger count.
trips_2022 = filter_by_passenger_count(trips_2022, min_count=1, max_count=7)


In [0]:
# Define a list of valid rate codes.
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Filter the trips_2022 DataFrame to keep only trips with rate codes in the list of valid rate codes.
trips_2022 = filter_by_valid_rate_codes(trips_2022, valid_rate_codes)


In [0]:
# Define a threshold value for filtering tip amounts.
threshold = 150  # Adjust the threshold as needed

# Apply a filter to the trips_2022 DataFrame to keep only trips with tip amounts less than or equal to the threshold.
trips_2022 = filter_tip_amounts(trips_2022, threshold)

# Display the filtered DataFrame.
trips_2022.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+-------------+
|       1| 2022-10-01 00:

In [0]:
# Define the output folder where the Parquet file will be saved.
output_folder = "/CleanedData/yellow/"

# Define the output filename.
output_filename = output_folder + "cleaned_2022.parquet"

# Write the trips_2022 DataFrame to a Parquet file with the specified filename.
trips_2022.write.parquet(output_filename, mode="overwrite")


In [0]:
# Use dbutils.fs.ls to display the contents of a directory in DBFS.

# Define the directory path you want to list.
directory_path = "dbfs:/CleanedData/yellow/"

# Call dbutils.fs.ls to list the contents of the specified directory.
directory_contents = dbutils.fs.ls(directory_path)

# Display the list of files and subdirectories in the directory.
display(directory_contents)


path,name,size,modificationTime
dbfs:/CleanedData/yellow/cleaned_2015.parquet/,cleaned_2015.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2016.parquet/,cleaned_2016.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2017.parquet/,cleaned_2017.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2018.parquet/,cleaned_2018.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2019.parquet/,cleaned_2019.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2020.parquet/,cleaned_2020.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2021.parquet/,cleaned_2021.parquet/,0,0
dbfs:/CleanedData/yellow/cleaned_2022.parquet/,cleaned_2022.parquet/,0,0


In [0]:
clean_yellow = spark.read.parquet("/CleanedData/yellow/cleaned_*")
clean_yellow.count()

Out[69]: 646503433

The count of rows in the resulting DataFrame, clean_yellow, is reported as 646,503,433.

This count represents the total number of cleaned and filtered records across all the years specified by the wildcard pattern in the Parquet files. It indicates the combined dataset size for the cleaned yellow taxi data from the years 2015 to 2022.

In [0]:
clean_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- filename: string (nullable = true)
 |-- trip_speed: double (nullable = true)
 |-- trip_duration: long (nullable = true)



## Green Taxi Dataset Cleaning Preparation 

We are importing the original parquet files of the Green Data into DBFS from the shared Google Drive. 

In [0]:
df_green = spark.read.parquet("/FileStore/green")
df_green.count()

Out[16]: 66200401

In [0]:

# Import the necessary functions
from pyspark.sql.functions import col

# Rename the columns in the DataFrame
df_cleaned = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'tpep_pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'tpep_dropoff_datetime')
    
# So that I can use df_cleaned with the updated column names


In [0]:
# Filter the DataFrame to keep only valid trips where drop-off is after or at the same time as pick-up
df_cleaned = df_cleaned.filter(df_cleaned['tpep_dropoff_datetime'] >= df_cleaned['tpep_pickup_datetime'])


In [0]:
# Import the necessary function
from pyspark.sql.functions import input_file_name

# Add a new column "filename" to the DataFrame containing the input file name
df_cleaned = df_cleaned.withColumn("filename", input_file_name())


Processing Year 2015

In [0]:
# Define the year for which you want to filter and clean the data
year = 2015

# Call the filter_and_clean_data function to filter and clean the data for the specified year
trips_2015 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Pseudo code for Data Filtering and Cleaning for Year 2015

# Filtering trips by minimum distance and minimum speed
trips_2015 = filter_by_distance_and_speed(trips_2015, min_distance=0, min_speed=0)

# Filtering trips to find the highest allowable speed
trips_2015 = filter_and_find_highest_speed(trips_2015, max_allowable_speed_kmph=40)

# Filtering trips by trip duration
trips_2015 = filter_by_trip_duration(trips_2015, min_duration=60, max_duration=86400)

# Filtering trips by trip distance
trips_2015 = filter_by_trip_distance(trips_2015, min_distance=0.1, max_distance=100)

# Filtering trips by passenger count
trips_2015 = filter_by_passenger_count(trips_2015, min_count=1, max_count=7)


In [0]:
# Pseudo code for filtering trips by invalid rate codes

# Define the list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Use a filter function to select trips with rate codes not in the valid_rate_codes list
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2015, valid_rate_codes)

# Show the DataFrame containing trips with invalid rate codes
invalid_rate_code_trips.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|filename|trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
+--------+--------------------+---------------------+--------

There are no invalid rate codes. 

In [0]:
# Pseudo code for filtering trips by extreme tip amounts

# Define the threshold for extreme tip amounts
threshold = 150  # Adjusted the threshold per business decision

# Use a filter function to select trips with tip amounts exceeding the threshold
extreme_tip_trips = filter_extreme_tip_amounts(trips_2015, threshold)

# Show the DataFrame containing trips with extreme tip amounts
extreme_tip_trips.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
# Pseudo code for filtering trips by tip amount threshold

# Define the tip amount threshold
threshold = 150  # Adjust the threshold as needed

# Use a filter function to select trips with tip amounts above the threshold
trips_2015 = filter_tip_amounts(trips_2015, threshold)

# Display the resulting DataFrame
trips_2015.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
#dbutils.fs.rm("/CleanedData/green/", recurse=True)


In [0]:
# Pseudo code for saving a DataFrame as a Parquet file

# Define the output folder and filename
output_folder = "/CleanedData/green/"
output_filename = output_folder + "cleaned_2015.parquet"

# Save the DataFrame as a Parquet file with the specified filename and overwrite if it already exists
trips_2015.write.parquet(output_filename, mode="overwrite")


In [0]:
# Read the Parquet file
trips_2015 = spark.read.parquet("/CleanedData/green/cleaned_2015.parquet")

# Define the list of columns to check for missing values
columns_to_check = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"]

# Check for missing values in the specified columns
check_missing_values(trips_2015, columns_to_check)

# Filter rows with unexpected values in the 'store_and_fwd_flag' column
filter_unexpected_store_and_fwd_flag(trips_2015)


Missing values in column 'tpep_pickup_datetime': 0
Missing values in column 'tpep_dropoff_datetime': 0
Missing values in column 'trip_distance': 0
Missing values in column 'fare_amount': 0
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|filename|trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+---------

In the dataset for the year 2015, there don't appear to be any missing values in the columns 'tpep_pickup_datetime,' 'tpep_dropoff_datetime,' 'trip_distance,' and 'fare_amount.' Additionally, upon filtering rows with unexpected values in the 'store_and_fwd_flag' column, no such rows were found, indicating that the data seems to be clean and consistent for this year.

Processing Year 2016

In [0]:
# Filter and clean the data for the year 2016
year = 2016
trips_2016 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Filter and clean data for the year 2016
year = 2016
trips_2016 = filter_and_clean_data(df_cleaned, year)

# Filter by distance and speed
trips_2016 = filter_by_distance_and_speed(trips_2016, min_distance=0, min_speed=0)

# Filter and find highest speed trips
trips_2016 = filter_and_find_highest_speed(trips_2016, max_allowable_speed_kmph=40)

# Filter by trip duration
trips_2016 = filter_by_trip_duration(trips_2016, min_duration=60, max_duration=86400)

# Filter by trip distance
trips_2016 = filter_by_trip_distance(trips_2016, min_distance=0.1, max_distance=100)

# Filter by passenger count
trips_2016 = filter_by_passenger_count(trips_2016, min_count=1, max_count=7)


In [0]:
# Define the list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Filter trips with invalid rate codes
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2016, valid_rate_codes)

# Display the resulting DataFrame
invalid_rate_code_trips.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|filename|trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
+--------+--------------------+---------------------+--------

In [0]:
# Set the threshold for extreme tip amounts
threshold = 150  # Adjusted the threshold per business decision

# Filter trips with tip amounts exceeding the threshold
extreme_tip_trips = filter_extreme_tip_amounts(trips_2016, threshold)

# Display the resulting DataFrame
extreme_tip_trips.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
# Set the threshold for tip amounts
threshold = 150  # Adjust the threshold as needed

# Filter trips with tip amounts exceeding the threshold
trips_2016 = filter_tip_amounts(trips_2016, threshold)

# Display the resulting DataFrame
trips_2016.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+--------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|          trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+--------------------+----------

In [0]:
# Define the output folder path
output_folder = "/CleanedData/green/"

# Define the output filename
output_filename = output_folder + "cleaned_2016.parquet"

# Save the DataFrame as a Parquet file with the specified filename
trips_2016.write.parquet(output_filename, mode="overwrite")


In [0]:
# Read the Parquet file into the trips_2016 DataFrame
trips_2016 = spark.read.parquet("/CleanedData/green/cleaned_2016.parquet")

# Define a list of columns to check for missing values
columns_to_check = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"]

# Check for missing values in the specified columns
check_missing_values(trips_2016, columns_to_check)

# Filter and display rows with unexpected values in the 'store_and_fwd_flag' column
filter_unexpected_store_and_fwd_flag(trips_2016)


Missing values in column 'tpep_pickup_datetime': 0
Missing values in column 'tpep_dropoff_datetime': 0
Missing values in column 'trip_distance': 0
Missing values in column 'fare_amount': 0


The code indicates that there are no missing values in critical columns such as 'tpep_pickup_datetime,' 'tpep_dropoff_datetime,' 'trip_distance,' and 'fare_amount' within the trips_2016 DataFrame. 

Processing Year 2017

In [0]:
year=2017
trips_2017 = filter_and_clean_data(df_cleaned, year)


In [0]:
# data filtering and cleaning operations
trips_2017 = filter_by_distance_and_speed(trips_2017, min_distance=0, min_speed=0)
trips_2017 = filter_and_find_highest_speed(trips_2017, max_allowable_speed_kmph=40)
trips_2017 = filter_by_trip_duration(trips_2017, min_duration=60, max_duration=86400)
trips_2017 = filter_by_trip_distance(trips_2017, min_distance=0.1, max_distance=100)
trips_2017 = filter_by_passenger_count(trips_2017, min_count=1, max_count=7)

In [0]:
valid_rate_codes = [1, 2, 3, 4, 5, 6]  # the list of valid rate codes we want to check against
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2017, valid_rate_codes)
invalid_rate_code_trips.show()

In [0]:
# List of valid rate codes to check against
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Filter trips in the year 2017 by valid rate codes
trips_2017 = filter_by_valid_rate_codes(trips_2017, valid_rate_codes)


In [0]:
# Set the threshold for extreme tip amounts
threshold = 150

# Filter trips in the year 2017 with extreme tip amounts
extreme_tip_trips = filter_extreme_tip_amounts(trips_2017, threshold)

# Display the resulting DataFrame
extreme_tip_trips.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+-------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|         trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+-------------------+-------------

In [0]:
# Set the threshold for tip amounts
threshold = 150

# Filter trips in the year 2017 based on the tip amount threshold
trips_2017 = filter_tip_amounts(trips_2017, threshold)

# Display the resulting DataFrame
trips_2017.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+--------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|          trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+--------------------+----------

In [0]:
# Define the output folder path
output_folder = "/CleanedData/green/"

# Define the output filename
output_filename = output_folder + "cleaned_2017.parquet"

# Write the trips_2017 DataFrame to a Parquet file with the specified filename
trips_2017.write.parquet(output_filename, mode="overwrite")


In [0]:
# Define the path to the Parquet file
parquet_file_path = "/CleanedData/green/cleaned_2017.parquet"

# Read the Parquet file into a DataFrame
trips_2017 = spark.read.parquet(parquet_file_path)

# Define a list of column names to check for missing values
columns_to_check = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"]

# Call a function to check for missing values in the specified columns
check_missing_values(trips_2017, columns_to_check)

# Call a function to filter rows with unexpected values in the 'store_and_fwd_flag' column
filter_unexpected_store_and_fwd_flag(trips_2017)


Missing values in column 'tpep_pickup_datetime': 0
Missing values in column 'tpep_dropoff_datetime': 0
Missing values in column 'trip_distance': 0
Missing values in column 'fare_amount': 0
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|filename|trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+---------

Processing Year 2018

In [0]:
# Pseudocode for loading and cleaning taxi trip data for the year 2018

# Specify the year for which data needs to be processed
year = 2018

# Call a function to filter and clean the data for the specified year
trips_2018 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Pseudocode for filtering and cleaning taxi trip data for the year 2018

# Filter trips based on minimum distance and speed
trips_2018 = filter_by_distance_and_speed(trips_2018, min_distance=0, min_speed=0)

# Filter trips to find the highest allowable speed
trips_2018 = filter_and_find_highest_speed(trips_2018, max_allowable_speed_kmph=40)

# Filter trips by trip duration within a specified range
trips_2018 = filter_by_trip_duration(trips_2018, min_duration=60, max_duration=86400)

# Filter trips by trip distance within a specified range
trips_2018 = filter_by_trip_distance(trips_2018, min_distance=0.1, max_distance=100)

# Filter trips by passenger count within a specified range
trips_2018 = filter_by_passenger_count(trips_2018, min_count=1, max_count=7)


In [0]:
# Pseudocode for filtering trips with invalid rate codes for the year 2018

# Define a list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Filter trips to keep only those with valid rate codes
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2018, valid_rate_codes)

# Display the resulting DataFrame
invalid_rate_code_trips.show()


In [0]:
# Define a list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Call the function to filter trips by valid rate codes
trips_2018 = filter_by_valid_rate_codes(trips_2018, valid_rate_codes)


In [0]:
# Define the threshold for extreme tip amounts
threshold = 150

# Call the function to filter trips by extreme tip amounts
extreme_tip_trips = filter_extreme_tip_amounts(trips_2018, threshold)

# Display the resulting DataFrame
extreme_tip_trips.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
# Define the threshold for tip amounts
threshold = 150

# Call the function to filter trips by tip amounts
trips_2018 = filter_tip_amounts(trips_2018, threshold)

# Display the resulting DataFrame
trips_2018.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
# Save the DataFrame as a Parquet file with the specified filename
output_folder = "/CleanedData/green/"

output_filename = output_folder + "cleaned_2018.parquet"
trips_2018.write.parquet(output_filename, mode="overwrite")

In [0]:
# Read the Parquet file into a DataFrame
trips_2018 = spark.read.parquet("/CleanedData/green/cleaned_2018.parquet")

# Define a list of columns to check for missing values
columns_to_check = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"]

# Call a function to check for missing values in the specified columns
check_missing_values(trips_2018, columns_to_check)

# Call a function to filter rows with unexpected values in the 'store_and_fwd_flag' column
filter_unexpected_store_and_fwd_flag(trips_2018)


Missing values in column 'tpep_pickup_datetime': 0
Missing values in column 'tpep_dropoff_datetime': 0
Missing values in column 'trip_distance': 0
Missing values in column 'fare_amount': 0
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------+----------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|filename|trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+---------

Processing Year 2019

In [0]:
# Filter and clean data for the year 2019
year = 2019
trips_2019 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Filter and clean trips data for the year 2019
year = 2019
trips_2019 = filter_and_clean_data(df_cleaned, year)

# Filter trips by distance and speed
trips_2019 = filter_by_distance_and_speed(trips_2019, min_distance=0, min_speed=0)

# Filter trips by finding the highest allowable speed
trips_2019 = filter_and_find_highest_speed(trips_2019, max_allowable_speed_kmph=40)

# Filter trips by trip duration
trips_2019 = filter_by_trip_duration(trips_2019, min_duration=60, max_duration=86400)

# Filter trips by trip distance
trips_2019 = filter_by_trip_distance(trips_2019, min_distance=0.1, max_distance=100)

# Filter trips by passenger count
trips_2019 = filter_by_passenger_count(trips_2019, min_count=1, max_count=7)


In [0]:
# Define the list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Filter trips by invalid rate codes
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2019, valid_rate_codes)

# Display the DataFrame with invalid rate code trips
invalid_rate_code_trips.show()


In [0]:
# Define a list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Filter trips_2019 DataFrame to keep rows with rate codes in the valid_rate_codes list
trips_2019 = filter_by_valid_rate_codes(trips_2019, valid_rate_codes)


In [0]:
# Set the threshold value for extreme tip amounts
threshold = 150

# Filter trips_2019 DataFrame to keep rows where tip amount is greater than or equal to the threshold
extreme_tip_trips = filter_extreme_tip_amounts(trips_2019, threshold)

# Display the resulting DataFrame
extreme_tip_trips.show()


In [0]:
# Set the threshold value for tip amounts
threshold = 150

# Filter trips_2019 DataFrame to keep rows where tip amount is greater than or equal to the threshold
trips_2019 = filter_tip_amounts(trips_2019, threshold)

# Display the resulting DataFrame
trips_2019.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
# Define the output folder path
output_folder = "/CleanedData/green/"

# Define the output filename
output_filename = output_folder + "cleaned_2019.parquet"

# Write the trips_2019 DataFrame to the specified Parquet file location
trips_2019.write.parquet(output_filename, mode="overwrite")


In [0]:
# Load the 'trips_2019' DataFrame from a Parquet file
trips_2019 = spark.read.parquet("/CleanedData/green/cleaned_2019.parquet")

# Define a list of columns to check for missing values
columns_to_check = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"]

# Call a function to check for missing values in the specified columns
check_missing_values(trips_2019, columns_to_check)

# Call a function to filter and display rows with unexpected values in the 'store_and_fwd_flag' column
filter_unexpected_store_and_fwd_flag(trips_2019)


Processing Year 2020

In [0]:
year=2020
trips_2020 = filter_and_clean_data(df_cleaned, year)


In [0]:
# Apply filters one by one
trips_2020 = filter_by_distance_and_speed(trips_2020, min_distance=0, min_speed=0)

trips_2020 = filter_and_find_highest_speed(trips_2020, max_allowable_speed_kmph=40)

trips_2020 = filter_by_trip_duration(trips_2020, min_duration=60, max_duration=86400)


trips_2020 = filter_by_trip_distance(trips_2020, min_distance=0.1, max_distance=100)

trips_2020 = filter_by_passenger_count(trips_2020, min_count=1, max_count=7)

In [0]:

valid_rate_codes = [1, 2, 3, 4, 5, 6]  # the list of valid rate codes we want to check against
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2020, valid_rate_codes)
invalid_rate_code_trips.show()

In [0]:
# Define a list of valid rate codes
valid_rate_codes = [1, 2, 3, 4, 5, 6]

# Apply the filter
trips_2020 = filter_by_valid_rate_codes(trips_2020, valid_rate_codes)

In [0]:
threshold = 150  # Adjusted the threshold per business decision
extreme_tip_trips = filter_extreme_tip_amounts(trips_2020, threshold)
extreme_tip_trips.show()

In [0]:

threshold = 150  # Adjust the threshold as needed
trips_2020 = filter_tip_amounts(trips_2020, threshold)
trips_2020.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

In [0]:
# Save the DataFrame as a Parquet file with the specified filename
output_folder = "/CleanedData/green/"

output_filename = output_folder + "cleaned_2020.parquet"
trips_2020.write.parquet(output_filename, mode="overwrite")

Processing Year 2021

In [0]:
# Define the year for filtering
year = 2021

# Call the filter_and_clean_data function
trips_2021 = filter_and_clean_data(df_cleaned, year)

In [0]:
# Apply the filters and cleaning operations to the trips_2021 DataFrame
trips_2021 = filter_by_distance_and_speed(trips_2021, min_distance=0, min_speed=0)
trips_2021 = filter_and_find_highest_speed(trips_2021, max_allowable_speed_kmph=40)
trips_2021 = filter_by_trip_duration(trips_2021, min_duration=60, max_duration=86400)
trips_2021 = filter_by_trip_distance(trips_2021, min_distance=0.1, max_distance=100)
trips_2021 = filter_by_passenger_count(trips_2021, min_count=1, max_count=7)

In [0]:
valid_rate_codes = [1, 2, 3, 4, 5, 6]  # the list of valid rate codes we want to check against
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2021, valid_rate_codes)
invalid_rate_code_trips.show()

In [0]:
valid_rate_codes = [1, 2, 3, 4, 5, 6]
trips_2021 = filter_by_valid_rate_codes(trips_2021, valid_rate_codes)

In [0]:

threshold = 150  # Adjusted the threshold per business decision
extreme_tip_trips = filter_extreme_tip_amounts(trips_2021, threshold)
extreme_tip_trips.show()


In [0]:
# Save the DataFrame as a Parquet file with the specified filename
output_folder = "/CleanedData/green/"

output_filename = output_folder + "cleaned_2021.parquet"
trips_2021.write.parquet(output_filename, mode="overwrite")

In [0]:
threshold = 150  # Adjust the threshold as needed
trips_2021 = filter_tip_amounts(trips_2021, threshold)
trips_2021.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|        trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+-------------+
|

Processing Year 2022

In [0]:
# Apply the filter and clean data operation for the year 2022
year = 2022
trips_2022 = filter_and_clean_data(df_cleaned, year)

In [0]:
# Filter trips_2022 by distance and speed
trips_2022 = filter_by_distance_and_speed(trips_2022, min_distance=0, min_speed=0)

# Find and filter out trips with the highest speed exceeding a threshold
trips_2022 = filter_and_find_highest_speed(trips_2022, max_allowable_speed_kmph=40)

# Filter trips by trip duration (minimum and maximum)
trips_2022 = filter_by_trip_duration(trips_2022, min_duration=60, max_duration=86400)

# Filter trips by trip distance (minimum and maximum)
trips_2022 = filter_by_trip_distance(trips_2022, min_distance=0.1, max_distance=100)

# Filter trips by passenger count (minimum and maximum)
trips_2022 = filter_by_passenger_count(trips_2022, min_count=1, max_count=7)


In [0]:
valid_rate_codes = [1, 2, 3, 4, 5, 6]  # the list of valid rate codes we want to check against
invalid_rate_code_trips = filter_by_invalid_rate_codes(trips_2022, valid_rate_codes)
invalid_rate_code_trips.show()

In [0]:

valid_rate_codes = [1, 2, 3, 4, 5, 6]
trips_2022 = filter_by_valid_rate_codes(trips_2022, valid_rate_codes)

In [0]:
threshold = 150  # Adjusted the threshold per business decision
extreme_tip_trips = filter_extreme_tip_amounts(trips_2022, threshold)
extreme_tip_trips.show()

In [0]:
threshold = 150  # Adjust the threshold as needed
trips_2022 = filter_tip_amounts(trips_2022, threshold)
trips_2022.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+-------------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|            filename|         trip_speed|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+-------------------+-------------

In [0]:
# Save the DataFrame as a Parquet file with the specified filename
output_folder = "/CleanedData/green/"

output_filename = output_folder + "cleaned_2022.parquet"
trips_2022.write.parquet(output_filename, mode="overwrite")

In [0]:
clean_green = spark.read.parquet("/CleanedData/green/cleaned_*")
clean_green.count()

Out[67]: 62758794

In [0]:
clean_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- filename: string (nullable = true)
 |-- trip_speed: double (nullable = true)
 |-- trip_durati

We have successfully cleaned and processed both the green and yellow taxi datasets, applying a series of data filtering and cleaning operations. The cleaned data has been saved into separate chunks based on each year, organized within a folder named 'CleanedData.' These cleaned datasets are now ready for further analysis and insights.