# LSTM & Transformer for High-Frequency Trading Prediction

This notebook demonstrates how to use deep learning sequence models for predicting short-term price movements.

## Key Concepts
- **LSTM (Long Short-Term Memory)**: Recurrent network that captures temporal dependencies
- **Transformer**: Attention-based model that can look at all timesteps simultaneously
- **Features**: OHLCV data, technical indicators, order book imbalance

## Requirements
```bash
pip install torch numpy pandas matplotlib yfinance ta
```

In [None]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report
import warnings
warnings.filterwarnings('ignore')

torch.manual_seed(42)
np.random.seed(42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 1. Generate Synthetic High-Frequency Data

For demonstration, we generate synthetic tick data with realistic properties.

In [None]:
def generate_synthetic_hft_data(n_ticks=100000, tick_size=0.01):
    """Generate synthetic tick data with microstructure effects"""
    # Initialize
    prices = [100.0]
    volumes = []
    bid_ask_spreads = []
    order_imbalances = []
    
    # Regime switching volatility
    volatilities = [0.0001, 0.0003, 0.0005]  # Low, medium, high vol regimes
    regime = 0
    
    for i in range(n_ticks - 1):
        # Regime switching (Markov)
        if np.random.random() < 0.001:
            regime = np.random.choice([0, 1, 2], p=[0.5, 0.35, 0.15])
        
        vol = volatilities[regime]
        
        # Price change with mean reversion and momentum
        momentum = 0.1 * (prices[-1] - np.mean(prices[-100:])) if len(prices) > 100 else 0
        noise = np.random.randn() * vol * prices[-1]
        price_change = -0.0001 * momentum + noise
        
        # Round to tick size
        new_price = round((prices[-1] + price_change) / tick_size) * tick_size
        new_price = max(new_price, 1.0)  # Floor at $1
        prices.append(new_price)
        
        # Volume (correlated with volatility)
        base_volume = 100 + 50 * regime
        volume = int(np.random.exponential(base_volume))
        volumes.append(volume)
        
        # Bid-ask spread (wider in high vol)
        spread = tick_size * (1 + regime + np.random.exponential(0.5))
        bid_ask_spreads.append(spread)
        
        # Order imbalance (predictive signal)
        imbalance = np.clip(np.random.randn() * 0.3 + 0.2 * np.sign(price_change), -1, 1)
        order_imbalances.append(imbalance)
    
    # Add last volume etc
    volumes.append(volumes[-1])
    bid_ask_spreads.append(bid_ask_spreads[-1])
    order_imbalances.append(order_imbalances[-1])
    
    df = pd.DataFrame({
        'price': prices,
        'volume': volumes,
        'spread': bid_ask_spreads,
        'imbalance': order_imbalances
    })
    
    return df

# Generate data
df = generate_synthetic_hft_data(100000)
print(f"Generated {len(df)} ticks")
print(df.head(10))

## 2. Feature Engineering

In [None]:
def create_features(df, lookback_windows=[5, 10, 20, 50]):
    """Create features for prediction"""
    features = pd.DataFrame(index=df.index)
    
    # Price returns at different scales
    features['return_1'] = df['price'].pct_change(1)
    features['return_5'] = df['price'].pct_change(5)
    features['return_10'] = df['price'].pct_change(10)
    
    # Moving averages ratio
    for w in lookback_windows:
        ma = df['price'].rolling(w).mean()
        features[f'ma_ratio_{w}'] = df['price'] / ma - 1
    
    # Volatility at different scales
    for w in lookback_windows:
        features[f'volatility_{w}'] = df['price'].pct_change().rolling(w).std()
    
    # Volume features
    features['volume_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean()
    features['volume_std'] = df['volume'].rolling(20).std() / df['volume'].rolling(20).mean()
    
    # Spread features
    features['spread'] = df['spread']
    features['spread_ma_ratio'] = df['spread'] / df['spread'].rolling(20).mean()
    
    # Order imbalance
    features['imbalance'] = df['imbalance']
    features['imbalance_ma'] = df['imbalance'].rolling(10).mean()
    
    # Target: direction of next tick (up=1, down=0)
    features['target'] = (df['price'].shift(-1) > df['price']).astype(int)
    
    # Drop NaN rows
    features = features.dropna()
    
    return features

features = create_features(df)
print(f"Features shape: {features.shape}")
print(f"\nFeature columns: {list(features.columns)}")

## 3. Prepare Sequences for LSTM/Transformer

In [None]:
def prepare_sequences(features, seq_length=50, train_ratio=0.8):
    """Prepare sequences for sequence models"""
    # Separate features and target
    X = features.drop('target', axis=1).values
    y = features['target'].values
    
    # Normalize features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # Create sequences
    X_seq, y_seq = [], []
    for i in range(len(X_scaled) - seq_length):
        X_seq.append(X_scaled[i:i+seq_length])
        y_seq.append(y[i+seq_length-1])  # Predict direction at end of sequence
    
    X_seq = np.array(X_seq)
    y_seq = np.array(y_seq)
    
    # Train/test split (no shuffle to maintain temporal order)
    split_idx = int(len(X_seq) * train_ratio)
    
    X_train = torch.tensor(X_seq[:split_idx], dtype=torch.float32)
    y_train = torch.tensor(y_seq[:split_idx], dtype=torch.long)
    X_test = torch.tensor(X_seq[split_idx:], dtype=torch.float32)
    y_test = torch.tensor(y_seq[split_idx:], dtype=torch.long)
    
    return X_train, y_train, X_test, y_test, scaler

SEQ_LENGTH = 50
X_train, y_train, X_test, y_test, scaler = prepare_sequences(features, SEQ_LENGTH)

print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"\nClass distribution (train): {np.bincount(y_train.numpy())}")

## 4. LSTM Model

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, num_layers=2, dropout=0.2):
        super().__init__()
        self.lstm = nn.LSTM(
            input_dim, hidden_dim, num_layers,
            batch_first=True, dropout=dropout, bidirectional=True
        )
        self.attention = nn.Linear(hidden_dim * 2, 1)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 2)
        )
    
    def forward(self, x):
        # LSTM encoding
        lstm_out, _ = self.lstm(x)  # (batch, seq, hidden*2)
        
        # Attention weights
        attn_weights = torch.softmax(self.attention(lstm_out), dim=1)
        
        # Weighted sum
        context = (lstm_out * attn_weights).sum(dim=1)  # (batch, hidden*2)
        
        # Classification
        return self.fc(context)

