# Stock Potential Identifier System Demo

This notebook demonstrates the core functionality of the Stock Potential Identifier System.

In [None]:
# Add parent directory to path for imports
import os
import sys
sys.path.insert(0, os.path.abspath('..'))

# Standard imports
import asyncio
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# Import system components
from src.data_acquisition import DataAcquisition
from src.feature_engineering import FeatureEngineering
from src.model_hierarchy import ModelHierarchy
from src.fusion_layer import FusionLayer
from src.signal_generator import SignalGenerator
from src.decision_engine import DecisionEngine
from src.utils import load_config, plot_feature_importance, plot_equity_curve

# Configure plotting
plt.style.use('ggplot')
sns.set_style('darkgrid')
plt.rcParams['figure.figsize'] = (12, 8)
%matplotlib inline

## 1. Initialize System Components

In [None]:
# Load configuration
config = load_config('../config/config.yaml')

# Initialize components
data_engine = DataAcquisition('../config/config.yaml')
feature_engine = FeatureEngineering('../config/config.yaml')
model_engine = ModelHierarchy('medium_term', '../config/config.yaml')
fusion_engine = FusionLayer('medium_term', '../config/config.yaml')
signal_engine = SignalGenerator('../config/config.yaml')
decision_engine = DecisionEngine('../config/config.yaml')

## 2. Data Acquisition

In [None]:
# Start data acquisition service
await data_engine.start()

# Get stock universe
symbols = await data_engine.get_stock_universe()
print(f"Analyzing {len(symbols)} stocks: {', '.join(symbols[:5])}...")

In [None]:
# Get market data for a sample stock
symbol = symbols[0]
market_data = await data_engine.get_market_data(symbol, period="2y", interval="1d")

# Display market data
print(f"Market data for {symbol}:")
market_data.tail()

In [None]:
# Plot the market data
plt.figure(figsize=(14, 7))

# Plot price and volume
ax1 = plt.subplot(2, 1, 1)
ax1.plot(market_data.index, market_data['Close'], label='Close Price')
ax1.set_title(f"{symbol} Price History")
ax1.set_ylabel('Price ($)')
ax1.legend()

ax2 = plt.subplot(2, 1, 2, sharex=ax1)
ax2.bar(market_data.index, market_data['Volume'], color='green', alpha=0.6, label='Volume')
ax2.set_ylabel('Volume')
ax2.set_xlabel('Date')
ax2.legend()

plt.tight_layout()
plt.show()

## 3. Feature Engineering

In [None]:
# Generate technical features
technical_features = feature_engine.generate_technical_features(market_data)

# Get fundamental data
fundamental_data = await data_engine.get_fundamentals(symbol)
fundamental_features = None

if not fundamental_data.empty:
    fundamental_features = feature_engine.generate_fundamental_features(fundamental_data)
    print("Fundamental features:")
    display(fundamental_features)
else:
    print("No fundamental data available.")

# Get alternative data
alt_data = await data_engine.get_alternative_data(symbol)
alt_features = None

if not alt_data.empty:
    alt_features = feature_engine.generate_alternative_features(alt_data)
    print("Alternative features:")
    display(alt_features)
else:
    print("No alternative data available.")

In [None]:
# Display subset of technical features
print(f"Generated {len(technical_features.columns)} technical features")

# Select key technical indicators to display
key_indicators = ['Close', 'rsi_14', 'macd_crossover', 'bb_width']
key_indicators = [col for col in key_indicators if col in technical_features.columns]

technical_features[key_indicators].tail()

In [None]:
# Plot key technical indicators
plt.figure(figsize=(14, 12))

# Plot price
ax1 = plt.subplot(4, 1, 1)
ax1.plot(technical_features.index, technical_features['Close'], label='Close')
ax1.set_title(f"{symbol} - Price and Technical Indicators")
ax1.set_ylabel('Price ($)')
ax1.legend()

# Plot RSI
if 'rsi_14' in technical_features.columns:
    ax2 = plt.subplot(4, 1, 2, sharex=ax1)
    ax2.plot(technical_features.index, technical_features['rsi_14'], color='purple')
    ax2.axhline(y=70, color='r', linestyle='--', alpha=0.5)
    ax2.axhline(y=30, color='g', linestyle='--', alpha=0.5)
    ax2.set_ylabel('RSI (14)')
    ax2.set_ylim(0, 100)

