In [0]:
raw_path = "s3a://crypto-databricks-1760666848-raw-trades/trades/crypto_trades.parquet"

raw_df = spark.read.parquet(raw_path)

print(f"Loaded {raw_df.count():,} raw tables")
print("\nSchema:")
raw_df.printSchema()
display(raw_df.limit(10))

In [0]:
from pyspark.sql.functions import current_timestamp, lit

bronze_df = raw_df \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("source_file", lit("crypto_tades.parquet"))

print("Added metadata columns")
display(bronze_df.limit(10))

In [0]:
bronze_path = "s3a://crypto-databricks-1760666848-raw-trades/delta/bronze_trades"

bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(bronze_path)

print(f"✓ Written {bronze_df.count():,} records to Bronze Delta table")
print(f"✓ Location: {bronze_path}")

In [0]:
bronze_path = "s3a://crypto-databricks-1760666848-raw-trades/delta/bronze_trades"

bronze_df = spark.read.format("delta").load(bronze_path)

print(f" Bronze tables has {bronze_df.count():,} records")
print("\nColumns:")
for col in bronze_df.columns:
    print(f" - {col})")

display(bronze_df.limit(10))

In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, bronze_path)

print("Table History")
display(delta_table.history())