In [0]:
from pyspark.sql.functions import current_timestamp, col, lit
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [0]:
# GCS Paths
raw_path = "gs://databricks-glk-dbx-ext-storage/nyc_taxi_lakehouse/raw/"
bronze_table_path = "gs://databricks-glk-dbx-ext-storage/nyc_taxi_lakehouse/bronze/bronze_fhv_tripdata"
tracking_table_path = "gs://databricks-glk-dbx-ext-storage/nyc_taxi_lakehouse/bronze/ingested_files"

# Unity Catalog References
catalog_name = "nyc_taxi_catalog"
schema_name = "bronze"
tracking_table_name = "ingested_files"
tracking_table_fq = f"{catalog_name}.{schema_name}.{tracking_table_name}"

In [0]:
spark.sql(f"""
CREATE CATALOG IF NOT EXISTS {catalog_name}
MANAGED LOCATION 'gs://databricks-glk-dbx-ext-storage/nyc_taxi_lakehouse/'
""")

spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
spark.sql(f"USE SCHEMA {schema_name}")

In [0]:
# 3️⃣ Identify already-ingested files
# ------------------------------------------------------------
try:
    tracked_files_df = spark.read.format("delta").load(tracking_table_path)
    tracked_files = [row.file_name for row in tracked_files_df.collect()]
except Exception:
    tracked_files = []

print(f"Already tracked files: {tracked_files}")


In [0]:
# 4️⃣ List available raw files & pick next to ingest
# ------------------------------------------------------------
all_files = [f.path for f in dbutils.fs.ls(raw_path) if f.path.endswith(".parquet")]
all_files = sorted(all_files)  # chronological order

# Filter unprocessed files
new_files = [f for f in all_files if f.split("/")[-1] not in tracked_files]

# Pick one file per run
file_to_process = [new_files[0]] if new_files else []

if not file_to_process:
    print("✅ No new files to process. All up to date.")
else:
    print(f"File to process in this run: {file_to_process}")

In [0]:
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")

for file in file_to_process:
    # Read raw parquet
    df = spark.read.parquet(file)

    # Normalize timestamp columns (avoid timestampNtz issue)
    for cname, dtype in df.dtypes:
        if dtype.startswith("timestamp"):
            df = df.withColumn(cname, col(cname).cast("string"))

    # Write to Bronze Delta table (create or append)
    df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .save(bronze_table_path)

    # ------------------------------------------------------------
    # 6️⃣ Update tracking Delta table
    # ------------------------------------------------------------
    filename = file.split("/")[-1]

    # Create DataFrame with file_name only
    tracking_df = spark.createDataFrame([(filename,)], ["file_name"])

    # Add ingestion timestamp
    tracking_df = tracking_df.withColumn("ingestion_date", current_timestamp())

    # Append or overwrite tracking table
    if tracked_files:
        tracking_df.write.format("delta").mode("append").save(tracking_table_path)
    else:
        tracking_df.write.format("delta").mode("overwrite").save(tracking_table_path)

    print(f"✅ File {filename} ingested successfully")


In [0]:
# ------------------------------------------------------------
# 7️⃣ Validate tracking entries
# ------------------------------------------------------------
print("📋 Tracking Table Contents:")
try:
    spark.read.format("delta").load(tracking_table_path).show()
except Exception:
    print("⚠️ Tracking table not found yet.")

print("📋 Bronze Table Contents:")
try:
    display(spark.read.format("delta").load(bronze_table_path).count())
except Exception:
    print("⚠️ Bronze table not found yet.")

In [0]:
df = spark.read.format("delta").load(bronze_table_path)

# Show schema
df.printSchema()