## Genetic Programming for trading strategies
In this work we are going to find a novel and unique profitable trading strategy using a genetic programming algorithm.

## Imports and Setup

In [79]:
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from deap import base, creator, tools, algorithms
import operator
import requests
import time
import pandas_ta as ta
from deap import gp
import plot_utils as plot_utils
import math

def get_historical_klines(symbol="BTCUSDT", interval="1h", start_time=None, end_time=None):
    base_url = "https://api.binance.com"
    endpoint = "/api/v3/klines"
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": 1500
    }
    if start_time:
        params["startTime"] = int(pd.to_datetime(start_time).timestamp() * 1000)
    if end_time:
        params["endTime"] = int(pd.to_datetime(end_time).timestamp() * 1000)

    all_klines = []
    while True:
        response = requests.get(base_url + endpoint, params=params)
        if response.status_code != 200:
            print(f"Error {response.status_code}: {response.text}")
            break
        data = response.json()
        if not data:
            break
        all_klines.extend(data)
        params["startTime"] = data[-1][0] + 1
        time.sleep(0.5)
        if end_time and params["startTime"] >= params["endTime"]:
            break

    columns = ["timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", 
               "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"]
    df = pd.DataFrame(all_klines, columns=columns)
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    numeric_cols = ["open", "high", "low", "close", "volume"]
    df[numeric_cols] = df[numeric_cols].astype(float)
    df = df[["timestamp"] + numeric_cols]

    df['RSI'] = ta.rsi(df['close'], length=14)

    macd_train = ta.macd(df['close'], fast=12, slow=26, signal=9)
    df['MACD'] = macd_train['MACD_12_26_9']
    df['MACD_signal'] = macd_train['MACDs_12_26_9']
    df['MACD_hist'] = macd_train['MACDh_12_26_9']

    df['EMA_7'] = ta.ema(df['close'], length=7)
    df['EMA_20'] = ta.ema(df['close'], length=20)
    df['EMA_50'] = ta.ema(df['close'], length=50)

    bbands_train = ta.bbands(df['close'], length=20, std=2)
    df['BB_upper'] = bbands_train['BBU_20_2.0']
    df['BB_lower'] = bbands_train['BBL_20_2.0']
    df['BB_middle'] = bbands_train['BBM_20_2.0']

    df['ATR'] = ta.atr(df['high'], df['low'], df['close'], length=14)

    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)
    
    return df

symbol = "BTCUSDT"
interval = "1h"

df_train_1 = get_historical_klines(symbol, interval, start_time="2023-12-01", end_time="2024-06-01") #up +77%
df_train_2 = get_historical_klines(symbol, interval, start_time="2021-11-01", end_time="2022-06-01") # down -51%
df_train_3 = get_historical_klines(symbol, interval, start_time="2022-07-01", end_time="2023-02-01") # lateral -3.77% con andamento laterale con picchi di -/+ 14%
df_train_4 = get_historical_klines(symbol, interval, start_time="2021-01-01", end_time="2021-06-01") # general case

GP_POP_SIZE = 100               # population size for GP
GP_NGEN = 20                 # number of generations for GP
GP_CXPB, GP_MUTPB = 0.7, 0.05    # crossover and mutation probability for GP
GP_TRNMT_SIZE = 3               # tournament size for GP
GP_HOF_SIZE = 3                 # size of the Hall-of-Fame for GP
seed = 0


## Genetic Programming Setup

In [None]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

pset = gp.PrimitiveSetTyped("MAIN", [dict], float) 

pset.addPrimitive(operator.add, [float, float], float)  
pset.addPrimitive(operator.sub, [float, float], float)  
pset.addPrimitive(operator.mul, [float, float], float) 
pset.addPrimitive(operator.gt, [float, float], bool)  
pset.addPrimitive(operator.lt, [float, float], bool)  
pset.addPrimitive(operator.and_, [bool, bool], bool) 
pset.addPrimitive(operator.or_, [bool, bool], bool)  
pset.addPrimitive(operator.inv, [bool], bool) 
def protected_div(x, y):
    return x / y if y != 0 else 1

pset.addPrimitive(protected_div, [float, float], float) 
def if_then_else(condition, out1, out2):
    return out1 if condition else out2