# Plot MACD
if all(x in technical_features.columns for x in ['MACD_12_26_9', 'MACDs_12_26_9']):
    ax3 = plt.subplot(4, 1, 3, sharex=ax1)
    ax3.plot(technical_features.index, technical_features['MACD_12_26_9'], label='MACD')
    ax3.plot(technical_features.index, technical_features['MACDs_12_26_9'], label='Signal')
    if 'MACDh_12_26_9' in technical_features.columns:
        ax3.bar(technical_features.index, technical_features['MACDh_12_26_9'], 
                color=np.where(technical_features['MACDh_12_26_9'] > 0, 'g', 'r'), alpha=0.5)
    ax3.set_ylabel('MACD')
    ax3.legend()

# Plot Bollinger Bands
if all(x in technical_features.columns for x in ['BBU_20_2.0', 'BBM_20_2.0', 'BBL_20_2.0']):
    ax4 = plt.subplot(4, 1, 4, sharex=ax1)
    ax4.plot(technical_features.index, technical_features['Close'], label='Close')
    ax4.plot(technical_features.index, technical_features['BBU_20_2.0'], 'r--', label='Upper BB')
    ax4.plot(technical_features.index, technical_features['BBM_20_2.0'], 'b--', label='Middle BB')
    ax4.plot(technical_features.index, technical_features['BBL_20_2.0'], 'g--', label='Lower BB')
    ax4.set_ylabel('Price ($)')
    ax4.set_xlabel('Date')
    ax4.legend()

plt.tight_layout()
plt.show()

In [None]:
# Combine all features
combined_features = feature_engine.combine_feature_sets(
    technical_features, fundamental_features, alt_features, normalize=True
)

print(f"Combined features shape: {combined_features.shape}")
combined_features.iloc[-5:, :10]  # Show last 5 rows, first 10 columns

## 4. Generate Target Labels

In [None]:
# Generate target labels for each horizon
horizons = ['short_term', 'medium_term', 'long_term']
target_labels = {}

for horizon in horizons:
    target_labels[horizon] = feature_engine.generate_target_labels(market_data, horizon)
    
# Display target distribution for each horizon
for horizon, labels in target_labels.items():
    positive_pct = labels['target'].mean() * 100
    print(f"{horizon}: {len(labels)} labels, {positive_pct:.1f}% positive")

In [None]:
# Plot future returns distribution
plt.figure(figsize=(14, 5))

for i, (horizon, labels) in enumerate(target_labels.items()):
    plt.subplot(1, 3, i+1)
    sns.histplot(labels['future_return'], kde=True)
    plt.axvline(x=0, color='r', linestyle='--')
    plt.title(f"{horizon} Returns Distribution")
    plt.xlabel('Future Return')
    plt.ylabel('Frequency')

plt.tight_layout()
plt.show()

## 5. Model Training

In [None]:
# Build models
model_engine.build_models()

# Prepare training data for a single stock (simplified example)
horizon = 'medium_term'
features = combined_features
targets = target_labels[horizon]

# Align features and targets
common_index = features.index.intersection(targets.index)
X_train = features.loc[common_index]
y_train = targets.loc[common_index, 'target']

print(f"Training data: {X_train.shape[0]} samples, {X_train.shape[1]} features")

In [None]:
# Train models (this may take some time)
train_results = model_engine.train_models(X_train, y_train)

# Display training results
for model_name, metrics in train_results.items():
    print(f"\n{model_name} training metrics:")
    for metric, value in metrics.items():
        if metric != 'top_features':  # Skip feature list
            print(f"  {metric}: {value:.4f}")

In [None]:
# Plot feature importance for tree-based models
for model_name, metrics in train_results.items():
    if 'top_features' in metrics and model_name in ['xgboost', 'lightgbm', 'random_forest']:
        # Convert to DataFrame for plotting
        feature_imp = pd.DataFrame(metrics['top_features'])
        
        plt.figure(figsize=(10, 8))
        sns.barplot(x='importance', y='feature', data=feature_imp)
        plt.title(f"{model_name} Feature Importance")
        plt.tight_layout()
        plt.show()

## 6. Prediction and Consensus

In [None]:
# Generate predictions on latest data
X_predict = X_train.iloc[[-1]]  # Just the latest date
predictions = model_engine.predict(X_predict)

# Display raw predictions
print("Raw model predictions:")
for model_name, pred in predictions.items():
    if 'probability' in pred and pred['probability'] is not None:
        print(f"  {model_name}: Class {pred['class'][0]}, Probability {pred['probability'][0]:.4f}")

In [None]:
# Detect market regime
market_regime = fusion_engine.detect_market_regime(market_data)
print(f"Detected market regime: {market_regime}")

# Generate consensus prediction
consensus = fusion_engine.generate_consensus(predictions, market_regime)

