## Test Intervals, trainning_periods, z-score_windows and see the win-rate, average returns, trading time for the first trading oppotunities in the next 50 intervals

In [93]:
# Set config
SET_INTERVALS = ["1m", "3m", "5m", "15m", "30m", "1h", "2h"]
SET_INTERVALS_INT_MINS = [1, 3, 5, 15, 30, 60, 120]
SET_TRAINNING_PERIODS = [100, 150, 200, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900]
SET_Z_SCORE_WINDOW = [20, 40, 60, 80, 100, 120, 140, 160, 180, 200, 220, 240, 260]
SET_TRIGGER_Z_SCORE_THRESHOD = [0.4, 0.8, 1.2, 1.6, 2.0, 2.4, 2.8]
TRADING_TIMES_THRESHOD = 50

#### Get tradeable symbols in tradeable_symbols

In [None]:
from binance.um_futures import UMFutures
import datetime
import time

session_public = UMFutures()
ONBOARD_TIME_THRESHOD = datetime.datetime(2023, 1, 6)
TRADING_VOLUME_THRESHOD_RATE = 1 / 150

In [None]:

def binance_get_24h_trading_volume_usdt(symbol: str) -> float:
    """get the 24h trading volume in usdt

    Args:
        symbol (str): symbol name

    Returns:
        float: the trading volume in usdt
    """
    return float(session_public.ticker_24hr_price_change(symbol)["quoteVolume"])


def binance_get_exchange_symbols():
    """get the exchange symbols from the binance

    Returns:
        _type_: a dict with all the symbols information in it
    
    See: https://binance-docs.github.io/apidocs/futures/en/#exchange-information
    """
    return session_public.exchange_info()["symbols"]

def transform_timestamp_to_datetime(timestamp: int):
    return datetime.datetime.fromtimestamp(int(timestamp)/1000)

def get_tradeable_symbols_dynamic() -> list:
    """Get tradeable symbols from the Binance, and return the list of 
    symbols and the number of tradeable pairs
    
    Only trade on USDT
    Only trade the coins that are on board for a certain time period

    Args:
       None

    Returns:
        sym_list(list): the list contains all the tradeable symbols
        count(int): the size of the list
    """
    count = 0
    sym_list = []
    BTCUSDT_trading_volume = binance_get_24h_trading_volume_usdt("BTCUSDT")
    
    symbols = binance_get_exchange_symbols()
    for symbol in symbols:
        if (symbol["quoteAsset"] == "USDT" and symbol["status"]=="TRADING"
            and transform_timestamp_to_datetime(symbol["onboardDate"]) <= ONBOARD_TIME_THRESHOD # coins onboard should not be later than this time
            and binance_get_24h_trading_volume_usdt(symbol["symbol"]) >= TRADING_VOLUME_THRESHOD_RATE * BTCUSDT_trading_volume): # trading volume
            
            sym_list.append(symbol["symbol"])
            count += 1
            time.sleep(0.1)
    print(f"{count} pairs found")

    # return all the tradeable symbol and the size of the list
    return sym_list

tradeable_symbols = get_tradeable_symbols_dynamic()

In [None]:
tradeable_symbols

#### Store the price of different Intervals

In [None]:
import json


def binance_get_recent_close_price(symbol: str, interval: str, limit: int) ->list:
    """get the recent close price list from binace with the related interval

    Args:
        symbol (_type_): the name of the symbol

    Returns:
        _type_: list
    """
    price_list = []
    prices = session_public.klines(symbol=symbol, interval=interval,limit = limit)
    for price in prices:
        price_list.append(float(price[4])) # change str to float
    if len(price_list) == limit:
        return price_list
    else: return