pset.addPrimitive(lambda cond, true_value, false_value: true_value if cond else false_value, [float, float, float], float, name="condition")
pset.addPrimitive(lambda x: max(x, 0), [float], float, name="protected_max")  
pset.addPrimitive(lambda x: min(x, 0), [float], float, name="protected_min")  
pset.addPrimitive(lambda x: np.mean([x, 0]), [float], float, name="protected_mean") 
pset.addPrimitive(lambda x: np.log(x) if x > 0 else 0, [float], float, name="protected_log")

for column in ['close', 'open', 'high', 'low', 'volume', 'RSI', 'MACD', 'EMA_7', 'EMA_20', 'EMA_50', 'BB_upper', 'BB_lower', 'ATR']:
    def make_terminal(col):
        return lambda i: df_train_1[col][i]
    pset.addTerminal(make_terminal(column), float, name=column)

pset.addTerminal(0.2, float)      
pset.addTerminal(0.3, float)     
pset.addTerminal(0.7, float)     
pset.addTerminal(5, float)       
pset.addTerminal(10, float)       
pset.addTerminal(20, float)      
pset.addTerminal(50, float)  
pset.addTerminal(100, float)
pset.addTerminal(150, float)
pset.addEphemeralConstant("rand_float", lambda: random.uniform(-1, 1), float)

def create_individual():
    return creator.Individual(gp.genHalfAndHalf(pset, min_=1, max_=2))

expr = gp.genFull(pset=pset, min_=1, max_=2)
toolbox = base.Toolbox()
toolbox.register("individual", tools.initIterate, creator.Individual, create_individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", gp.cxOnePoint) 
toolbox.register("mutate", gp.mutUniform, expr=lambda pset, type_: gp.genFull(pset=pset, min_=0, max_=2, type_=type_), pset=pset)
toolbox.register("select", tools.selTournament, tournsize=GP_TRNMT_SIZE) 
toolbox.register("compile", gp.compile, pset=pset)
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=20))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=20))



## Fitness Function

In [81]:

def penalize_negative_profit(total_profit_pct, penalize_factor=0.2):
    if total_profit_pct > 0:
        return 1
    else:
        penalty = math.exp(abs(total_profit_pct) * penalize_factor)
        return 1 + penalize_factor + min(penalty, 1)
    
def penalize_trade_pct(pos_trades_pct, penalize_factor=0.2,threshold=0.5):
    if pos_trades_pct > threshold:
        return 1
    else:
        return 1 - penalize_factor - (threshold-pos_trades_pct)
def penalize_trade_pct_neg(pos_trades_pct,penalize_factor=0.2,threshold=0.5):
    if pos_trades_pct > threshold:
        return 1
    else:
        return 1+penalize_factor+(threshold-pos_trades_pct)

    
def bonus_nt(num_trades,bonus=0.2):
    if num_trades > 1:
        return 1+bonus 
    else:
        return 1

