In [39]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
import datetime as dt
import mplfinance 
import fx

from collections import deque
from renkodf import Renko
from scipy.signal import lfilter
from deap import base, creator, tools
from scipy.stats import zscore
from tqdm.notebook import tqdm

In [40]:
# collect the actual data from csv file
filename = "C:/Users/WilliamFetzner/Documents/Trading/EURUSD1.csv"
tickstory_filename = "C:/Users/WilliamFetzner/Documents/Trading/3mo_EURUSD.csv"
df_tick = pd.read_csv(tickstory_filename)
df_tick['datetime'] = pd.to_datetime(df_tick['Timestamp'], format='%Y%m%d %H:%M:%S:%f')
# rename bid price to close
df_tick.rename(columns={'Bid price':'close'}, inplace=True)
df_tick.set_index('datetime', inplace=True)
# adjust the datetime 7 hrs ahead to match market time
df_tick.index = df_tick.index + pd.Timedelta(hours=7)
df_tick_ready = df_tick[['close']]
df_tick_ready_2024 = df_tick_ready['2024-01-01':'2024-02-13']
# df_full = pd.read_csv(filename, header=None, names=['date', 'time', 'open', 'high', 'low', 'close', 'volume'])
df_tick_ready_2024.head(10)

Unnamed: 0_level_0,close
datetime,Unnamed: 1_level_1
2024-01-02 00:00:12.108,1.10427
2024-01-02 00:00:14.513,1.10425
2024-01-02 00:00:18.925,1.10425
2024-01-02 00:00:25.498,1.10425
2024-01-02 00:00:36.129,1.10429
2024-01-02 00:00:44.956,1.10425
2024-01-02 00:00:54.534,1.10429
2024-01-02 00:01:04.364,1.10429
2024-01-02 00:01:08.324,1.10429
2024-01-02 00:01:08.626,1.10429


In [41]:
# convert date to datetime
# df_full['datetime'] = pd.to_datetime(df_full['date'] + ' ' + df_full['time'], format='%Y.%m.%d %H:%M')
# filter the data to just 2023

# df_2022 = fx.prep_data(df_full, 2022)
# df_2023 = fx.prep_data(df_full, 2023)
# df_2024 = fx.prep_data(df_full, 2024)

# Parameters

In [42]:
# Renko variable
initial_brick_size = 0.0001
# create a list of possible brick sizes a max at 0.001 and min at 0.00001 and each step is 0.00001
brick_size_list = np.arange(0.00007, 0.00101, 0.00001)
# get one random brick size
# brick_size = np.random.choice(brick_size_list)

# psar variables
start = 0.02
increment = 0.02
increment_list = np.arange(0.001, 0.2001, 0.0001)
maximum = 0.2
max_list = np.arange(0.01, 0.501, 0.001)

# impulse variables
lengthMA = 34
ma_list = np.arange(10, 100, 1)
lengthSignal = 9
signal_list = np.arange(2, 51, 1)


In [43]:
# Lot Size
initial_lot_size = 5
lot_sizes_list = np.arange(1, 5, 1)
per_lot = 100000

# Commissions
nova_commission = -3
msolutions_commission = -5

# starting balance
balance = 200000

# base currency rate
base_currency_rate = 0.045 # EUR
quote_currency_rate = 0.055 # USD

# Optimization

In [44]:
def renko_ready(df, brick_size):
    # create a renko chart from the df dataframe
    r_full = Renko(df, brick_size=brick_size)
    # create a new dataframe from the renko features
    renko_data = r_full.renko_df()
    return renko_data


