Reference : [Stock price prediction with optimized deep LSTM network with artificial rabbits optimization algorithm](https://www.sciencedirect.com/science/article/pii/S0957417423008485)

In [None]:
# dependencies

# %pip install numpy
# %pip install pandas
# %pip install torch
# %pip install matplotlib
# %pip install tensorflow
# %pip install sklearn
# %pip install yfinance

import numpy as np
from torch import randperm
import matplotlib.pyplot as plt
from pylab import *
import tensorflow as tf
import keras
from keras import layers
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score
import math
from typing import Callable
scaler = MinMaxScaler()

In [2]:
# parameters

initial_guess = np.array([10,1,5,0,15,8,0.7])
ARO_max_iters=20
ARO_npop=20
LSTM_epochs=50
stock_name="TSLA"
stock_timeframe="10y"

In [3]:
# stock prices

def create_sequences(data, seq_length):
    X = []
    y = []
    for i in range(seq_length, len(data)):
        X.append(data[i-seq_length:i, 0])
        y.append(data[i, 0])
    return np.array(X), np.array(y)

def get_stock_data(stock_symbol="AAPL"):
    ticker = yf.Ticker(stock_symbol)
    hist = ticker.history(period=stock_timeframe).drop(columns=['Dividends','Stock Splits'])
    
    # feature engineering
    hist['MA50'] = hist['Close'].rolling(50).mean()
    hist['MA200'] = hist['Close'].rolling(200).mean()
    hist['Previous day close price'] = hist['Close'].shift(1)
    hist['Change in price'] = hist['Close'] - hist['Previous day close price']
    hist['Percent change in price'] = hist['Close'].pct_change()
    hist.bfill(inplace=True)
    scaled_data = scaler.fit_transform(hist[['Close']])
    
    # test train split
    X, y = create_sequences(scaled_data, seq_length=60)
    X = X.reshape((X.shape[0], X.shape[1], 1))
    split_index=int(X.shape[0]*0.8)
    print(split_index)
    X_train, X_test = X[:split_index], X[split_index:]
    y_train, y_test = y[:split_index], y[split_index:]
    
    return X_train,y_train,X_test,y_test,scaled_data

In [None]:
# data init

X_train,y_train,X_test,y_test,scaled_data=get_stock_data(stock_name)

In [5]:
def plot_results(model,history):
    train_predict=model.predict(X_train)
    test_predict=model.predict(X_test)
    loss=history.history['loss']

    train_predict=scaler.inverse_transform(train_predict)
    test_predict=scaler.inverse_transform(test_predict)

    original_scaled_data = scaler.inverse_transform(scaled_data)
    look_back = 60
    trainPredictPlot = np.empty((len(original_scaled_data), 1))
    trainPredictPlot[:] = np.nan

    trainPredictPlot[look_back:look_back + len(train_predict)] = train_predict.reshape(-1, 1)

    testPredictPlot = np.empty((len(original_scaled_data), 1))
    testPredictPlot[:] = np.nan

    test_start = len(original_scaled_data) - len(test_predict)
    testPredictPlot[test_start:] = test_predict.reshape(-1, 1)


    plt.figure(figsize=(15, 6))

    if original_scaled_data.shape[1] == 1:
        plt.plot(original_scaled_data[:, 0], color='black', label="Actual price")
    else:
        plt.plot(original_scaled_data[:, 3], color='black', label="Actual price")

    plt.plot(trainPredictPlot, color='red', label="price (train set)")
    plt.plot(testPredictPlot, color='blue', label="price (test set)")
    plt.xlabel("Time")
    plt.ylabel("Share Price")
    plt.legend()
    plt.show()

In [6]:
# model

def build_model(x0,x1,x2,x3,x4,x5,x6):
    model = keras.models.Sequential()
    model.add(layers.LSTM(units=x0, return_sequences=True, input_shape=(X_train.shape[1],X_train.shape[2])))
    
    if x1>0 and x3>0:
        model.add(layers.LSTM(units=x2,return_sequences=True))
        model.add(layers.LSTM(units=x2))  
    elif x1>0:
        model.add(layers.LSTM(units=x2))
    elif x3>0:
        model.add(layers.LSTM(units=x4))
    else:
        model = keras.models.Sequential()
        model.add(layers.LSTM(units=x0, input_shape=(X_train.shape[1],X_train.shape[2])))
        
    
    model.add(layers.Dense(x5, activation="relu"))
    model.add(layers.Dropout(x6))
    model.add(layers.Dense(1, activation="linear"))
    
    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=keras.losses.mean_squared_error,
        metrics=[keras.metrics.MeanSquaredError(), keras.metrics.RootMeanSquaredError()]
    )
    return model

def run_model(x0,x1,x2,x3,x4,x5,x6):
    model=build_model(x0,x1,x2,x3,x4,x5,x6)
    history = model.fit(
        X_train,
        y_train,
        epochs=LSTM_epochs,
        batch_size=32,
        validation_data=(X_test, y_test),
        verbose=0
    )
    plot_results(model,history)
    test_predict=model.predict(X_test)
    return test_predict

def compute_fitness(x):
    print(f"current pop : {x}")
    test_predict=run_model(int(x[0]),int(x[1]),int(x[2]),int(x[3]),int(x[4]),int(x[5]),int(x[6]))
    fitness=1e9
    try:
        fitness=-r2_score(y_test,test_predict)
    except:
        fitness=1e9
    if (fitness<-1 or fitness>0):
        fitness=1e9
    print(f"fitness : {fitness}")
    return fitness

