In [None]:
import yfinance as yf
import statsmodels.api as sm
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.stattools import adfuller
from datetime import datetime, timedelta
import numpy as np
from itertools import combinations
import MetaTrader5 as mt5
import pandas as pd
from pandas.tseries.offsets import BDay
from tqdm import tqdm
import matplotlib.pyplot as plt
from random import randint

In [None]:
def get_pairs(location):
    mt5.initialize()

    symbol_list = []
    for symbol in mt5.symbols_get():
        if location in symbol.path:
            symbol_list.append(symbol)
            
    return list(combinations(symbol_list, 2))

In [None]:
def get_bars(symbol, start_date, end_date, timeframe=mt5.TIMEFRAME_M1):
    mt5.initialize()
    bars = mt5.copy_rates_range(symbol, timeframe, start_date, end_date)
    df_bars = pd.DataFrame(bars)
    df_bars["time"] = pd.to_datetime(df_bars["time"], unit="s")
    return df_bars

In [None]:
def get_pairs_close(symbol_pair, start_date, end_date):
    mt5.initialize()
    
    close0 = get_bars(symbol_pair[0].name, start_date, end_date)['close']
    close1 = get_bars(symbol_pair[1].name, start_date, end_date)['close']
    
    if close0.empty or close1.empty:
        return pd.Series([], dtype='float64')
    else:
        pairs_close_data = pd.concat([close0, close1], axis=1, keys = ['close0', 'close1']).dropna()
        return pairs_close_data

In [None]:
def first_cointegration_test(pair_close_data):
    COINTEGRATION_CONFIDENCE_LEVEL = 99
    result = coint_johansen(pair_close_data, 0, 1)
    
    confidence_level_cols = {
        90: 0,
        95: 1,
        99: 2
    }
    confidence_level_col = confidence_level_cols[COINTEGRATION_CONFIDENCE_LEVEL]
    
    trace_crit_value = result.cvt[:, confidence_level_col]
    eigen_crit_value = result.cvm[:, confidence_level_col]
    
    if np.all(result.lr1 >= trace_crit_value) and np.all(result.lr2 >= eigen_crit_value):
        return True
    else:
        return False

In [None]:
# Engle-Granger test
def second_cointegration_test(pair_close_data):
    # Step 1: Perform OLS regression of one time series on the other
    y = pair_close_data.iloc[:, 0]
    x = sm.add_constant(pair_close_data.iloc[:, 1])
    model = sm.OLS(y, x)
    results = model.fit()
    
    # Step 2: Test residuals for stationarity using ADF test
    residuals = y - results.predict(x)
    adf_result = adfuller(residuals)
    
    # Step 3: If residuals are stationary, the two time series are considered cointegrated
    if adf_result[1] < 0.01:
        return results.params[1]
    else:
        return None


In [None]:
def compute_Hc(time_series, max_lag=20):
        """Returns the Hurst Exponent of the time series"""
    
        time_series = time_series.to_numpy()
        lags = range(2, max_lag)

        # variances of the lagged differences
        tau = [np.std(np.subtract(time_series[lag:], time_series[:-lag])) for lag in lags]
        
        # calculate the slope of the log plot -> the Hurst Exponent
        reg = np.polyfit(np.log(lags), np.log(tau), 1)

        return reg[0]

In [None]:
def first_sieve(all_pairs, start_date, end_date):
    candidate_stationary_pairs = []
    for pair in tqdm(all_pairs):
        pairs_close_data = get_pairs_close(pair, start_date, end_date)
        if pairs_close_data.empty:
            continue
        else:
            if first_cointegration_test(pairs_close_data):
                candidate_stationary_pairs.append(pair)
                
    return candidate_stationary_pairs

In [None]:
def second_sieve(candidate_stationary_pairs, start_date, end_date):
    stationary_pairs = []
    for pair in tqdm(candidate_stationary_pairs):
        pairs_close_data = get_pairs_close(pair, start_date, end_date)
        if pairs_close_data.empty:
            continue
        else:
            slope = second_cointegration_test(pairs_close_data) 
            if slope:
                stationary_pairs.append({'coint_pair': pair, 'slope' : slope})
                
    return stationary_pairs

