In [0]:
# FRED Monthly Pipeline: 02_fred_monthly_pipeline

import requests
import pandas as pd
from datetime import datetime
import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType

start_time = time.time()

# === Config ===
API_KEY = "0edfc0525246965667057e6f44062902"

def get_series(series_id, label):
    url = "https://api.stlouisfed.org/fred/series/observations"
    params = {
        "series_id": series_id,
        "api_key": API_KEY,
        "file_type": "json"
    }
    response = requests.get(url, params=params)
    data = response.json()

    if "observations" not in data:
        print(f"⚠️ No observations found for series_id: {series_id}")
        print("Response:", data)
        return pd.DataFrame(columns=["date", label])

    df = pd.DataFrame(data['observations'])[['date', 'value']]
    df['date'] = pd.to_datetime(df['date'])
    df[label] = pd.to_numeric(df['value'], errors='coerce')
    return df[['date', label]]

# === Indicators (Monthly) ===
monthly_indicators = {
    "UnemploymentRate": "UNRATE",
    "CPI": "CPIAUCSL",
    "RetailSales": "RSAFS",
    "PersonalIncome": "PI",
    "HomePriceIndex": "CSUSHPINSA"
}

# === Build df_monthly ===
df_monthly = None
for label, series_id in monthly_indicators.items():
    df = get_series(series_id, label)
    df_monthly = df if df_monthly is None else df_monthly.merge(df, on="date", how="outer")

# === Clean ===
def clean_fred_df(df: pd.DataFrame, value_columns: list) -> pd.DataFrame:
    df = df.copy()
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    for col in value_columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    df = df.dropna(subset=value_columns, how='all')
    df = df.sort_values("date").reset_index(drop=True)
    return df

monthly_cols = list(monthly_indicators.keys())
df_monthly = clean_fred_df(df_monthly, monthly_cols)

# === Enrich ===
df_monthly['CPI_PctChange'] = df_monthly['CPI'].pct_change()
df_monthly['Retail_RealGrowth'] = df_monthly['RetailSales'].pct_change() - df_monthly['CPI_PctChange']
df_monthly['CPI_MA_3'] = df_monthly['CPI'].rolling(3).mean()
df_monthly['Income_vs_Inflation'] = df_monthly['PersonalIncome'].pct_change() - df_monthly['CPI_PctChange']
df_monthly['Year'] = df_monthly['date'].dt.year
df_monthly['Month'] = df_monthly['date'].dt.month

# === Convert to Spark & Save with Schema Merge ===
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark_df_monthly = spark.createDataFrame(df_monthly)
spark_df_monthly.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/mnt/datalake/fred/monthly")

# === Metadata Logging ===
end_time = time.time()
duration = int(end_time - start_time)
row_count = spark_df_monthly.count()

log_data = [( 
    "fred_monthly_pipeline",
    "fact_macro_monthly",
    datetime.utcnow(),
    row_count,
    "success",
    duration,
    "monthly" 
)]

log_schema = StructType([
    StructField("job_name", StringType(), True),
    StructField("table_name", StringType(), True),
    StructField("run_date", TimestampType(), True),
    StructField("row_count", LongType(), True),
    StructField("status", StringType(), True),
    StructField("duration_sec", LongType(), True),
    StructField("frequency", StringType(), True)
])

log_df = spark.createDataFrame(log_data, schema=log_schema)

log_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("frequency") \
    .save("/mnt/datalake/fred/logs/job_metadata")
    