In [45]:
def adding_cols(df, inc, max, ma, signal, stop_loss):
    # adding psar to the dataframe
    r_w_psar = fx.psar_from_data(df, inc, max)
    # adding impulse to the dataframe
    r_w_impulse = fx.calc_impulse_macd(r_w_psar, ma, int(signal))
    # add the brick color to the dataframe
    r_w_impulse['brick_color'] = np.where(r_w_impulse['open'] > r_w_impulse['close'], 'red', 'green')
    # add the day of the week to the dataframe
    r_w_impulse['day_of_week'] = r_w_impulse.index.day_name()
    # place a 1 in day_of_week_transition, if it is the last bar on Friday and the next bar is Sunday
    r_w_impulse['day_of_week_transition'] = np.where((r_w_impulse['day_of_week'] == 'Friday') & 
                                                     ((r_w_impulse['day_of_week'].shift(-1) == 'Sunday') | (r_w_impulse['day_of_week'].shift(-1) == 'Monday')), 1, 0)

    #### entry conditions ####
    # add a column that will be the entry signal for the strategy to be when both impulse_signal and psar_signal are both 'buy' or 'sell'
    r_w_impulse['entry_signal'] = np.where((r_w_impulse['psar_signal'] == 'buy') & (r_w_impulse['impulse_signal'] == 'buy') 
                                            & (r_w_impulse['brick_color'] == 'green') & (r_w_impulse['day_of_week_transition'] != 1), 'long', 
                                            np.where((r_w_impulse['psar_signal'] == 'sell') & (r_w_impulse['impulse_signal'] == 'sell') & 
                                                    (r_w_impulse['brick_color'] == 'red') & (r_w_impulse['day_of_week_transition'] != 1), 'short', 'none'))

    # if there was a change from 'none' to 'buy' or 'sell' then that is an entry signal and replace the 'buy' or 'sell' with 'entry + long' or 'entry + sell'
    r_w_impulse['entry_signal'] = np.where((r_w_impulse['entry_signal'] != 'none') & (r_w_impulse['entry_signal'].shift(1) == 'none'),
                                                'entry + ' + r_w_impulse['entry_signal'], r_w_impulse['entry_signal'])

    #### Exit conditions ####
    # add a stop loss column that will be the entry price +/- the brick size for when the entry signal is 'entry + long' or 'entry + sell'
    r_w_impulse['stop_loss'] = np.where(r_w_impulse['entry_signal'] == 'entry + long', r_w_impulse['open'] - stop_loss,
                                            np.where(r_w_impulse['entry_signal'] == 'entry + short', r_w_impulse['open'] + stop_loss, np.nan))
    
    # if the 'entry_signal' colummn goes from 'entry + short' or 'short' to 'none' then 'none' should be replaced with 'exit' in the entry_signal column
    r_w_impulse['entry_signal'] = np.where((r_w_impulse['entry_signal'].shift(1) == 'entry + short') & (r_w_impulse['entry_signal'] == 'none'), 'exit', 
                                                np.where((r_w_impulse['entry_signal'].shift(1) == 'short') & (r_w_impulse['entry_signal'] == 'none'), 'exit', 
                                                        np.where((r_w_impulse['entry_signal'].shift(1) == 'entry + long') & (r_w_impulse['entry_signal'] == 'none'), 'exit', 
                                                                np.where((r_w_impulse['entry_signal'].shift(1) == 'long') & (r_w_impulse['entry_signal'] == 'none'), 'exit', 
                                                                        r_w_impulse['entry_signal']))))
    # position_count will be a cumulative count used to filter the data to the timeframe between the entry 
    # and exit signals so anytime there is an "entry + long" or "entry + short" the count should increase by 1
    r_w_impulse['position_count'] = np.where(r_w_impulse['entry_signal'] == 'entry + long', 1, np.where(r_w_impulse['entry_signal'] == 'entry + short', 1, 0))
    r_w_impulse['cum_position_count'] = r_w_impulse['position_count'].cumsum()
    # when 'entry_signal' is 'none' then the 'cum_position_count' should be null
    r_w_impulse['cum_position_count'] = np.where(r_w_impulse['entry_signal'] == 'none', np.nan, r_w_impulse['cum_position_count'])
    # group by cum_position_count and forward fill the value in the first index of the 'stop_loss' column
    r_w_impulse['stop_loss'] = r_w_impulse.groupby('cum_position_count')['stop_loss'].ffill()
    # determine whether the exit should be sooner because the stop_loss was hit before the exit signal (look at the high/low of the brick)
    r_w_impulse['exit_stop_loss'] = np.where((r_w_impulse['entry_signal'] == 'long') & (r_w_impulse['stop_loss'] > r_w_impulse['low']), 1,
                                            np.where((r_w_impulse['entry_signal'] == 'short') & (r_w_impulse['stop_loss'] < r_w_impulse['high']), 1, 0))
    
    return r_w_impulse