In [None]:
#1 Get all symbols and form a list of pairs
location = "Retail\\Forex"
all_pairs = get_pairs(location)  

end_date = datetime.now()
start_date = end_date - timedelta(days = 1)

#2 For all pairs check for stationarity and return a list of pairs that are stationary
#candidate_stationary_pairs = first_sieve(all_pairs, start_date, end_date)
candidate_stationary_pairs = all_pairs
stationary_pairs = second_sieve(candidate_stationary_pairs, start_date, end_date)

In [None]:
#print((len(candidate_stationary_pairs) * 100) / len(all_pairs))
#print(len(all_pairs))
#print(len(candidate_stationary_pairs))
#print(len(stationary_pairs))

for data in stationary_pairs:
    print(data['coint_pair'][0].name, data['coint_pair'][1].name)


In [None]:
def in_sample_backtester(artificial_asset, mean_, std_, account_balance=1000, percentage_of_portfolio_at_risk=0.05, risk_tolerance = 0.01):
    #print(f'ACCOUNT BALANCE: {account_balance}')
    #print(f'PERCENTAGE AT RISK: {percentage_of_portfolio_at_risk}')
    #print(f'RISK TOLERANCE: {risk_tolerance}')
    #print()
    
    win_counter = 0
    loss_counter = 0
    
    
    total_losses_allowed = account_balance * percentage_of_portfolio_at_risk
    position_size = account_balance * risk_tolerance

    entry_high = mean_ + std_
    entry_low = mean_ - std_
    stop_loss_high = mean_ + 2 * std_
    stop_loss_low = mean_ - 2 * std_
    take_profit = mean_

    in_a_position = False
    for i in range(1, len(artificial_asset)):
        if not in_a_position:
            if artificial_asset[i-1] < entry_high and artificial_asset[i] >= entry_high:
                #print('Buy High', equation[i])
                in_a_position = True
                continue
            elif artificial_asset[i-1] > entry_low and artificial_asset[i] <= entry_low:
                #print('Buy Low', equation[i])
                in_a_position = True
                continue
            else:
                continue
            
        if in_a_position:
            if artificial_asset[i-1] > take_profit and artificial_asset[i] <= take_profit:
                #print('TP High', equation[i])
                in_a_position = False
                win_counter += 1
                account_balance += position_size
                position_size = account_balance * risk_tolerance
                #print(account_balance)
                continue
            elif artificial_asset[i-1] < take_profit and artificial_asset[i] >= take_profit:
                #print('TP Low', equation[i])
                in_a_position = False
                win_counter += 1
                account_balance += position_size
                position_size = account_balance * risk_tolerance
                #print(account_balance)
                continue
            elif artificial_asset[i-1] < stop_loss_high and artificial_asset[i] >= stop_loss_high:
                #print('SL High', equation[i])
                in_a_position = False
                loss_counter += 1
                account_balance -= position_size
                position_size = account_balance * risk_tolerance
                if account_balance <= total_losses_allowed:
                    print("Account totalled")
                    return
                #print(account_balance)
                continue
            elif artificial_asset[i-1] > stop_loss_low and artificial_asset[i] <= stop_loss_low:
                #print('SL Low', equation[i])
                in_a_position = False
                loss_counter += 1
                account_balance -= position_size
                position_size = account_balance * risk_tolerance
                if account_balance <= total_losses_allowed:
                    print("Account totalled")
                    return
                #print(account_balance)
                continue
            else:
                #print("DOING NOTHING")
                continue
            
    print()
    try:
        #print(f'PROFIT/LOSS Ratio: {win_counter / loss_counter}')
        return f'PROFIT/LOSS Ratio: {(win_counter / loss_counter):.2f}, Final Balance: {account_balance:.2f}'
    except ZeroDivisionError:
        #print(f'NO LOSSES!')
        #print(f'WINS: {win_counter}')
        #print(f'LOSSES: {loss_counter}')
        return f'NO LOSSES! WINS: {win_counter:.2f}, LOSSES: {loss_counter:.2f}, Final Balance: {account_balance:.2f} '
        
    #print(f'Final Balance: {account_balance}')