# Store price histry for all available pairs
def store_price_history_static(symbols: list, interval) -> str:
    """
    Store the price history for the given symbols and return the filename of the stored data.

    Args:
        symbols (list): List of symbols for which price history needs to be stored.

    Returns:
        str: Filename of the stored data.
    """
    
    # Get prices and store in DataFrame
    counts = 0
    price_history_dict = {}
    for sym in symbols:
        price_history = binance_get_recent_close_price(sym, interval=interval, limit=1500)
        if len(price_history) == 1500: # make sure that each symbol has the same amount of data
            price_history_dict[sym] = price_history
            counts += 1
    print(f"{counts} items stored, {len(symbols)-counts}items not stored")
    
    # Output prices to JSON
    if len(price_history_dict) > 0:
        filename = f"{interval}_price_list.json"
        with open(filename, "w") as fp:
            json.dump(price_history_dict, fp, indent=4)
        print("Prices saved successfully.")
    time.sleep(5)

# for interval in SET_INTERVALS:
for interval in SET_INTERVALS:
    store_price_history_static(tradeable_symbols, interval)

#### Get cointegrated pairs and store in csv

In [94]:
INVESTIBLE_CAPITAL_EACH_TIME = 200

In [95]:
def binance_get_latest_price(symbol: str) -> float:
    """
    Retrieves the latest price for a given symbol from the Binance API.

    Args:
        symbol (str): The symbol for which to retrieve the latest price.

    Returns:
        float: The latest price for the specified symbol.
    """
    return float(session_public.ticker_price(symbol)["price"])

def get_trade_qty_each_time(symbol_1: str, symbol_2: str, hedge_ratio):
    estimated_trade_qty_symbol_1 = INVESTIBLE_CAPITAL_EACH_TIME / (binance_get_latest_price(symbol_1) + hedge_ratio * binance_get_latest_price(symbol_2))
    estimated_trade_qty_symbol_2 = (estimated_trade_qty_symbol_1 * hedge_ratio)
    
    return estimated_trade_qty_symbol_1, estimated_trade_qty_symbol_2

In [96]:
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint
import scipy.stats as stats
import pandas as pd
import numpy as np

def calculate_cointegration_static(series_1, series_2):
    """
    Calculate the cointegration between two series and return cointegration flag,
    hedge ratio, and initial intercept.

    Args:
        series_1 (array like): First series for cointegration analysis.
        series_2 (array like): Second series for cointegration analysis.

    Returns:
        tuple: A tuple containing cointegration flag, hedge ratio, and initial intercept.

    Notes:
        - The series should have the same length.
        - Cointegration tests the long-term relationship between two time series.
        - The cointegration flag indicates if the two series are cointegrated.
        - The hedge ratio represents the relationship between the two series.
        - The initial intercept is the intercept of the linear regression model.

    Raises:
        ValueError: If the input series have different lengths.

    """
    
    coint_flag = 0
    coint_res = coint(series_1, series_2)
    coint_t = coint_res[0]
    p_value = coint_res[1]
    critical_value = coint_res[2][1]
    
    
    # get initial intercept and hedge_ration of the model
    series_2 = sm.add_constant(series_2)
    model = sm.OLS(series_1, series_2).fit()
    initial_intercept = model.params[0]
    hedge_ratio = model.params[1]

    if (p_value < 0.03) and (coint_t < critical_value):
        coint_flag = 1
    return coint_flag, p_value, hedge_ratio, initial_intercept

In [97]:
from statsmodels.regression.rolling import RollingOLS
# series_2 = [1,2,3,4,5,6,7,8,9]
# series_1 = [1,3,5,7,9,11,13,15,17]

def calculate_spread_hedge_ratio_window(series_1: list, series_2: list, window: int):
    """
    Calculates the spread between two series using a given hedge ratio.

    Args:
        series_1 (list): A list of values representing the first series.
        series_2 (list): A list of values representing the second series.
        hedge_ratio (float): The hedge ratio to be applied.

    Returns:
        list: A list containing the calculated spread.
    """
    data_series_1 = pd.DataFrame(series_1)
    data_series_2 = pd.DataFrame(series_2)
    
    endog = data_series_1
    exog = sm.add_constant(data_series_2)
    rols = RollingOLS(endog, exog, window=window)
    rres = rols.fit()
    params = rres.params.replace(np.nan, 0)
    hedge_ratio = params.iloc[:, 1].tolist()
    
    spread = pd.Series(series_1) - (pd.Series(series_2) * hedge_ratio)
    spread[:window-1] = 0
    return spread.tolist(), hedge_ratio

