#Bronze To Silver

The Bronze to Silver code performs data cleansing, validation, and formatting in a medallion architecture pipeline. It starts by loading raw data from the Bronze layer, specified by partition year and month, into a Spark DataFrame. The pipeline then processes this data in several steps:

- **Data Integrity Checks:** Removes rows with critical null values (e.g., `passenger_count`, `tpep_pickup_datetime`), filling other nullable fields with default values to ensure completeness.

- **Data Type Validation:** Casts columns to their appropriate data types, ensuring consistency and correctness for downstream processes.

- **Date and Range Checks:** Ensures `tpep_pickup_datetime` is earlier than `tpep_dropoff_datetime` and that dates fall within the specified partition month. Validates numeric fields to remove or flag invalid records (e.g., negative distances or fare amounts).

- **Categorical Data Mapping:** Restricts `RatecodeID` and `payment_type` to known ranges, assigning out-of-range values to 7 (unknown), and filters valid NYC location IDs, mapping unknown values as 264.

- **Deduplication:** Removes potential duplicate records based on key fields (e.g., VendorID, pickup and dropoff times, total_amount) to improve data quality.

- **Metadata and Partitioning:** Adds audit columns (`PipelineRunID`, `PipelineRunDate`, `SourceFile`) for traceability and partitions data by year and month.

Finally, the cleaned and enriched data is saved in Delta format to the Silver layer, optimized for analytics and further processing in subsequent steps.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from calendar import monthrange

##1. Data Reading

Setting up widgets to parameterize the data by year and month. This allows dynamic selection of the partition year and month for reading data, improving modularity and reusability.

In [0]:
dbutils.widgets.text("Year", "")
dbutils.widgets.text("Month", "")

partition_year = dbutils.widgets.get("Year")
partition_month = dbutils.widgets.get("Month")

Reading raw data from the Bronze delta lake container with the specified partition. The data is read in delta format from the specified year and month, ensuring we only process data relevant to the selected time frame.


In [0]:
silver_df = spark.read.format("delta").load(f"/mnt/bronze/delta_data/PartitionYear={partition_year}/PartitionMonth={partition_month}")

##2. Data Integrity

####Null Checks

Dropping rows where certain critical fields are null. Ensures data quality by filtering out rows that lack required values, particularly those needed for analysis or further processing.

In [0]:
silver_df = silver_df.na.drop(subset=[
        "passenger_count",
        "tpep_pickup_datetime",
        "tpep_dropoff_datetime"
])

Filling null values in certain columns with defaults. This approach ensures that calculations don't break due to missing values, and provides a meaningful fallback for specific columns.

In [0]:
silver_df = silver_df.fillna(
    {
        "trip_distance": 0,
        "fare_amount": 0,
        "extra": 0,
        "mta_tax": 0,
        "tip_amount": 0,
        "tolls_amount": 0,
        "improvement_surcharge": 0,
        "total_amount": 0,
        "congestion_surcharge": 0,
        "Airport_fee": 0
    }
)

silver_df = silver_df.fillna(

    {
        "RateCodeID": 7,
        "payment_type": 5,
        "PULocationID": 264,
        "DOLocationID": 264,
        "store_and_fwd_flag": "Unknown"
    }
)

####Data Type Validation

Casting columns to enforce correct data types. Ensures that columns are in the expected format, which is critical for consistency and downstream processing.

In [0]:

silver_df = (silver_df
  .withColumn("VendorID", col("VendorID").cast(IntegerType()))
  .withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType()))
  .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType()))
  .withColumn("passenger_count", col("passenger_count").cast(IntegerType()))
  .withColumn("trip_distance", col("trip_distance").cast(DoubleType()))
  .withColumn("RatecodeID", col("RatecodeID").cast(IntegerType()))
  .withColumn("store_and_fwd_flag", col("store_and_fwd_flag").cast(StringType()))
  .withColumn("PULocationID", col("PULocationID").cast(IntegerType()))
  .withColumn("DOLocationID", col("DOLocationID").cast(IntegerType())) 
  .withColumn("payment_type", col("payment_type").cast(IntegerType()))
  .withColumn("fare_amount", col("fare_amount").cast(DoubleType()))
  .withColumn("extra", col("extra").cast(DoubleType()))
  .withColumn("mta_tax", col("mta_tax").cast(DoubleType()))
  .withColumn("tip_amount", col("tip_amount").cast(DoubleType()))
  .withColumn("tolls_amount", col("tolls_amount").cast(DoubleType())) 
  .withColumn("improvement_surcharge", col("improvement_surcharge").cast(DoubleType())) 
  .withColumn("total_amount", col("total_amount").cast(DoubleType())) 
  .withColumn("congestion_surcharge", col("congestion_surcharge").cast(DoubleType())) 
  .withColumn("Airport_fee", col("Airport_fee").cast(DoubleType()))
)

