In [0]:
# ===================================================================
# NOTEBOOK: 04_Silver_to_Gold
# PURPOSE: Create business-ready analytics from Silver layer
# AUTHOR: Jose Veliz - Space Cowboy 
# DATE: 2025-10-19
# ===================================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Spark is already initialized
spark = spark

print(" Libraries imported")
print(" Space Cowboy's Gold Layer Analytics")
print(" Final layer - Business-ready insights!")
print("=" * 60)

In [0]:
# ===================================================================
# READ FROM SILVER LAYER
# ===================================================================

print(" Reading data from Silver layer...")

# Read from Silver table
silver_df = spark.table("silver_stock_prices")

print(f"\n SILVER DATA LOADED:")
print(f"   Total rows: {silver_df.count():,}")
print(f"   Symbols: {silver_df.select('symbol').distinct().count()}")
print(f"   Date range: {silver_df.agg(min('date'), max('date')).collect()[0]}")

print("\n" + "=" * 60)

In [0]:
# ===================================================================
# GOLD TABLE 1: DAILY STOCK SUMMARY
# ===================================================================

print(" Creating Gold Table 1: Daily Stock Summary")
print("=" * 60)

# Aggregate by date to get market-wide daily summary
gold_daily_summary = silver_df.groupBy("date").agg(
    # Trading activity
    count("symbol").alias("stocks_traded"),
    sum("volume").alias("total_volume"),
    
    # Price metrics (across all stocks)
    round(avg("close"), 2).alias("avg_closing_price"),
    round(avg("daily_change_pct"), 2).alias("avg_daily_change_pct"),
    round(avg("price_range_pct"), 2).alias("avg_volatility_pct"),
    
    # Market breadth (how many stocks up vs down)
    sum(when(col("daily_change_pct") > 0, 1).otherwise(0)).alias("stocks_up"),
    sum(when(col("daily_change_pct") < 0, 1).otherwise(0)).alias("stocks_down"),
    sum(when(col("daily_change_pct") == 0, 1).otherwise(0)).alias("stocks_flat"),
    
    # Extremes
    round(max("daily_change_pct"), 2).alias("biggest_gainer_pct"),
    round(min("daily_change_pct"), 2).alias("biggest_loser_pct")
).orderBy("date")

# Add derived metrics
gold_daily_summary = gold_daily_summary.withColumn(
    "market_sentiment",
    when(col("stocks_up") > col("stocks_down"), "BULLISH")
    .when(col("stocks_up") < col("stocks_down"), "BEARISH")
    .otherwise("NEUTRAL")
).withColumn(
    "gold_processing_timestamp",
    current_timestamp()
)

print(f"\n Daily Summary Table Created:")
print(f"   Rows: {gold_daily_summary.count()}")
print(f"   Columns: {len(gold_daily_summary.columns)}")

print("\n SAMPLE (Most Recent Days):")
gold_daily_summary.orderBy(desc("date")).show(10, truncate=False)

print("\n" + "=" * 60)

In [0]:
# ===================================================================
# GOLD TABLE 2: STOCK PERFORMANCE METRICS
# ===================================================================

print(" Creating Gold Table 2: Stock Performance Metrics")
print("=" * 60)

# Aggregate by symbol for overall performance metrics
gold_stock_performance = silver_df.groupBy("symbol").agg(
    # Basic stats
    count("*").alias("trading_days"),
    round(avg("close"), 2).alias("avg_closing_price"),
    round(min("close"), 2).alias("min_closing_price"),
    round(max("close"), 2).alias("max_closing_price"),
    
    # Performance
    round(avg("daily_change_pct"), 2).alias("avg_daily_change_pct"),
    round(stddev("daily_change_pct"), 2).alias("volatility_stddev"),
    round(sum("daily_change_pct"), 2).alias("cumulative_change_pct"),
    
    # Volume
    round(avg("volume"), 0).alias("avg_daily_volume"),
    sum("volume").alias("total_volume"),
    
    # Winning days vs losing days
    sum(when(col("daily_change_pct") > 0, 1).otherwise(0)).alias("winning_days"),
    sum(when(col("daily_change_pct") < 0, 1).otherwise(0)).alias("losing_days"),
    
    # Best and worst days
    round(max("daily_change_pct"), 2).alias("best_day_pct"),
    round(min("daily_change_pct"), 2).alias("worst_day_pct"),
    
    # Price range
    round(avg("price_range_pct"), 2).alias("avg_intraday_range_pct")
)