# spread, hedge_ratio = calculate_spread_hedge_ratio_window(series_1, series_2, 3)
# spread, hedge_ratio

In [98]:
TRADING_FEE_RATE = 0.0004
def calculate_z_score_window(spread: list, window: int) -> list:
    """
    Calculates the Z-Score of a given spread.

    Args:
        spread (list): A list of values representing the spread.

    Returns:
        list: A list containing the Z-Score values.
    """
    data = pd.DataFrame(spread)
    rolling = data.rolling(window=window)
    m = rolling.mean()
    s = rolling.std()
    z_score = (data - m) / s
    
    # assign the first num of window z-score to be 0
    z_score[0][:(window-1)] = 0

    return z_score[0].tolist()



def calculate_std_spread(spread: list):
    """
    Calculates the std of a given spread.

    Args:
        spread (list): A list of values representing the spread.

    Returns:
        std: float
    """
    data = pd.DataFrame(spread)
    return data.std().values[0]

def check_differnet_signal(a,b):
    return abs(a + b) != abs(a) + abs(b)

def get_backtesting_properties(series_1: list, series_2: list, hedge_ratio_list: float, zscore_series: list, TRIGGER_Z_SCORE_THRESHOD: float):
    trade_oppotunities = 0
    last_value = 0.00
    enter_market_signal = False
    
    cumulative_return = 0
    cumulative_trading_qty = 0
    count_entering_time = 0
    
    open_long_price_list = []
    open_short_price_list = []
    
    win_times = 0
    peak_loss = 0
    
    
    for index, value in enumerate(zscore_series):
        if abs(value) >= abs(TRIGGER_Z_SCORE_THRESHOD) and not check_differnet_signal(value, last_value):
            
            enter_market_signal = True
            
            if value >= TRIGGER_Z_SCORE_THRESHOD:
                direction = "sell"
            elif value <= -TRIGGER_Z_SCORE_THRESHOD:
                direction = "buy"
            
            if count_entering_time < TRADING_TIMES_THRESHOD:
                cumulative_trading_qty += (INVESTIBLE_CAPITAL_EACH_TIME / (series_1[index] + hedge_ratio_list[index] * series_2[index]))  # qty for each symbol
                if direction == "buy":
                    open_long_price_list.append(series_1[index])
                    open_short_price_list.append(series_2[index])
                elif direction == "sell":
                    open_short_price_list.append(series_1[index])
                    open_long_price_list.append(series_2[index])
                    
                count_entering_time += 1

        # Calculate the peak loss during the trade
        if enter_market_signal:
            if direction == "buy":
                long_profit = (series_1[index] - sum(open_long_price_list)/len(open_long_price_list)) * cumulative_trading_qty
                short_profit = (sum(open_short_price_list)/len(open_short_price_list) - series_2[index]) * cumulative_trading_qty * hedge_ratio_list[index]
            elif direction == "sell":
                long_profit = (series_2[index] - sum(open_long_price_list)/len(open_long_price_list)) * cumulative_trading_qty * hedge_ratio_list[index]
                short_profit = (sum(open_short_price_list)/len(open_short_price_list) - series_1[index]) * cumulative_trading_qty
            current_revenue = long_profit + short_profit
            peak_loss = min(peak_loss, current_revenue)
        
        # Calculate the returns when exiting the market
        if enter_market_signal and check_differnet_signal(value, last_value):
            trade_oppotunities += 1
            exiting_profit = current_revenue - INVESTIBLE_CAPITAL_EACH_TIME * count_entering_time * TRADING_FEE_RATE # revenue for all symbols
            
            # calculate the win rate
            if exiting_profit > 0:
                win_times += 1

            # Cumulate the return
            cumulative_return += exiting_profit
            
            # Reset
            enter_market_signal = False
            cumulative_trading_qty = 0
            count_entering_time = 0
            direction = ""
            open_long_price_list = []
            open_short_price_list = []
        
        last_value = value
    
    if trade_oppotunities > 0:
        win_rate = win_times / trade_oppotunities
    else:
        win_rate = 0
    
    # Calculate the recent trade qty
    recent_trade_qty = (INVESTIBLE_CAPITAL_EACH_TIME / (series_1[-1] + hedge_ratio_list[-1] * series_2[-1]))
    
    return trade_oppotunities, cumulative_return, win_rate, recent_trade_qty, peak_loss