##3. Format and Range

####Date and Time Consistency

Ensuring pickup time is before dropoff time. Validates temporal consistency by ensuring that each trip has a valid pickup and dropoff order.

In [0]:
silver_df = silver_df.filter(col("tpep_pickup_datetime") < col("tpep_dropoff_datetime"))

Filtering trips to fall within the specified month range. Adds another layer of temporal validation by filtering out records outside the expected monthly boundary.

In [0]:
start_date = datetime(int(partition_year), int(partition_month), 1)
end_date = datetime(int(partition_year), int(partition_month), monthrange(int(partition_year), int(partition_month))[1])

silver_df = silver_df.filter(
    (col("tpep_pickup_datetime") >= lit(start_date)) &
    (col("tpep_pickup_datetime") <= lit(end_date))
)

####Range Checks for Numeric Columns

Ensuring values fall within acceptable ranges for numeric columns. This is critical for data quality, ensuring only realistic and valid numeric entries proceed to the Silver layer.

In [0]:
silver_df = silver_df.filter(
    (col("passenger_count") >= 1) &
    (col("trip_distance") >= 0) &
    (col("fare_amount") >= 0) &
    (col("extra") >= 0) &
    (col("mta_tax") >= 0) &
    (col("tip_amount") >= 0) &
    (col("tolls_amount") >= 0) &
    (col("improvement_surcharge") >= 0) &
    (col("total_amount") >= 0) &
    (col("congestion_surcharge") >= 0) &
    (col("Airport_fee") >= 0)
)

Map `RatecodeID` values to 1-6 range, assigning 7 (unknown) to any out-of-range values.

In [0]:
silver_df = silver_df.withColumn(
    "RatecodeID",
    when((col("RatecodeID") >= 1) & (col("RatecodeID") <= 6), col("RatecodeID"))
    .otherwise(7)
)

Map `payment_type` values to 1-6 range, assigning 7 (unknown) to any out-of-range values.

In [0]:
silver_df = silver_df.withColumn(
    "payment_type",
    when((col("payment_type") >= 1) & (col("payment_type") <= 6), col("payment_type"))
    .otherwise(7)
)

Map `PULocationID` and `DOLocationID` values to 1-265 range, assigning 264 (unknown) to any out-of-range values.

In [0]:
silver_df = silver_df.withColumn(
    "PULocationID",
    when((col("PULocationID") >= 1) & (col("PULocationID") <= 265), col("PULocationID"))
    .otherwise(264)
).withColumn(
    "DOLocationID",
    when((col("DOLocationID") >= 1) & (col("DOLocationID") <= 265), col("DOLocationID"))
    .otherwise(264)
)

##4. Consistency and Cross-Field Validations

####Cross-Field validation for `total_amount`

Validating total amount consistency with calculated total. This adds a calculated column to check if the `total_amount` matches the sum of individual charges, improving data accuracy.

In [0]:
silver_df = silver_df.withColumn(
    "calculated_total_amount",
    col("fare_amount") + 
    col("extra") + 
    col("mta_tax") + 
    col("tip_amount") +
    col("tolls_amount") + 
    col("improvement_surcharge") + 
    col("congestion_surcharge") + 
    col("Airport_fee")
)

silver_df = silver_df.withColumn(
    "total_amount",
    when(col("total_amount") != col("calculated_total_amount"), col("calculated_total_amount"))
    .otherwise(col("total_amount"))
).drop("calculated_total_amount")

##5. Deduplication 

Removes potential duplicate trips based on key columns that uniquely represent a trip, enhancing data quality.

In [0]:
silver_df = silver_df.dropDuplicates(subset=["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "total_amount"])

##6. Extra Columns

####Partitions

Adding partition columns for year and month. Extracts and adds year and month from pickup datetime to support partitioning and optimized querying in the delta lake.

In [0]:
silver_df = silver_df.withColumn("PartitionYear", year(col("tpep_pickup_datetime")))
silver_df = silver_df.withColumn("PartitionMonth", month(col("tpep_pickup_datetime")))

####Audit Columns

Adding audit columns for tracking data lineage and load metadata. These columns help track data origin and load specifics, useful for audits and debugging.

In [0]:
dbutils.widgets.text("PipelineRunID", "")
pipeline_id = dbutils.widgets.get("PipelineRunID")

silver_df = (silver_df
    .withColumn("PipelineRunID", lit(pipeline_id))
    .withColumn("PipelineRunDate",now())
    .withColumn("SourceFile", input_file_name())
)

##6. Data Loading

Writing the transformed data to the Silver container. Saves data in Delta format, partitioned by year and month, facilitating optimized storage and query performance.

In [0]:
silver_df.write.mode("append").format("delta").partitionBy("PartitionYear", "PartitionMonth").save("/mnt/silver")