In [0]:
dbutils.widgets.dropdown("rebuild", "False", ["True", "False"])
rebuild = True if dbutils.widgets.get("rebuild") == "True" else "False"

In [0]:
secret_scope_name = "fmdp-secrets"
client_id = dbutils.secrets.get(scope=secret_scope_name, key="fmdp-databricks-sp-client-id")
client_secret = dbutils.secrets.get(scope=secret_scope_name, key="fmdp-databricks-sp-client-secret")
tenant_id = dbutils.secrets.get(scope=secret_scope_name, key="tenant-id")
alpha_vantage_api_key = dbutils.secrets.get(scope=secret_scope_name, key="fmdp-alpha-vantage-api-key")

storage_account_name = "fmdpstg2"
bronze_path = f"abfss://financial-data@{storage_account_name}.dfs.core.windows.net/bronze"
silver_path = f"abfss://financial-data@{storage_account_name}.dfs.core.windows.net/silver"
gold_path = f"abfss://financial-data@{storage_account_name}.dfs.core.windows.net/gold"

In [0]:
configs = {
  f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net": "OAuth",
  f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net": client_id,
  f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net": client_secret,
  f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}

for k, v in configs.items(): spark.conf.set(k, v)

In [0]:
from pyspark.sql.functions import current_timestamp, current_date, col, lit, to_date, row_number, from_json, schema_of_json, explode, map_keys, min as min_func, max as max_func, when
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, LongType, BooleanType, TimestampType
from pyspark.sql.window import Window
import requests
import json
import time
from datetime import datetime
import hashlib
from delta.tables import DeltaTable

