In [91]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import math
warnings.filterwarnings('ignore')
pd.set_option('display.max_rows', None)

## Prepare dataset

In [92]:
# Read a CSV file into a DataFrame
csv_file_path = "../Dataset/XRPUSDT/XRPUSDT_5min.csv"  # Replace with your CSV file path
xrp_df = pd.read_csv(csv_file_path)

xrp_df['timestamp'] = pd.to_datetime(xrp_df['timestamp'])

earliest_data = pd.to_datetime('2025-04-01')

xrp_df = xrp_df[xrp_df['timestamp'] >= earliest_data]

In [93]:
def getATR(df, period):
    # Calculate the True Range (TR)
    df['high_low'] = df['high'] - df['low']
    df['high_close'] = abs(df['high'] - df['close'].shift())
    df['low_close'] = abs(df['low'] - df['close'].shift())
    df['tr'] = df[['high_low', 'high_close', 'low_close']].max(axis=1)

    # Calculate the Average True Range (ATR) with Wilder's method 
    df['atr'] = df['tr'].ewm(alpha=1/period, adjust=False).mean()

    return df

In [94]:
def getADX(df, period):
    high = df['high']
    low  = df['low']

    up_move = high.diff()
    down_move = -low.diff()

    plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
    minus_dm = np.where((up_move < down_move) & (down_move > 0), down_move, 0.0)


    df = getATR(df.copy(), period)

    plus_dm_ewm = pd.Series(plus_dm, index=df.index).ewm(alpha=1/period, adjust=False).mean()
    minus_dm_ewm = pd.Series(minus_dm, index=df.index).ewm(alpha=1/period, adjust=False).mean()

    plus_di = 100 * (plus_dm_ewm / df['atr'])
    minus_di = 100 * (minus_dm_ewm / df['atr'])

    dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
    adx = dx.ewm(alpha=1/period, adjust=False).mean()

    return plus_di, minus_di, adx, df['atr']


In [95]:
tp = 2
sl = -1

trend_df = pd.DataFrame()

trend_df['timestamp'] = xrp_df['timestamp']

#SMA
trend_df['SMA_fast'] = xrp_df['close'].rolling(window=10).mean()
trend_df['SMA_slow'] = xrp_df['close'].rolling(window=30).mean()

#MACD
trend_df['EMA_fast'] = xrp_df['close'].ewm(span=12, adjust=False).mean()
trend_df['EMA_slow'] = xrp_df['close'].ewm(span=26, adjust=False).mean()
trend_df['MACD'] = trend_df['EMA_fast'] - trend_df['EMA_slow']
trend_df['Signal_Line'] = trend_df['MACD'].ewm(span=9, adjust=False).mean()

#ADX
adx = getADX(xrp_df, 14)
trend_df['plus_di'] = pd.DataFrame(adx[0]).rename(columns = {0:'plus_di'})
trend_df['minus_di'] = pd.DataFrame(adx[1]).rename(columns = {0:'minus_di'})
trend_df['adx'] = pd.DataFrame(adx[2]).rename(columns = {0:'adx'})
trend_df['take_profit'] = tp * getADX(xrp_df, 14)[3]
trend_df['stop_loss'] = sl * getADX(xrp_df, 14)[3]

#VOLUME
trend_df['avg_volume'] = xrp_df['volume'].rolling(window=20).mean()
trend_df['volume'] = xrp_df['volume']

trend_df.dropna(inplace = True)
trend_df.reset_index(drop=True, inplace=True)


Add Signals

In [96]:
def checkMACD(EMA_fast, EMA_slow, MACD, signal_line):
    if EMA_fast > EMA_slow and MACD > signal_line:
        return 1  # Buy signal
    elif EMA_fast < EMA_slow and MACD < signal_line:
        return -1  # Sell signal
    else:
        return 0  # No signal
    

In [97]:
def checkSMA(SMA_fast, SMA_slow):
    if SMA_fast > SMA_slow:
        return 1  # Buy signal
    elif SMA_fast < SMA_slow:
        return -1  # Sell signal
    else:
        return 0  # No signal

In [98]:
def checkADX(plus_di, minus_di, adx):
    if adx > 20 and plus_di > minus_di:
        return 1  # Buy signal
    elif adx > 20 and minus_di > plus_di:
        return -1  # Sell signal
    else:
        return 0  # No signal / Take Profit

