In [None]:
!pip install yfinance

In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
from scipy.stats import kurtosis
import math

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense, Dropout
import numpy as np
from scipy.stats import levy_stable

In [None]:
start = '2012-01-01'
end = '2023-1-31'
interval = '1d'

symbols = ['AAPL', 'NVDA', 'MSFT', 'AMZN', 'META', 'GOOGL', 'BRK.B','GOOG','AVGO', 'TSLA', 'LLY', 'JPM', 'XOM', 'UNH', 'V', 'MA', 'HD', 'PG', 'COST', 'JNJ', 'WMT', 'ABBV', 'NFLX', 'BAC', 'CRM']
data = pd.DataFrame()

for x in symbols:
    current_data = yf.download(x, start=start, end=end, interval=interval)
    current_data.columns = current_data.columns.get_level_values(0)
    current_data.reset_index(inplace=True)
    current_data['Date'] = current_data['Date'].dt.date
    current_data['Symbol'] = x
    data = pd.concat([data, current_data], ignore_index=True)

data["AnnReturn"] = data['Adj Close'].pct_change()
data['v20'] = data['AnnReturn'].rolling(window = 20).var() * 252
data['k20'] = data['AnnReturn'].rolling(window = 20).apply(kurtosis, raw = True)
data['vol10'] = data['Volume'].rolling(window=10).mean()
data['vema12'] = data['Volume'].ewm(span=12, adjust=False).mean()
data['vstd20'] = data['Volume'].rolling(window=20).std()
data['ar'] = (data['High'].rolling(window=26).sum() - data['Open'].rolling(window=26).sum()) / (data['Open'].rolling(window=26).sum() - data['Low'].rolling(window=26).sum()) * 100
data['br'] = (data['High'].rolling(window=26).sum() - data['Close'].shift(1).rolling(window=26).sum()) / (data['Close'].shift(1).rolling(window=26).sum() - data['Low'].rolling(window=26).sum()) * 100


data = data.dropna()
data = data.reset_index(drop=True)
data.columns.name = None
data['close_change_pct'] = data.groupby('Symbol')['Close'].pct_change()

data['Label'] = 0
for x in symbols:
    pct_mean = data[data['Symbol'] == x]['close_change_pct'].mean()
    pct_std = data[data['Symbol'] == x]['close_change_pct'].std()
    for j in data[data['Symbol'] == x].index:
        if data.at[j, 'close_change_pct'] >= (pct_mean + pct_std):
            data.at[j, 'Label'] = 2
        elif 0 < data.at[j, 'close_change_pct'] < (pct_mean + pct_std):
            data.at[j, 'Label'] = 1
        else:
            data.at[j, 'Label'] = 0


In [None]:
data = data[data['Label'] != 2].reset_index(drop=True)
data = data.sort_values(by=['Date']).reset_index(drop=True)
y = data['Label'].values
sym = data['Symbol'].values
train = data.drop(columns=['Label', 'close_change_pct', 'Symbol', 'Date'])

In [None]:
X_raw = np.lib.stride_tricks.sliding_window_view(train.values, window_shape=(30, train.shape[1]))[:-1, :, :]
X_raw = X_raw[:, 0, :, :]
y_seq = y[30:]

X_scaled = np.array([StandardScaler().fit_transform(seq) for seq in X_raw])
x_train, x_test, y_train, y_test = train_test_split(X_scaled, y_seq, test_size=0.2, shuffle=False, random_state=42)
x_train.shape

In [None]:
input_shape = (x_train.shape[1], x_train.shape[2])

In [None]:
# Cuckoo Search optimization algorithm, set the probability Pa of the host bird to find foreign eggs in the nest
# to be 0.25, the step scaling factor α in Levy’s flight is 0.1, the β coefficient is 1.5, and the population
# size is 18.

# In the CS-GRU stock selection model, the Cuckoo Search optimization algorithm is
# used to optimize the number of neuron nodes in the GRU layer and full connection layer
# in GRU neural network architecture. In the process of parameter optimization, this paper
# sets the maximum number of optimization iterations to 100, and the value range of the
# four parameters to be optimized is [0, 200].

h_neurons = [10, 30, 20, 15]
shape = (x_train.shape[1], x_train.shape[2])
label_count = 1