In [0]:
def av_time_series_daily_brz_to_sil(symbols=None, start_date=None, end_date=None, rebuild=False):
    """
    Transform TIME_SERIES_DAILY data from bronze to silver layer.
    
    Args:
        symbols (list, optional): List of stock symbols to process. If None, process all symbols.
        start_date (str, optional): Start date for processing in YYYY-MM-DD format
        end_date (str, optional): End date for processing in YYYY-MM-DD format
        rebuild (bool): Whether to rebuild the entire silver layer or perform incremental update
    
    Returns:
        bool: True if transformation successful, False otherwise
    """

    try:
        # Define table paths
        bronze_table = f"{bronze_path}/brz_av_time_series_daily"
        silver_table = f"{silver_path}/sil_av_time_series_daily"
        
        # Create silver directory if it doesn't exist
        silver_dir = silver_path
        dbutils.fs.mkdirs(silver_dir)
        
        # Start time for performance tracking
        start_time = datetime.now()
        print(f"Starting silver transformation at {start_time}")
        
        # Define explicit schema for silver layer
        silver_schema = StructType([
            StructField("symbol", StringType(), False),
            StructField("date", DateType(), False),
            StructField("open", DoubleType(), True),
            StructField("high", DoubleType(), True),
            StructField("low", DoubleType(), True),
            StructField("close", DoubleType(), True),
            StructField("volume", LongType(), True),
            StructField("source_batch_id", StringType(), True),
            StructField("source_load_type", StringType(), True),
            StructField("is_valid", BooleanType(), False),
            StructField("processing_date", DateType(), False),
            StructField("processing_timestamp", TimestampType(), False)
        ])
        
        # Read from bronze layer
        print("Reading from bronze layer...")
        bronze_df = spark.read.format("delta").load(bronze_table)
        
        # Filter by symbols if provided
        if symbols:
            bronze_df = bronze_df.filter(col("symbol").isin(symbols))
        
        # Filter by ingestion date if provided
        if start_date:
            bronze_df = bronze_df.filter(col("ingestion_date") >= start_date)
        if end_date:
            bronze_df = bronze_df.filter(col("ingestion_date") <= end_date)
        
        if not rebuild:
            # For incremental updates, only process the latest batch for each symbol
            print("Performing incremental update - selecting latest batches...")
            window_spec = Window.partitionBy("symbol").orderBy(col("ingestion_timestamp").desc())
            bronze_df = bronze_df.withColumn("row_num", row_number().over(window_spec)) \
                                .filter(col("row_num") == 1) \
                                .drop("row_num")
        else:
            # For full rebuild, check if we have any initial load records
            has_full = bronze_df.filter(col("load_type") == "full").count() > 0
            
            if has_full:
                print("Rebuild mode: Found full load records, prioritizing those...")
                # For each symbol, prioritize initial load if available, otherwise use latest batches
                window_spec = Window.partitionBy("symbol").orderBy(
                    # First prioritize initial loads, then by timestamp
                    when(col("load_type") == "full", 0).otherwise(1),
                    col("ingestion_timestamp").desc()
                )
                bronze_df = bronze_df.withColumn("row_num", row_number().over(window_spec)) \
                                    .filter(col("row_num") == 1) \
                                    .drop("row_num")
            else:
                print("Rebuild mode: No initial load records found, using all available data...")
                # Use the latest batch for each symbol since no initial load is available
                window_spec = Window.partitionBy("symbol").orderBy(col("ingestion_timestamp").desc())
                bronze_df = bronze_df.withColumn("row_num", row_number().over(window_spec)) \
                                    .filter(col("row_num") == 1) \
                                    .drop("row_num")
        
        # Count symbols being processed
        symbol_count = bronze_df.select("symbol").distinct().count()
        if symbol_count == 0:
            print("No data to process. Exiting.")
            return True
            
        print(f"Processing data for {symbol_count} symbols")
        
        # Create an empty DataFrame with our desired schema
        empty_rdd = spark.sparkContext.emptyRDD()
        silver_df = spark.createDataFrame(empty_rdd, silver_schema)
        
        # Process each symbol individually to handle the complex nested structure
        for symbol_row in bronze_df.collect():
            symbol = symbol_row.symbol
            batch_id = symbol_row.batch_id
            load_type = symbol_row.load_type
            raw_data = json.loads(symbol_row.raw_data)
            
            print(f"Processing symbol: {symbol} (load type: {load_type})")
            
            # Extract time series data
            time_series_data = raw_data.get("Time Series (Daily)", {})
            
            # Create rows for each date
            rows = []
            for date_str, daily_data in time_series_data.items():
                try:
                    # Parse values with error handling
                    open_price = float(daily_data.get("1. open", 0)) if daily_data.get("1. open") else None
                    high_price = float(daily_data.get("2. high", 0)) if daily_data.get("2. high") else None
                    low_price = float(daily_data.get("3. low", 0)) if daily_data.get("3. low") else None
                    close_price = float(daily_data.get("4. close", 0)) if daily_data.get("4. close") else None
                    volume = int(daily_data.get("5. volume", 0)) if daily_data.get("5. volume") else None
                    
                    # Check if data is valid
                    is_valid = (
                        open_price is not None and 
                        high_price is not None and 
                        low_price is not None and 
                        close_price is not None and 
                        volume is not None
                    )
                    
                    # Create a row
                    rows.append((
                        symbol,                         # symbol
                        datetime.strptime(date_str, "%Y-%m-%d").date(),  # date
                        open_price,                     # open
                        high_price,                     # high
                        low_price,                      # low
                        close_price,                    # close
                        volume,                         # volume
                        batch_id,                       # source_batch_id
                        load_type,                      # source_load_type
                        is_valid,                       # is_valid
                        datetime.now().date(),          # processing_date
                        datetime.now()                  # processing_timestamp
                    ))
                except Exception as e:
                    print(f"Error processing {symbol} for date {date_str}: {str(e)}")
            
            # Create a DataFrame from the rows for this symbol
            if rows:
                symbol_df = spark.createDataFrame(rows, silver_schema)
                
                # Union with the main DataFrame
                silver_df = silver_df.union(symbol_df)
        
        # Count the records before writing
        total_records = silver_df.count()
        print(f"Processed {total_records} records for {symbol_count} symbols")
        
        if total_records == 0:
            print("No records to write. Exiting.")
            return True
            
        # Write to silver layer
        print("Writing to silver layer...")
        
        # Check if silver table exists and choose appropriate write strategy
        silver_table_exists = True
        try:
            # Try to read the silver table to check if it exists
            spark.read.format("delta").load(silver_table).limit(1).count()
        except:
            silver_table_exists = False
        
        if rebuild or not silver_table_exists:
            # For rebuilds or new tables, overwrite completely
            print("Using overwrite mode for silver layer...")
            silver_df.write \
                .format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .partitionBy("symbol") \
                .save(silver_table)
        else:
            # For incremental updates, use merge operation
            print("Using merge operation for incremental update...")
            # Create DeltaTable instance
            delta_table = DeltaTable.forPath(spark, silver_table)
            
            # Perform MERGE operation
            delta_table.alias("target") \
                .merge(
                    silver_df.alias("source"),
                    "target.symbol = source.symbol AND target.date = source.date"
                ) \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
        
        # Calculate statistics for reporting
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        # Read back the silver table after write to get accurate counts
        updated_silver_df = spark.read.format("delta").load(silver_table)
        
        # Get record counts
        total_silver_records = updated_silver_df.count()
        valid_records = updated_silver_df.filter(col("is_valid") == True).count()
        invalid_records = total_silver_records - valid_records
        
        # Calculate date range correctly using spark functions
        date_min = updated_silver_df.agg(min_func("date")).collect()[0][0]
        date_max = updated_silver_df.agg(max_func("date")).collect()[0][0]
        
        # Print summary
        print("\n--- Silver Transformation Summary ---")
        print(f"Operation: {'Rebuild' if rebuild else 'Incremental update'}")
        print(f"Completed at: {end_time}")
        print(f"Duration: {duration:.2f} seconds")
        print(f"Symbols processed: {symbol_count}")
        print(f"Records processed in this run: {total_records}")
        print(f"Total records in silver layer: {total_silver_records}")
        print(f"Valid records: {valid_records}")
        print(f"Invalid records: {invalid_records}")
        print(f"Date range: {date_min} to {date_max}")
        
        return True
        
    except Exception as e:
        print(f"Error in silver transformation: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

In [0]:
# List of symbols to process (can be None to process all symbols in bronze)
symbols = ["AAPL", "MSFT", "AMZN", "META", "NVDA", "TSLA", "GOOGL", "QQQ", "SPY"]

# Optional date filters (can be None to process all dates)
start_date = None  # Format: "2023-01-01"
end_date = None    # Format: "2023-12-31"

# Choose the operation mode
# - For first run after initial data load, set rebuild=True
# - For daily updates after that, set rebuild=False

print(f"Running {'SILVER REBUILD' if rebuild else 'SILVER UPDATE'} at {datetime.now()}")

# Add timestamp for logging
run_start_time = datetime.now()
print(f"Starting silver transformation at {run_start_time}")

try:
    # Call the transformation function
    success = av_time_series_daily_brz_to_sil(
        symbols=symbols,        # List of symbols to process (or None for all)
        start_date=start_date,  # Optional date filter
        end_date=end_date,      # Optional date filter
        rebuild=rebuild    # Rebuild mode flag
    )
    
    # Summarize results
    run_end_time = datetime.now()
    duration = (run_end_time - run_start_time).total_seconds()
    
    print(f"\n--- Transformation Summary ---")
    print(f"Operation: {'Rebuild' if rebuild else 'Incremental update'}")
    print(f"Status: {'Successful' if success else 'Failed'}")
    print(f"Run completed at: {run_end_time}")
    print(f"Total duration: {duration:.2f} seconds")
    
except Exception as e:
    print(f"Error in transformation process: {str(e)}")
    import traceback
    traceback.print_exc()