def fitness_function(individual):
    df_train_list = [df_train_1, df_train_2, df_train_3, df_train_4]
    fitness_list=[]

    for i, df in enumerate(df_train_list):
        df_train=df
        initial_balance = 1000
        balance = initial_balance
        position = 0
        trade_log = []
        profit_log = []
        positive_trades=0

        for i in range(1, len(df_train)):

            price_data = {
                'close': df_train['close'][i],
                'open': df_train['open'][i],
                'high': df_train['high'][i],
                'low': df_train['low'][i],
                'volume': df_train['volume'][i],
                'RSI': df_train['RSI'][i],
                'MACD': df_train['MACD'][i],
                'EMA_7': df_train['EMA_7'][i],
                'EMA_20': df_train['EMA_20'][i],
                'EMA_50': df_train['EMA_50'][i],
                'BB_upper': df_train['BB_upper'][i],
                'BB_lower': df_train['BB_lower'][i],
                'ATR': df_train['ATR'][i]
            }

            strategy_output = evaluate_gp_tree(individual, price_data, pset)
            decision = interpret_strategy_output(strategy_output)

            if decision == "Buy" and position == 0:
                buy_price = df_train['close'][i]
                trade_log.append((i, "Buy", buy_price))
                position = 1

            elif decision == "Sell" and position == 0:
                sell_price = df_train['close'][i]
                trade_log.append((i, "Sell", sell_price))
                position = -1

            elif decision == "Sell" and position == 1:
                sell_price = df_train['close'][i]
                trade_log.append((i, "Sell", sell_price))
                profit_pct = ((sell_price - buy_price) / buy_price) * 100
                profit_log.append(profit_pct)
                balance += (profit_pct / 100) * balance
                position = 0
                if profit_pct>0: 
                    positive_trades+=1

            elif decision == "Buy" and position == -1:
                buy_price = df_train['close'][i]
                trade_log.append((i, "Buy", buy_price))
                profit_pct = ((sell_price - buy_price) / buy_price) * 100
                profit_log.append(profit_pct)
                balance += (profit_pct / 100) * balance
                position = 0
                if profit_pct>0: 
                    positive_trades+=1

        if position != 0:
            close_price = df_train['close'].iloc[-1]
            if position == 1: 
                profit_pct = ((close_price - buy_price) / buy_price) * 100
                profit_log.append(profit_pct)
                balance += (profit_pct / 100) * balance
                if profit_pct>0: 
                    positive_trades+=1
            elif position == -1:
                profit_pct = ((sell_price - close_price) / sell_price) * 100
                profit_log.append(profit_pct)
                balance += (profit_pct / 100) * balance
                if profit_pct>0: 
                    positive_trades+=1

        final_balance = balance
        net_pnl = final_balance - initial_balance
        total_profit_pct = ((final_balance - initial_balance) / initial_balance) * 100

        num_trades = len(trade_log)
        if num_trades:
            pos_trades_pct=(positive_trades)/num_trades
        else:
            pos_trades_pct=0

        fitness= math.exp(total_profit_pct/10)*num_trades*pos_trades_pct
        #fitness = (0.5*penalize_negative_profit(total_profit_pct)) + (0.1 * sharpe_ratio) - (0.1 * max_drawdown) - (0.1 * volatility) + (0.1* num_trades) + (0.1*penalize_trade_pct(pos_trades_pct))
        #fitness = normalize_total_prift(penalize_negative_profit(total_profit_pct)) + penalize_trade_pct(pos_trades_pct)
        #fitness = (0.8*total_profit_pct/10*penalize_negative_profit(total_profit_pct))+(0.2*pos_trades_pct*penalize_trade_pct(pos_trades_pct))
        #fitness=total_profit_pct*penalize_negative_profit(total_profit_pct)*penalize_trade_pct(pos_trades_pct)*bonus_nt(num_trades)

        """if(total_profit_pct>0):
            fitness=total_profit_pct*penalize_trade_pct(pos_trades_pct)*bonus_nt(num_trades)
        else:
            fitness=total_profit_pct*penalize_negative_profit(total_profit_pct)*penalize_trade_pct_neg(pos_trades_pct,0.2)*bonus_nt(num_trades,0.2)"""

        fitness_list.append(fitness)

    fitness=sum(fitness_list)
    
    return fitness,

def evaluate_gp_tree(individual, price_data, pset):

    transformed_individual = toolbox.clone(individual)
    #print(individual)
    #print(transformed_individual)

    def replace_terminal_values(node, price_data):

        if isinstance(node, gp.Terminal):
            terminal_name = node.name
            if terminal_name in price_data:
                node.value = price_data[terminal_name]
        return node

    def apply_terminal_replacement(individual):
        for node in individual:
            replace_terminal_values(node, price_data)
        return individual

    individual = apply_terminal_replacement(transformed_individual)
    compiled_individual = toolbox.compile(expr=transformed_individual)
    fitness_value = compiled_individual(transformed_individual)
    #print(f"fitness value: {fitness_value}")
    return fitness_value

def interpret_strategy_output(output):
    if output > 0.5:
        return "Buy"
    elif output < -0.5:
        return "Sell"
    else:
        return "Hold"
    

## Backtesting

In [144]:


