In [1]:
import ta
from indicators import * # Import all our TA indicators

import numpy as np
import pandas as pd
import ccxt
import re
import random

import warnings
# Turn off all warnings
warnings.filterwarnings("ignore")

In [2]:
# Fetch OHLCV data from Kraken
def fetch_ohlcv_data(start_date, limit=None):
    exchange = ccxt.kraken()
    symbol = 'BTC/AUD'
    timeframe = '1d'
    since = exchange.parse8601(start_date)
    ohlcv_data = exchange.fetch_ohlcv(symbol, timeframe, since, limit=limit)
    
    # Convert the data to a Pandas DataFrame and set column names
    ohlcv_df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'o', 'h', 'l', 'c', 'v'])
    # Convert the timestamp to a human-readable datetime format
    ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['timestamp'], unit='ms')

    return ohlcv_df

origin_ohlcv_data = fetch_ohlcv_data("2021-01-01T00:00:00Z")
ohlcv_data = origin_ohlcv_data.copy()
#print(origin_ohlcv_data[:3])

In [3]:
class Value():
    def __init__(self):
        random_macd_choice = random.choice([[12, 26, 9], [24, 52, 18]])
        stochastic_oscillator_values = [random.randint(10, 20), random.randint(3, 5)]
        aroon_values = [random.randint(14, 28)]
        kst_values = [random.randint(10, 20), random.randint(15, 25), random.randint(20, 30), random.randint(30, 40)]
        vortex_values = [random.randint(10, 30)]
        random_stc_parameters = [random.randint(20, 40), random.randint(40, 60), random.randint(5, 15), random.randint(2, 5), random.randint(2, 5)]
        ppo_window = [random.choice([24, 26, 28, 30]), random.choice([10, 12, 14]), random.choice([7, 9, 11])]

        self.values = {
            # Trend Indicators
            "ADX": ([adx, [random.randint(10, 30)]], [constant, [25]]),
            "Aroon_Up": ([aroon_up, aroon_values], [aroon_down, aroon_values]),
            #"DPO": ([dpo, [random.randint(14, 28)]], [constant, [0]]),
            "KST": ([kst, kst_values], [kst_signal, kst_values]),
            "Mass_Index": ([mass, [random.randint(7, 15), random.randint(20, 40)]], [constant, [30]]),
            "STC": ([stc, random_stc_parameters], [constant, [50]]),
            #"TRIX": ([trix, [random.randint(14, 28)]], [constant, [0]]),
            "Vortex_Positive": ([vortex_pos, vortex_values], [vortex_neg, vortex_values]),
            "WMA": ([wma, [random.randint(20, 40)]], [candle, 'o']),
            "EMA": ([ema, [random.randint(20, 40)]], [candle, 'o']),
            "CCI": ([cci, [random.randint(20, 40)]], [constant, [100]]),  
            "PSAR": ([psar, [round(random.uniform(0.01, 0.03), 2), round(random.uniform(0.1, 0.3), 2)]], [candle, 'o']),
            "SMA": ([sma, [random.randint(20, 40)]], [candle, 'o']),
            "MACD": ([macd, random_macd_choice], [macd_signal, random_macd_choice]),
            # Volatility Indicators
            "ATR": ([atr, [random.randint(10, 20)]], [candle, 'h']), 
            # Momentum Indicators
            "RSI": ([rsi, [random.randint(14, 28)]], [constant, random.choice([[30], [70]])]), 
            "Stochastic_Oscillator_K": ([stochastic_oscillator_K, stochastic_oscillator_values], [constant, [20]]),
            "Stochastic_Oscillator_D": ([stochastic_oscillator_D, stochastic_oscillator_values], [constant, [80]]),
            "KAMA": ([kama, [random.choice([8, 10, 12, 14]), random.choice([2, 3, 4]), random.choice([25, 30, 35])]], [candle, 'c']),
            "PPO_Line": ([ppo_line, ppo_window], [ppo_signal, ppo_window]),
            # Volume Indicators
            "MFI": ([mfi, [random.randint(10, 20)]], [constant, [100]]),            
        }

        self.indicator_name = random.choice(list(self.values.keys()))
        self.indicator = self.values[self.indicator_name]
        self.column_name_1 = self.indicator[0][0](ohlcv_data, *self.indicator[0][1])
        self.column_name_2 = self.indicator[1][0](ohlcv_data, *self.indicator[1][1])
        self.c = random.uniform(0.94, 1.06)

    def evaluate(self,time):
        return f'ohlcv_data.{self.column_name_1}.iloc[{time}]', f'ohlcv_data.{self.column_name_2}.iloc[{time}]'
    
    # Regenerates one of the indicators
    def mutate(self):
        #print("before")
        #print(f'{self.column_name_1}, {self.column_name_2}')
        newIndicatorParamaters = Value()
        self.indicator = newIndicatorParamaters.values[self.indicator_name]

        self.column_name_1 = self.indicator[0][0](ohlcv_data, *self.indicator[0][1])
        self.column_name_2 = self.indicator[1][0](ohlcv_data, *self.indicator[1][1])
        #print("after")
        #print(f'{self.column_name_1}, {self.column_name_2}')

    def __str__(self):
        return f'{self.column_name_1}, {self.column_name_2}'

