# Read Initial Data

In [1]:
import pandas as pd
time = []
open = []
high = []
low = []
close = []
volume = []
size = -1

def read_df(file_name):

    df = pd.read_feather(file_name)
    df = df.dropna()    # Dropping NA values, so that talib works correctly
    global size
    size = df.shape[0]
    
    global time
    time = df['time'].to_numpy()

    global open
    open = df['open'].to_numpy()

    global high
    high = df['high'].to_numpy()

    global low
    low = df['low'].to_numpy()

    global close
    close = df['close'].to_numpy()

    global volume
    volume = df['volume'].to_numpy()

    return

read_df("data-BNBUSDT1h.feather")

# Create Indicators

In [2]:
import sys
import numpy as np
np.set_printoptions(threshold=sys.maxsize)
import talib as ta    # https://github.com/mrjbq7/ta-lib

ma_direction_period = 28
ma_direction = ta.EMA(close, timeperiod=ma_direction_period)
atr_direction_period = 14
atr_direction = ta.ATR(high, low, close, timeperiod=atr_direction_period)

# Generate Trades

In [3]:
in_trade = 0    # +1 = long_entry, -1 = shor_entry
entry_price = -1.0
trades_long = []
trades_short = []

for i in range(size):
    # Long Entry
    if (ma_direction[i] - close[i] > 4.0*atr_direction[i]
        and in_trade == 0.0):
        in_trade = +1.0
        entry_price = close[i]
    # Long Exit
    if (ma_direction[i] - close[i] < 0.0
        and in_trade == 1.0):
        in_trade = 0.0
        trades_long.append(close[i]/entry_price)

    # Short Entry
    if (ma_direction[i] - close[i] < -4.0*atr_direction[i]
        and in_trade == 0.0):
        in_trade = -1.0
        entry_price = close[i]
    # Short Exit
    if (ma_direction[i] - close[i] > 0.0
        and in_trade == -1.0):
        in_trade = 0.0
        trades_short.append(entry_price/close[i])


# Calculate Results

In [4]:
long_res = 1.0
for k in trades_long:
    long_res *= k

print("Longs result: ", long_res)
average_long_trades_res = np.sum(trades_long)/len(trades_long)
print("Average Long Trade: ", average_long_trades_res)
DD_long = 1.0 - np.min(trades_long)
print("Highest DD for Longs: ", DD_long)
print()

short_res = 1.0
for k in trades_short:
    short_res *= k

print("Shorts result: ", short_res)
average_short_trades_res = np.sum(trades_short)/len(trades_short)
print("Average Short Trade: ", average_short_trades_res)
DD_short = 1.0 - np.min(trades_short)
print("Highest DD for Shorts: ", DD_short)
print()


DD_highest = None
if DD_long > DD_short:
    DD_highest = DD_long
else: DD_highest = DD_short

score = ((average_long_trades_res + average_short_trades_res) / 2) / DD_highest
print("Score: ", score)

Longs result:  1.1932475343208822
Average Long Trade:  1.0058177152451184
Highest DD for Longs:  0.2489020352179997

Shorts result:  0.16694511027625009
Average Short Trade:  0.9839752741179365
Highest DD for Shorts:  0.4806757383373651

Score:  2.0697872085718907


# Optimize Results

In [5]:
# Strategy Function
def strategy_ma(ma_direction_period, atr_direction_period, atr_direction_coeff, show_money = False):
    # Generate Indicators
    ma_direction = ta.EMA(close, timeperiod=ma_direction_period)
    atr_direction = ta.ATR(high, low, close, timeperiod=atr_direction_period)

    # Calculate Trades
    in_trade = 0    # +1 = long_entry, -1 = shor_entry
    entry_price = -1.0
    trades_long = []
    trades_short = []
    # Parse dataframe
    for i in range(size):
        # Long Entry
        if (ma_direction[i] - close[i] > atr_direction_coeff*atr_direction[i]
            and in_trade == 0.0):
            in_trade = +1.0
            entry_price = close[i]
        # Long Exit
        if (ma_direction[i] - close[i] < 0.0
            and in_trade == 1.0):
            in_trade = 0.0
            trades_long.append(close[i]/entry_price)

        # Short Entry
        if (ma_direction[i] - close[i] < -atr_direction_coeff*atr_direction[i]
            and in_trade == 0.0):
            in_trade = -1.0
            entry_price = close[i]
        # Short Exit
        if (ma_direction[i] - close[i] > 0.0
            and in_trade == -1.0):
            in_trade = 0.0
            trades_short.append(entry_price/close[i])

    # Calculating Stats
    # Finding Average Trade Size
    if trades_long and trades_short: # If any trade happened
        average_long_trades_res = np.sum(trades_long)/len(trades_long)
        average_short_trades_res = np.sum(trades_short)/len(trades_short)
        if show_money:
            print("Average Long Trade: ", average_long_trades_res)
            print("Amount of Long Trades: ", len(trades_long))
            print("Average Short Trade: ", average_short_trades_res)
            print("Amount of Short Trades: ", len(trades_short))
        score = (average_long_trades_res + average_short_trades_res) / 2    # Gives same weight to long/short not overall trades
        return score
    else: return None