def backtesting(individual, df_test):
    initial_balance = 1000
    balance = initial_balance
    position = 0
    trade_log = []
    profit_log = [0]
    times = []

    initial_market_price = df_test['close'][0]
    market_returns = ((df_test['close'] - initial_market_price) / initial_market_price) * 100

    for i in range(1, len(df_test)):
        price_data = {
            'close': df_test['close'][i],
            'open': df_test['open'][i],
            'high': df_test['high'][i],
            'low': df_test['low'][i],
            'volume': df_test['volume'][i],
            'RSI': df_test['RSI'][i],
            'MACD': df_test['MACD'][i],
            'EMA_7': df_test['EMA_7'][i],
            'EMA_20': df_test['EMA_20'][i],
            'EMA_50': df_test['EMA_50'][i],
            'BB_upper': df_test['BB_upper'][i],
            'BB_lower': df_test['BB_lower'][i],
            'ATR': df_test['ATR'][i]
        }

        strategy_output = evaluate_gp_tree(individual, price_data, pset)
        decision = interpret_strategy_output(strategy_output)

        if decision == "Buy" and position == 0:
            buy_price = df_test['close'][i]
            trade_log.append((i, "Buy", buy_price, None)) 
            position = 1
            profit_log.append(((balance - initial_balance) / initial_balance) * 100)

        elif decision == "Sell" and position == 0:
            sell_price = df_test['close'][i]
            trade_log.append((i, "Sell", sell_price, None))
            position = -1
            profit_log.append(((balance - initial_balance) / initial_balance) * 100)

        elif decision == "Sell" and position == 1:
            sell_price = df_test['close'][i]
            profit_pct = ((sell_price - buy_price) / buy_price) * 100
            trade_log.append((i, "Sell", sell_price, profit_pct))
            balance += (profit_pct / 100) * balance
            profit_log.append(((balance - initial_balance) / initial_balance) * 100)
            position = 0
            times.append(i)

        elif decision == "Buy" and position == -1:
            buy_price = df_test['close'][i]
            profit_pct = ((sell_price - buy_price) / buy_price) * 100
            trade_log.append((i, "Buy", buy_price, profit_pct))
            balance += (profit_pct / 100) * balance
            profit_log.append(((balance - initial_balance) / initial_balance) * 100)
            position = 0
            times.append(i)
        else:
            profit_log.append(((balance - initial_balance) / initial_balance) * 100) 

    if position != 0:
        close_price = df_test['close'].iloc[-1]
        if position == 1:
            profit_pct = ((close_price - buy_price) / buy_price) * 100
            trade_log.append((len(df_test)-1, "Sell", close_price, profit_pct))
            balance += (profit_pct / 100) * balance
            profit_log.append(((balance - initial_balance) / initial_balance) * 100)
        elif position == -1:
            profit_pct = ((sell_price - close_price) / close_price) * 100
            trade_log.append((len(df_test)-1, "Buy", close_price, profit_pct))
            balance += (profit_pct / 100) * balance
            profit_log.append(((balance - initial_balance) / initial_balance) * 100)

    # Plot
    fig, ax1 = plt.subplots(figsize=(12, 6))
    ax1.plot(market_returns, label="Market Returns (%)", color="blue", alpha=0.7)
    ax1.plot(profit_log, label="Strategy Profit (%)", color="purple", alpha=0.8)

    ax1.set_xlabel("Time")
    ax1.set_ylabel("Percentage Returns (%)")

    ax1.set_title("Market Return vs Strategy Profit")
    ax1.legend(loc='upper left')

    plt.grid(True)
    plt.show()


    final_balance = balance
    net_pnl = final_balance - initial_balance
    total_profit_pct = ((final_balance - initial_balance) / initial_balance) * 100

    return total_profit_pct, len(trade_log)



def evaluate_gp_tree(individual, price_data, pset):

    transformed_individual = toolbox.clone(individual)
    #print(individual)
    #print(transformed_individual)

    def replace_terminal_values(node, price_data):
        if isinstance(node, gp.Terminal):
            terminal_name = node.name
            if terminal_name in price_data:
                node.value = price_data[terminal_name]
        return node

    def apply_terminal_replacement(individual):
        for node in individual:
            replace_terminal_values(node, price_data)
        return individual

    individual = apply_terminal_replacement(transformed_individual)
    compiled_individual = toolbox.compile(expr=transformed_individual)
    fitness_value = compiled_individual(transformed_individual) 

    return fitness_value




def interpret_strategy_output(output):

    if output > 0.5:
        return "Buy"
    elif output < -0.5:
        return "Sell"
    else:
        return "Hold"

## Evolutionary Code

In [None]:

def print_tree_with_names(individual):
    node_names = []
    def recursive_collect(node):
        if isinstance(node, gp.Terminal):
            node_names.append(f"Terminal: {node.name}")
        elif isinstance(node, gp.Primitive):
            node_names.append(f"Function: {node.name}")
        if isinstance(node, gp.Primitive) and node.arity > 0: 
            for child in node.args:
                recursive_collect(child)
    for tree in individual:
        recursive_collect(tree)
    print(" -> ".join(node_names))