In [46]:
def calc_profit_loss(df, lots):
    profit_df = pd.DataFrame() #columns=['cum_position_count', 'direction', 'entry_price', 'exit_price', 'first_TP_hit', 'profit']
    # calculate the profit for each position by first grouping by each position and finding the entry price
    profit_df['entry_price'] = df.groupby('cum_position_count')['close'].first()
    # separate out the datetime column
    df['datetime'] = df.index
    # get the entry and exit times
    profit_df['entry_time'] = df.groupby('cum_position_count')['datetime'].first()
    profit_df['exit_time'] = df.groupby('cum_position_count')['datetime'].last()
    # # determine the exit price
    profit_df['exit_price'] = df.groupby('cum_position_count')['close'].last()
    # # what was the direction, long or short?
    profit_df['direction'] = df.groupby('cum_position_count')['entry_signal'].first()
    profit_df['direction'] = profit_df['direction'].str.split('+').str[1]
    profit_df['profit'] = np.where((profit_df['direction'].str.strip() == 'long'),
                                            (profit_df['exit_price'] - profit_df['entry_price'])*per_lot*(lots), 
                                            np.where((profit_df['direction'].str.strip() == 'short'),
                                                    (profit_df['entry_price'] - profit_df['exit_price'])*per_lot*(lots), np.nan))
    profit_df = fx.add_swap_rates(profit_df, base_currency_rate, quote_currency_rate, lots=lots)
    profit_df['nova_profit'] = profit_df['profit'] + (nova_commission*lots) + profit_df['swap_rate']
    profit_df['msolutions_profit'] = profit_df['profit'] + (msolutions_commission*lots) + profit_df['swap_rate']
    # use the entry time and resample to each day and find the sum of the profit
    profit_df['entry_time'] = pd.to_datetime(profit_df['entry_time'])
    profit_df_new_index = profit_df.set_index('entry_time')
    # find the sum of the nova and msolutions profit for each day
    profit_df_daily = profit_df_new_index.resample('D').agg({'nova_profit': 'sum'})
    # drop the weekends by first adding in a new day of the week column
    profit_df_daily['day_of_week'] = profit_df_daily.index.day_name()
    # drop any day that is Saturday or Sunday
    profit_df_daily = profit_df_daily.loc[(profit_df_daily['day_of_week'] != 'Saturday') & (profit_df_daily['day_of_week'] != 'Sunday')]
    # find the number of times that profit_df_daily is below zero
    profit_df_daily['nova_negative'] = np.where(profit_df_daily['nova_profit'] < 0, 1, 0)
    # find the sum of nova_negative and msolutions_negative
    nova_negative_sum = profit_df_daily['nova_negative'].sum()
    # find the sum of nova_profit
    nova_profit_sum = profit_df['nova_profit'].sum()

    return nova_profit_sum, nova_negative_sum

In [47]:
def making_calculations(df, brick_size, inc, max, ma, signal, stop_loss, lot_size):
    renko_data = renko_ready(df, brick_size)
    # add the columns necessary for the strategy
    renko_data_cols_added = adding_cols(renko_data, inc, max, ma, signal, stop_loss)
    # determine the profit/loss for the strategy
    total_profit, days_in_drawdown = calc_profit_loss(renko_data_cols_added, lot_size)

    return total_profit, days_in_drawdown

