In [4]:
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
from pyspark.sql.functions import create_map, lit

In [None]:
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")

KeyError: 'pipeline_id'

In [5]:
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True), 
    StructField("start_station_id", StringType(), True),   
    StructField("end_station_name", StringType(), True), 
    StructField("end_station_id", StringType(), True), 
    StructField("start_lat", DecimalType(), True), 
    StructField("start_lng", DecimalType(), True), 
    StructField("end_lat", DecimalType(), True), 
    StructField("end_lng", DecimalType(), True), 
    StructField("member_casual", StringType(), True), 
])

In [None]:
# With Parameters
df = spark.read.csv(f"/Volumes/{catalog}/00_landing/source_citybike_data/JC-202503-citibike-tripdata.csv", schema=schema, header=True)

In [None]:
# Adding metadata column using predefined parameters

df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit(pipeline_id),
                  lit("run_id"), lit(run_id),
                  lit("task_id"), lit(task_id),
                  lit("processed_timestamp"), lit(processed_timestamp)
                  ))

In [None]:
# write to table using parameters

df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.01_bronze.jc_citibike")