In [2]:
!pip install pandas numpy matplotlib yfinance ta scikit-learn xgboost scipy

Defaulting to user installation because normal site-packages is not writeable
Collecting yfinance
  Downloading yfinance-0.2.65-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting ta
  Downloading ta-0.11.0.tar.gz (25 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting xgboost
  Downloading xgboost-3.0.4-py3-none-win_amd64.whl.metadata (2.1 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Downloading multitasking-0.0.12.tar.gz (19 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting peewee>=3.16.2 (from yfinance)
  Downloading peewee-3.18.2.tar.gz (949 kB)
     ---------------------------------------- 0.0/949.2 kB ? eta -:--:--
     ----------- ---------------------------- 262.1/949.2 kB ? eta -:--:--
     -------------------------------------- 949.2/949.2 kB 4.4 MB/s eta 0:00:00
  Installing build dependencies: started
  Installing build dependencies: fini



In [4]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
from ta import add_all_ta_features
from ta.momentum import RSIIndicator
from ta.trend import MACD, EMAIndicator
from ta.volatility import BollingerBands
from sklearn.ensemble import IsolationForest
from xgboost import XGBClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score
from scipy.stats import zscore
import warnings
warnings.filterwarnings('ignore')

### Configuration

In [5]:
tickers = ['GME', 'AMC', 'BB', 'NOK', 'TSLA']  # Example meme stocks
start_date = '2020-03-01'
end_date = '2025-03-01'
threshold = 2.5  # For Z-score anomalies
residual_threshold = 15  # For residual anomalies
test_size = 0.2  # For XGBoost train-test split
random_state = 42

### Download Data Function

In [6]:
def download_data(ticker):
    df = yf.download(ticker, start=start_date, end=end_date, interval='1d')
    df = df[['Close', 'Volume']]
    df = df.rename(columns={'Close': 'price', 'Volume': 'volume'})
    df.index.name = 'date'
    df['ticker'] = ticker
    return df

### Feature Engineering function

In [13]:
def engineer_features(df):
    # Calculate returns
    df['returns'] = df['price'].pct_change()
    
    # Lagged returns
    for lag in [1, 3, 7]:
        df[f'return_lag_{lag}'] = df['returns'].shift(lag)
    
    # Rolling statistics for price
    for window in [5, 10, 20]:
        df[f'rolling_mean_{window}'] = df['price'].rolling(window=window).mean()
        df[f'rolling_std_{window}'] = df['price'].rolling(window=window).std()
        
        # Rolling statistics for volume
        df[f'volume_mean_{window}'] = df['volume'].rolling(window=window).mean()
        df[f'volume_std_{window}'] = df['volume'].rolling(window=window).std()
    
    # Rolling volatility (std of returns)
    df['volatility_20'] = df['returns'].rolling(window=20).std()
    
    # Technical indicators
    df['rsi'] = RSIIndicator(df['price']).rsi()
    macd = MACD(df['price'])
    df['macd'] = macd.macd()
    df['macd_signal'] = macd.macd_signal()
    df['ema_10'] = EMAIndicator(df['price'], window=10).ema_indicator()
    
    # Bollinger Bands
    bb = BollingerBands(df['price'])
    df['bb_high'] = bb.bollinger_hband()
    df['bb_low'] = bb.bollinger_lband()
        # Create target variable for supervised learning (XGBoost)
    # Using Z-score anomalies as initial labels
    df['Z-Score'] = zscore(df['price'])
    df['Anomaly'] = (df['Z-Score'].abs() > threshold).astype(int)
    
    # Drop rows with NA values created by rolling calculations
    df = df.dropna()
    return df

### Anomaly Detection Function

In [38]:
import numpy as np
import pandas as pd
import yfinance as yf
from scipy.stats import zscore
from sklearn.ensemble import IsolationForest
from xgboost import XGBClassifier
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Configuration
tickers = ['GME', 'AMC', 'BB', 'NOK', 'TSLA']
threshold = 2.5
start_date = '2020-03-01'
end_date = '2023-03-01'

def safe_zscore(series, threshold):
    """Robust z-score calculation with error handling"""
    try:
        values = series.to_numpy().flatten()
        if len(values) < 2 or np.all(np.isnan(values)):
            return np.full(len(series), np.nan), np.full(len(series), False)
        
        z = zscore(values, nan_policy='omit')
        z = np.nan_to_num(z)
        return z, np.abs(z) > threshold
    except Exception as e:
        print(f"Z-score error: {str(e)}")
        return np.full(len(series), np.nan), np.full(len(series), False)

def detect_anomalies(df, ticker):
    """Main anomaly detection function"""
    df = df.copy()
    df.columns = ['price']
    
    # 1. Calculate Z-Score anomalies
    df['z_score'], df['z_anomaly'] = safe_zscore(df['price'], threshold)
    
    # 2. Prepare features
    df['returns'] = df['price'].pct_change()
    df = df.dropna()
    
    if len(df) < 10:  # Minimum samples required
        print(f"Insufficient data for {ticker}")
        return None
    
    features = df[['price', 'returns']].values
    
    # 3. Isolation Forest
    try:
        iso = IsolationForest(contamination=0.01, random_state=42)
        df['iso_anomaly'] = iso.fit_predict(features) == -1
    except Exception as e:
        print(f"Isolation Forest error for {ticker}: {str(e)}")
        df['iso_anomaly'] = False
    
    # 4. XGBoost with proper initialization
    try:
        # Ensure we have both classes
        y = df['z_anomaly'].astype(int)
        if y.sum() == 0:  # No anomalies detected by z-score
            df['xgb_anomaly'] = False
            df['xgb_confidence'] = 0.0
        else:
            # Initialize XGBoost with proper base_score
            xgb = XGBClassifier(
                random_state=42,
                base_score=0.5,  # Fixes the logistic loss error
                scale_pos_weight=10
            )
            xgb.fit(features, y)
            df['xgb_anomaly'] = xgb.predict(features)
            df['xgb_confidence'] = xgb.predict_proba(features)[:, 1]
    except Exception as e:
        print(f"XGBoost error for {ticker}: {str(e)}")
        df['xgb_anomaly'] = False
        df['xgb_confidence'] = 0.0
    
    return df

# Main execution
anomaly_log = pd.DataFrame()

for ticker in tickers:
    print(f"\nProcessing {ticker}...")
    try:
        # Download data
        data = yf.download(ticker, start=start_date, end=end_date)[['Close']]
        if data.empty:
            print(f"No data for {ticker}")
            continue
            
        # Detect anomalies
        result = detect_anomalies(data, ticker)
        if result is not None:
            result['ticker'] = ticker
            anomalies = result[(result['z_anomaly']) | (result['iso_anomaly']) | (result['xgb_anomaly'])]
            if not anomalies.empty:
                anomaly_log = pd.concat([anomaly_log, anomalies])
    except Exception as e:
        print(f"Processing error for {ticker}: {str(e)}")
        continue

# Save results
if not anomaly_log.empty:
    anomaly_log.to_csv('anomalies.csv')
    print("\nAnomaly summary:")
    print(anomaly_log['ticker'].value_counts())
else:
    print("\nNo anomalies detected")

[*********************100%***********************]  1 of 1 completed


Processing GME...



[*********************100%***********************]  1 of 1 completed


Processing AMC...

Processing BB...



[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Processing NOK...



[*********************100%***********************]  1 of 1 completed


Processing TSLA...

Anomaly summary:
ticker
AMC     26
BB      14
NOK     13
GME     10
TSLA     8
Name: count, dtype: int64





### Visualisation

In [44]:
import matplotlib
matplotlib.use('TkAgg')  # or 'Qt5Agg'
import matplotlib.pyplot as plt

def plot_anomalies(df, ticker):
    """
    Visualizes all detected anomalies with guaranteed display
    Args:
        df: DataFrame with anomaly columns
        ticker: Stock symbol for title
    """
    # Create figure with explicit backend (try both options)
    try:
        plt.switch_backend('TkAgg')  # Try Tk first
    except:
        plt.switch_backend('Qt5Agg')  # Fallback to Qt
    
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 12), sharex=True)
    plt.style.use('seaborn')
    
    # Main price plot
    ax1.plot(df.index, df['price'], color='navy', alpha=0.8, linewidth=1.5, label='Price')
    
    # Anomaly markers
    z_mask = df['z_anomaly']
    iso_mask = df['iso_anomaly']
    xgb_mask = df['xgb_anomaly']
    
    ax1.scatter(df[z_mask].index, df[z_mask]['price'],
                color='red', marker='o', s=80, 
                label=f'Z-Score ({z_mask.sum()})')
    
    ax1.scatter(df[iso_mask].index, df[iso_mask]['price'],
                color='green', marker='^', s=80,
                label=f'Isolation Forest ({iso_mask.sum()})')
    
    sc = ax1.scatter(df[xgb_mask].index, df[xgb_mask]['price'],
                    c=df[xgb_mask]['xgb_confidence'],
                    cmap='plasma', marker='*', s=120,
                    label=f'XGBoost ({xgb_mask.sum()})')
    
    # Formatting
    ax1.set_title(f'{ticker} Anomaly Detection', fontsize=16, pad=10)
    ax1.set_ylabel('Price ($)', fontsize=12)
    ax1.legend(loc='upper left')
    ax1.grid(True, alpha=0.3)
    
    # Add colorbar
    cbar = plt.colorbar(sc, ax=ax1)
    cbar.set_label('XGBoost Confidence', rotation=270, labelpad=15)
    
    # Returns plot
    ax2.plot(df.index, df['returns'], color='purple', alpha=0.6, linewidth=0.8)
    ax2.set_ylabel('Daily Returns', fontsize=12)
    ax2.grid(True, alpha=0.2)
    
    # Force display
    plt.tight_layout()
    plt.show(block=True)  # block=True ensures display
    plt.pause(0.1)  # Needed in some environments
    plt.close()  # Prevents figure accumulation