# Daily Macro Signal Research Workflow
Complete pipeline from data ingestion to model training and backtesting.

## 1. Setup and Environment

In [None]:
# Install required packages (Kaggle environment)
!pip install prisma-client-python torch transformers optuna nixtla==0.2.8 fredapi yfinance pandas_datareader psycopg2-binary ta quantlib google-generativeai

In [None]:
import os
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)

# Import project modules
from src.db.client import get_db, health_check
from src.data_ingest.fred import ingest_fred_data
from src.data_ingest.yahoo import ingest_yahoo_data
from src.timejoin.spine import create_feature_spine
from src.features.tech import create_feature_matrix
from src.models.patchtst import PatchTSTWrapper, finetune_patchtst

print("✅ Environment setup complete")

## 2. Database Connection

In [None]:
# Set database connection
# In Kaggle, add these as secrets:
# DATABASE_URL, FRED_API_KEY, GEMINI_API_KEY

os.environ['DATABASE_URL'] = 'YOUR_POSTGRES_CONNECTION_STRING'
os.environ['FRED_API_KEY'] = 'YOUR_FRED_API_KEY'
os.environ['GEMINI_API_KEY'] = 'YOUR_GEMINI_API_KEY'

# Test database connection
health = await health_check()
print(f"Database health: {health['status']}")

## 3. Data Ingestion

In [None]:
# Ingest macro economic data from FRED
print("📊 Ingesting macro data...")
fred_count = await ingest_fred_data(os.getenv('FRED_API_KEY'))
print(f"Ingested {fred_count} macro release records")

# Ingest market data from Yahoo Finance
print("📈 Ingesting market data...")
market_count = await ingest_yahoo_data(
    tickers=['SPY', 'EURUSD=X', 'GLD', 'TLT'],
    start_date='2020-01-01',
    end_date='2024-12-31'
)
print(f"Ingested {market_count} market price records")

## 4. Feature Engineering

In [None]:
# Create feature spine with as-of joined macro data
print("🔧 Building feature spine...")
ticker = 'SPY'
feature_spine = await create_feature_spine(
    ticker=ticker,
    start_date='2021-01-01',
    end_date='2024-12-31',
    include_macro=True
)

print(f"Feature spine shape: {feature_spine.shape}")
print(f"Date range: {feature_spine['ts'].min()} to {feature_spine['ts'].max()}")

In [None]:
# Generate comprehensive technical features
print("🎯 Generating technical features...")
feature_matrix = create_feature_matrix(
    feature_spine,
    include_technical=True,
    include_macro=True,
    dropna=True
)

print(f"Feature matrix shape: {feature_matrix.shape}")
print(f"Features: {[col for col in feature_matrix.columns if col not in ['ts', 'ticker', 'open', 'high', 'low', 'close', 'volume']][:10]}...")

## 5. Model Training - PatchTST

In [None]:
# Prepare data for PatchTST
model = PatchTSTWrapper(
    context_length=256,
    prediction_length=30,
    random_state=42
)

# Use close price as target
X, y = model.prepare_data(feature_matrix, target_col='close')
print(f"Prepared data: X={X.shape}, y={y.shape}")

# Split into train/validation
split_idx = int(0.8 * len(X))
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]

print(f"Train: X={X_train.shape}, y={y_train.shape}")
print(f"Validation: X={X_val.shape}, y={y_val.shape}")

In [None]:
# Train PatchTST model
print("🚀 Training PatchTST model...")
model, metrics = finetune_patchtst(
    X_train, y_train, X_val, y_val,
    gpu=True,
    epochs=25,
    seed=42
)

print(f"Training completed!")
print(f"Final train loss: {metrics['final_train_loss']:.6f}")
print(f"Final validation loss: {metrics['final_val_loss']:.6f}")
print(f"Best validation loss: {metrics['best_val_loss']:.6f}")

## 6. Model Evaluation

In [None]:
# Generate predictions
predictions = model.predict(X_val)
print(f"Generated {len(predictions)} predictions")