def calculate_pairs_trading_result(series_1, series_2, num_window: int, z_score_threshod: float) -> tuple:
    
    spread, hedge_ratio_list = calculate_spread_hedge_ratio_window(series_1, series_2, window=num_window)
    zscore_series = calculate_z_score_window(spread, window=num_window)
    std = calculate_std_spread(spread)
    
    # Get recent z score
    recent_z_score = zscore_series[-1]
    
    trade_oppotunities, cumulative_return, win_rate, recent_trade_qty, peak_loss = get_backtesting_properties(series_1, series_2, hedge_ratio_list, zscore_series, z_score_threshod)
        
    return trade_oppotunities, cumulative_return, win_rate, recent_trade_qty, recent_z_score, peak_loss, std

In [99]:
def calculate_pairs_one_time_trading_result(series_1_real_test, series_2_real_test, z_score_window, z_score_threshod):
    spread, hedge_ratio_list = calculate_spread_hedge_ratio_window(series_1_real_test, series_2_real_test, window=z_score_window)
    zscore_series = calculate_z_score_window(spread, window=z_score_window)
    
    trade_oppotunities = 0
    last_value = 0.00
    enter_market_signal = False
    
    cumulative_return = 0
    cumulative_trading_qty = 0
    count_entering_time = 0
    
    open_long_price_list = []
    open_short_price_list = []
    
    peak_loss = 0
    
    
    for index, value in enumerate(zscore_series):
        if abs(value) >= abs(z_score_threshod) and not check_differnet_signal(value, last_value):
            
            enter_market_signal = True
            
            if value >= z_score_threshod:
                direction = "sell"
            elif value <= -z_score_threshod:
                direction = "buy"
            
            if count_entering_time < TRADING_TIMES_THRESHOD:
                cumulative_trading_qty += (INVESTIBLE_CAPITAL_EACH_TIME / (series_1_real_test[index] + hedge_ratio_list[index] * series_2_real_test[index]))  # qty for each symbol
                if direction == "buy":
                    open_long_price_list.append(series_1_real_test[index])
                    open_short_price_list.append(series_2_real_test[index])
                elif direction == "sell":
                    open_short_price_list.append(series_1_real_test[index])
                    open_long_price_list.append(series_2_real_test[index])
                    
                count_entering_time += 1

        # Calculate the peak loss during the trade
        if enter_market_signal:
            if direction == "buy":
                long_profit = (series_1_real_test[index] - sum(open_long_price_list)/len(open_long_price_list)) * cumulative_trading_qty
                short_profit = (sum(open_short_price_list)/len(open_short_price_list) - series_2_real_test[index]) * cumulative_trading_qty * hedge_ratio_list[index]
            elif direction == "sell":
                long_profit = (series_2_real_test[index] - sum(open_long_price_list)/len(open_long_price_list)) * cumulative_trading_qty * hedge_ratio_list[index]
                short_profit = (sum(open_short_price_list)/len(open_short_price_list) - series_1_real_test[index]) * cumulative_trading_qty
            current_revenue = long_profit + short_profit
            peak_loss = min(peak_loss, current_revenue)
        
        # Calculate the returns when exiting the market
        if enter_market_signal and check_differnet_signal(value, last_value):
            trade_oppotunities += 1
            exiting_profit = current_revenue - INVESTIBLE_CAPITAL_EACH_TIME * count_entering_time * TRADING_FEE_RATE # revenue for all symbols
            
            # Cumulate the return
            cumulative_return += exiting_profit
            return trade_oppotunities, cumulative_return, peak_loss
        
        last_value = value
    
    return trade_oppotunities, cumulative_return, peak_loss
    
    