In [None]:
def out_sample_backtester(y_stationary_pairs, y_start_date, y_end_date, t_start_date, t_end_date):
    #1 Get data from previous day and data for today
    
    #2 test cointegration for previous day and place cointegrated pairs into a new list called previous_stationary_pairs 
    
    wins = 0
    losses = 0
    break_even = 0
    outcomes = []
    for index in range(len(y_stationary_pairs)):
        # Get the mean and std of the yesterday-stationary pair
        y_stationary_pair = get_pairs_close(y_stationary_pairs[index]['coint_pair'], y_start_date, y_end_date)
        beta = y_stationary_pairs[index]['slope']
        y_artificial_asset = y_stationary_pair['close0'] - beta * y_stationary_pair['close1']
        
        #t_stationary_pair = get_pairs_close(y_stationary_pairs[index]['coint_pair'], t_start_date, t_end_date)
        #t_artificial_asset = t_stationary_pair['close0'] - beta * t_stationary_pair['close1']
        
        print(y_stationary_pairs[index]['coint_pair'][0].name,y_stationary_pairs[index]['coint_pair'][1].name)
        
        #print(y_artificial_asset)
        #print(t_artificial_asset)
        
            
        
        
        if compute_Hc(y_artificial_asset) < 0.5:
            
            x_1 = y_artificial_asset.size
            plt.plot(range(x_1), y_artificial_asset)
            plt.axhline(y_artificial_asset.mean(), color='g', linestyle='-')
            plt.axhline(y_artificial_asset.mean() + y_artificial_asset.std(), color='y', linestyle='-')
            plt.axhline(y_artificial_asset.mean() - y_artificial_asset.std(), color='y', linestyle='-')
            plt.axhline(y_artificial_asset.mean() + 2 * y_artificial_asset.std(), color='r', linestyle='-')
            plt.axhline(y_artificial_asset.mean() - 2 * y_artificial_asset.std(), color='r', linestyle='-')
            
            # run an out of sample backtest by running todays prices with yesterdays mean and std
            t_stationary_pair = get_pairs_close(y_stationary_pairs[index]['coint_pair'], t_start_date, t_end_date)
            t_artificial_asset = t_stationary_pair['close0'] - beta * t_stationary_pair['close1']   
            
            x_2 = t_artificial_asset.size
            plt.plot(range(x_1, x_1 + x_2), t_artificial_asset)
            plt.axhline(y_artificial_asset.mean(), color='g', linestyle='-')
            plt.axhline(y_artificial_asset.mean() + y_artificial_asset.std(), color='y', linestyle='-')
            plt.axhline(y_artificial_asset.mean() - y_artificial_asset.std(), color='y', linestyle='-')
            plt.axhline(y_artificial_asset.mean() + 2 * y_artificial_asset.std(), color='r', linestyle='-')
            plt.axhline(y_artificial_asset.mean() - 2 * y_artificial_asset.std(), color='r', linestyle='-')
        
        
            total_artificial_asset = y_artificial_asset.append(t_artificial_asset)
            rolling_mean = total_artificial_asset.rolling(1000).mean()
            plt.plot(range(x_1 + x_2), rolling_mean, color='k')
            
            plt.show()
            
            print('y stats: ')
            print(f'\t\t{in_sample_backtester(y_artificial_asset, y_artificial_asset.mean(), y_artificial_asset.std())}')
            print("t stats: ")
            asset_stats = in_sample_backtester(t_artificial_asset, y_artificial_asset.mean(), y_artificial_asset.std())
    
            if asset_stats == "Account totalled":
                print("################## FAILED ##################")
                break
            else:
                print(f'\t\t {asset_stats}')
                total_return = asset_stats[-8:].replace(' ', '')
                total_return = total_return.replace(':', '')
                total_return = float(total_return)
                print(f'\t\t {total_return}')
        
                if total_return > 1000:
                    wins += 1
                elif total_return < 1000:
                    losses += 1
                else:
                    break_even += 1
                
            outcomes.append(total_return)
        else:
            continue
        
        
        
            
    
    
    print(f'WINS: {wins}, LOSSES:{losses}, BREAK-EVENS: {break_even}')
    print(f'AVERAGE OUTCOME: {sum(outcomes) / len(outcomes)}')


    