In [99]:
def checkVolume(volume, avg_volume):
    if volume > avg_volume:
        return 1  # High volume
    else:
        return 0  # Low volume

In [100]:
def checkSignal(SMA, MACD, ADX, vol):
    return SMA * MACD * ADX * vol

In [101]:
def addSignals(xrp_df, trend_df):
    xrp_df['SMA_Signal'] = trend_df.apply(lambda row: checkSMA(row['SMA_fast'], row['SMA_slow']), axis=1)
    xrp_df['MACD_Signal'] = trend_df.apply(lambda row: checkMACD(row['EMA_fast'], row['EMA_slow'], row['MACD'], row['Signal_Line']), axis=1)
    xrp_df['ADX_Signal'] = trend_df.apply(lambda row: checkADX(row['plus_di'], row['minus_di'], row['adx']), axis=1)
    xrp_df['Volume_Signal'] = trend_df.apply(lambda row: checkVolume(row['volume'], row['avg_volume']), axis=1)

    # Combine the signals
    xrp_df['Combined_Signal'] = xrp_df.apply(lambda row: checkSignal(row['SMA_Signal'], row['MACD_Signal'], row['ADX_Signal'], row['Volume_Signal']), axis=1)
    return xrp_df

In [102]:
trend_df = trend_df.reset_index(drop=True)
xrp_df = xrp_df.reset_index(drop=True)

xrp_df['take_profit'] = trend_df['take_profit']
xrp_df['stop_loss'] = trend_df['stop_loss']

xrp_df = addSignals(xrp_df, trend_df)

## Python Code Implementation

In [103]:
xrp_df.head()

Unnamed: 0,timestamp,open,high,low,close,volume,take_profit,stop_loss,SMA_Signal,MACD_Signal,ADX_Signal,Volume_Signal,Combined_Signal
0,2025-04-01 00:00:00,2.0899,2.1015,2.0852,2.1004,1206826.0,0.015197,-0.007598,1.0,1.0,1.0,1.0,1.0
1,2025-04-01 00:05:00,2.1005,2.107,2.0965,2.1045,1058005.0,0.014768,-0.007384,1.0,1.0,1.0,0.0,0.0
2,2025-04-01 00:10:00,2.1045,2.1045,2.0928,2.0929,468495.0,0.014571,-0.007285,1.0,1.0,1.0,0.0,0.0
3,2025-04-01 00:15:00,2.093,2.0952,2.0866,2.0951,656157.0,0.014458,-0.007229,1.0,1.0,1.0,1.0,1.0
4,2025-04-01 00:20:00,2.0952,2.0978,2.0932,2.0955,745775.0,0.014154,-0.007077,1.0,1.0,1.0,1.0,1.0


In [104]:
#dataset
first_day = xrp_df['timestamp'].dt.date.min()
xrp_first_day_rows = xrp_df[xrp_df['timestamp'].dt.date == first_day]

