In [0]:
from pyspark.sql.functions import col

df_bronze = spark.table("taxi_dev.bronze.yellow_taxi_trips")


In [0]:
df_bronze.select("year", "month").distinct().show()


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

dedup_window = Window.partitionBy(
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID"
).orderBy(col("ingestion_ts").desc())

df_dedup = (
    df_bronze
    .withColumn("rn", row_number().over(dedup_window))
    .filter(col("rn") == 1)
    .drop("rn")
)


In [0]:
df_valid = df_dedup.filter(
    (col("tpep_pickup_datetime") < col("tpep_dropoff_datetime")) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0) &
    (col("passenger_count") >= 1)
)

df_invalid = df_dedup.subtract(df_valid)


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS taxi_dev.silver;
CREATE VOLUME IF NOT EXISTS taxi_dev.silver.nyc_taxi;


In [0]:
(
    df_valid
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .saveAsTable("taxi_dev.silver.yellow_taxi_trips")
)

In [0]:
(
    df_invalid
    .write
    .format("delta")
    .mode("overwrite")
    .option("path", "/Volumes/taxi_dev/silver/nyc_taxi/yellow_taxi_invalid")
    .saveAsTable("taxi_dev.silver.yellow_taxi_trips_invalid")
)


In [0]:
%sql
-- Bronze total
SELECT COUNT(*) AS bronze_count
FROM taxi_dev.bronze.yellow_taxi_trips;


In [0]:
%sql

-- Silver valid
SELECT COUNT(*) AS silver_valid_count
FROM taxi_dev.silver.yellow_taxi_trips;


In [0]:
%sql
-- Silver invalid
SELECT COUNT(*) AS silver_invalid_count
FROM taxi_dev.silver.yellow_taxi_trips_invalid;

In [0]:
%sql
SELECT COUNT(*) - COUNT(DISTINCT
  VendorID,
  tpep_pickup_datetime,
  tpep_dropoff_datetime,
  PULocationID,
  DOLocationID
) AS duplicate_count
FROM taxi_dev.bronze.yellow_taxi_trips;


In [0]:
%sql
SELECT
  ROUND(100.0 * 655111 / 3475226, 2) AS invalid_pct;


In [0]:
from pyspark.sql.functions import when, lit

df_with_reason = df_dedup.withColumn(
    "failure_reason",
    when(col("tpep_pickup_datetime") >= col("tpep_dropoff_datetime"), lit("INVALID_TIME"))
    .when(col("trip_distance") <= 0, lit("INVALID_DISTANCE"))
    .when(col("fare_amount") <= 0, lit("INVALID_FARE"))
    .when(col("passenger_count") < 1, lit("INVALID_PASSENGER_COUNT"))
)


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS taxi_dev.control;

CREATE TABLE IF NOT EXISTS taxi_dev.control.silver_watermark (
  table_name STRING,
  max_year INT,
  max_month INT,
  updated_ts TIMESTAMP
)
USING DELTA;


In [0]:
%sql
INSERT INTO taxi_dev.control.silver_watermark
VALUES ('yellow_taxi', 0, 0, current_timestamp());


In [0]:
from pyspark.sql.functions import col
wm = (
    spark.table("taxi_dev.control.silver_watermark")
    .filter(col("table_name") == "yellow_taxi")
    .collect()[0]
)

last_year = wm["max_year"]
last_month = wm["max_month"]


In [0]:
df_bronze_inc = (
    spark.table("taxi_dev.bronze.yellow_taxi_trips")
    .filter(
        (col("year") > last_year) |
        ((col("year") == last_year) & (col("month") > last_month))
    )
)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

dedup_window_inc = Window.partitionBy(
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID"
).orderBy(col("ingestion_ts").desc())

df_dedup_inc = (
    df_bronze_inc
    .withColumn("rn", row_number().over(dedup_window_inc))
    .filter(col("rn") == 1)
    .drop("rn")
)

In [0]:
df_valid_inc = df_dedup_inc.filter(
    (col("tpep_pickup_datetime") < col("tpep_dropoff_datetime")) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0) &
    (col("passenger_count") >= 1)
)

In [0]:
df_valid_inc.createOrReplaceTempView("temp_valid_inc")

spark.sql("""
MERGE INTO taxi_dev.silver.yellow_taxi_trips AS tgt
USING temp_valid_inc AS src
ON
  tgt.VendorID = src.VendorID
  AND tgt.tpep_pickup_datetime = src.tpep_pickup_datetime
  AND tgt.tpep_dropoff_datetime = src.tpep_dropoff_datetime
  AND tgt.PULocationID = src.PULocationID
  AND tgt.DOLocationID = src.DOLocationID
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

In [0]:
df_invalid_inc = df_dedup_inc.subtract(df_valid_inc)
df_invalid_inc.write.format("delta") \
    .mode("append") \
    .saveAsTable("taxi_dev.silver.yellow_taxi_trips_invalid")


In [0]:
from pyspark.sql.functions import when, lit

df_with_reason_inc = (
    df_dedup_inc
    .withColumn(
        "failure_reason",
        when(col("tpep_pickup_datetime") >= col("tpep_dropoff_datetime"), lit("INVALID_TIME"))
        .when(col("trip_distance") <= 0, lit("INVALID_DISTANCE"))
        .when(col("fare_amount") <= 0, lit("INVALID_FARE"))
        .when(col("passenger_count") < 1, lit("INVALID_PASSENGER_COUNT"))
    )
)

df_valid_inc = df_with_reason_inc.filter(col("failure_reason").isNull())
df_invalid_inc = df_with_reason_inc.filter(col("failure_reason").isNotNull())


In [0]:
df_invalid_inc.write.format("delta") \
    .mode("append") \
    .saveAsTable("taxi_dev.silver.yellow_taxi_trips_invalid")