# Initialize model
input_dim = X_train.shape[2]
lstm_model = LSTMClassifier(input_dim, hidden_dim=64).to(device)
print(f"LSTM parameters: {sum(p.numel() for p in lstm_model.parameters()):,}")

## 5. Transformer Model

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=100):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.input_projection = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model, nhead, dim_feedforward=128, dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.fc = nn.Sequential(
            nn.Linear(d_model, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 2)
        )
    
    def forward(self, x):
        # Project to d_model dimensions
        x = self.input_projection(x)
        x = self.pos_encoder(x)
        
        # Transformer encoding
        x = self.transformer(x)
        
        # Use last timestep for classification
        x = x[:, -1, :]
        
        return self.fc(x)

# Initialize model
transformer_model = TransformerClassifier(input_dim, d_model=64).to(device)
print(f"Transformer parameters: {sum(p.numel() for p in transformer_model.parameters()):,}")

## 6. Training Loop

In [None]:
def train_model(model, X_train, y_train, X_test, y_test, epochs=50, batch_size=256, lr=0.001):
    """Train a model and return training history"""
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
    
    X_train_dev = X_train.to(device)
    y_train_dev = y_train.to(device)
    X_test_dev = X_test.to(device)
    y_test_dev = y_test.to(device)
    
    train_losses = []
    test_accs = []
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        n_batches = 0
        
        # Mini-batch training
        indices = torch.randperm(len(X_train_dev))
        for i in range(0, len(X_train_dev), batch_size):
            batch_idx = indices[i:i+batch_size]
            X_batch = X_train_dev[batch_idx]
            y_batch = y_train_dev[batch_idx]
            
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            total_loss += loss.item()
            n_batches += 1
        
        avg_loss = total_loss / n_batches
        train_losses.append(avg_loss)
        
        # Evaluate
        model.eval()
        with torch.no_grad():
            test_outputs = model(X_test_dev)
            test_preds = test_outputs.argmax(dim=1)
            test_acc = (test_preds == y_test_dev).float().mean().item()
            test_accs.append(test_acc)
        
        scheduler.step(avg_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}: Loss = {avg_loss:.4f}, Test Acc = {test_acc:.4f}")
    
    return train_losses, test_accs

# Train LSTM
print("Training LSTM...")
lstm_losses, lstm_accs = train_model(lstm_model, X_train, y_train, X_test, y_test, epochs=30)

# Train Transformer
print("\nTraining Transformer...")
transformer_losses, transformer_accs = train_model(transformer_model, X_train, y_train, X_test, y_test, epochs=30)

## 7. Compare Models

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Training loss
axes[0].plot(lstm_losses, label='LSTM', linewidth=2)
axes[0].plot(transformer_losses, label='Transformer', linewidth=2)
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Training Loss')
axes[0].set_title('Training Loss Comparison')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Test accuracy
axes[1].plot(lstm_accs, label='LSTM', linewidth=2)
axes[1].plot(transformer_accs, label='Transformer', linewidth=2)
axes[1].axhline(y=0.5, color='gray', linestyle='--', label='Random baseline')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Test Accuracy')
axes[1].set_title('Test Accuracy Comparison')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nFinal Test Accuracy:")
print(f"  LSTM: {lstm_accs[-1]:.4f}")
print(f"  Transformer: {transformer_accs[-1]:.4f}")

