In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, sum as fsum, to_date, lit

spark = SparkSession.builder.getOrCreate()

In [0]:
dbutils.widgets.text("catalog", "new_york_taxi")            
dbutils.widgets.text("volume", "raw_files")
dbutils.widgets.text("bronze_schema", "bronze")      

CATALOG       = dbutils.widgets.get("catalog")
VOLUME        = dbutils.widgets.get("volume")
BRONZE_SCHEMA = dbutils.widgets.get("bronze_schema")

BRONZE = f"{CATALOG}.{BRONZE_SCHEMA}"
SILVER = f"{CATALOG}.{BRONZE_SCHEMA}"
GOLD = f"{CATALOG}.{BRONZE_SCHEMA}"
RAW_BASE_PATH = f"/Volumes/{CATALOG}/{BRONZE_SCHEMA}/{VOLUME}"


In [0]:
# 1. Ensure output schemas exist
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {BRONZE}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SILVER}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {GOLD}")

print("Catalog and schemas ensured:")
print("-", BRONZE)
print("-", SILVER)
print("-", GOLD)

In [0]:
# 2. Read raw tables from the volumes
base_path = f"/Volumes/{CATALOG}/{BRONZE_SCHEMA}/{VOLUME}"

green_path  = f"{base_path}/green_tripdata_2024-01.parquet"
yellow_path = f"{base_path}/yellow_tripdata_2024-01.parquet"
zones_path  = f"{base_path}/taxi_zone_lookup.csv"

green_raw = spark.read.parquet(green_path)
yellow_raw = spark.read.parquet(yellow_path)
zones_raw = spark.read.option("header", "true").csv(zones_path)

print("Files read from Volume:")
print("-", green_path)
print("-", yellow_path)
print("-", zones_path)

In [0]:
# 3. Write Bronze managed Delta tables
green_raw.write.mode("overwrite").saveAsTable(f"{BRONZE}.green_tripdata")
yellow_raw.write.mode("overwrite").saveAsTable(f"{BRONZE}.yellow_tripdata")
zones_raw.write.mode("overwrite").saveAsTable(f"{BRONZE}.taxi_zone_lookup")

print("Bronze tables written:")
print("-", f"{BRONZE}.green_tripdata")
print("-", f"{BRONZE}.yellow_tripdata")
print("-", f"{BRONZE}.taxi_zone_lookup")