In [48]:
# find the max date of df_2024 
# df_2024.index.max()

In [49]:
# results for standard values
# print('2022 data: ', making_calculations(df_2022, initial_brick_size, start, maximum, lengthMA, lengthSignal, initial_brick_size*2, initial_lot_size))
# print('2023 data: ', making_calculations(df_2023, initial_brick_size, start, maximum, lengthMA, lengthSignal, initial_brick_size*2, initial_lot_size))
# print('2024 data: ', making_calculations(df_2024, initial_brick_size, start, maximum, lengthMA, lengthSignal, initial_brick_size*2, initial_lot_size))
# print('last 3 months data: ', making_calculations(df_tick_ready, initial_brick_size, start, maximum, lengthMA, lengthSignal, initial_brick_size*2, initial_lot_size))
# 2022 data:  ($1,136,669.99, 53)
# 2023 data:  ($447,824.99, 82)
# 2024 data:  ($20,509.99, 15)
# last 3 months data:  (23,329.99, 33)

In [50]:
lol = [brick_size_list, increment_list, max_list, ma_list, signal_list] # , lot_sizes_list,  
def constraint_handler(individual):
    counter = 0
    for i, lst in zip(individual, lol):
        # Ensure parameter is greater than   0
        if (i <= 0) | ((counter == len(individual) - 1) & (i < 2)):
            individual[counter] = np.random.choice(lst)  # Reset to a valid value
        counter += 1
    return individual

In [51]:
# Define the fitness function
import sys
# Check if FitnessMax already exists in the __main__ namespace
if 'FitnessMax' not in sys.modules['__main__'].__dict__:
    # If not, create it
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))

if 'Individual' not in sys.modules['__main__'].__dict__:
    creator.create("Individual", list, fitness=creator.FitnessMax)
# creator.create("FitnessMax", base.Fitness, weights=(1.0,))
# creator.create("Individual", list, fitness=creator.FitnessMax)

population_size = 100
global_total_profits = []  # A list to collect total profits from all individuals
global_days_in_drawdowns = []  # A list to collect days in drawdown from all individuals
# Global variables to keep track of the number of evaluations
num_evaluations = 0
discard_threshold = 20  # Discard the first 20 evaluations

toolbox = base.Toolbox()

# Register an attribute generator for each parameter with its own range
toolbox.register("brick_size", np.random.choice, brick_size_list)
toolbox.register("increment", np.random.choice, increment_list)
toolbox.register("maximum", np.random.choice, max_list)
toolbox.register("ma", np.random.choice, ma_list)
toolbox.register("signal", np.random.choice, signal_list)
# toolbox.register("lot_size", np.random.choice, lot_sizes_list)

# Combine the attribute generators to create an individual
toolbox.register("individual", tools.initCycle, creator.Individual,
                  (toolbox.brick_size, toolbox.increment, toolbox.maximum,
                   toolbox.ma, toolbox.signal))# , toolbox.lot_size, 
toolbox.register("population", tools.initRepeat, list, toolbox.individual)


