In [41]:
import pandas as pd
import numpy as np
import random
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Conv1D, Flatten, LeakyReLU, Dropout, Input, BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, explained_variance_score
import random

In [4]:
p_crossover = 1
p_mutation = 0.3
pop = 100
gen = 20
n_factors = 84 #retrieve from size of dataset

In [3]:
DF = pd.read_csv("final_data.csv", index_col = ["ticker", "date"])

In [5]:
DF = DF.sample(frac = 0.1)

In [6]:
parents = []
for i in range(pop):
    i = np.random.choice([0, 1], size=(n_factors,), p=[1./3, 2./3])
    parents.append(i)

parents = np.array(parents)

In [7]:
def crossover(p1, p2): 
  
   # converting the string to list for performing the crossover 
    l = list(p1) 
    q = list(p2) 
  
    # generating the random number to perform crossover 
    k = random.randint(0, len(l)) 
  
    # interchanging the genes 
    for i in range(k, len(l)): 
        l[i], q[i] = q[i], l[i] 
     
    return np.array(l), np.array(q)

In [9]:
def mutation(c1, p_mutation = 0.3):
    flag = np.random.rand(*c1.shape) <= p_mutation
    ind = np.argwhere(flag)
    for i in ind:
        if c1[i] == 0:
            c1[i] = 1
        else:
            c1[i] = 0
    return c1

In [11]:
def roulette_wheel_selection(p):
    c = np.cumsum(p)
    r = sum(p)*np.random.rand()
    ind = np.argwhere(r <= c)
    return ind[0][0]

In [12]:
def feature_select(X, gene):
    feature_index = []
    for i in range(len(gene)):
        if gene[i] == 1:
            feature_index.append(i)
    df_filter = X[:, feature_index]
    return df_filter

In [13]:
def get_last(data, target):
    last = {}
    tickers = set(data.index.get_level_values(0))
    for tic in sorted(tickers):
        l = (data.loc[tic][-1:].drop(target, axis = 1)).to_dict(orient = "list")
        last[tic] = l
    last = pd.DataFrame(last).transpose()
    for col in last.columns:
        last[col] = last[col].str[0]
    return last

In [14]:
def data_fixer(DF, target):
    last = get_last(DF, target)
    DF = DF.replace([np.inf, -np.inf], np.nan)
    DF = DF.dropna()
    X = DF.drop([target], axis = 1)
    y = DF[target]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=101)
    scaler = MinMaxScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    return X_train, X_test, y_train, y_test

In [15]:
def mean_absolute_percentage_error(y_true, y_pred):  #define the mean percentage error
    y_pred = y_pred.reshape(y_pred.shape[0])
    return np.mean((np.abs(y_true - y_pred)) / y_true) * 100

In [16]:
def evaluate(X_train, X_test, y_train, y_test, gene, dropout = 0.2, kernel_size = 2, batch_size = 512, epochs = 2, verbose = 0):
    X_filt= feature_select(X_train, gene)
    X_test_filt= feature_select(X_test, gene)
    X_filt = np.expand_dims(X_filt, axis=2)
    X_test_filt = np.expand_dims(X_test_filt, axis=2)
    model = Sequential()
    model.add(Conv1D(32, kernel_size, padding = "causal", input_shape = X_filt.shape[1:]))
    model.add(LeakyReLU(alpha = 0.1))
    model.add(BatchNormalization())

    model.add(Conv1D(64, kernel_size, padding = "causal",  dilation_rate = 2))
    model.add(LeakyReLU(alpha = 0.01))
    model.add(BatchNormalization())

    model.add(Conv1D(128, kernel_size, padding = "causal", activation = "relu", dilation_rate = 3))
    model.add(BatchNormalization())

    model.add(Flatten())

    model.add(Dense(128, activation = "relu"))
    model.add(Dropout(dropout))
    model.add(Dense(1, activation = "relu"))
    
    model.compile(loss= "mean_squared_error", optimizer= "adam")
    
    model.fit(x=X_filt, y=y_train, batch_size = batch_size, epochs=epochs, validation_data=(X_test_filt, y_test), verbose=verbose)
    
    pred = model.predict(X_test_filt)
    perc_err = mean_absolute_percentage_error(y_test, pred)
    score = (100 - perc_err)/100
    return score