In [6]:
# Random Initialization
def get_rand_candidate_solution_ma():
    params = []
    params.append(np.random.randint(128, 144))
    params.append(np.random.randint(24, 48))
    params.append(np.random.uniform(1.0, 3.0))
    return params

# Small pertrubation from found solution
def get_candidate_solution_ma(params):
    params[0] += np.random.randint(-4, 5)
    if params[0] < 8: params[0] = 8
    params[1] += np.random.randint(-4, 5)
    if params[1] < 4: params[1] = 4
    params[2] += np.random.uniform(-0.2, 0.3)
    return params

# Simulated Annealing

In [7]:
import copy

# Cooling Schedules
temp_max = 0.1
temp_min = 0.0
outer_loop = 200
inner_loop = 10

def cool_lin_a(step):
    return temp_min + (temp_max - temp_min) * ((outer_loop - step)/outer_loop)
def cool_quad_a(step):
    return temp_min + (temp_max - temp_min) * ((outer_loop - step)/outer_loop)**2
def cool_exp_a(step):
    return temp_min + (temp_max - temp_min) * ((outer_loop - step)/outer_loop)**np.sqrt(step)
def cool_cos_a(step):
    return temp_min + 0.5*(temp_max - temp_min) * (1 + np.cos(step*np.pi/outer_loop))

# Simulated Annealing
def annealing_solution():

    step = 0
    temp = temp_max

    # Generate Init Candidate Solution
    candidate_params = get_rand_candidate_solution_ma()
    print("Initial Guess: ", candidate_params)
    best_params = copy.deepcopy(candidate_params)
    current_energy = strategy_ma(candidate_params[0], candidate_params[1], candidate_params[2])
    best_solution = current_energy

    # Annealing Schedule
    while step < outer_loop:
        
        step += 1
        mc_step = 0

        while mc_step < inner_loop:

            mc_step += 1

            # Generate Candidate Solution
            candidate_params = get_candidate_solution_ma(candidate_params)

            candidate_energy = strategy_ma(candidate_params[0], candidate_params[1], candidate_params[2])
            if candidate_energy:    # If any solution found
                delta_energy = candidate_energy - current_energy    # Positive if solution is better, and is always accepted
                if candidate_energy < best_solution:
                    best_solution = candidate_energy
                    best_params = copy.deepcopy(candidate_params)

                if np.random.random() < np.exp(-delta_energy / temp):   # Value is >1 if solution is better anyway, otherwise Metropolis is used
                    current_energy = candidate_energy
            
        # Cooling Function
        temp = cool_quad_a(step)
        
    return best_params

found_sol = annealing_solution()
print("Found Params: ", found_sol)
print(strategy_ma(found_sol[0], found_sol[1], found_sol[2], show_money=True))

Initial Guess:  [139, 39, 1.4596221707607222]
Found Params:  [169, 36, 12.861957702512225]
Average Long Trade:  1.043950272221074
Amount of Long Trades:  5
Average Short Trade:  0.7062618454407253
Amount of Short Trades:  2
0.8751060588308996


# Pyswarm

In [8]:
# Import modules
import numpy as np

# Import PySwarms
import pyswarms as ps
from pyswarms.utils.functions import single_obj as fx

# Set-up hyperparameters
options = {'c1': 0.5, 'c2': 0.3, 'w':0.9}

# Call instance of PSO
optimizer = ps.single.GlobalBestPSO(n_particles=10, dimensions=2, options=options)

# Perform optimization
cost, pos = optimizer.optimize(fx.sphere, iters=1000)

2022-12-24 23:28:57,584 - pyswarms.single.global_best - INFO - Optimize for 1000 iters with {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
pyswarms.single.global_best: 100%|██████████|1000/1000, best_cost=1.05e-41
2022-12-24 23:29:00,605 - pyswarms.single.global_best - INFO - Optimization finished | best cost: 1.0479898019839176e-41, best pos: [-3.05678166e-21  1.06582547e-21]