In [7]:
def discretize_values(x: np.ndarray) -> np.ndarray:
    """
    Discretize values according to the problem constraints:
    - indices 0,2,4,5: integers 1-20
    - indices 1,3: binary (0 or 1)
    - index 6: float between 0.3 and 0.7
    """
    result = x.copy()
    

    discrete_indices = [0, 2, 4, 5]
    for idx in discrete_indices:
        result[idx] = np.round(result[idx])
        result[idx] = int(np.clip(result[idx], 1, 20))
    
    

    binary_indices = [1, 3]
    for idx in binary_indices:
        result[idx] = np.round(result[idx])
        result[idx] = int(np.clip(result[idx], 0, 1))
    
    

    result[6] = np.clip(result[6], 0.3, 0.7)
    
    return result

def space_bound(X: np.ndarray, Up: np.ndarray, Low: np.ndarray) -> np.ndarray:
    """Ensures solutions stay within bounds and meet discrete constraints"""

    S = (X > Up) + (X < Low)
    res = (np.random.rand(len(X)) * (Up - Low) + Low) * S + X * (~S)
    

    res = discretize_values(res)
    return res

def ARO(fitness_func: Callable[[np.ndarray], float],
               initial_guess: np.ndarray = None,
               max_it: int = 1000,
               npop: int = 50) -> tuple[np.ndarray, float, np.ndarray]:
    """
    Custom ARO optimization for 7-dimensional problem with mixed discrete and continuous variables
    
    Parameters:
    -----------
    fitness_func : callable
        Your custom fitness function that takes a numpy array of 7 values and returns a float
    initial_guess : numpy array, optional
        Initial guess for the solution (7 values). If None, random initialization is used.
    max_it : int
        Maximum number of iterations
    npop : int
        Population size
    
    Returns:
    --------
    best_x : numpy array
        Best solution found
    best_f : float
        Best fitness value
    his_best_fit : numpy array
        History of best fitness values
    """
    dim = 7
    

    lb = np.array([1, 0, 1, 0, 1, 1, 0.3])
    ub = np.array([20, 1, 20, 1, 20, 20, 0.7])
    

    pop_pos = np.zeros((npop, dim))
    for i in range(npop):
    
        rand_pos = np.random.rand(dim) * (ub - lb) + lb
        pop_pos[i] = discretize_values(rand_pos)
    

    if initial_guess is not None:
        pop_pos[0] = discretize_values(initial_guess)
    
    iter_num=1
    print(f"iteration : {iter_num}")
    
    pop_fit = np.array([fitness_func(pos) for pos in pop_pos])
    

    best_idx = np.argmin(pop_fit)
    best_f = pop_fit[best_idx]
    best_x = pop_pos[best_idx].copy()
    his_best_fit = np.zeros(max_it)
    

    for it in range(max_it):
        direct1 = np.zeros((npop, dim))
        direct2 = np.zeros((npop, dim))
        theta = 2 * (1 - (it + 1) / max_it)
        iter_num+=1
        print(f"iteration : {iter_num}")
        for i in range(npop):
            L = (np.e - np.exp((((it + 1) - 1) / max_it) ** 2)) * (np.sin(2 * np.pi * np.random.rand()))
            rd = np.floor(np.random.rand() * dim)
            rand_dim = np.random.permutation(dim)
            direct1[i, rand_dim[:int(rd)]] = 1
            c = direct1[i, :]
            R = L * c
            
            A = 2 * np.log(1 / np.random.rand()) * theta
            
            if A > 1: 
                available_indices = np.concatenate([np.arange(i), np.arange(i + 1, npop)])
                rand_idx = np.random.choice(available_indices)
                newPopPos = (pop_pos[rand_idx] + 
                           R * (pop_pos[i] - pop_pos[rand_idx]) +
                           round(0.5 * (0.05 + np.random.rand())) * np.random.randn())
            else: 
                ttt = int(np.floor(np.random.rand() * dim))
                direct2[i, ttt] = 1
                gr = direct2[i, :]
                H = ((max_it - (it + 1) + 1) / max_it) * np.random.randn()
                b = pop_pos[i] + H * gr * pop_pos[i]
                newPopPos = pop_pos[i] + R * (np.random.rand() * b - pop_pos[i])
            
        
            newPopPos = space_bound(newPopPos, ub, lb)
            
            newPopFit = fitness_func(newPopPos)
            
        
            if newPopFit < pop_fit[i]:
                pop_fit[i] = newPopFit
                pop_pos[i] = newPopPos
                
            
                if newPopFit < best_f:
                    best_f = newPopFit
                    best_x = newPopPos.copy()
        
        his_best_fit[it] = best_f
        
    return best_x, best_f, his_best_fit

In [None]:
best_solution, best_fitness, history = ARO(
    fitness_func=compute_fitness,
    initial_guess=initial_guess,
    max_it=ARO_max_iters,
    npop=ARO_npop
)

print("\nOptimization Results:")
print(f"Best solution found: {best_solution}")
print(f"Best fitness value: {best_fitness}")

print("\nConstraint Verification:")
print(f"Integers (1-20): x[0]={best_solution[0]}, x[2]={best_solution[2]}, "
        f"x[4]={best_solution[4]}, x[5]={best_solution[5]}")
print(f"Binary (0-1): x[1]={best_solution[1]}, x[3]={best_solution[3]}")
print(f"Float (0.3-0.7): x[6]={best_solution[6]}")

plt.figure(figsize=(10, 6))
plt.plot(history)
plt.xlabel('Iteration')
plt.ylabel('Best Fitness Value')
plt.title('Convergence History')
# plt.yscale('log')
plt.grid(True)
plt.show()

In [None]:
compute_fitness(best_solution)