In [0]:
import dlt
import pyspark.sql.functions as F

In [0]:
# This location keeps track of schema changes
SCHEMA_LOCATION = "/tmp/chp_02/taxi_data_chkpnt"
# This location contains the raw, unprocessed trip data
RAW_DATA_LOCATION = "/tmp/chp_02/taxi_data/"
@dlt.table(
    name="raw_taxi_trip_data",
    comment="Raw taxi trip data generated by the data generator notebook"
)
def raw_taxi_trip_data():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", SCHEMA_LOCATION)
        .load(RAW_DATA_LOCATION) )

In [0]:
# Define a new streaming table to apply SCD Type 2 changes
dlt.create_streaming_table("taxi_trip_data_merged")

In [0]:
dlt.apply_changes(
    target="taxi_trip_data_merged",
    source="raw_taxi_trip_data",
    keys = ["trip_id"],
    sequence_by = F.col("sequence_num"),
    apply_as_deletes = F.expr("op_type = 'D'"),
    except_column_list = ["op_type", "op_date", "sequence_num"],
    stored_as_scd_type = 2
)


In [0]:
@dlt.table(
    name="raw_driver_data",
    comment="Dataset containing info about the taxi drivers"
)
def raw_driver_data():
    postgresdb_url = f"jdbc:postgresql://{POSTGRES_HOSTNAME}:{POSTGRES_PORT}/{POSTGRES_DB}"
    conn_props = {
        "user": POSTGRES_USERNAME,
        "password": POSTGRES_PW,
        "driver": "org.postgresql.Driver",
        "fetchsize": "1000"
    }
    return (
        spark.read
            .jdbc(postgresdb_url,
                  table=POSTGRES_TABLENAME,
                  properties=conn_props))
@dlt.table(
    name="taxi_trip_silver",
    comment="Taxi trip data with transformed columns"
)
def taxi_trip_silver():
    return (
        dlt.read("taxi_trip_data_merged")
            .withColumn("fare_amount_usd",
                        F.round(F.col("trip_amount"), 2))
            .withColumn("taxes_amount_usd",
                        F.round(F.col("trip_amount") * 0.05, 2))
            .withColumn("trip_distance_miles",
                        F.round(F.col("trip_distance"), 2))
            .withColumn("trip_distance_km",
                        F.round(F.col("trip_distance")
                        * 1.60934, 2)) # 1 mile = 1.60934 km
    ).join(
        dlt.read("raw_driver_data"),
        on="taxi_number",
        how="left"
    )