def main():

    df_test = get_historical_klines(symbol, interval, start_time="2017-01-01", end_time="2017-12-31")

    population = toolbox.population(n=GP_POP_SIZE)
    hof = tools.HallOfFame(GP_HOF_SIZE)

    stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
    stats_size = tools.Statistics(len)
    mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
    mstats.register("avg", np.mean)
    mstats.register("std", np.std)
    mstats.register("min", np.min)
    mstats.register("max", np.max)
    
    toolbox.register("evaluate", fitness_function)

    final_pop,logbook=algorithms.eaSimple(population, toolbox, GP_CXPB, GP_MUTPB, GP_NGEN, stats=mstats, halloffame=hof, verbose=True)

    def calculate_price_difference(df):
        first_price = df['close'].iloc[0]
        last_price = df['close'].iloc[-1]
        absolute_diff = last_price - first_price
        percentage_diff = (absolute_diff / first_price) * 100
        
        return percentage_diff
        

    best_individual = tools.selBest(population, 1)[0]
    print(f"Best strategy: {best_individual.fitness.values[0]}", best_individual)
    print_tree_with_names(best_individual)

    print(f"Profit of best individual on test data 1: {backtesting(best_individual, df_test)}, market: {calculate_price_difference(df_test), len(df_test)}")

    def plot():
        gen = logbook.select("gen")
        fit_mins = logbook.chapters["fitness"].select("min")
        fit_maxs = logbook.chapters["fitness"].select("max")
        fit_avgs = logbook.chapters["fitness"].select("avg")
        size_avgs = logbook.chapters["size"].select("avg")

        fig, ax1 = plt.subplots(figsize=(10, 6))
        
        line1 = ax1.plot(gen, fit_mins, "b-", label="Worst Fitness (Min)")
        line2 = ax1.plot(gen, fit_maxs, "r-", label="Best Fitness (Max)") 
        line3 = ax1.plot(gen, fit_avgs, "g-", label="Average Fitness")
        ax1.set_xlabel("Generation")
        ax1.set_ylabel("Fitness", color="k")
        ax1.tick_params(axis="y", labelcolor="k")
        
        ax2 = ax1.twinx()
        line4 = ax2.plot(gen, size_avgs, "purple", label="Average Size") 
        ax2.set_ylabel("Size", color="purple")
        ax2.tick_params(axis="y", labelcolor="purple")
        
        lns = line1 + line2 + line3 + line4
        labs = [l.get_label() for l in lns]
        ax1.legend(lns, labs, loc="upper center", bbox_to_anchor=(0.5, 1.1), ncol=4)

        ax1.grid()
        plt.title("GP: Fitness and Tree Size Trends Across Generations")
        plt.tight_layout()
        plt.show()

        nodes, edges, labels = gp.graph(best_individual)
        for i in nodes:
                labels[i] = best_individual[i]
        plot_utils.plotTree(nodes,edges,labels,"parity",'results')


    plot()




if __name__ == "__main__":
    main()


## Backtesting environment

In [None]:
symbol = "BTCUSDT"
interval = "1h"
start_time = "2021-11-01"
end_time = "2024-12-01"
if(1):
        df_test_1 = get_historical_klines(symbol, interval, start_time="2017-01-01", end_time="2018-12-31")
        def calculate_price_difference(df):
                first_price = df['close'].iloc[0]
                last_price = df['close'].iloc[-1]
                absolute_diff = last_price - first_price
                percentage_diff = (absolute_diff / first_price) * 100        
                return percentage_diff

        individual = gp.PrimitiveTree.from_string("add(protected_div(EMA_50, MACD), protected_div(BB_upper,sub(EMA_50,BB_lower)))", pset)

        print(f"Profit of best individual on test data 1: {backtesting(individual, df_test_1)}, market: {calculate_price_difference(df_test_1), len(df_test_1)}")

        for i in range(len(individual)):
                print(individual[i].name)

        nodes, edges, labels = gp.graph(individual)
        for i in nodes:
                labels[i] = individual[i]
                
        plot_utils.plotTree(nodes,edges,labels,"tree","results")

