In [0]:
dbutils.widgets.text("ProcessMonth", "202412", "Month (YYYYMM)")

In [0]:
processMonth = dbutils.widgets.get("ProcessMonth")

print(processMonth)

In [0]:
# Run only if not using Unity Catalog

# Replace the values: *** Data Lake Name ***, *** Data Lake Access Key ***

#storage_account_name = "*** Data Lake Name ***"
#storage_account_access_key = "*** Data Lake Access Key ***"

#spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_access_key)

### A. Define File Path

In [0]:
# Replace the values: ***ContainerName***, ***DataLakeName*** and file path (if required)

#yellowTaxisFilePath = "abfss://taxidata@mstrainingdatalake.dfs.core.windows.net/Raw/YellowTaxis/YellowTaxis_202501.csv"

yellowTaxisFilePath = f"abfss://taxidata@mstrainingdatalake.dfs.core.windows.net/Raw/YellowTaxis/YellowTaxis_{processMonth}.csv"

print(yellowTaxisFilePath)

### B. Read File by Applying Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create schema for Yellow Taxi Data

yellowTaxiSchema = (
                        StructType
                        ([ 
                            StructField("VendorId"               , IntegerType()   , True),
                            StructField("lpep_pickup_datetime"   , TimestampType() , True),
                            StructField("lpep_dropoff_datetime"  , TimestampType() , True),
                            StructField("passenger_count"        , DoubleType()    , True),
                            StructField("trip_distance"          , DoubleType()    , True),
                            StructField("RatecodeID"             , DoubleType()    , True),
                            StructField("store_and_fwd_flag"     , StringType()    , True),
                            StructField("PULocationID"           , IntegerType()   , True),
                            StructField("DOLocationID"           , IntegerType()   , True),
                            StructField("payment_type"           , IntegerType()   , True),
                            StructField("fare_amount"            , DoubleType()    , True),
                            StructField("extra"                  , DoubleType()    , True),
                            StructField("mta_tax"                , DoubleType()    , True),
                            StructField("tip_amount"             , DoubleType()    , True),
                            StructField("tolls_amount"           , DoubleType()    , True),
                            StructField("improvement_surcharge"  , DoubleType()    , True),
                            StructField("total_amount"           , DoubleType()    , True),
                            StructField("congestion_surcharge"   , DoubleType()    , True),
                            StructField("airport_fee"            , DoubleType()    , True)
                        ])
                   )

In [0]:
# Create DataFrame by applying the schema

yellowTaxiDF = (
                    spark
                        .read
                        .option("header", "true")

                        .schema(yellowTaxiSchema)

                        .csv(yellowTaxisFilePath)
                )

# Print schema
yellowTaxiDF.printSchema()

### C. Clean Data

#### C.1. Accuracy Check: Filter inaccurate data

In [0]:
yellowTaxiDF = (
                  yellowTaxiDF
    
                      .where("passenger_count > 0")

                      .filter(col("trip_distance") > 0.0)
               )

#### C.2. (a) Completeness Check: Drop rows with nulls

In [0]:
yellowTaxiDF = (
                   yellowTaxiDF    
                          .na.drop('all')
               )

#### C.2. (b) Completeness Check: Replace nulls with default values

In [0]:
defaultValueMap = {'payment_type': 5, 'RateCodeID': 1}


yellowTaxiDF = (
                   yellowTaxiDF    
                      .na.fill(defaultValueMap)
               )

#### C.3. Uniqueness Check: Drop duplicate rows

In [0]:
yellowTaxiDF = (
                   yellowTaxiDF
                          .dropDuplicates()
               )

#### C.4. Timeliness Check: Remove records outside the bound

In [0]:
yellowTaxiDF = (
    
                    yellowTaxiDF
                        .where("lpep_pickup_datetime >= '2025-01-01' AND lpep_dropoff_datetime < '2025-02-01'")
               )

### D. Transform Data

#### D.1. Select Limited Columns

In [0]:
yellowTaxiDF = (
                   yellowTaxiDF

                        # Select only limited columns
                        .select(
                                  "VendorID",
                             
                                  col("passenger_count").cast(IntegerType()),
                            
                                  column("trip_distance").alias("TripDistance"),
                            
                                  yellowTaxiDF.lpep_pickup_datetime,
                            
                                  "lpep_dropoff_datetime",
                                  "PUlocationID",
                                  "DOlocationID",
                                  "RatecodeID",
                                  "total_amount",
                                  "payment_type"
                               )
               )

#### D.2. Rename Columns

In [0]:
yellowTaxiDF = (
                   yellowTaxiDF                        
                        
                        .withColumnRenamed("passenger_count", "PassengerCount")
    
                        .withColumnRenamed("lpep_pickup_datetime", "PickupTime")
                        .withColumnRenamed("lpep_dropoff_datetime", "DropTime")
                        .withColumnRenamed("PUlocationID", "PickupLocationId")
                        .withColumnRenamed("DOlocationID", "DropLocationId")
                        .withColumnRenamed("total_amount", "TotalAmount")
                        .withColumnRenamed("payment_type", "PaymentType")    
               )

#### D.3. (a) Create Derived Columns - TripYear, TripMonth, TripDay

In [0]:
# Create derived columns for year, month and day
yellowTaxiDF = (
                  yellowTaxiDF
    
                        .withColumn("TripYear", year(col("PickupTime")))
    
                        .select(
                                    "*",
                            
                                    expr("month(PickupTime) AS TripMonth"),
                            
                                    dayofmonth(col("PickupTime")).alias("TripDay")
                               )
               )

#### D.3. (b) Create Derived Column - TripTimeInMinutes

In [0]:
tripTimeInSecondsExpr = unix_timestamp(col("DropTime")) - unix_timestamp(col("PickupTime"))

tripTimeInMinutesExpr = round(tripTimeInSecondsExpr / 60)


yellowTaxiDF = (
                  yellowTaxiDF
                        .withColumn("TripTimeInMinutes", tripTimeInMinutesExpr)
               )

#### D.3. (c) Create Derived Column - TripType

In [0]:
tripTypeColumn = (
                    when(
                            col("RatecodeID") == 6,
                              "SharedTrip"
                         )
                    .otherwise("SoloTrip")
                 )    


yellowTaxiDF = (
                  yellowTaxiDF
    
                        .withColumn("TripType", tripTypeColumn)
               )

### E. Save Data to Data Lake as Spark (Delta) Table

#### E.1. Create Catalog and Schema in Unity Catalog

In [0]:
%sql

CREATE CATALOG IF NOT EXISTS taxicatalog;

CREATE SCHEMA IF NOT EXISTS taxicatalog.rides;

-- For Hive Metastore
-- CREATE SCHEMA IF NOT EXISTS hive_metastore.rides;

#### E.2. Save DataFrame as Delta Table

In [0]:
(
    yellowTaxiDF
        .write

        .mode("append")

        .partitionBy("VendorID")

        .format("delta")

        .saveAsTable("taxicatalog.rides.yellowtaxis")
)

In [0]:
dbutils.notebook.exit("success")