In [2]:
from pyspark.sql.functions import lit, create_map
from databricks.connect import DatabricksSession
from citibike.citibike_utils import get_trip_duration
from utils.datetime_utils import timestamp_to_date_col

In [None]:
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")

In [None]:
spark = DatabricksSession.builder.getOrCreate()

In [None]:
df = spark.read.table(f"{catalog}.bronze.jc_citibike")

In [None]:
df = get_trip_duration(spark, df, "started_at", "ended_at", "trip_duration_mins")

In [None]:
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")

In [None]:
df = df.withColumn(
    "metadata",
    create_map(
        lit("pipeline_id"),
        lit(pipeline_id),
        lit("run_id"),
        lit(run_id),
        lit("task_id"),
        lit(task_id),
        lit("processed_timestamp"),
        lit(processed_timestamp),
    ),
)

In [None]:
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata",
)

In [None]:
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    f"{catalog}.silver.jc_citibike"
)