In [100]:
def get_cointegrated_pairs(prices, interval, trainning_period, z_score_window, z_score_threshod) -> str:

    # Loop through coins and check for co-integration
    coint_pair_list = []
    
    found_pair_list = list(prices.keys())
    loop_count = 0
    for sym_1 in found_pair_list:
        loop_count += 1
        # Check each coin against the first (sym_1)
        for sym_2 in found_pair_list[loop_count:]:
            
            # Get close prices
            series_1 = prices[sym_1]
            series_2 = prices[sym_2]
            
            # Get recent NUM_LIMITS prices.
            series_1_coint_test = prices[sym_1][1499 - 50 - (trainning_period):1499 - 50]
            series_2_coint_test = prices[sym_2][1499 - 50 - (trainning_period):1499 - 50]

            # Check for cointegration and add cointegrated pair
            coint_flag, p_value, hedge_ratio, initial_intercept = calculate_cointegration_static(series_1_coint_test, series_2_coint_test)
            

            
            if (coint_flag == 1) and (hedge_ratio > 0.01) and (hedge_ratio < 100):
                series_1_train_test = prices[sym_1][1499 - 50 - (trainning_period + 2 * z_score_window):1499 - 50]
                series_2_train_test = prices[sym_2][1499 - 50 - (trainning_period + 2 * z_score_window):1499 - 50]
                
                series_1_real_test = prices[sym_1][1499 - (2 * z_score_window):]
                series_2_real_test = prices[sym_2][1499 - (2 * z_score_window):]
                trade_oppotunities, cumulative_returns, win_rate, recent_trade_qty, recent_z_score, peak_loss, std = calculate_pairs_trading_result(series_1_train_test,
                                                                                                                                              series_2_train_test,
                                                                                                                                              z_score_window,
                                                                                                                                              z_score_threshod)
                one_time_trade_oppotunities, one_time_returns, one_time_peak_loss = calculate_pairs_one_time_trading_result(series_1_real_test, series_2_real_test, z_score_window, z_score_threshod)
                
                coint_pair_list.append({
                    "sym_1": sym_1,
                    "sym_2": sym_2,
                    "std":std,
                    "p_value": p_value,
                    "hedge_ratio": hedge_ratio,
                    "initial_intercept": initial_intercept,
                    "trading_oppotunities": trade_oppotunities,
                    "estimated_returns": cumulative_returns,
                    "win_rate": win_rate,
                    "recent_trade_qty": recent_trade_qty,
                    "peak_loss": peak_loss,
                    "recent_z_score": recent_z_score,
                    "one_time_trade_oppotunities": one_time_trade_oppotunities,
                    "one_time_returns": one_time_returns,
                    "one_time_peak_loss": one_time_peak_loss,
                })

    # Output results and rank all the trading pairs
    df_coint = pd.DataFrame(coint_pair_list)
    # add the total score column
    df_coint = df_coint.sort_values("estimated_returns", ascending=False)
    filename = f"{interval}_{trainning_period}_{z_score_window}_{z_score_threshod}_cointegrated_pairs.csv"
    # choose positive hedge ratio
    df_coint = df_coint[df_coint["hedge_ratio"] > 0]
    df_coint.to_csv(filename)
    
    print(f"{interval}_{trainning_period}_{z_score_window}_cointegrated_pairs.csv has been completed")
    return df_coint

In [102]:
import json
def test_parameters(interval, trainning_period, z_score_window, z_score_threshod):
    with open (f"{interval}_price_list.json") as json_file:
        price_data = json.load(json_file)
        df_coint = get_cointegrated_pairs(price_data, interval, trainning_period, z_score_window, z_score_threshod)
        return df_coint

def get_trainning_result(df_coint: pd.DataFrame):
    df_coint = df_coint[df_coint["estimated_returns"] > 0].head(10)
    df_coint = df_coint[abs(df_coint["peak_loss"]) < 0.2 * INVESTIBLE_CAPITAL_EACH_TIME]
    average_return = df_coint["one_time_returns"].mean()
    average_loss = df_coint[df_coint["one_time_returns"] < 0].mean()
    if df_coint.shape[0] != 0:
        win_rate = (df_coint[df_coint["one_time_returns"] > 0].shape[0] / df_coint.shape[0])
    else: win_rate = 0
    return average_return, win_rate, average_loss