def gru(h_neurons, shape, label_count=1):
    gru_model = Sequential()
    gru_model.add(GRU(int(h_neurons[0]), input_shape=shape, return_sequences=False))
    gru_model.add(tf.keras.layers.ReLU())
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(int(h_neurons[1]), activation='relu'))
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(int(h_neurons[2]), activation='relu'))
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(int(h_neurons[3]), activation='relu'))
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(label_count, activation='sigmoid')) #chose sigmoid over softmax due to binary lables (1/0)

    gru_model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return gru_model

def fitness_function(neurons, shape, X_train, X_test, y_train, y_test):
    h_neurons = neurons.astype(int)
    model = gru(h_neurons, shape, label_count)
    model.fit(X_train, y_train, epochs=3, batch_size=32, verbose=0)
    _, accuracy = model.evaluate(X_test, y_test, verbose=0)
    return accuracy

def levy_flight(beta):
    sigma = (math.gamma(1 + beta) * np.sin(np.pi * beta / 2) /
             (math.gamma((1 + beta) / 2) * beta * 2**((beta - 1) / 2)))**(1 / beta)
    u = np.random.normal(0, sigma, 4)
    v = np.random.normal(0, 1, 4)
    step = u / (np.abs(v) ** (1 / beta))
    return step

def cuckoo_search(input_shape, X_train, y_train, X_test, y_test,
                  population_size, max_iterations, pa, alpha, beta,
                  parameter_range):

    population = np.random.randint(parameter_range[0], parameter_range[1], (population_size, 4))
    fitness = np.array([fitness_function(ind, input_shape, X_train, X_test, y_train, y_test)
                        for ind in population])

    for iteration in range(max_iterations):
        for i in range(population_size):
            step = alpha * levy_flight(beta)
            new_solution = np.clip(population[i] + step, *parameter_range).astype(int)
            new_fitness = fitness_function(new_solution, input_shape, X_train, X_test, y_train, y_test)
            if new_fitness > fitness[i]:
                population[i] = new_solution
                fitness[i] = new_fitness

        num_discovered = int(pa * population_size)
        for _ in range(num_discovered):
            idx = np.random.randint(population_size)
            population[idx] = np.random.randint(parameter_range[0], parameter_range[1], 4)
            fitness[idx] = fitness_function(population[idx], input_shape, X_train, X_test, y_train, y_test)

        best_idx = np.argmax(fitness)
        print(f"fitness: {fitness[best_idx]}, nueron setup: {population[best_idx]}")

    best_solution = population[best_idx]
    return best_solution

nueron_setup = cuckoo_search(
    input_shape=shape,
    X_train=x_train,
    y_train=y_train,
    X_test=x_test,
    y_test=y_test,
    population_size=18,
    max_iterations=100,
    pa=0.25,
    alpha=0.1,
    beta=1.5,
    parameter_range=(1, 200)
)

print(nueron_setup)
# nueron setup on runs shows: h_neurons = [31, 163, 53, 66]

In [None]:
shape = (x_train.shape[1], x_train.shape[2])
label_count = 1

def gru(h_neurons, shape, label_count=1):
    gru_model = Sequential()
    gru_model.add(GRU(int(h_neurons[0]), input_shape=shape, return_sequences=False))
    gru_model.add(tf.keras.layers.ReLU())
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(163, activation='relu'))
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(53, activation='relu'))
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(66, activation='relu'))
    gru_model.add(Dropout(0.5))
    gru_model.add(Dense(label_count, activation='sigmoid'))

    gru_model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return gru_model


def evaluate_gru(x_train, y_train, x_test, y_test, h_neurons):
    input_shape = x_train.shape[1:]
    label_count = 1
    model = gru(h_neurons, input_shape, label_count=label_count)
    model.fit(x_train, y_train, epochs=50, batch_size=32, verbose=1)
    predictions = model.predict(x_test)
    predicted_labels = (predictions > 0.5).astype(int)
    print(f"Accuracy: {accuracy_score(y_test, predicted_labels)}")
    print(f"Precision: {precision_score(y_test, predicted_labels, average='binary')}")
    print(f"Recall: {recall_score(y_test, predicted_labels, average='binary')}")
    print(f"F1 Score: {f1_score(y_test, predicted_labels, average='binary')}")

    return accuracy, precision, recall, f1

dummy_var = evaluate_gru(x_train, y_train, x_test, y_test, h_neurons) #dummy var to run call the function