# Plot training losses
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(metrics['train_losses'], label='Train Loss')
plt.plot(metrics['val_losses'], label='Validation Loss')
plt.title('Training Progress')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
# Plot sample predictions vs actual
sample_idx = 0
plt.plot(y_val[sample_idx], label='Actual', alpha=0.7)
plt.plot(predictions[sample_idx], label='Predicted', alpha=0.7)
plt.title('Sample Prediction vs Actual')
plt.xlabel('Time Steps')
plt.ylabel('Price')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

## 7. Store Model Artifacts

In [None]:
# Store trained model in database
from src.models.patchtst import torch_save_bytes

db = await get_db()
strategy_id = f'patchtst_{ticker}_{datetime.now().strftime("%Y%m%d_%H%M%S")}'

model_blob = torch_save_bytes(model)

await db.modelartifact.create({
    'strategy_id': strategy_id,
    'model_type': 'patchtst',
    'blob': model_blob,
    'metrics_json': metrics,
    'config_yaml': f'ticker: {ticker}\ncontext_length: 256\nprediction_length: 30'
})

print(f"✅ Model stored with strategy_id: {strategy_id}")

## 8. TimeGPT Comparison (Optional)

In [None]:
# Compare with TimeGPT API
try:
    from nixtla import TimeGPT
    
    # Initialize TimeGPT (requires API key)
    timegpt = TimeGPT(api_key=os.getenv('TIMEGPT_API_KEY'))
    
    # Prepare data for TimeGPT (needs specific format)
    timegpt_df = feature_matrix[['ts', 'close']].copy()
    timegpt_df.columns = ['ds', 'y']
    timegpt_df = timegpt_df.tail(500)  # Use recent data
    
    # Generate forecast
    timegpt_forecast = timegpt.forecast(timegpt_df, h=30)
    
    print("📈 TimeGPT forecast generated")
    print(timegpt_forecast.head())
    
except ImportError:
    print("⚠️ TimeGPT not available - skipping comparison")
except Exception as e:
    print(f"⚠️ TimeGPT error: {e}")

## 9. Export Results

In [None]:
# Export feature matrix and predictions for further analysis
feature_matrix.to_csv(f'feature_matrix_{ticker}.csv', index=False)
print(f"✅ Feature matrix exported: feature_matrix_{ticker}.csv")

# Export predictions
pred_df = pd.DataFrame({
    'prediction_id': range(len(predictions)),
    'predictions': [pred.tolist() for pred in predictions],
    'actual': [actual.tolist() for actual in y_val]
})
pred_df.to_json(f'predictions_{ticker}.json', orient='records')
print(f"✅ Predictions exported: predictions_{ticker}.json")

print("\n🎉 Daily workflow completed successfully!")
print(f"Strategy ID: {strategy_id}")
print(f"Model performance: {metrics['best_val_loss']:.6f} MSE")

## 8. Monitoring and Alert Generation 📊

Monitor system health, data quality, and model performance. Generate alerts for significant deviations or issues that require attention.

In [None]:
# Data Quality Monitoring
import logging
from datetime import datetime, timedelta

# Configure monitoring logger
monitoring_logger = logging.getLogger('monitoring')
monitoring_logger.setLevel(logging.INFO)

print("🔍 Running Data Quality Checks...")

# Check recent data ingestion
recent_cutoff = datetime.now() - timedelta(days=7)

# Check market data freshness
recent_prices = await db.marketprice.count(
    where={'created_at': {'gte': recent_cutoff}}
)

# Check macro data freshness  
recent_macro = await db.macrorelease.count(
    where={'created_at': {'gte': recent_cutoff}}
)

# Check features freshness
recent_features = await db.feature.count(
    where={'created_at': {'gte': recent_cutoff}}
)

print(f"📈 Recent Price Records: {recent_prices:,}")
print(f"🏛️ Recent Macro Records: {recent_macro:,}")
print(f"⚙️ Recent Feature Records: {recent_features:,}")

# Quality thresholds
alerts = []
if recent_prices < 100:
    alerts.append("⚠️ LOW PRICE DATA: Fewer than 100 records in past week")