result_list = []
for interval in SET_INTERVALS:
    for trainning_period in SET_TRAINNING_PERIODS:
        for z_score_window in SET_Z_SCORE_WINDOW:
            for z_score_threshod in SET_TRIGGER_Z_SCORE_THRESHOD:
                df_coint = test_parameters(interval, trainning_period, z_score_window, z_score_threshod)
                average_return, win_rate, average_loss = get_trainning_result(df_coint)
                temp_dict = {"interval":interval, "trainning_period": trainning_period, "z_score_window": z_score_window,
                             "z_score_threshod": z_score_threshod, "test_average_returns": average_return, "test_win_rate":win_rate,
                             "test_ave_loss": average_loss}
                result_list.append(temp_dict)
                df_result = pd.DataFrame(result_list)
                df_result.to_csv("analysis.csv")
    

1m_100_20_cointegrated_pairs.csv has been completed
1m_100_20_cointegrated_pairs.csv has been completed
1m_100_20_cointegrated_pairs.csv has been completed
1m_100_20_cointegrated_pairs.csv has been completed
1m_100_20_cointegrated_pairs.csv has been completed
1m_100_20_cointegrated_pairs.csv has been completed
1m_100_20_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed
1m_100_40_cointegrated_pairs.csv has been completed


KeyboardInterrupt: 

In [71]:
df

Unnamed: 0,sym_1,sym_2,std,p_value,hedge_ratio,initial_intercept,trading_oppotunities,estimated_returns,win_rate,recent_trade_qty,peak_loss,recent_z_score,one_time_trade_oppotunities,one_time_returns,one_time_peak_loss
14,BCHUSDT,TOMOUSDT,104.340193,0.010185,51.366389,182.865998,9,1887.543322,0.666667,1.253064,-79.806720,1.856927,1,80.603238,-4.961395
56,UNIUSDT,ZENUSDT,4.413268,0.018671,0.621808,0.924200,13,1018.273771,0.384615,213.444731,-172.818079,1.421342,0,0.000000,-42.001261
58,FTMUSDT,TOMOUSDT,0.100835,0.003658,0.014392,0.240290,13,351.084517,0.615385,1098.985883,-95.107838,1.726565,1,111.748745,-12.622201
63,FTMUSDT,STMXUSDT,0.110142,0.004504,0.089314,0.257195,9,259.974118,0.555556,839.457725,-84.344562,2.595934,0,0.000000,-13.631452
45,ATOMUSDT,UNIUSDT,3.909655,0.006055,0.706606,5.082743,10,230.346460,0.600000,9.018283,-30.655257,-0.776951,1,-2.481637,-2.081637
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9,BCHUSDT,DOTUSDT,141.891255,0.004130,4.897000,218.770642,14,-905.109945,0.571429,0.535716,-830.511636,-1.157035,1,4.647558,-1.693321
10,BCHUSDT,CRVUSDT,144.989465,0.002170,44.925334,209.724091,10,-1038.578465,0.500000,0.451044,-1095.048741,-0.237127,1,8.235363,-0.559301
35,TRXUSDT,WAVESUSDT,0.038730,0.000820,0.044902,-0.007130,12,-1221.128233,0.250000,520567.802487,-1122.239495,0.785689,1,102.121616,-2.073345
4,BCHUSDT,BNBUSDT,200.435066,0.003888,1.024481,-3.801153,13,-1459.640510,0.384615,0.410590,-1329.713600,-0.293669,0,0.000000,-19.639932


In [91]:
df1 = df[df["estimated_returns"] > 0].head(10)
df2 = df1[abs(df1["peak_loss"]) < 0.2 * INVESTIBLE_CAPITAL_EACH_TIME]
df2["one_time_returns"].mean()
df2[df2["one_time_returns"] > 0].shape[0]

2