In [None]:
# To get the project root dir to import custom modules
from pathlib import Path
import sys

project_root = Path().resolve().parents[2]
sys.path.append(str(project_root))

In [None]:
from src.citibike.citibike_utils import get_trip_duration_mins
from src.utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit
from databricks.connect import DatabricksSession

# Wouldn't need this for the job runtime as it defaults to use SparkSession already
#spark = DatabricksSession.builder.serverless().profile("fe_kfs").getOrCreate()

In [None]:
# To retrieve the parameters at the job run time for metadata

pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")

In [None]:
df = spark.read.table(f"{catalog}.01_bronze.jc_citibike")

In [None]:
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "get_trip_duration_mins")

In [None]:
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")

In [None]:
df = df.withColumn("metadata",
                   create_map(
                       lit("pipeline_id"), lit(pipeline_id),
                       lit("run_id"), lit(run_id),
                       lit("task_id"), lit(task_id),
                       lit("processed_timestamp"), lit(processed_timestamp)
                   ))

In [None]:
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "get_trip_duration_mins",
    "metadata"
)

In [None]:
df.write.mode("overwrite").options(overwriteSchema=True).saveAsTable(f"{catalog}.02_silver.jc_citibike")