def eval_func(individual):
    # Assuming these are global variables accessible within the scope of eval_func
    global global_total_profits  # A list to collect total profits from all individuals
    global global_days_in_drawdowns  # A list to collect days in drawdown from all individuals
    global num_evaluations  # A global variable to keep track of the number of evaluations

    # Unpack individual parameters
    brick_size, inc, max, ma, signal = individual #, lot_size, brick_size
    print(brick_size, inc, max, ma, signal) # , lot_size, brick_size
    for i in individual:
        if i <= 0:
            individual = constraint_handler(individual)
            brick_size,inc, max, ma, signal = individual #, lot_size,
            print(brick_size, inc, max, ma, signal) #, lot_size, 

    # Perform calculations using these parameters
    total_profit, days_in_drawdown = making_calculations(df_tick_ready_2024, brick_size, inc, max, ma, signal, brick_size*2, initial_lot_size)
    print(total_profit, days_in_drawdown)

    # Update the global lists with the new values
    global_total_profits.append(total_profit)
    # print(global_total_profits)
    global_days_in_drawdowns.append(days_in_drawdown)

    if len(global_total_profits) > discard_threshold:
        # Normalize total_profit and days_in_drawdown independently
        norm_total_profit = zscore(global_total_profits)[-1]
        norm_days_in_drawdown = zscore(global_days_in_drawdowns)[-1]
        # Combine the scores
        score = norm_total_profit - norm_days_in_drawdown
        if norm_total_profit == 0:
            score = 0
        print(score)
        return (score,)
    else:
        num_evaluations +=  1
        print(num_evaluations)
        return (0,)

    
    

toolbox.register("evaluate", eval_func)

# Define the feasibility function
def feasible(individual):
    for i, lst in zip(individual, lol):
        # find the min of the list
        min = lst.min()
        # find the max of the list
        max = lst.max()
        if i < min or i > max:
            return False
    return True

# Define a distance function to the feasibility region
def distance(individual):
    distances = 0
    for i, lst in zip(individual, lol):
        # find the min of the list
        min = lst.min()
        # find the max of the list
        max = lst.max()
        if i < min:
            # find out how far away from the min the parameter is and then normalize it
            dist = abs(i - min) / (max - min)
        elif i > max:
            # find out how far away from the max the parameter is and then normalize it
            dist = abs(i - max) / (max - min)
        else:
            dist = 0
        distances += dist
    return distances  # Distance from the feasibility region

# Decorate the evaluation function with a DeltaPenalty decorator
toolbox.decorate("evaluate", tools.DeltaPenalty(feasible, 1.0, distance))
toolbox.register("mate", tools.cxUniform, indpb=0.5)
# toolbox.register("mutate", tools.mutPolynomialBounded, low=0.00001, up=0.00101, eta=20.0, indpb=0.1)
toolbox.register("select", tools.selTournament, tournsize=3)
# Register the population and other operators as before
# toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.1)

# Create initial population
population = toolbox.population(n=population_size)
for ind in population:
    ind.fitness.values = (0,)  # Temporary fitness value

# Run the genetic algorithm
NGEN =  50  # Number of generations
CXPB =  0.7  # Crossover probability
MUTPB =  0.2  # Mutation probability

for gen in tqdm(range(NGEN)):
    # Select the next generation individuals
    offspring = toolbox.select(population, len(population))
    # Clone the selected individuals
    offspring = list(map(toolbox.clone, offspring))

    # Apply crossover and mutation on the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CXPB:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    for mutant in offspring:
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in offspring if ind.fitness.values == (0,)]
    # print(invalid_ind)
    invalid_ind.extend([ind for ind in offspring if not ind.fitness.valid])
    fitnesses = map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        # print(type(ind.fitness.values[0]), ind.fitness.values, type(fit), fit)
        ind.fitness.values = fit

    # Replace population with the offspring
    population[:] = offspring

# Extract the best individual
best_ind = tools.selBest(population,  1)[0]
best_score = best_ind.fitness.values[0]



  0%|          | 0/50 [00:00<?, ?it/s]

0.00037 0.1894000000000001 0.30599999999999977 56 9


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["datetime"] = df.index


