In [0]:
service_credential = dbutils.secrets.get(scope="blockpulse_keyvault_scope3",key="blockpulsedatabrickssecret")

spark.conf.set("fs.azure.account.auth.type.blockpulsecrypto.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.blockpulsecrypto.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.blockpulsecrypto.dfs.core.windows.net", "39057514-1421-4818-9f98-8bcc69c384d4")
spark.conf.set("fs.azure.account.oauth2.client.secret.blockpulsecrypto.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.blockpulsecrypto.dfs.core.windows.net", "https://login.microsoftonline.com/0dba45aa-3354-4d4c-8c45-eea7b490aebe/oauth2/token")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_timestamp, lit, coalesce, year, month, dayofmonth, dayofweek
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DateType,
    DoubleType, LongType
)
from delta.tables import DeltaTable
from datetime import datetime
import traceback

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Blockpulse Gold Layer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Configuration
storage_account = "blockpulsecrypto"
container = "blockpulsedatacontainer"
today = datetime.now()
current_year, current_month, current_day = today.year, today.month, today.day

silver_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/year={current_year}/month={current_month:02d}/day={current_day:02d}/"
gold_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/gold/"

# Define Gold Table Schemas
gold_tables = {
    "crypto_market_snapshot_fact": StructType([
        StructField("crypto_id", StringType()),
        StructField("snapshot_date", DateType()),
        StructField("current_price_usd", DoubleType()),
        StructField("market_cap_usd", DoubleType()),
        StructField("total_volume_usd", DoubleType()),
        StructField("volume_change_pct_24h", DoubleType()),
        StructField("price_change_24h_usd", DoubleType()),
        StructField("price_change_pct_24h", DoubleType()),
        StructField("market_cap_rank", LongType()),
        StructField("circulating_supply", DoubleType()),
        StructField("total_supply", DoubleType()),
        StructField("max_supply", DoubleType()),
        StructField("all_time_high", DoubleType()),
        StructField("ath_change_pct", DoubleType()),
        StructField("all_time_low", DoubleType()),
        StructField("atl_change_pct", DoubleType()),
        StructField("high_24h_usd", DoubleType()),
        StructField("low_24h_usd", DoubleType())
    ]),
    "crypto_asset_dim": StructType([
        StructField("crypto_id", StringType()),
        StructField("symbol", StringType()),
        StructField("name", StringType()),
        StructField("image_url", StringType())
    ]),
    "date_dim": StructType([
        StructField("date", DateType()),
        StructField("year", LongType()),
        StructField("month", LongType()),
        StructField("day", LongType()),
        StructField("day_of_week", LongType())
    ])
}

# Ensure Gold Tables Exist
def ensure_delta_table(path, schema=None):
    try:
        if not DeltaTable.isDeltaTable(spark, path):
            print(f"Creating Delta table at {path}")
            empty_df = spark.createDataFrame([], schema or StructType([]))
            empty_df.write.format("delta").mode("overwrite").save(path)
        else:
            print(f"Delta table already exists at {path}")
    except Exception as e:
        print(f"Error ensuring table at {path}: {e}")
        traceback.print_exc()

for table_name, schema in gold_tables.items():
    ensure_delta_table(f"{gold_path}{table_name}", schema)

# Read Silver Tables
try:
    crypto_market_snapshot_fact_df = spark.read.format("delta").load(f"{silver_path}crypto_market_snapshot_fact/")
    crypto_asset_dim_df = spark.read.format("delta").load(f"{silver_path}crypto_asset_dim/")
    date_dim_df = spark.read.format("delta").load(f"{silver_path}date_dim/")
except Exception as e:
    print(f"Error reading from silver path: {e}")
    spark.stop()
    exit(1)

# Add processing metadata (optional)
for df in [crypto_market_snapshot_fact_df, crypto_asset_dim_df, date_dim_df]:
    df.withColumn("processing_time", current_timestamp())

# Write Gold Tables
def write_to_gold(df, table_name):
    try:
        df.write.format("delta").mode("overwrite").save(f"{gold_path}{table_name}")
        print(f"{table_name} successfully written to gold path.")
    except Exception as e:
        print(f"Error writing {table_name} to gold layer:", e)
        traceback.print_exc()

write_to_gold(crypto_market_snapshot_fact_df, "crypto_market_snapshot_fact")
write_to_gold(crypto_asset_dim_df, "crypto_asset_dim")
write_to_gold(date_dim_df, "date_dim")


Delta table already exists at abfss://blockpulsedatacontainer@blockpulsecrypto.dfs.core.windows.net/gold/crypto_market_snapshot_fact
Delta table already exists at abfss://blockpulsedatacontainer@blockpulsecrypto.dfs.core.windows.net/gold/crypto_asset_dim
Delta table already exists at abfss://blockpulsedatacontainer@blockpulsecrypto.dfs.core.windows.net/gold/date_dim
crypto_market_snapshot_fact successfully written to gold path.
crypto_asset_dim successfully written to gold path.
date_dim successfully written to gold path.