class Literal():
    def __init__(self):
        self.negated = random.choice([True, False])
        self.value = Value()

    def evaluate(self,time):
        val1, val2 = self.value.evaluate(time)
        if self.negated:
            return f'not({val1} > {self.value.c:.2f} * {val2})'
        return f'({val1} > {self.value.c:.2f} * {val2})'
    
    # Regenerates the values and negated status of the literal if we have reached depth, 
    # or passes the mutation down to self.value
    def mutate(self, depth):
        depth -= 1
        if depth == 0:
            self.negated = random.choice([True, False])
            self.value = Value()
        else:
            self.value.mutate()
            
    def get_node(self, depth, path):
        depth -= 1
        path.append(0)
        if depth == 0:
            node = self.value
            return node, path
            
    def replace(self, node, path):
        if len(path) == 1:
            self.value = node

    def __str__(self):
        if self.negated:
            return f'¬({self.value.column_name_1} > {self.value.c:.2f} * {self.value.column_name_2})'
        return f'({self.value.column_name_1} > {self.value.c:.2f} * {self.value.column_name_2})'

class Conjugate():
    def __init__(self):
        self.literals = []
        self.generate_lit()

    def generate_lit(self):
        rand_num = random.random()
        if rand_num < 0.2:
            self.literals.append(Literal())
            self.generate_lit()
            return
        self.literals.append(Literal())
        return 
    
    def evaluate(self,time):
        out_str =""
        for literal in self.literals:
            out_str += f"{literal.evaluate(time)} and "
        return out_str[:-5]
    
    # Regenerates a literal if we have reached depth, or passes the mutation down to a literal
    def mutate(self, depth):
        which_lit = random.randint(0, len(self.literals) - 1)
        depth -= 1
        if depth == 0:
            self.literals[which_lit] = Literal()
        else:
            self.literals[which_lit].mutate(depth)
            
    def get_node(self, depth, path):
        depth -= 1
        which = random.randint(0, len(self.literals) - 1)
        path.append(which)
        if depth == 0:
            node = self.literals[which]

            return node, path
        else:
            node, path = self.literals[which].get_node(depth, path)

            return node, path
            
    def replace(self, node, path):
        if len(path) == 1:
            self.literals[path.pop(0)] = node
        else:
            self.literals[path.pop(0)].replace(node, path)

    def __str__(self):
        out_str =""
        for literal in self.literals:
            out_str += f"{literal.__str__()} ∧ "
        return out_str[:-3]
        
class Gene:
    def __init__(self) -> None:
        self.sell_conjugates = []
        self.buy_conjugates = []
        self.generate_buy_conjugates()
        self.generate_sell_conjugates()
    
    def generate_buy_conjugates(self):
        rand_num = random.random()
        if rand_num < 0.15:
            self.buy_conjugates.append(Conjugate())
            self.generate_buy_conjugates()
            return
        self.buy_conjugates.append(Conjugate())
        return

    def generate_sell_conjugates(self):
        rand_num = random.random()
        if rand_num < 0.15:
            self.sell_conjugates.append(Conjugate())
            self.generate_sell_conjugates()
            return
        self.sell_conjugates.append(Conjugate())
        return

    def evaluate_buy(self,time):
        out_str =""
        for conjugate in self.buy_conjugates:
            out_str += f"({conjugate.evaluate(time)}) or "
        return eval(out_str[:-4])

    def evaluate_sell(self,time):
        out_str =""
        for conjugate in self.sell_conjugates:
            out_str += f"({conjugate.evaluate(time)}) or "
        return eval(out_str[:-4])
    
    def count_literals(self):
        literal_count = 0
        for buy_conjugate in self.buy_conjugates:
            literal_count += len(buy_conjugate.literals)
        for sell_conjugate in self.sell_conjugates:
            literal_count += len(sell_conjugate.literals)
        return literal_count
    
    def update_weights(self, weights):
        if len(weights) != self.count_literals():
            raise ValueError("The number of weights provided does not match the number of literals in the gene.")
        weight_index = 0
        for buy_conjugate in self.buy_conjugates:
            for literal in buy_conjugate.literals:
                literal.value.c = weights[weight_index]
                weight_index += 1
        for sell_conjugate in self.sell_conjugates:
            for literal in sell_conjugate.literals:
                literal.value.c = weights[weight_index]
                weight_index += 1
                
    # Mutates the gene
    def mutate(self):
        depth = random.randint(1, 4) # Randomly chooses how deep into the gene we will mutate
        
        # Mutate a buy_conjugate
        if random.random() < 0.5:     
            which_conj = random.randint(0, len(self.buy_conjugates) - 1)
            depth -= 1
            if depth == 0:
                self.buy_conjugates[which_conj] = Conjugate()
            else:
                self.buy_conjugates[which_conj].mutate(depth)
              
        # Mutate a sell conjugate
        else:
            which_conj = random.randint(0, len(self.sell_conjugates) - 1)
            depth -= 1
            if depth == 0:
                self.sell_conjugates[which_conj] = Conjugate()
            else:
                self.sell_conjugates[which_conj].mutate(depth)
                
    # Returns a sub-expression from the gene
    def get_node(self, depth, buy_or_sell):
        path = []
        depth -= 1
        
        if buy_or_sell == 'buy':
            which = random.randint(0, len(self.buy_conjugates) - 1)
            path.append(which)
            if depth == 0:
                node = self.buy_conjugates[which]
                
                return node, path
            else:
                node, path = self.buy_conjugates[which].get_node(depth, path)
                
                return node, path
        elif buy_or_sell == 'sell':
            which = random.randint(0, len(self.sell_conjugates) - 1)
            path.append(which)
            if depth == 0:
                node = self.sell_conjugates[which]
                
                return node, path
            else:
                node, path = self.sell_conjugates[which].get_node(depth, path)
                
                return node, path
    
    # Replaces the sub-expression described by path with node
    def replace(self, node, path, buy_or_sell):
        if buy_or_sell == 'buy':
            if len(path) == 1:
                self.buy_conjugates[path.pop(0)] = node
                
            # Asks the next sub-expression to replace one of its sub-expressions
            else:
                self.buy_conjugates[path.pop(0)].replace(node, path)
                
        elif buy_or_sell == 'sell':
            if len(path) == 1:
                self.sell_conjugates[path.pop(0)] = node
            else:
                self.sell_conjugates[path.pop(0)].replace(node, path)
                
    def __str__(self):
        out_str ="BUY CONDITIONS: "
        for conjugate in self.buy_conjugates:
            out_str += f"({conjugate.__str__()}) ∨ "
        out_str = out_str[:-3]
        out_str += "\nSELL CONDITIONS: "
        for conjugate in self.sell_conjugates:
            out_str += f"({conjugate.__str__()}) ∨ "
        out_str = out_str[:-3]
        return out_str
    