In [21]:
##select one of the parents with roulette, one random
#add switching of chromosomes in crossover

In [17]:
X_train, X_test, y_train, y_test = data_fixer(DF, "next")

In [18]:
score_1 = evaluate(X_train, X_test, y_train, y_test, parents[0], verbose = 1)

Train on 464640 samples, validate on 116161 samples
Epoch 1/2
Epoch 2/2


In [22]:
score_2

0.7590517130415668

In [29]:
dic = {0:score_1, 1: score_2}

In [20]:
score_2 = evaluate(X_train, X_test, y_train, y_test, parents[1])

In [23]:
p = [score_1, score_2]

In [60]:
c = reproduction([parents[0], parents[1]], p)

In [24]:
s = []
for _ in range(100):
    s.append(roulette_wheel_selection(p))

In [54]:
def generation_eval(pop, X_train, X_test, y_train, y_test, dropout = 0.2, kernel_size = 2, batch_size = 512, epochs = 2, verbose = 0):
    scores = []
    best_score = 0
    best_set = []
    for i in range(len(pop)):
        score = evaluate(X_train, X_test, y_train, y_test, pop[i], dropout = dropout, kernel_size = kernel_size, batch_size = batch_size, epochs = epochs, verbose = verbose) 
        scores.append(score)
        if score > best_score:
            best_score = score
            best_set = parents[i]
    scores = np.array(scores)
    return scores, best_score, best_set

In [59]:
def reproduction(pop, scores):
    children = []
    for _ in range(int(len(pop)/2)):
        p_1 = pop[roulette_wheel_selection(scores)]
        p_2 = random.choice(pop)
        c_1, c_2 = crossover(p_1, p_2)
        c_1, c_2 = mutation(c_1), mutation(c_2)
        children.append(c_1)
        children.append(c_2)
    children = np.array(children)
    return children

In [31]:
best_score = 0
best_set = []
for i in range(2):
    score = evaluate(X_train, X_test, y_train, y_test, parents[i], dropout = 0.2, kernel_size = 2, batch_size = 512, epochs = 2, verbose = 1)
    if score > best_score:
        best_score = score
        best_set = parents[i]

Train on 464640 samples, validate on 116161 samples
Epoch 1/2
Epoch 2/2
Train on 464640 samples, validate on 116161 samples
Epoch 1/2
Epoch 2/2


In [64]:
def GA(X_train, X_test, y_train, y_test, dropout = 0.2, kernel_size = 2, batch_size = 512, epochs = 2, verbose = 0, p_mutation = 0.3, pop = 100, gen = 20, n_factors = 84):
    parents = []
    for i in range(pop):
        i = np.random.choice([0, 1], size=(n_factors,), p=[1./3, 2./3])
        parents.append(i)
    parents = np.array(parents) 
    
    best_score = 0
    best_set = []
    for i in range(gen):
        scores, gen_best_score, gen_best_set = generation_eval(parents, X_train, X_test, y_train, y_test, dropout = dropout, kernel_size = kernel_size, 
                                                   batch_size = batch_size, epochs = epochs, verbose = verbose)
        if gen_best_score > best_score:
            best_score = gen_best_score
            best_set = gen_best_set
            print(f"Best score gen {i+1}: {best_score}")
        
        children = reproduction(parents, scores)
        parents = children
        
    return best_score, best_set

In [63]:
best_score, best_set = GA(X_train, X_test, y_train, y_test, pop = 10, gen = 2)

Best score gen 0: 0.7683118119827279
Best score gen 1: 0.8632192308268318
