In [0]:
# Define the catalog and schema names
silver_catalog_name = "trubrixai_dev_ws_cin_01" 
silver_schema_name = "silver_data"

# Create the schema if it doesn't already exist.
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_catalog_name}.{silver_schema_name}")

# Overwrite the variables with the new, correct three-level name and path
silver_table_name = f"{silver_catalog_name}.{silver_schema_name}.shipments"
silver_table_path = "abfss://silver@trubrixaidevst.dfs.core.windows.net/shipments"

print(f"Silver table name has been RESET to: {silver_table_name}")
print(f"Silver table path has been RESET to: {silver_table_path}")

In [0]:
# COMMAND ----------

# Cell 3: Read All Raw Data into a Single DataFrame
# Spark reads all CSV files from the landing zone directory.
raw_shipments_df = spark.read.format("csv").load(bronze_folder_path, header=True, inferSchema=True)

print("Displaying combined raw data from Bronze layer:")
display(raw_shipments_df)

In [0]:
# COMMAND ----------

# Cell 4: Clean and Transform Data
from pyspark.sql.functions import col, current_date, to_date

cleaned_shipments_df = raw_shipments_df.distinct()
cleaned_shipments_df = cleaned_shipments_df.withColumn("ingestion_date", current_date()) \
                                           .withColumn("shipment_date", to_date(col("shipment_date")))

print("Displaying cleaned and transformed data:")
display(cleaned_shipments_df)

In [0]:
# COMMAND ----------

# Cell 5: Save as a Delta Table in the Silver Layer
# This robust overwrite command will now work correctly every time.

print(f"Overwriting Silver table '{silver_table_name}'...")
(cleaned_shipments_df.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
#  .option("path", silver_table_path)
  .saveAsTable(silver_table_name)
)
print(f"Successfully overwrote Silver Delta table '{silver_table_name}'.")

# COMMAND ----------

In [0]:
# Cell 6: Verify the Silver Table by Reading It
print("\n--- Reading data back from the Silver Delta table to verify ---")
silver_df = spark.read.table(silver_table_name)

print(f"Verification successful. The Silver table now contains {silver_df.count()} rows.")
display(silver_df)