124.99999999979491 18
1
0.0006399999999999999 0.08940000000000003 0.2519999999999998 88 38
-15064.999999999442 22
2
0.00041 0.029500000000000012 0.24199999999999983 17 22
-1794.9999999999136 19
3
0.00025 0.05450000000000003 0.1299999999999999 53 8
-9689.999999929056 22
4
0.0008999999999999999 0.18130000000000007 0.29099999999999976 88 20
-14610.000000003238 19
5
0.0009599999999999999 0.008500000000000004 0.09499999999999992 32 41
-14474.999999997048 18
6
0.00057 0.008700000000000003 0.34499999999999975 39 34
2594.9999999997153 16
7
0.00013 0.003200000000000001 0.1299999999999999 69 6
-6210.000000007292 20
8
0.0005899999999999999 0.09240000000000004 0.4509999999999996 87 40
-21824.99999999257 24
9
8.999999999999999e-05 0.029000000000000012 0.04599999999999997 44 17
3794.9999994780183 16
10
0.0007099999999999999 0.16610000000000008 0.29799999999999977 42 15
-5724.999999999923 20
11
0.00019999999999999998 0.11480000000000005 0.040999999999999974 30 41
-12685.000000000151 22
12
0.00041 0.0

In [52]:
print(best_ind, best_score)

[7e-05, 0.13986205735881893, 0.4509999999999996, 10, 8.395708263802485] 1.4443776064557534


In [53]:
best_brick, best_inc, best_max, best_ma, best_signal = best_ind #, best_lot_size,  

In [54]:
# results_2023 = making_calculations(df_2023, initial_brick_size, best_inc, best_max, best_ma, best_signal, initial_brick_size*2, initial_lot_size)
# results_2024 = making_calculations(df_2024, initial_brick_size, best_inc, best_max, best_ma, best_signal, initial_brick_size*2, initial_lot_size)

In [55]:
best_parameters = pd.read_excel('C:/Users/WilliamFetzner/Documents/Trading/best_parameters_IP.xlsx')
bp_colnames = best_parameters.columns
results_3mo = making_calculations(df_tick_ready_2024, best_brick, best_inc, best_max, best_ma, best_signal, best_brick*2, initial_lot_size)
new_row_added = pd.concat([best_parameters, pd.DataFrame({bp_colnames[0]: [best_brick], bp_colnames[1]: [best_inc], bp_colnames[2]: [best_max], bp_colnames[3]: [best_ma], bp_colnames[4]: [best_signal],
           bp_colnames[5]: [initial_lot_size], bp_colnames[6]: 'set to 5', bp_colnames[7]: [0], bp_colnames[8]: [0], 
           bp_colnames[9]: [0], bp_colnames[10]: results_3mo[0]})], ignore_index=True)
# save the best_parameters dataframe to the best_parameters_IP.xlsx file
new_row_added.to_excel('C:/Users/WilliamFetzner/Documents/Trading/best_parameters_IP.xlsx', index=False)
new_row_added

Unnamed: 0,best_brick,best_inc,best_max,best_ma,best_signal,lot_size,lot size options,2024,2023,2022,3 months
0,1e-05,0.1156,0.3089,65.0,5.0,20,1-21,2930645.56,33552970.14,53863825.47,
1,1e-05,0.1907,0.324,10.0,7.5,4,1-5,403040.0,4934702.0,9124250.0,
2,3e-05,0.1788,0.277,12.0,3.5,5,set to 5,451259.99,5443904.99,9662459.99,
3,7e-05,0.1502,273.0,12.66,3.166,5,set to 5,214665.0,2628385.0,5008870.0,
4,7.9e-05,0.1467,0.3669,12.85,3.86,5,set to 5,187190.0,2287365.0,4206735.0,
5,0.0001,0.1729,0.231887,9.954992,3.996463,5,set to 5,150629.99,1831444.99,3566419.99,
6,0.0001,0.1099,0.244,10.790947,3.737069,5,set to 5,150419.99,1813079.99,3535944.99,
7,0.0003,0.00089,0.213,64.0,1.0,5,set to 5,0.0,0.0,0.0,0.0
8,7e-05,0.139862,0.451,10.0,8.395708,5,set to 5,0.0,0.0,0.0,20000.0


In [56]:
results_3mo[1]

11