In [None]:
import numpy as np
import pandas as pd
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Input, Dense, Dropout, Flatten, Conv2D, MaxPooling2D
import os

tickers = ['SPY', 'IWM', 'DIA']
VERSION_NAME = 'widerATR_20day'
os.makedirs(f'./results_{VERSION_NAME}', exist_ok=True)

In [90]:
def form_image(input_df):
    df = input_df.copy()
    
    # Define indicator structure
    INDICATOR_ORDER = [
        'SMA', 'EMA', 'WMA', 'HMA', 'TEMA',
        'PSAR', 'DMI', 'CMFI',
        'RSI', 'Williams_%R', 'CMO', 'ROC',
        'MACD', 'PPO', 'CCI'
    ]
    
    # Pre-allocate image array (optimized memory layout)
    images = np.empty((len(df), 15, 15), dtype=np.float32)
    
    # 3. Fill array using vectorized operations (faster than loops)
    for i, indicator in enumerate(INDICATOR_ORDER):
        for j, n in enumerate(range(6, 21)):
            images[:, i, j] = df[f'{indicator}_{n}'].values
    
    # 4. Create output DataFrame (memory efficient)
    result_df = pd.DataFrame({
        'Date': df['Date'].values,
        'Close': df['Close'].values,
        'Label': df['Label'].values,
        'Image': list(images)  # Store arrays directly
    })
    
    return result_df

In [91]:
def train_val_test_split(df, year):
    df['Date'] = pd.to_datetime(df['Date'])
    train = df[df['Date'].dt.year < year-1]
    val = df[df['Date'].dt.year == year-1]
    test = df[df['Date'].dt.year >= year]
    return train, val, test

In [None]:
# 1. Prepare Data Loaders
def prepare_data(df):
    """Convert DataFrame to CNN-ready arrays"""
    X = np.stack(df['Image'].values)  # Shape: (n_samples, 15, 15)
    X = np.expand_dims(X, -1)  # Add channel dimension (15,15,1)
    y = to_categorical(df['Label'])
    return X, y

# 2. Define Model
def create_model():
    model = Sequential([
        Input(shape=(15, 15, 1)),                    # [15×15×1] indicator grid
        Conv2D(32, (3,3), activation='relu'),        # 32 filters, 3×3 kernels
        Conv2D(64, (3,3), activation='relu'),        # 64 filters, 3×3 kernels
        MaxPooling2D((2,2)),                         # Downsampling to 7×7×64
        Dropout(0.25),                               # Spatial dropout
        Flatten(),                                   # Vectorize to 3136 elements
        Dense(128, activation='relu'),               # Fully-connected layer
        Dropout(0.5),                                # Feature dropout
        Dense(3, activation='softmax')               # Probabilistic outputs
    ])
    
    model.compile(loss='categorical_crossentropy',
                 optimizer='adam',
                 metrics=['accuracy'])
    return model

# 3. Training Loop
results = []
for ticker in tickers:
    for year in range(2019, 2024):
        print(f"\nProcessing {ticker} {year}-{year+1}")
        
        # Load and split data
        df = pd.read_csv(f'./data/normalised/{ticker}_{year}_{year+1}.csv')
        df = form_image(df)
        train, val, test = train_val_test_split(df, year)
        
        # Prepare datasets
        X_train, y_train = prepare_data(train)
        X_val, y_val = prepare_data(val)
        X_test, y_test = prepare_data(test)
        
        # Create and train model
        model = create_model()
        history = model.fit(
            X_train, y_train,
            validation_data=(X_val, y_val),
            epochs=150,
            batch_size=32,
            verbose=2
        )
        
        # Evaluate and get predictions
        test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
        y_pred = model.predict(X_test, verbose=0)
        
        # Get predictions (will output 0/1 matching your labels)
        test = test.copy()  # Avoid SettingWithCopyWarning
        test['Prediction'] = np.argmax(model.predict(X_test), axis=1)
        test['Prediction_Prob'] = np.max(y_pred, axis=1)  # Store confidence

        # Drop Image
        test = test.drop(columns=['Image'])

        test.to_csv(f'./results_{VERSION_NAME}/{ticker}_{year}_{year+1}.csv', index=False)