# Add derived metrics
gold_stock_performance = gold_stock_performance.withColumn(
    "win_rate_pct",
    round((col("winning_days") / col("trading_days")) * 100, 1)
).withColumn(
    "risk_reward_ratio",
    round(abs(col("avg_daily_change_pct")) / col("volatility_stddev"), 2)
).withColumn(
    "performance_rating",
    when(col("cumulative_change_pct") > 15, "STRONG")
    .when(col("cumulative_change_pct") > 5, "GOOD")
    .when(col("cumulative_change_pct") > -5, "NEUTRAL")
    .when(col("cumulative_change_pct") > -15, "WEAK")
    .otherwise("POOR")
).withColumn(
    "gold_processing_timestamp",
    current_timestamp()
).orderBy(desc("cumulative_change_pct"))

print(f"\n Stock Performance Table Created:")
print(f"   Rows: {gold_stock_performance.count()}")
print(f"   Columns: {len(gold_stock_performance.columns)}")

print("\n STOCK PERFORMANCE RANKINGS:")
gold_stock_performance.select(
    "symbol",
    "cumulative_change_pct",
    "avg_daily_change_pct",
    "volatility_stddev",
    "win_rate_pct",
    "performance_rating"
).show(truncate=False)

print("\n" + "=" * 60)

In [0]:
# ===================================================================
# GOLD TABLE 3: TOP PERFORMERS (DAILY GAINERS/LOSERS)
# ===================================================================

print(" Creating Gold Table 3: Top Performers Analysis")
print("=" * 60)

# Get top 3 gainers and losers for each date
window_spec = Window.partitionBy("date").orderBy(desc("daily_change_pct"))

gold_top_performers = silver_df.select(
    "date",
    "symbol",
    "open",
    "close",
    "daily_change",
    "daily_change_pct",
    "volume",
    row_number().over(window_spec).alias("rank")
).filter(col("rank") <= 3)  # Top 3 gainers each day

# Add category
gold_top_performers = gold_top_performers.withColumn(
    "category", lit("TOP_GAINER")
).withColumn(
    "gold_processing_timestamp", current_timestamp()
)

# Get top 3 losers
window_spec_losers = Window.partitionBy("date").orderBy("daily_change_pct")

gold_top_losers = silver_df.select(
    "date",
    "symbol",
    "open",
    "close",
    "daily_change",
    "daily_change_pct",
    "volume",
    row_number().over(window_spec_losers).alias("rank")
).filter(col("rank") <= 3)  # Top 3 losers each day

gold_top_losers = gold_top_losers.withColumn(
    "category", lit("TOP_LOSER")
).withColumn(
    "gold_processing_timestamp", current_timestamp()
)

# Combine gainers and losers
gold_top_performers_combined = gold_top_performers.union(gold_top_losers)

print(f"\n Top Performers Table Created:")
print(f"   Rows: {gold_top_performers_combined.count()}")
print(f"   (Top 3 gainers + top 3 losers per day)")

print("\n MOST RECENT DAY - TOP GAINERS:")
gold_top_performers.filter(col("rank") <= 3).orderBy(desc("date"), "rank").limit(3).show(truncate=False)

print("\n MOST RECENT DAY - TOP LOSERS:")
gold_top_losers.filter(col("rank") <= 3).orderBy(desc("date"), "rank").limit(3).show(truncate=False)

print("\n" + "=" * 60)

