In [0]:
# https://shilsdemostorage.blob.core.windows.net/nycyellowtaxidata-raw/nycyellowtaxidata-raw/2022/part-00000-tid-5193234821261796341-6165aa51-56ca-4363-a07d-c0bd865f916c-14-1-c000.snappy.parquet

dbutils.fs.ls("/mnt/nycyellowtaxidata-raw/2022")

In [0]:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("nyc_taxi_trip_data_clean") \
    .getOrCreate()

spark.conf.set("fs.azure.sas.nycyellowtaxidata-raw.shilsdemostorage.blob.core.windows.net", "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2024-06-15T07:52:01Z&st=2024-06-14T23:52:01Z&spr=https&sig=W7cBUc1L3%2FC9XCs%2BEug%2FTontPX%2F3hhQiFJpjmhfEe2o%3D")

# Define the location of your Parquet files
container_name = "nycyellowtaxidata-raw"
storage_account_name = "shilsdemostorage"
folder_path = "nycyellowtaxidata-raw/2022"

# Build the full path to your files
full_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{folder_path}/*.parquet"

# List and print all the files in the Azure Blob conatiner
# files = dbutils.fs.ls(full_path.replace("*.parquet", ""))
# print("Files in the conatiner nycyellowtaxidata-raw: ")
# for file in files:
#     print(file.path)

# Read the Parquet files into a DataFrame
df = spark.read.parquet(full_path)

# Show the DataFrame schema and some rows
df.printSchema()
df.show(n=10)


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+----

In [0]:
df.printSchema()

In [0]:
# =========================Step 2: Data Cleaning================================
from pyspark.sql.functions import col

# Filter rows with positive passenger_count and positive trip_distance
dff = df.filter((col("passenger_count") > 0) & (col("trip_distance") > 0))

# negative_tip_df = dff.filter(col("tip_amount") < 0)
# print("Rows with negative tip_amount:")
# negative_tip_df.show()
    
# # Count rows with negative tip_amount
# negative_tip_count = negative_tip_df.count()
# print(f"Number of rows with negative tip_amount: {negative_tip_count}")
    
# Remove rows with negative tip_amount. For the “tip amount”, a value of 0 is possible, indicating that the passenger did not leave a tip
cleaned_df = dff.filter(col("tip_amount") >= 0)
cleaned_df.show()

# Drop duplicate rows based on all columns
deduplicated_df = cleaned_df.dropDuplicates()
    
# Show the cleaned and deduplicated DataFrame
print("Cleaned and deduplicated DataFrame:")
deduplicated_df.show()
    
# Optionally, count the number of rows in the deduplicated DataFrame
deduplicated_count = deduplicated_df.count()
print(f"Number of rows in the cleaned and deduplicated DataFrame: {deduplicated_count}")


In [0]:
from pyspark.sql.functions import col, unix_timestamp, expr

# Calculate the trip duration in seconds
df_calc = deduplicated_df.withColumn(
    "trip_duration_seconds",
    unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))
)

# Optionally, calculate the trip duration in minutes
df_calc = df_calc.withColumn(
    "trip_duration_minutes",
    col("trip_duration_seconds") / 60
)
# Show the DataFrame with the trip_duration column
print("DataFrame with trip_duration (in seconds and minutes):")
df_calc.show()

In [0]:
# This is to filter the trip_duration_seconds column that has only positive trip duration. Negative values (668 rows) are filtered.
df_calc = df_calc.filter(col("trip_duration_seconds")> 0)
df_calc.count()

37036192

In [0]:
# Build the full path to your files
full_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{folder_path}/*.parquet"

spark.conf.set("fs.azure.sas.nycyellowtaxidata-raw.shilsdemostorage.blob.core.windows.net", "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2024-06-15T07:52:01Z&st=2024-06-14T23:52:01Z&spr=https&sig=W7cBUc1L3%2FC9XCs%2BEug%2FTontPX%2F3hhQiFJpjmhfEe2o%3D")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp

class DataCleaner:
    def __init__(self, app_name="DataCleanerApp"):
        # Initialize SparkSession
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .getOrCreate()

    def set_azure_conf(self, container_name, storage_account_name, sas_token):
        # Set Azure Blob Storage SAS token configuration
        conf_key = f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net"
        self.spark.conf.set(conf_key, sas_token)

    
    def clean_data(self):
        storage_account_name = "shilsdemostorage"
        container_name = "nycyellowtaxidata-raw"
        folder_path = "nycyellowtaxidata-raw/2022"
        full_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{folder_path}/*.parquet"
        df = self.spark.read.parquet(full_path)

        # Filter rows with positive passenger_count and positive trip_distance
        df = df.filter((col("passenger_count") > 0) & (col("trip_distance") > 0))
        
        # Remove rows with negative tip_amount
        df = df.filter(col("tip_amount") >= 0)
        
        # Drop duplicate rows based on all columns
        df = df.dropDuplicates()
    
        # Calculate the trip duration in seconds
        df = df.withColumn(
            "trip_duration_seconds",
            unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))
        )
        
        # Optionally, calculate the trip duration in minutes
        df = df.withColumn(
            "trip_duration_minutes",
            col("trip_duration_seconds") / 60
        )
        
        # Filter out rows with non-positive trip durations
        df = df.filter(col("trip_duration_seconds") > 0)
    
        # Display the schema and a few rows of the DataFrame
        df.printSchema()
        df.show()
        
        # Count the number of rows in the cleaned DataFrame
        row_count = df.count()
        print(f"Number of rows in the cleaned DataFrame: {row_count}")
    
        # Write the cleaned DataFrame back to Azure Blob Storage. Define your output folder here
        output_folder = "nycyellowtaxidata-cleaned/2022"  
        # Define the full path for the cleaned data
        output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{output_folder}"
        
        # Write the cleaned DataFrame to the specified path as Parquet
        df.write.mode("overwrite").parquet(output_path)
        
        print(f"Cleaned data written to {output_path}")
        return df

# Main class calling the DataCleaner class
if __name__ == "__main__":
    # Azure Blob Storage configuration
    storage_account_name = "shilsdemostorage"
    sas_token = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2024-06-15T07:52:01Z&st=2024-06-14T23:52:01Z&spr=https&sig=W7cBUc1L3%2FC9XCs%2BEug%2FTontPX%2F3hhQiFJpjmhfEe2o%3D"
    
    # folder_path = "nycyellowtaxidata-raw/2022"
    # full_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{folder_path}/*.parquet"
    
    # Initialize the DataCleaner class
    cleaner = DataCleaner()

    # Set Azure configuration
    cleaner.set_azure_conf(container_name, storage_account_name, sas_token)

    # Clean the DataFrame and show results
    cleaner.clean_data()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- trip_duration_seconds: long (nullable = true)
 |-- trip_duration_minutes: double (nullable = true)

+--------+----------------