ohlcv_data = origin_ohlcv_data.copy()
a = Gene()
print(a)
#print(a.evaluate_buy(0))
#print(a.evaluate_sell(0))

BUY CONDITIONS: ((ppo_line_28_10 > 0.97 * ppo_signal_28_10_7))
SELL CONDITIONS: ((atr_13 > 0.96 * h) ∧ ¬(aroon_up_16 > 1.01 * aroon_down_16)) ∨ (¬(kama_10_3_25 > 1.01 * c))


In [4]:
def bot(t, gene):
    buy_trigger = (gene.evaluate_buy(t)) and (not gene.evaluate_buy(t-1)) and (not gene.evaluate_sell(t) and not gene.evaluate_sell(t-1))
    sell_trigger = (gene.evaluate_sell(t)) and (not gene.evaluate_sell(t-1)) and (not gene.evaluate_buy(t) and not gene.evaluate_buy(t-1))
    return buy_trigger, sell_trigger

def trading_bot(gene, ohlcv_data, from_day=0, to_day=720):
    positions = []
    fiat_money = 100
    btc_money = 0
    fee = 0.02 # Each buy or sell event costs 2% of current holdings.

    for t in range(from_day, to_day):
        close_price = ohlcv_data.loc[t, 'c']      

        buy_trigger, sell_trigger = bot(t, gene)

        if buy_trigger:
            positions.append(('buy', t))
            if(fiat_money > 0):
                btc_money = (fiat_money / close_price) * (1 - fee)
                fiat_money = 0
            
        if sell_trigger:
            positions.append(('sell', t))
            if(btc_money > 0):
                fiat_money = (btc_money * close_price) * (1 - fee)
                btc_money = 0
    
    if len(positions) > 0:
        if positions[-1][0] == "buy":
            #if(to_day==720):
                positions.append(('sell', t))
                if(btc_money > 0):
                    fiat_money = (btc_money * close_price) * (1 - fee)
                    btc_money = 0

    return positions, fiat_money
    
#ohlcv_data = origin_ohlcv_data.copy()
a = Gene()
print(a)
pos, fitness = trading_bot(a, ohlcv_data)
print(pos)
if fitness == 0:
    print("Bust")
elif fitness == 100:
    print("No trades")
else:
    print(fitness)    

BUY CONDITIONS: ((rsi_27 > 1.02 * constant_70))
SELL CONDITIONS: ((kst_18_23_29_37_10_10_10_15 > 0.97 * kst_signal_18_23_29_37_10_10_10_15_9))
[('sell', 14), ('sell', 22), ('sell', 44), ('sell', 62), ('sell', 134), ('sell', 215), ('sell', 242), ('sell', 255), ('sell', 284), ('sell', 301), ('sell', 372), ('sell', 409), ('sell', 476), ('sell', 488), ('sell', 521), ('sell', 558), ('sell', 598), ('sell', 662), ('sell', 687)]
No trades


In [36]:
import copy
import numpy as np

class Genome:
    def __init__(self, fitness, genes):
        self.fitness = fitness
        self.genes = genes