# Display consensus results
print("\nConsensus prediction:")
print(f"  Class: {consensus['class'][0]}")
print(f"  Probability: {consensus['probability'][0]:.4f}")
print(f"  Confidence: {consensus['confidence'][0]:.4f}")
print(f"\nModel weights:")
for model, weight in consensus['model_weights'].items():
    print(f"  {model}: {weight:.4f}")

## 7. Signal Generation

In [None]:
# Generate signals
signals = signal_engine.generate_signals(market_data, consensus, horizon, symbol)

# Display signals
if signals:
    signal = signals[0]  # Take the first signal
    print(f"Signal for {signal['symbol']}:")
    print(f"  Type: {signal['type']}")
    print(f"  Entry Price: ${signal['entry_price']:.2f}")
    print(f"  Stop Loss: ${signal['stop_loss']:.2f}")
    print(f"  Target: ${signal['target']:.2f}")
    print(f"  Risk-Reward: {signal['risk_reward']:.2f}")
    print(f"  Probability: {signal['probability']:.2f}")
    print(f"  Sharpe: {signal['sharpe']:.2f}")
    print(f"  Recommended Position Size: {signal['position_size']:.2f}")
else:
    print("No signals generated")

In [None]:
# Visualize the signal on the price chart
if signals:
    signal = signals[0]  # Take the first signal
    
    plt.figure(figsize=(12, 6))
    
    # Plot price history
    plt.plot(market_data.index, market_data['Close'], label='Close Price')
    
    # Get the latest date
    latest_date = market_data.index[-1]
    
    # Add entry point
    plt.scatter(latest_date, signal['entry_price'], color='blue', s=100, label='Entry')
    
    # Add stop loss and target
    if signal['type'] == 'LONG':
        stop_color = 'red'
        target_color = 'green'
    else:  # SHORT
        stop_color = 'green'
        target_color = 'red'
    
    plt.scatter(latest_date, signal['stop_loss'], color=stop_color, s=100, label='Stop Loss')
    plt.scatter(latest_date, signal['target'], color=target_color, s=100, label='Target')
    
    # Add horizontal lines
    plt.axhline(y=signal['entry_price'], color='blue', linestyle='--', alpha=0.3)
    plt.axhline(y=signal['stop_loss'], color=stop_color, linestyle='--', alpha=0.3)
    plt.axhline(y=signal['target'], color=target_color, linestyle='--', alpha=0.3)
    
    plt.title(f"{symbol} - {signal['type']} Signal")
    plt.xlabel('Date')
    plt.ylabel('Price ($)')
    plt.legend()
    plt.tight_layout()
    plt.show()

## 8. Decision Engine

In [None]:
# For demonstration, let's create multiple signals
# In a real system, these would come from analyzing multiple stocks
all_signals = signals.copy() if signals else []

# Add some mock signals for demonstration
if len(all_signals) < 3:
    additional_symbols = symbols[1:4] if len(symbols) > 4 else ['MSFT', 'GOOGL', 'AMZN']
    
    for i, sym in enumerate(additional_symbols):
        # Create a mock signal
        mock_signal = {
            'symbol': sym,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'type': 'LONG' if i % 2 == 0 else 'SHORT',
            'entry_price': 100 + i * 10,
            'stop_loss': (100 + i * 10) * (0.95 if i % 2 == 0 else 1.05),
            'target': (100 + i * 10) * (1.1 if i % 2 == 0 else 0.9),
            'probability': 0.6 + i * 0.05,
            'confidence': 0.7 + i * 0.03,
            'risk_reward': 2.0 + i * 0.2,
            'expected_value': 5 + i,
            'sharpe': 1.0 + i * 0.1,
            'horizon': horizon,
            'volatility': 0.01 + i * 0.005,
            'position_size': 0.5 + i * 0.1,
            'market_regime': market_regime
        }
        all_signals.append(mock_signal)

print(f"Total signals: {len(all_signals)}")

In [None]:
# Create a basic correlation matrix
symbols_used = [signal['symbol'] for signal in all_signals]
mock_corr = pd.DataFrame(np.random.uniform(-0.2, 0.8, (len(symbols_used), len(symbols_used))), 
                       index=symbols_used, columns=symbols_used)
np.fill_diagonal(mock_corr.values, 1)

# Make it symmetric
mock_corr = (mock_corr + mock_corr.T) / 2

# Set the correlation matrix
decision_engine.set_correlation_matrix(mock_corr)

# Rank opportunities
ranked_opportunities = decision_engine.rank_opportunities(all_signals)