In [None]:

# Monte Carlo simulation
#not_a_weekday = True
while True:
    t_end_date = datetime.now() - timedelta(days = randint(1, 252))
    if t_end_date.weekday() < 5 and t_end_date.weekday() > 0:
        break


print(t_end_date.strftime("%A"))

t_start_date = t_end_date - BDay(1)   
y_end_date = t_start_date
y_start_date = y_end_date - BDay(4)



print(t_end_date)
print(t_start_date)
print(y_end_date)
print(y_start_date)

#print(t_end_date)
#print(t_start_date)


y_candidate_stationary_pairs = first_sieve(all_pairs, y_start_date, y_end_date)
y_stationary_pairs = second_sieve(y_candidate_stationary_pairs, y_start_date, y_end_date)

In [None]:
#for pairs in y_stationary_pairs:
#    print(pairs['coint_pair'][0].name, pairs['coint_pair'][1].name)

out_sample_backtester(y_stationary_pairs, y_start_date, y_end_date, t_start_date, t_end_date)


In [None]:
#print (in_sample_backtester(equation))

best_return = {'pair': [], 'account balance' : 0}
for index in range(len(stationary_pairs)):
    stationary_pair = get_pairs_close(stationary_pairs[index]['coint_pair'], start_date, end_date)
    beta = stationary_pairs[index]['slope']
    artificial_asset = np.log(stationary_pair['close0']) - beta * np.log(stationary_pair['close1'])
    
    if compute_Hc(artificial_asset) < 0.5:
        
        print(stationary_pairs[index]['coint_pair'][0].name, stationary_pairs[index]['coint_pair'][1].name)
    
        asset_stats = in_sample_backtester(artificial_asset, artificial_asset.mean(), artificial_asset.std())
            
        x_1 = artificial_asset.size
        plt.plot(range(x_1), artificial_asset)
        plt.axhline(artificial_asset.mean(), color='g', linestyle='-')
        plt.axhline(artificial_asset.mean() + artificial_asset.std(), color='y', linestyle='-')
        plt.axhline(artificial_asset.mean() - artificial_asset.std(), color='y', linestyle='-')
        plt.axhline(artificial_asset.mean() + 2 * artificial_asset.std(), color='r', linestyle='-')
        plt.axhline(artificial_asset.mean() - 2 * artificial_asset.std(), color='r', linestyle='-')
        
        plt.show()
    
    
        if asset_stats == "Account totalled":
            print("################## FAILED ##################")
            break
        else:
            print(f'\t\t {asset_stats}')
            total_return = asset_stats[-8:].replace(' ', '')
            total_return = total_return.replace(':', '')
            total_return = float(total_return)
            print(f'\t\t {total_return}')
        
            if total_return > best_return['account balance']:
                best_return['pair'] = [stationary_pairs[index]['coint_pair'][0].name, stationary_pairs[index]['coint_pair'][1].name]
                best_return['account balance'] = total_return
    
    print(f'BEST PAIR: {best_return["pair"]}, RETURN:{best_return["account balance"]}')

In [None]:
# TODO Generalize pairs trading to trade 4 pairs concurrently
def portfolio_contructor():
    #1 Sample several (all if able) combination of 4-tuple of all stationary_pairs
    # Test the four pairs for Granger causality and Cross-correlation and reject if there exist relationships, resample
    #2 Backtest all four and return the four pairs that performed the best