if recent_macro < 10:
    alerts.append("⚠️ LOW MACRO DATA: Fewer than 10 releases in past week")
if recent_features < 1000:
    alerts.append("⚠️ LOW FEATURE DATA: Fewer than 1000 features in past week")

if alerts:
    print("\n🚨 DATA QUALITY ALERTS:")
    for alert in alerts:
        print(f"  {alert}")
else:
    print("✅ All data quality checks passed!")

In [None]:
# Model Performance Monitoring
print("\n🤖 Checking Model Performance...")

# Get recent backtest results
recent_backtests = await db.backtestresult.find_many(
    where={'created_at': {'gte': recent_cutoff}},
    order_by={'created_at': 'desc'},
    take=10
)

if recent_backtests:
    # Calculate performance metrics
    sharpe_ratios = [bt.sharpe_ratio for bt in recent_backtests]
    total_returns = [bt.total_return for bt in recent_backtests]
    max_drawdowns = [bt.max_drawdown for bt in recent_backtests]
    
    avg_sharpe = np.mean(sharpe_ratios)
    avg_return = np.mean(total_returns)
    worst_drawdown = min(max_drawdowns)
    
    print(f"📊 Recent Performance (last {len(recent_backtests)} backtests):")
    print(f"  Average Sharpe Ratio: {avg_sharpe:.3f}")
    print(f"  Average Return: {avg_return:.2%}")
    print(f"  Worst Drawdown: {worst_drawdown:.2%}")
    
    # Performance alerts
    performance_alerts = []
    if avg_sharpe < 0.5:
        performance_alerts.append("⚠️ LOW SHARPE: Average Sharpe ratio below 0.5")
    if avg_return < -0.05:
        performance_alerts.append("⚠️ NEGATIVE RETURNS: Average return below -5%")
    if worst_drawdown < -0.20:
        performance_alerts.append("⚠️ HIGH DRAWDOWN: Drawdown exceeding -20%")
        
    if performance_alerts:
        print("\n🚨 PERFORMANCE ALERTS:")
        for alert in performance_alerts:
            print(f"  {alert}")
    else:
        print("✅ Model performance within acceptable ranges")
else:
    print("⚠️ No recent backtest results found")

In [None]:
# System Health and Final Report
print("\n🏥 System Health Check...")

# Check database connections and table sizes
try:
    table_counts = {
        'MarketPrice': await db.marketprice.count(),
        'MacroRelease': await db.macrorelease.count(),
        'Feature': await db.feature.count(),
        'Signal': await db.signal.count(),
        'Strategy': await db.strategy.count()
    }
    
    print("📊 Database Table Counts:")
    for table, count in table_counts.items():
        print(f"  {table}: {count:,} records")
    
    # Storage usage estimation (simplified)
    total_records = sum(table_counts.values())
    estimated_storage_mb = total_records * 0.001  # ~1KB per record
    
    print(f"\n💾 Estimated Storage: {estimated_storage_mb:.1f} MB")
    
    if estimated_storage_mb > 10000:  # 10GB threshold
        print("⚠️ Storage usage exceeding 10GB - consider archiving old data")
    
except Exception as e:
    print(f"❌ Database health check failed: {e}")

# Workflow Summary
end_time = datetime.now()
total_duration = end_time - start_time

print(f"\n🎯 DAILY WORKFLOW SUMMARY")
print(f"=" * 50)
print(f"⏱️ Total Runtime: {total_duration}")
print(f"📅 Completed: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🎯 Strategy ID: {STRATEGY_ID}")
print(f"📊 Models Trained: {len(model_results) if 'model_results' in locals() else 0}")
print(f"🔄 Backtests Run: {len(recent_backtests) if recent_backtests else 0}")
print(f"🚨 Alerts Generated: {len(alerts + performance_alerts) if alerts else 0}")

if total_duration.total_seconds() < 3600:  # Under 1 hour
    print("✅ Workflow completed within performance target!")
else:
    print("⚠️ Workflow exceeded 1-hour target - consider optimization")

print(f"\n🚀 Daily workflow complete! Ready for Kaggle export.")