class Optimization():
    def __init__(self, population_size, generations, ohlcv_data, gene, to_day):
        self.population_size = population_size
        self.generations = generations
        self.ohlcv_data = ohlcv_data
        self.gene = gene
        self.num_literals = gene.count_literals()
        self.to_day = to_day # How many days the data is trained on

    def train(self):
        no_improvement = 0 # Used to record generations of no fitness improvement
        mutation_rate = 0.05
        population = self.initialize_population(self.population_size, self.num_literals)
        population = self.evaluate_population(population)
        population.sort(key=lambda x: x.fitness, reverse=True) # Sort the population by fitness
        best_genome = population[0] # population[0] will have the highest fitness
        if(population[0].fitness <= 100): # The DNF expression is unable to return a positive yield
            #print("Top Fitness <= 100: Quitting")
            return population[0]
        for i in range(0, self.generations):
            # Do tournament selection for next generation

            population = self.tournament_selection(population, 0.05, self.population_size // 10, mutation_rate)

            # Evaluate the population
            population = self.evaluate_population(population)
            population.sort(key=lambda x: x.fitness, reverse=True) # Sort the population by fitness
            # Track if our fitness is improving, if not increment no_improvement
            no_improvement = no_improvement + 1 if i > 0 and population[0].fitness == best_genome.fitness else 0
            if no_improvement == 4:
                #print("No improvement for 4 generations: Quitting")
                return best_genome
            # Update the mutation rate. Increase the rate if fitness is not increasing.
            mutation_rate = mutation_rate * 1.6 if no_improvement >= 2 else 0.05
            # Finally, save the best genome in our population
            best_genome = population[0]
            #print(f"Generation {i} best genome fitness: {population[0].fitness}, average fitness: {np.mean([x.fitness for x in population])}")
            #print(f"G[{i}] F: ({population[0].fitness:.2f}, {np.mean([x.fitness for x in population]):.2f}), ", end="")
        #print("")
        return best_genome

    def initialize_population(self, population_size, num_literals):
        population = []
        for i in range(population_size):
            genes = np.random.uniform(0.95, 1.05, num_literals)
            population.append(Genome(0, genes))
        return population
    
    def evaluate_population(self, population):
        for genome in population:
            genome.fitness = self.fitness(genome)
        return population
    
    def fitness(self, weights):
        self.gene.update_weights(weights.genes)
        positions, fitness = trading_bot(self.gene, ohlcv_data, to_day=self.to_day)
        return fitness	
    
    def tournament_selection(self, population, elitism, tournament_size, mutation_rate):
        elite = population[:int(elitism * len(population))] # Select elite
        parents = [] # Initialize an empty list for the parents
        for _ in range(len(population) - len(elite)):
            tournament = np.random.choice(population, size=tournament_size) # Randomly select tournament_size genomes
            winner = copy.deepcopy(max(tournament, key=lambda x: x.fitness)) # Select the best genome
            # Apply mutation to winner.genes
            for i in range(winner.genes.size):
                mutation = np.random.uniform(-mutation_rate, mutation_rate)
                winner.genes[i] = max(0, winner.genes[i] + mutation)
            parents.append(winner) # Add the winner to the parents list
        parents.extend(elite) # Add the elite to the parents list
        return parents

In [154]:
## Hard Reset (removes all indicator data generated by our trading bot)
ohlcv_data = origin_ohlcv_data.copy()

In [46]:
# Performs tournament selection using reproduction and mutation on the logical expressions
class GeneticAlgorithm:
    def __init__(self, gene_list, to_day, iterations=10, data=ohlcv_data, 
                 function=0, method=[1, 2, 3], pattern=None):
        self.data = data
        self.to_day = to_day
        self.gene_list = gene_list
        self.crossover_list = []
        self.iterations=iterations
        self.function=function
        self.method=method
        self.pattern=pattern

    def train(self):
        print(f"Gene List Length: {len(self.gene_list)}")
        
        # Initialise the data collection variables
        data = []
        which_name = ["Mutate", "Crossover", "Optimization"]

        # Mutation or crossover is randomly chosen after each tournament selection for each surviving gene
        for i in range(0, self.iterations):
            if(self.function == 0):
                print(f"Iterations Remaining: {self.iterations-i}")
                self.reproduce_and_cull()
            if(len(self.gene_list) == 1):
                print(f"Genes Remaining: {len(self.gene_list)}")
                break
            if(self.function == 2):
                print(f"Iterations Remaining: {self.iterations-i}")
                self.gene_list.sort(key=lambda x: x[0], reverse=True)
            if(self.function == 1):
                print(f"Genes Remaining: {len(self.gene_list)}") if len(self.gene_list) % 5 == 0 or len(self.gene_list) < 3 else None
                self.tournament_selection()
            which = self.pattern[i % len(self.pattern)] if self.pattern else None
            


            for gene_tuple in self.gene_list:
                self.gene_list.remove(gene_tuple)
                which = which if self.pattern else random.choice(self.method)
                if which == 1: ## Gene Mutation
                    m_gene = self.mutate(copy.deepcopy(gene_tuple[1]))
                    gene_fitness = gene_tuple[0]
                    mutated_fitness = self.fitness(m_gene)
                    if mutated_fitness >= gene_fitness:
                        #print(f"Mutated Gene Wins: {mutated_fitness} > {gene_fitness}")
                        self.gene_list.append((mutated_fitness, m_gene))
                    else:
                        #print(f"Unmutated Gene Wins: {gene_fitness} > {mutated_fitness}")
                        self.gene_list.append((gene_fitness, gene_tuple[1]))
                elif which == 2: ## Gene Crossover
                    c_gene1, c_gene2 = self.crossover(gene_tuple)
                    if c_gene1 != False:
                        if c_gene1[0] >= self.crossover_list[0][0]:
                            self.gene_list.append(c_gene1)
                            self.crossover_list.pop(0)
                        else:
                            self.gene_list.append(self.crossover_list.pop(0))
                        if c_gene2[0] >= self.crossover_list[0][0]:
                            self.gene_list.append(c_gene2)
                            self.crossover_list.pop(0)
                        else:
                            self.gene_list.append(self.crossover_list.pop(0))
                elif which == 3: ## Weight Optimization
                    print(f"Before: {gene_tuple[1]}")
                    improved_gene = copy.deepcopy(gene_tuple[1])
                    optimization = Optimization(population_size=20, generations=3, ohlcv_data=self.data, gene=improved_gene, to_day=self.to_day)
                    best_weight_genome = optimization.train()                    
                    improved_gene.update_weights(best_weight_genome.genes)
                    print(f"After: {improved_gene}")
                    improved_fitness = self.fitness(improved_gene)
                    print(f"Old Fitness: {gene_tuple[0]}, New Fitness: {improved_fitness}") if improved_fitness > gene_tuple[0] else None
                    if improved_fitness > gene_tuple[0]: # If the fitness doesn't change we'll keep our original weights
                        self.gene_list.append((improved_fitness, improved_gene))
                    else:
                        self.gene_list.append((gene_tuple[0], gene_tuple[1]))

            if len(self.crossover_list) > 0:
                self.gene_list.append(self.crossover_list.pop(0))

            # Collect and save iteration data
            collect = [(i, which_name[which-1], fitness, str(gene).replace('\n', ',')) for fitness, gene in self.gene_list] #which - 1 because we indexed from 1 (skull_emoji)
            data.extend(collect)

        # Save the DataFrame to a CSV file
        filename_pattern = ''.join(map(str, self.pattern)) if not None else ""
        filename_length = len(self.gene_list)
        df = pd.DataFrame(data, columns=['i', 'Method', 'Fitness', 'Gene'])
        df.to_csv(f'{filename_pattern}_{filename_length}_out.csv', index=False)
        
        # The 5 top genes and their fitness
        self.gene_list = sorted(self.gene_list, key=lambda x: x[0], reverse=True)
        for i in range(0, len(self.gene_list[:5])):
            print(self.gene_list[i][0])
            print(f"Fitness: {self.gene_list[i][1]}")
        return self.gene_list

    # Pits each gene against one other gene, 
    # whichever has a better fitness is propogated to the next generation
    def tournament_selection(self):
        next_generation = []
        
        for _ in range(len(self.gene_list) // 2):
            
            # Pick out two random genes and remove them from the pool
            first_gene = random.choice(self.gene_list)
            self.gene_list.remove(first_gene)
            second_gene = random.choice(self.gene_list)
            self.gene_list.remove(second_gene)
            
            # Only occurs if the current gene population has an odd number of genes, 
            # if so then one of the surviving genes has to face the odd gene
            if len(self.gene_list) == 1:
                if first_gene[0] < second_gene[0]:
                    first_gene = second_gene
                    
                second_gene = random.choice(self.gene_list)
                    
            # Determine which gene moves on
            if first_gene[0] >= second_gene[0]:
                next_generation.append(first_gene)
            else:
                next_generation.append(second_gene)
        
        self.gene_list = next_generation
        return
    
    # Duplicates the top genes and prunes the bottom genes.
    def reproduce_and_cull(self):
        self.gene_list.sort(key=lambda x: x[0], reverse=True)
        # Determine the cut points for top, middle, and bottom genes
        n_genes = len(self.gene_list)
        top_cut = int(0.1 * n_genes) # Duplicate top 10% of genes
        bottom_cut = int((1 - 0.1) * n_genes) # Prune bottom 10% of genes
        # Remove the bottom genes
        self.gene_list = self.gene_list[:bottom_cut]
        # Duplicate the top genes
        top_genes = [(fitness, copy.deepcopy(gene)) for fitness, gene in self.gene_list[:top_cut]]
        self.gene_list.extend(top_genes)        
        return
        
    # Determines fitness of a gene
    def fitness(self, gene):
        positions, fitness = trading_bot(gene, self.data, to_day=self.to_day)
        return fitness
    
    # Nothing happens to the gene
    def reproduce(self, gene):
        self.gene_list.append(gene)
        return gene
    
    # Gene is mutated, check the mutation method in the Gene class
    def mutate(self, gene):
        gene.mutate()
        return gene
    
    # Two genes exchange a sub-expression
    def crossover(self, gene_tuple):
        
        # Checks if there's already a gene waiting to perform crossover
        if len(self.crossover_list) == 0:
            self.crossover_list.append(gene_tuple)            
            return False, False
        
        # Perform the crossover if there's another gene waiting
        else:
            #print('Crossover!\n', gene, '\n', self.crossover_list[0])
            self.crossover_list.append(gene_tuple)
            gene1 = copy.deepcopy(self.crossover_list[0][1])
            gene2 = copy.deepcopy(self.crossover_list[1][1])
            depth = random.randint(1, 3) # Determines how deep into the expression we exchange sub-expressions
            
            # Crossover is done on buy expression
            if random.random() < 0.5:
                
                # Fetching the sub-expressions to be exchanged and recording how to get back to their position
                node1, path1 = gene1.get_node(depth, 'buy')
                node2, path2 = gene2.get_node(depth, 'buy')
                
                #print(f"node1{node1}, path1{path1}")
                #print(f"gene1 before {gene1}")
                #print(f"node2{node2}, path2{path2}")
                #print(f"gene2 before {gene2}")

                # Exchanges the sub-expressions by following the path back to the node to be changed
                gene1.replace(node2, path1, 'buy')
                gene2.replace(node1, path2, 'buy')
                #print(f"gene1 after {gene1}")
                #print(f"gene2 after {gene2}")
            
            # Crossover is done on sell expression
            else:
                node1, path1 = gene1.get_node(depth, 'sell')
                node2, path2 = gene2.get_node(depth, 'sell')
                gene1.replace(node2, path1, 'sell')
                gene2.replace(node1, path2, 'sell')
            
            return (self.fitness(gene1), gene1), (self.fitness(gene2), gene2)

GeneticAlgorithm returns a list of genes with the tuple format (fitness, gene)

In [8]:
# Randomly Generate Genes outside of GeneticAlgorithm so we can compare them to the optimized genes
genome_size = 5000 # <---- How many genes are initially generated
gene_list = []
to_day=200

def funcfitness(gene, ohlcv_data, to_day):
    positions, fitness = trading_bot(gene, ohlcv_data, to_day=to_day)
    return fitness


for i in range(genome_size):
    new_gene = Gene()
    fitness = funcfitness(new_gene, ohlcv_data, to_day)
    if(fitness > 100):
        gene_list.append((fitness, new_gene))

original_gene_list = copy.deepcopy(gene_list) # Use to compare before

In [47]:
gene_list = copy.deepcopy(original_gene_list)

In [21]:
MUTATION = #copy.deepcopy(optimized_genes)

In [27]:
CROSSOVERGL # = copy.deepcopy(optimized_genes)

In [33]:
OPTIMIZED = #copy.deepcopy(optimized_genes)

In [37]:
print(([1,2] * 4 + [1,3]) * 5)

[1, 2, 1, 2, 1, 2, 1, 2, 1, 3, 1, 2, 1, 2, 1, 2, 1, 2, 1, 3, 1, 2, 1, 2, 1, 2, 1, 2, 1, 3, 1, 2, 1, 2, 1, 2, 1, 2, 1, 3, 1, 2, 1, 2, 1, 2, 1, 2, 1, 3]


In [49]:
## Run this one

# Simple sorting - no elitism bias - does not duplicate genes - probably the best for data analysis
method = [1, 2, 3] # 1 = Mutate, #2 = Crossover, #3 = Weight Optimization (WO)
pattern = ([1,2] * 4 + [1,3]) * 5
#pattern = [1,2] * 24 + [1,3] # Swap Between Mutate and Cross Over for 48 generations and then Mutate, Optimize.
optimized_genes = GeneticAlgorithm(gene_list, to_day, 50, ohlcv_data, 2, method, pattern=pattern).train()

Gene List Length: 2351
Iterations Remaining: 50
Iterations Remaining: 49
Iterations Remaining: 48
Iterations Remaining: 47
Iterations Remaining: 46
Iterations Remaining: 45
Iterations Remaining: 44
Iterations Remaining: 43
Iterations Remaining: 42
Iterations Remaining: 41
Before: BUY CONDITIONS: ((ema_31 > 1.04 * o) ∧ ¬(aroon_up_25 > 1.06 * aroon_down_25))
SELL CONDITIONS: ((ppo_line_30_12 > 1.03 * ppo_signal_30_12_7))
After: BUY CONDITIONS: ((ema_31 > 1.02 * o) ∧ ¬(aroon_up_25 > 1.01 * aroon_down_25))
SELL CONDITIONS: ((ppo_line_30_12 > 1.02 * ppo_signal_30_12_7))
Before: BUY CONDITIONS: ((kst_19_16_27_30_10_10_10_15 > 0.98 * kst_signal_19_16_27_30_10_10_10_15_9)) ∨ (¬(aroon_up_28 > 1.02 * aroon_down_28))
SELL CONDITIONS: ((stc_34_55_13_3_3 > 1.03 * constant_50))
After: BUY CONDITIONS: ((kst_19_16_27_30_10_10_10_15 > 1.04 * kst_signal_19_16_27_30_10_10_10_15_9)) ∨ (¬(aroon_up_28 > 0.99 * aroon_down_28))
SELL CONDITIONS: ((stc_34_55_13_3_3 > 0.90 * constant_50))
Old Fitness: 222.139111

In [None]:
# Reproduce and Cull - Best at manipulating best genes due to replication.
# Method determines (randomly) which alteration method is chosen for each gene
method = [1, 2, 3] # 1 = Mutate, #2 = Crossover, #3 = Weight Optimization (WO)
# You can bias Mutation by adding 1's i.e. [1, 1, 2, 3] 50% mutation 25% crossover 25% WO

# If a pattern is present, the code will perform the pattern number (1,2,3) on all genes for that iteration.
# This probably doesn't work well for tournament as tournament kicks genes out and ends abruptly.
pattern = [1,1,1,1,1,1] # Crossover, Crossover, Optimization, Crossover... etc.
# method does nothing if pattern is defined
optimized_genes = GeneticAlgorithm(gene_list, to_day, 3, ohlcv_data, 0, method, pattern=pattern).train()
print(len(optimized_genes))

In [None]:
# Tournament Selection - Fast
method = [1, 2, 3] # 1 = Mutate, #2 = Crossover, #3 = Weight Optimization (WO)
# You can bias Mutation by adding 1's i.e. [1, 1, 2, 3] 50% mutation 25% crossover 25% WO
optimized_genes = GeneticAlgorithm(gene_list, to_day, 3, ohlcv_data, 1, method).train()

In [50]:
"""
For each gene chosen by any above algorithm, we can iterate through them and see if they generalize for the rest of the dataset

"""
def doesItGeneralize(temp=optimized_genes):
    genes_that_generalize_tuple = []
    update_days_genes_tuple = []
    for fitness_200, gene in temp:
        positions, fitness_720 = trading_bot(gene, ohlcv_data, from_day=to_day, to_day=720)
        print(f"First {to_day} days: {fitness_200}, Days {to_day}-720: {fitness_720}")

        if fitness_720 > 100:
            genes_that_generalize_tuple.append((fitness_720, gene))
        update_days_genes_tuple.append((fitness_720, gene))
    return genes_that_generalize_tuple, update_days_genes_tuple

print(len(optimized_genes))
genes_that_generalize_tuple, update_days_genes_tuple = doesItGeneralize(optimized_genes)
print(len(genes_that_generalize_tuple))

2351
First 200 days: 293.4580043646804, Days 200-720: 57.62142320251012
First 200 days: 272.6316240126374, Days 200-720: 70.72643619526849
First 200 days: 267.6157863803278, Days 200-720: 70.27586332855962
First 200 days: 262.46633605177004, Days 200-720: 62.83113475364027
First 200 days: 262.0769721756708, Days 200-720: 87.82169389782511
First 200 days: 259.65212393892915, Days 200-720: 112.50207313614705
First 200 days: 257.2688617026681, Days 200-720: 115.426377736185
First 200 days: 256.8469617243953, Days 200-720: 71.76961309104257
First 200 days: 255.85839478768952, Days 200-720: 104.27628818899494
First 200 days: 254.30372237370776, Days 200-720: 41.4997103851774
First 200 days: 251.70778808413874, Days 200-720: 49.19271264835603
First 200 days: 250.3437572821332, Days 200-720: 66.22485530125263
First 200 days: 250.23232172184, Days 200-720: 69.48468132369972
First 200 days: 250.23232172184, Days 200-720: 69.48468132369972
First 200 days: 250.23232172184, Days 200-720: 69.484681

In [52]:
unoptimized_average = sum(fitness for fitness, _ in original_gene_list) / len(original_gene_list)
optimized_average = sum(fitness for fitness, _ in optimized_genes) / len(optimized_genes)
generalized_average = sum(fitness for fitness, _ in update_days_genes_tuple) / len(update_days_genes_tuple)
successful_generalized_average = sum(fitness for fitness, _ in genes_that_generalize_tuple) / len(genes_that_generalize_tuple)


print(f"unoptimized_average: {unoptimized_average}")
print(f"optimized_average: {optimized_average}")
print(f"generalized_average: {generalized_average}")
print(f"successful_generalized_average: {successful_generalized_average}")
genes_generalized_sorted = sorted(genes_that_generalize_tuple, key=lambda x: x[0], reverse=True)
print(genes_generalized_sorted[0][0])
print(genes_generalized_sorted[0][1])

unoptimized_average: 132.78246417257594
optimized_average: 184.2716970693421
generalized_average: 68.4146201013693
successful_generalized_average: 118.05364404446388
175.67091054274448
BUY CONDITIONS: ((ppo_line_28_12 > 1.01 * ppo_signal_28_12_11))
SELL CONDITIONS: (¬(macd_12_26_9 > 0.97 * macd_signal_12_26_9))


In [32]:
"""
Successfull indicator counter:

For each gene in the list `temp` we count how many of each indicator is present.

Worth testing with `genes_that_generalize_tuple` if you can get it large enough.

"""
def indicatorCounter(temp=optimized_genes): ## TEMP IS A (fitness, gene) TUPLE.
    lib = {'buy': [], 'sell': []}
    for fitness, gene in temp:
        for indicator in gene.buy_conjugates:
            for literal in indicator.literals:
                lib['buy'].append(literal.value.indicator_name)
        for indicator in gene.sell_conjugates:
            for literal in indicator.literals:
                lib['sell'].append(literal.value.indicator_name)

    from collections import Counter

    buy_counts = Counter(lib['buy'])
    sell_counts = Counter(lib['sell'])

    print("Buy counts:", buy_counts)
    print("Sell counts:", sell_counts)

indicatorCounter(original_gene_list) # Original Genes
indicatorCounter(optimized_genes) # Optimized Genes
indicatorCounter(genes_that_generalize_tuple) # Genes that generalize from Optimized

Buy counts: Counter({'KST': 247, 'Aroon_Up': 229, 'Vortex_Positive': 222, 'PSAR': 214, 'KAMA': 210, 'Stochastic_Oscillator_K': 207, 'Stochastic_Oscillator_D': 199, 'EMA': 197, 'WMA': 197, 'SMA': 196, 'CCI': 187, 'PPO_Line': 183, 'MACD': 182, 'STC': 161, 'RSI': 134, 'ADX': 133, 'Mass_Index': 124, 'ATR': 63, 'MFI': 60})
Sell counts: Counter({'ADX': 209, 'KST': 198, 'PPO_Line': 194, 'MACD': 193, 'RSI': 188, 'Stochastic_Oscillator_D': 187, 'Aroon_Up': 184, 'Mass_Index': 180, 'STC': 179, 'ATR': 178, 'Vortex_Positive': 177, 'SMA': 173, 'PSAR': 171, 'KAMA': 171, 'EMA': 170, 'Stochastic_Oscillator_K': 161, 'MFI': 153, 'WMA': 150, 'CCI': 143})
Buy counts: Counter({'KST': 247, 'Aroon_Up': 229, 'Vortex_Positive': 222, 'PSAR': 214, 'KAMA': 210, 'Stochastic_Oscillator_K': 207, 'Stochastic_Oscillator_D': 199, 'EMA': 197, 'WMA': 197, 'SMA': 196, 'CCI': 187, 'PPO_Line': 183, 'MACD': 182, 'STC': 161, 'RSI': 134, 'ADX': 133, 'Mass_Index': 124, 'ATR': 63, 'MFI': 60})
Sell counts: Counter({'ADX': 209, 'KS

In [None]:
"""
Part of Test Strategy 1: Generate random genes for the first 200 days and optimize them.

This code generates 200 genes, for each gene 50 different weight variations are generated and the best one is chosen.

The Optimization class is only being used to generate random weights, thus generations is = 0.

"""

geneList = []
goodGeneList = []
best_fitness = 0
best_gene = None
for x in range(0, 100):
    newGene = Gene()    
    #print(newGene)
    optimization = Optimization(population_size=50, generations=0, ohlcv_data=ohlcv_data, gene=newGene, to_day=200)
    best_genome = optimization.train()
    newGene.update_weights(best_genome.genes)
    positions, fitness = trading_bot(newGene, ohlcv_data, to_day=200)
    if(fitness > 100): # Validation
        geneList.append(newGene)   
    positions, fitness = trading_bot(newGene, ohlcv_data, from_day=200, to_day=720)
    if(fitness > 100): # Validation
        goodGeneList.append(newGene)
        print(f"Genes Generated: {x}: Genes >$100 for first 200 days: {len(geneList)}, Genes >$100 for days 200 to 720: {len(goodGeneList)}")
        #print(f"Final DNF Expression: ")
        #print(optimization.gene)
        #print("Best genome:", best_genome.genes)
        #print("Best genome fitness:", best_genome.fitness)
        if(best_genome.fitness > best_fitness):
            best_fitness = best_genome.fitness
            best_gene = newGene


In [None]:
# Take the genes that passed validation and optimize them more
# This probably makes the genes overfit for the first 200 days
# Just a thought.

toBeOptimizedFurtherGenes = copy.deepcopy(goodGeneList)

for x in toBeOptimizedFurtherGenes:
    optimization = Optimization(population_size=200, generations=20, ohlcv_data=ohlcv_data, gene=x, to_day=200)
    best_genome = optimization.train()
    x.update_weights(best_genome.genes)
    positions, fitness = trading_bot(x, ohlcv_data, from_day=200, to_day=719)
    print(f"Final DNF Expression: ")
    print(optimization.gene)
    print("Best genome:", best_genome.genes)
    print("Best genome fitness:", best_genome.fitness)
       

In [None]:
goodgenecount=0
optimizedcount=0
equal=0
for i, _ in enumerate(goodGeneList):
    positions, good_fitness = trading_bot(goodGeneList[i], ohlcv_data, from_day=200, to_day=719)
    positions, optimized_fitness = trading_bot(toBeOptimizedFurtherGenes[i], ohlcv_data, from_day=200, to_day=719)

    if good_fitness > optimized_fitness:
        print(f"goodGeneList[{i}] has the better performing gene with a fiat_money value of {good_fitness} > {optimized_fitness}")
        goodgenecount +=1
    elif good_fitness == optimized_fitness:
        print(f"Equal: {good_fitness} == {good_fitness}")
        equal +=1
    else:
        print(f"toBeOptimizedFurtherGenes[{i}] has the better performing gene with a fiat_money value of {optimized_fitness} > {good_fitness}")
        optimizedcount +=1
        
print(f"goodGeneList has {goodgenecount} better performing genes.")
print(f"The number of equal genes: {equal}")
print(f"toBeOptimizedFurtherGenes has {optimizedcount} better performing genes.")


In [None]:
## Does further optimization work? spoiler:no
for i, x in enumerate(goodGeneList):
    positions, fitness = trading_bot(x, ohlcv_data, from_day=0, to_day=719)
    print(f"{i}: {fitness}")
    print(x)
print()
print(f"Further Optimized:")
for i, x in enumerate(toBeOptimizedFurtherGenes):
    positions, fitness = trading_bot(x, ohlcv_data, from_day=0, to_day=719)
    print(f"{i}: {fitness}")
    print(x)


In [None]:
def numberOfEvents(positions):
    while len(positions) > 0 and positions[0][0] == 'sell':
        positions.pop(0)
    if len(positions) > 0:
        trimmed = [positions[0]]
        for i in range(1, len(positions)):
            if positions[i][0] != positions[i-1][0]:
                trimmed.append(positions[i])
    else:
        return positions
    return trimmed

for gene in toBeOptimizedFurtherGenes:
    positions, fitness = trading_bot(gene, ohlcv_data, from_day=200, to_day=719)
    #print(positions)
    positions = numberOfEvents(positions)



    print(fitness)
    import matplotlib.pyplot as plt

    # Extract the timestamps, close prices, and buy/sell signals
    timestamps = ohlcv_data.index
    close_prices = ohlcv_data['c']
    buy_signals = [t for action, t in positions if action == 'buy']
    sell_signals = [t for action, t in positions if action == 'sell']

    # Plot the price data
    plt.plot(timestamps, close_prices, label='Close Price', color='blue', alpha=0.7)

    # Plot the buy signals
    plt.scatter([timestamps[t] for t in buy_signals],
                [close_prices[t] for t in buy_signals],
                color='g', s=25, marker="^", label='Buy Signal')

    # Plot the sell signals
    plt.scatter([timestamps[t] for t in sell_signals],
                [close_prices[t] for t in sell_signals],
                color='r', s=25, marker="v", label='Sell Signal')

    # Customize the plot appearance

    plt.xlabel('Time')
    plt.ylabel('Close Price')
    plt.title(f"Fitness: {fiat_money}")
    plt.legend()
    plt.grid()

    # Show the plot
    plt.show()