In [2]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

from pyspark.sql.functions import col, current_timestamp, to_timestamp

# ==============================================================================
# 1. CONFIGURATION
# ==============================================================================
# PASTE YOUR PATH HERE AGAIN ðŸ‘‡
LAKEHOUSE_ROOT_PATH = "abfss://DE_Project_Portfolio@onelake.dfs.fabric.microsoft.com/Lh_Medallion.Lakehouse/Files"

# Define where the raw data lives
BRONZE_PATH = f"{LAKEHOUSE_ROOT_PATH}/bronze/crypto_raw/*.json"

# ==============================================================================
# 2. READ BRONZE (RAW)
# ==============================================================================
print(f"Reading data from: {BRONZE_PATH}")
df_raw = spark.read.json(BRONZE_PATH)

# ==============================================================================
# 3. TRANSFORM TO SILVER (CLEANING)
# ==============================================================================
df_silver = df_raw.select(
    # Rename columns for clarity
    col("id").alias("coin_id"),
    col("symbol"),
    col("name"),
    
    # CASTING: Enforce Data Types (The "Schema" part of Medallion)
    col("current_price").cast("double"),
    col("market_cap").cast("long"),
    col("total_volume").cast("long"),
    col("high_24h").cast("double"),
    col("low_24h").cast("double"),
    
    # Handle Time: Convert string to Timestamp
    to_timestamp(col("last_updated")).alias("last_updated_time")
)

# --- DEDUPLICATION ---
# If you ran the ingest script multiple times, you might have duplicates.
# We keep only unique combinations of Coin ID and Time.
df_silver = df_silver.dropDuplicates(["coin_id", "last_updated_time"])

# --- HANDLE NULLS ---
# Fill missing market caps with 0
df_silver = df_silver.fillna(0, subset=["market_cap", "total_volume"])

# --- AUDIT ---
# Add a timestamp so we know when this row was processed
df_silver = df_silver.withColumn("processed_at", current_timestamp())

# ==============================================================================
# 4. WRITE TO SILVER (DIRECT PATH METHOD)
# ==============================================================================
# Instead of relying on the "Default Lakehouse" setting, we write directly 
# to the "Tables" folder using your secure ABFSS path.

table_name = "silver_crypto"
# Construct the full path to the Tables folder
TABLE_PATH = f"{LAKEHOUSE_ROOT_PATH}/Tables/{table_name}"

print(f"Writing Delta Table to: {TABLE_PATH}...")

# CHANGE: Use .save(path) instead of .saveAsTable(name)
df_silver.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(TABLE_PATH)

print("âœ… SUCCESS: Silver table created in storage.")

# ==============================================================================
# 5. VERIFY DATA
# ==============================================================================
print("\n--- DATA PREVIEW (Reading back from storage) ---")
# We read from the path to verify
display(spark.read.format("delta").load(TABLE_PATH).limit(5))

StatementMeta(, 1b50aae1-a76f-44c8-b359-81ebc83b23ef, 4, Finished, Available, Finished)

Reading data from: abfss://DE_Project_Portfolio@onelake.dfs.fabric.microsoft.com/Lh_Medallion.Lakehouse/Files/bronze/crypto_raw/*.json
Writing Delta Table to: abfss://DE_Project_Portfolio@onelake.dfs.fabric.microsoft.com/Lh_Medallion.Lakehouse/Files/Tables/silver_crypto...
âœ… SUCCESS: Silver table created in storage.

--- DATA PREVIEW (Reading back from storage) ---


SynapseWidget(Synapse.DataFrame, 26e9254b-27e4-497f-899b-6999bb340db0)