## 8. Detailed Evaluation

In [None]:
def evaluate_model(model, X_test, y_test, model_name):
    """Detailed evaluation of a model"""
    model.eval()
    with torch.no_grad():
        X_test_dev = X_test.to(device)
        outputs = model(X_test_dev)
        probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()
        preds = outputs.argmax(dim=1).cpu().numpy()
    
    y_true = y_test.numpy()
    
    print(f"\n{'='*50}")
    print(f"{model_name} Evaluation")
    print(f"{'='*50}")
    print(classification_report(y_true, preds, target_names=['Down', 'Up']))
    
    # Confidence analysis
    high_conf_mask = (probs > 0.6) | (probs < 0.4)
    if high_conf_mask.sum() > 0:
        high_conf_acc = accuracy_score(y_true[high_conf_mask], preds[high_conf_mask])
        print(f"High confidence predictions ({high_conf_mask.sum()} samples): {high_conf_acc:.4f} accuracy")
    
    return probs, preds

lstm_probs, lstm_preds = evaluate_model(lstm_model, X_test, y_test, "LSTM")
transformer_probs, transformer_preds = evaluate_model(transformer_model, X_test, y_test, "Transformer")

## 9. Simulated Trading Performance

In [None]:
def simulate_trading(y_true, y_pred, probs, threshold=0.55):
    """Simulate trading based on predictions"""
    pnl = []
    trades = []
    
    for i in range(len(y_true)):
        conf = abs(probs[i] - 0.5) * 2  # Confidence 0-1
        
        # Only trade on high confidence predictions
        if max(probs[i], 1-probs[i]) > threshold:
            predicted_direction = 1 if probs[i] > 0.5 else -1
            actual_direction = 1 if y_true[i] == 1 else -1
            
            # P&L: +1 if correct, -1 if wrong
            trade_pnl = predicted_direction * actual_direction
            pnl.append(trade_pnl)
            trades.append(i)
    
    return np.array(pnl), trades

# Simulate trading
lstm_pnl, lstm_trades = simulate_trading(y_test.numpy(), lstm_preds, lstm_probs, threshold=0.55)
transformer_pnl, transformer_trades = simulate_trading(y_test.numpy(), transformer_preds, transformer_probs, threshold=0.55)

# Plot cumulative P&L
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

if len(lstm_pnl) > 0:
    axes[0].plot(np.cumsum(lstm_pnl), label='LSTM', linewidth=2)
if len(transformer_pnl) > 0:
    axes[0].plot(np.cumsum(transformer_pnl), label='Transformer', linewidth=2)
axes[0].axhline(y=0, color='gray', linestyle='--')
axes[0].set_xlabel('Trade #')
axes[0].set_ylabel('Cumulative P&L (units)')
axes[0].set_title('Simulated Trading P&L (High Confidence Only)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Prediction confidence distribution
axes[1].hist(lstm_probs, bins=50, alpha=0.5, label='LSTM', density=True)
axes[1].hist(transformer_probs, bins=50, alpha=0.5, label='Transformer', density=True)
axes[1].axvline(x=0.5, color='gray', linestyle='--')
axes[1].set_xlabel('Predicted Probability (Up)')
axes[1].set_ylabel('Density')
axes[1].set_title('Prediction Confidence Distribution')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nTrading Summary:")
print(f"  LSTM: {len(lstm_trades)} trades, Win rate: {(lstm_pnl > 0).mean():.2%}, Total P&L: {lstm_pnl.sum():.0f}")
print(f"  Transformer: {len(transformer_trades)} trades, Win rate: {(transformer_pnl > 0).mean():.2%}, Total P&L: {transformer_pnl.sum():.0f}")

## Summary

This notebook demonstrated:

1. **LSTM with Attention**: Bidirectional LSTM with attention mechanism for sequence classification
2. **Transformer**: Self-attention based model for capturing long-range dependencies
3. **Feature Engineering**: Technical indicators, volume, spread, and order imbalance features
4. **Trading Simulation**: Using model confidence to filter trades

### Key Insights:
- Both models can learn patterns in tick data beyond random chance
- High-confidence predictions tend to be more accurate
- Transaction costs and latency are critical in real HFT (not modeled here)

### Extensions to Try:
- Add limit order book features (L2 data)
- Use real tick data from Polygon.io or similar
- Implement execution simulation with realistic slippage
- Try temporal fusion transformers or other architectures