xrp_simulation_data = xrp_df[xrp_df['timestamp'].dt.date > first_day]
xrp_simulation_data = xrp_simulation_data[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
columns_to_add = ['SMA_fast', 'SMA_slow', 'EMA_fast', 'EMA_slow', 'MACD', 'Signal_Line', 
                  'plus_di', 'minus_di', 'adx', 'avg_volume']
xrp_first_day_rows = xrp_first_day_rows.join(trend_df[columns_to_add])


In [105]:
def editDataFrame(df, newData):

    df = df.iloc[1:]

    new_data_df = pd.DataFrame(newData)
    new_data_df['timestamp'] = pd.to_datetime(new_data_df['timestamp'])
    df = pd.concat([df, new_data_df], ignore_index=True)

    return df


In [106]:
def calculateATR(df, period):

    # Calculate the True Range (TR)
    df['high_low'] = df['high'] - df['low']
    df['high_close'] = abs(df['high'] - df['close'].shift())
    df['low_close'] = abs(df['low'] - df['close'].shift())
    df['tr'] = df[['high_low', 'high_close', 'low_close']].max(axis=1)

    # Calculate the Average True Range (ATR) with Wilder's method 
    df['atr'] = df['tr'].ewm(alpha=1/period, adjust=False).mean()

    return df

In [107]:
def calculateADX(df, period):
    up_move = df['high'].diff()
    down_move = -df['low'].diff()

    plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
    minus_dm = np.where((up_move < down_move) & (down_move > 0), down_move, 0.0)


    df = getATR(df.copy(), period)

    plus_dm_ewm = pd.Series(plus_dm, index=df.index).ewm(alpha=1/period, adjust=False).mean()
    minus_dm_ewm = pd.Series(minus_dm, index=df.index).ewm(alpha=1/period, adjust=False).mean()

    plus_di = 100 * (plus_dm_ewm / df['atr'])
    minus_di = 100 * (minus_dm_ewm / df['atr'])

    dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
    adx = dx.ewm(alpha=1/period, adjust=False).mean()

    return plus_di, minus_di, adx, df['atr']


In [108]:
def calculateSMA(df, fast, slow):

    df.loc[df.index[-1], 'SMA_fast'] = df['close'].iloc[-fast:].mean()
    df.loc[df.index[-1], 'SMA_slow'] = df['close'].iloc[-slow:].mean()

    return df

In [109]:
def calculateMACD(df, fast, slow, signal):

    df.loc[df.index[-1], 'EMA_fast'] = df['close'].iloc[-fast:].ewm(span=fast, adjust=False).mean().iloc[-1]
    df.loc[df.index[-1], 'EMA_slow'] = df['close'].iloc[-slow:].ewm(span=slow, adjust=False).mean().iloc[-1]
    df.loc[df.index[-1], 'MACD'] = df['EMA_fast'].iloc[-1] - df['EMA_slow'].iloc[-1]
    df.loc[df.index[-1], 'Signal_Line'] = df['MACD'].iloc[-signal:].ewm(span=signal, adjust=False).mean().iloc[-1]

    return df


In [110]:
def calculateVolume(df):

    df.loc[df.index[-1], 'avg_volume'] = df['volume'].rolling(window=20).mean().iloc[-1]

    return df


In [111]:
def calculateTPSL(df, adx, tp=2, sl=-1):
    
    df.loc[df.index[-1],'take_profit'] = tp * adx
    df.loc[df.index[-1],'stop_loss'] = sl * adx

    return df


In [112]:
def checkMACD(EMA_fast, EMA_slow, MACD, signal_line):
    if EMA_fast > EMA_slow and MACD > signal_line:
        return 1  # Buy signal
    elif EMA_fast < EMA_slow and MACD < signal_line:
        return -1  # Sell signal
    else:
        return 0  # No signal
    

In [113]:
def checkSMA(SMA_fast, SMA_slow):
    if SMA_fast > SMA_slow:
        return 1  # Buy signal
    elif SMA_fast < SMA_slow:
        return -1  # Sell signal
    else:
        return 0  # No signal

In [114]:
def checkADX(plus_di, minus_di, adx):
    if adx > 20 and plus_di > minus_di:
        return 1  # Buy signal
    elif adx > 20 and minus_di > plus_di:
        return -1  # Sell signal
    else:
        return 0  # No signal / Take Profit

In [115]:
def checkVolume(volume, avg_volume):
    if volume > avg_volume:
        return 1  # High volume
    else:
        return 0  # Low volume

In [116]:
def checkSignal(SMA, MACD, ADX):
    
    return SMA * MACD * ADX

In [117]:
def addSignals(df):

    sma = checkSMA(df.loc[df.index[-1], 'SMA_fast'], df.loc[df.index[-1], 'SMA_slow'])
    macd = checkMACD(df.loc[df.index[-1], 'EMA_fast'], df.loc[df.index[-1], 'EMA_slow'], df.loc[df.index[-1], 'MACD'], df.loc[df.index[-1], 'Signal_Line'])
    adx = checkADX(df.loc[df.index[-1], 'plus_di'], df.loc[df.index[-1], 'minus_di'], df.loc[df.index[-1], 'adx'])
    #vol = checkVolume(df.loc[df.index[-1], 'volume'], df.loc[df.index[-1], 'avg_volume'])


    df.loc[df.index[-1], 'SMA_Signal'] = sma
    df.loc[df.index[-1], 'MACD_Signal'] = macd
    df.loc[df.index[-1], 'ADX_Signal'] = adx
    #df.loc[df.index[-1], 'Volume_Signal'] = vol
    
    #df.loc[df.index[-1], 'Combined_Signal'] = checkSignal(sma, macd, adx, vol)
    df.loc[df.index[-1], 'Combined_Signal'] = checkSignal(sma, macd, adx)

    return df

In [118]:
def prepareDataset(cryptoDf, tp=2, sl=-1):

    cryptoDf = calculateSMA(cryptoDf, 10, 30)
    cryptoDf = calculateMACD(cryptoDf, 12, 26, 9)
    #cryptoDf = calculateVolume(cryptoDf)


    adx = calculateADX(cryptoDf, 14)
    cryptoDf.loc[cryptoDf.index[-1],'plus_di'] = adx[0].iloc[-1]
    cryptoDf.loc[cryptoDf.index[-1],'minus_di'] = adx[1].iloc[-1]
    cryptoDf.loc[cryptoDf.index[-1],'adx'] = adx[2].iloc[-1]

    cryptoDf = calculateTPSL(cryptoDf, adx[3].iloc[-1], tp, sl)
    cryptoDf = addSignals(cryptoDf)

    return cryptoDf


In [119]:
def manageProfit(entryPrice, currentPrice, signal, currentShares):
    
    trade_profit = 0
    
    if signal == "buy":
        trade_profit = (currentPrice - entryPrice) * currentShares 
    elif signal == "sell":
        trade_profit = (entryPrice - currentPrice) * currentShares

    return trade_profit

In [120]:
def main():

    initial_investment = 200
    signal = None
    entry_price = None
    hold = False
    trade_amount = 200
    total_investment_amount = 200.00
    total_trades = 0
    current_shares = 0 
    errors = 0
    tp = 2
    sl = -1
    tmp_df = xrp_first_day_rows.copy()
    profit = 0

    for i in range(len(xrp_simulation_data)):
        timestamp = xrp_simulation_data['timestamp'].iloc[i]
        open = xrp_simulation_data['open'].iloc[i]
        high = xrp_simulation_data['high'].iloc[i]
        low = xrp_simulation_data['low'].iloc[i]
        close = xrp_simulation_data['close'].iloc[i]
        volume = xrp_simulation_data['volume'].iloc[i]

        new_data = [{
            'timestamp': timestamp, 
            'open': open, 
            'high': high, 
            'low': low, 
            'close': close,
            'volume': volume
        }]

        tmp_df = editDataFrame(tmp_df, new_data)
        prepared_crypto_df = prepareDataset(tmp_df, tp, sl)
        latest_data = prepared_crypto_df.iloc[-1]

        # No position held
        if not hold:
            if latest_data['Combined_Signal'] == 1:
                signal = "buy"
                entry_price = close
                current_shares = trade_amount / entry_price
                hold = True
                total_trades += 1
            elif latest_data['Combined_Signal'] == -1:
                signal = "sell"
                entry_price = close
                current_shares = trade_amount / entry_price
                hold = True
                total_trades += 1

        # When holding a position
        else:
            profit = manageProfit(entry_price, close, signal, current_shares)

            # Check take profit and stop loss conditions or if signal says otherwise
            if (profit >= latest_data['take_profit'] or profit <= latest_data['stop_loss']) or latest_data['Combined_Signal'] == 0:
                total_investment_amount += profit  # Update investment amount with the profit or loss from this trade
                if profit < 0:
                    errors += 1
                hold = False  # Exit the position
                profit = 0  # Reset after trade is closed

        if total_investment_amount < trade_amount:
            trade_amount = total_investment_amount
        #else: trade_amount = total_investment_amount    
        
        if total_investment_amount <= 0:
            print(f"Investment depleted at iteration {i} on {latest_data['timestamp']}")
            break

    profit_percentage = math.floor((total_investment_amount/initial_investment)*100)
    error_percentage = 0
    if errors > 0:
        error_percentage = math.floor(errors/total_trades * 100)

    print("Profit from the strategy by investing P200 in XRP : " + str(round(total_investment_amount, 2)))
    print("Profit percentage of the strategy : " + str(profit_percentage-100) + "%")
    print("Total Trades: ", total_trades)
    print("Errors: ", errors)
    print("Error Percentage: ", error_percentage, "%")  

main()


Profit from the strategy by investing P200 in XRP : 218.99
Profit percentage of the strategy : 9%
Total Trades:  874
Errors:  430
Error Percentage:  49 %