print(f"Ranked opportunities: {len(ranked_opportunities)}")

In [None]:
# Generate decision report
market_context = {
    'regime': market_regime,
    'volatility': 'normal'
}

decision_report = decision_engine.generate_decision_report(ranked_opportunities, market_context)

# Display report summary
print("Decision Report Summary:")
print(decision_report['summary'])

In [None]:
# Display top opportunities
print("Top Opportunities:")
for i, op in enumerate(ranked_opportunities[:3], 1):
    print(f"\n{i}. {op['symbol']} - {op['type']}")
    print(f"   Entry: ${op['entry_price']:.2f}, Target: ${op['target']:.2f}, Stop: ${op['stop_loss']:.2f}")
    print(f"   Probability: {op['probability']:.2f}, Risk-Reward: {op['risk_reward']:.2f}")
    print(f"   Sharpe: {op['sharpe']:.2f}, Position Size: {op['position_size']:.2f}")

In [None]:
# Save the decision report
saved_files = decision_engine.save_decision_report(decision_report)

print("Saved decision report to:")
for format_type, path in saved_files.items():
    print(f"  {format_type}: {path}")

## 9. Backtesting a Strategy (Simplified Example)

This is a simplified example of how you might backtest a strategy based on the model's predictions.

In [None]:
# Train and backtest on historical data for simplified example
def backtest_symbol(symbol, horizon='medium_term'):
    print(f"Backtesting {symbol} for {horizon} horizon...")
    
    # Get data
    data = await data_engine.get_market_data(symbol, period="5y", interval="1d")
    if data.empty:
        print(f"No data for {symbol}")
        return None
        
    # Generate features
    features = feature_engine.generate_technical_features(data)
    
    # Generate target labels
    targets = feature_engine.generate_target_labels(data, horizon)
    
    # Align features and targets
    common_index = features.index.intersection(targets.index)
    features = features.loc[common_index]
    targets = targets.loc[common_index]
    
    # Use 70% for training, 30% for testing
    split_idx = int(len(features) * 0.7)
    
    X_train = features.iloc[:split_idx]
    y_train = targets.iloc[:split_idx]['target']
    
    X_test = features.iloc[split_idx:]
    y_test = targets.iloc[split_idx:]['target']
    
    # Train models
    model = ModelHierarchy(horizon)
    model.build_models()
    model.train_models(X_train, y_train)
    
    # Generate predictions
    predictions = model.predict(X_test)
    
    # Create consensus predictions
    fusion = FusionLayer(horizon)
    consensus = fusion.generate_consensus(predictions)
    
    # Create trading signals
    test_data = data.loc[X_test.index]
    
    # Create a simple trading strategy
    signals = pd.DataFrame(index=X_test.index)
    signals['actual'] = y_test
    signals['predicted'] = consensus['class']
    signals['probability'] = consensus['probability']
    signals['price'] = test_data['Close']
    signals['position'] = signals['predicted'].map({0: -1, 1: 1})  # 1 for long, -1 for short
    
    # Calculate returns
    signals['market_return'] = test_data['Close'].pct_change()
    signals['strategy_return'] = signals['position'].shift(1) * signals['market_return']
    
    # Calculate cumulative returns
    signals['cum_market_return'] = (1 + signals['market_return']).cumprod() - 1
    signals['cum_strategy_return'] = (1 + signals['strategy_return']).cumprod() - 1
    
    return signals

In [None]:
# Run backtest for a symbol
backtest_results = await backtest_symbol(symbols[0])

if backtest_results is not None:
    # Display final results
    market_return = backtest_results['cum_market_return'].iloc[-1] * 100
    strategy_return = backtest_results['cum_strategy_return'].iloc[-1] * 100
    hit_rate = (backtest_results['predicted'] == backtest_results['actual']).mean() * 100
    
    print(f"Backtest Results for {symbols[0]}:")
    print(f"Market Return: {market_return:.2f}%")
    print(f"Strategy Return: {strategy_return:.2f}%")
    print(f"Hit Rate: {hit_rate:.2f}%")
    
    # Plot equity curves
    plt.figure(figsize=(12, 6))
    plt.plot(backtest_results.index, backtest_results['cum_market_return'] * 100, label='Buy & Hold')
    plt.plot(backtest_results.index, backtest_results['cum_strategy_return'] * 100, label='Strategy')
    plt.title(f"{symbols[0]} - Backtest Performance")
    plt.xlabel('Date')
    plt.ylabel('Return (%)')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

## 10. Clean Up

In [None]:
# Stop data acquisition service
await data_engine.stop()
print("System stopped.")