# 03 - Gold Business Analytics

**Purpose:** Generate portfolio risk metrics and business analytics from Silver layer data

**Author:** Jonah A.  
**Created:** 2025-07-30

**Architecture Layer:** Gold (Business Analytics)

**Input:** Silver layer enhanced view `silver_daily_prices_enhanced`  
**Output:** Portfolio risk metrics, VaR calculations, and business insights

**Business Value:** Investment risk analysis and portfolio monitoring for AI stock portfolio

In [0]:
# Recreate complete pipeline with fixed column references
%pip install yfinance

import yfinance as yf
from datetime import datetime
from pyspark.sql import Row, functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

print("🔄 Recreating Bronze → Silver pipeline (fixed)...")

# Step 1: Download AAPL data
test_data = yf.download("AAPL", period="5d")

# Step 2: Convert to Bronze format
bronze_rows = []
ingestion_time = datetime.now()

for date_idx, row in test_data.iterrows():
    bronze_row = Row(
        symbol="AAPL",
        trading_date=date_idx.date(),  # Use proper date from start
        open=float(row[('Open', 'AAPL')]),
        high=float(row[('High', 'AAPL')]),
        low=float(row[('Low', 'AAPL')]),
        close=float(row[('Close', 'AAPL')]),
        volume=int(row[('Volume', 'AAPL')]),
        data_source="yahoo_finance"
    )
    bronze_rows.append(bronze_row)

# Step 3: Create Silver with proper schema
bronze_schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("trading_date", DateType(), True),  # Proper date type
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", LongType(), True),
    StructField("data_source", StringType(), True)
])

bronze_df = spark.createDataFrame(bronze_rows, bronze_schema)

# Silver transformation with correct column references
window_spec = Window.partitionBy("symbol").orderBy("trading_date")

silver_df = bronze_df.select(
    F.col("symbol"),
    F.col("trading_date"),
    F.round(F.col("close"), 2).alias("close_price"),
    F.col("volume").alias("trading_volume")
).withColumn(
    "daily_return_pct", 
    F.round(
        ((F.col("close_price") - F.lag("close_price").over(window_spec)) 
         / F.lag("close_price").over(window_spec) * 100), 4
    )
).orderBy("trading_date")

silver_df.createOrReplaceTempView("silver_for_gold")

print("✅ Fixed Silver data ready:")
silver_df.show()

In [0]:
# Gold layer risk analytics
silver_df = spark.table("silver_for_gold")

# Calculate key risk metrics (excluding NULL values)
risk_metrics = silver_df.filter(F.col("daily_return_pct").isNotNull()).agg(
    F.avg("daily_return_pct").alias("avg_daily_return"),
    F.stddev("daily_return_pct").alias("daily_volatility"),
    F.min("daily_return_pct").alias("worst_daily_return"),
    F.max("daily_return_pct").alias("best_daily_return"),
    F.count("daily_return_pct").alias("trading_days")
).collect()[0]

print("📊 AAPL Risk Profile (4-day sample):")
print(f"Average Daily Return: {risk_metrics['avg_daily_return']:.4f}%")
print(f"Daily Volatility: {risk_metrics['daily_volatility']:.4f}%")
print(f"Best Day: +{risk_metrics['best_daily_return']:.4f}%")
print(f"Worst Day: {risk_metrics['worst_daily_return']:.4f}%")
print(f"Trading Days Analyzed: {risk_metrics['trading_days']}")

# Annualized metrics (standard financial industry calculation)
annual_return = risk_metrics['avg_daily_return'] * 252  # 252 trading days/year
annual_volatility = risk_metrics['daily_volatility'] * (252 ** 0.5)  # Square root of time scaling

print(f"\n📈 Annualized Risk Estimates:")
print(f"Expected Annual Return: {annual_return:.2f}%")
print(f"Annual Volatility: {annual_volatility:.2f}%")

# Risk-adjusted metrics
if annual_volatility > 0:
    sharpe_estimate = annual_return / annual_volatility  # Simplified Sharpe ratio (assuming 0% risk-free rate)
    print(f"Risk-Adjusted Return Ratio: {sharpe_estimate:.4f}")

In [0]:
# Create Gold layer business summary
portfolio_summary = spark.sql("""
    SELECT 
        symbol,
        MIN(trading_date) as period_start,
        MAX(trading_date) as period_end,
        ROUND(MIN(close_price), 2) as period_low,
        ROUND(MAX(close_price), 2) as period_high,
        ROUND(FIRST(close_price), 2) as period_open,
        ROUND(LAST(close_price), 2) as period_close,
        ROUND(((LAST(close_price) - FIRST(close_price)) / FIRST(close_price) * 100), 2) as total_return_pct,
        ROUND(AVG(trading_volume), 0) as avg_daily_volume
    FROM silver_for_gold 
    WHERE trading_date IS NOT NULL
    GROUP BY symbol
    ORDER BY symbol
""")

print("📈 Gold Layer Portfolio Summary:")
portfolio_summary.show()

# Create final Gold table for business users
portfolio_summary.createOrReplaceTempView("gold_portfolio_summary")

print("✅ Gold layer complete - Business analytics ready for executives")
print("\n🎯 Business Insight:")
total_return = portfolio_summary.collect()[0]['total_return_pct']
if total_return > 0:
    print(f"✅ AAPL gained {total_return}% over the analysis period")
else:
    print(f"⚠️  AAPL declined {abs(total_return)}% over the analysis period")