In [0]:
# ===================================================================
# SAVE ALL GOLD TABLES
# ===================================================================

print(" Saving Gold layer tables...")
print("=" * 60)

# Table 1: Daily Summary
print("\n Saving gold_daily_summary...")
gold_daily_summary.createOrReplaceTempView("gold_daily_summary_temp")
spark.sql("""
    CREATE OR REPLACE TABLE gold_daily_summary
    USING DELTA
    AS SELECT * FROM gold_daily_summary_temp
""")
print(f"   Saved: gold_daily_summary ({gold_daily_summary.count()} rows)")

# Table 2: Stock Performance
print("\n Saving gold_stock_performance...")
gold_stock_performance.createOrReplaceTempView("gold_stock_performance_temp")
spark.sql("""
    CREATE OR REPLACE TABLE gold_stock_performance
    USING DELTA
    AS SELECT * FROM gold_stock_performance_temp
""")
print(f"   Saved: gold_stock_performance ({gold_stock_performance.count()} rows)")

# Table 3: Top Performers
print("\n Saving gold_top_performers...")
gold_top_performers_combined.createOrReplaceTempView("gold_top_performers_temp")
spark.sql("""
    CREATE OR REPLACE TABLE gold_top_performers
    USING DELTA
    PARTITIONED BY (date)
    AS SELECT * FROM gold_top_performers_temp
""")
print(f"   Saved: gold_top_performers ({gold_top_performers_combined.count()} rows)")

print("\n ALL GOLD TABLES SAVED!")
print("=" * 60)

# Saving Gold layer tables...
# ============================================================

# 1️⃣ Saving gold_daily_summary...
#    ✅ Saved: gold_daily_summary (100 rows)
# 2️⃣ Saving gold_stock_performance...
#    ✅ Saved: gold_stock_performance (5 rows)
# 3️⃣ Saving gold_top_performers...
#    ✅ Saved: gold_top_performers (600 rows)
# ✅ ALL GOLD TABLES SAVED!

In [0]:
# ===================================================================
# FINAL VERIFICATION & PIPELINE SUMMARY
# ===================================================================

print(" Final verification of complete pipeline...")
print("=" * 60)

print("\n BRONZE LAYER:")
bronze_count = spark.table("bronze_stock_prices").count()
print(f"   Table: bronze_stock_prices")
print(f"   Rows: {bronze_count:,}")
print(f"   Purpose: Raw data from API")

print("\n SILVER LAYER:")
silver_count = spark.table("silver_stock_prices").count()
print(f"   Table: silver_stock_prices")
print(f"   Rows: {silver_count:,}")
print(f"   Purpose: Cleaned & transformed data")

print("\n GOLD LAYER:")
gold1_count = spark.table("gold_daily_summary").count()
gold2_count = spark.table("gold_stock_performance").count()
gold3_count = spark.table("gold_top_performers").count()
print(f"   Table 1: gold_daily_summary ({gold1_count} rows)")
print(f"   Table 2: gold_stock_performance ({gold2_count} rows)")
print(f"   Table 3: gold_top_performers ({gold3_count} rows)")
print(f"   Purpose: Business-ready analytics")

print("\n" + "=" * 60)
print(" COMPLETE DATA PIPELINE BUILT! ")
print("=" * 60)
print("\n🏆 MEDALLION ARCHITECTURE:")
print("   ✅ Bronze → Raw ingestion")
print("   ✅ Silver → Data quality & transformation")
print("   ✅ Gold → Business analytics")
print("\n💎 TECHNOLOGIES DEMONSTRATED:")
print("   ✅ Databricks")
print("   ✅ Delta Lake")
print("   ✅ PySpark")
print("   ✅ REST API integration")
print("   ✅ Medallion architecture")
print("   ✅ Data quality checks")
print("   ✅ Performance optimization (partitioning)")
print("\n Space Cowboy has landed on the moon! ")
print("=" * 60)
