In [None]:
import os
import sys
import traceback
import re
import enum
import json
import tzlocal
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
import time
import logging
import uuid
import arrow
from typing import Dict, List, Tuple, Any, Union
import pandas as pd
from statsmodels.tsa.stattools import coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
import statsmodels.api as sm # in-compatible with pypy
from hurst import compute_Hc # compatible with pypy
import numpy as np
import math
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

In [None]:
TIMEZONE : str = "Asia/Hong_Kong"

trade_file = 'orderhist_cache.csv'
export_file = f"{trade_file.replace('.csv','')}.flattenned.csv"

ACC_INITIAL_EQUITY : float = 100

pd_orderhist = pd.read_csv(trade_file)
pd_orderhist.sort_values(by=['timestamp_ms'], ascending=[True])
pd_orderhist['entry_datetime'] = pd_orderhist['datetime'].shift(1)
pd_orderhist['entry_timestamp_ms'] = pd_orderhist['timestamp_ms'].shift(1)
pd_orderhist['entry_datetime'] = pd.to_datetime(pd_orderhist['entry_timestamp_ms'], unit='ms', utc=True).dt.tz_convert(TIMEZONE)
pd_orderhist['datetime'] = pd.to_datetime(pd_orderhist['timestamp_ms'], unit='ms', utc=True).dt.tz_convert(TIMEZONE)
pd_orderhist['amount'] = pd_orderhist['amount'].shift(1)
pd_orderhist['entry_side'] = pd_orderhist['side'].shift(1)
pd_orderhist['entry_px'] = pd_orderhist['avg_price'].shift(1)
pd_orderhist['entry_fees'] = pd_orderhist['fees'].shift(1)
pd_orderhist['total_fees'] = pd_orderhist['entry_fees'] + pd_orderhist['fees']
pd_orderhist['entry_slippage_bps'] = pd_orderhist['slippage_bps'].shift(1)
pd_orderhist['total_slippage_bps'] = pd_orderhist['slippage_bps'] + pd_orderhist['entry_slippage_bps']
pd_orderhist = pd_orderhist[pd_orderhist.reason!='entry']
pd_orderhist['pnl_less_comm'] = pd_orderhist['pnl'] - pd_orderhist['total_fees']
pd_orderhist['pnl_bps'] = round(pd_orderhist['pnl_bps'], 3)
pd_orderhist['pnl_bps_less_comm'] = round(pd_orderhist['pnl_less_comm'] / pd_orderhist['amount'] * 10000, 3)
pd_orderhist['total_fees_bps'] = round(pd_orderhist['total_fees'] / pd_orderhist['amount'] * 10000, 3)
pd_orderhist['max_pain_bps'] = round(pd_orderhist['max_pain'] / pd_orderhist['amount'] * 10000, 3)
pd_orderhist['duration_hr'] = (pd_orderhist['timestamp_ms'] - pd_orderhist['entry_timestamp_ms'])/(1000*60*60)
pd_orderhist['cumulative_pnl'] = pd_orderhist['pnl_less_comm'].cumsum()
pd_orderhist['cumulative_pnl_bps'] = pd_orderhist['pnl_bps_less_comm'].cumsum()
pd_orderhist['total_equity'] = pd_orderhist['cumulative_pnl'] + ACC_INITIAL_EQUITY
pd_orderhist['interval_return'] = np.log(pd_orderhist['total_equity'] / pd_orderhist['total_equity'].shift(1))

mask = pd_orderhist['reason2'].str.contains('Trailing stop fired', na=False)
extracted = pd_orderhist.loc[mask, 'reason2'].str.extract(
    r'loss_trailing:\s*([-\d.]+).*effective_tp_trailing_percent:\s*([-\d.]+)',
    expand=True
)
pd_orderhist['loss_trailing'] = np.nan
pd_orderhist['effective_tp_trailing_percent'] = np.nan
if not extracted.empty:
    pd_orderhist.loc[mask, 'loss_trailing'] = extracted[0].astype(float)
    pd_orderhist.loc[mask, 'effective_tp_trailing_percent'] = extracted[1].astype(float)
pd_orderhist['loss_trailing_slippage'] = pd_orderhist['loss_trailing'] - pd_orderhist['effective_tp_trailing_percent'] # in percent
pd_orderhist['loss_trailing_slippage_bps'] = round(pd_orderhist['pnl_bps'] * (pd_orderhist['loss_trailing_slippage'] / 100), 3)

min_entry_timestamp_ms = pd_orderhist['entry_timestamp_ms'].min()
max_exit_timestamp_ms = pd_orderhist['timestamp_ms'].max()
duration_ms = int(max_exit_timestamp_ms - min_entry_timestamp_ms)
duration_sec = duration_ms/1000
duration_days = round(duration_ms / (1000 *60*60 *24), 2)

pd_orderhist.rename(columns={ 'datetime' : 'exit_datetime', 'timestamp_ms' : 'exit_timestamp_ms', 'avg_price': 'exit_px', 'side' : 'exit_side', 'fees' : 'exit_fees', 'slippage_bps' : 'exit_slippage_bps' }, inplace=True)
columns = [ 'exit_datetime', 'exit_timestamp_ms', 'entry_datetime', 'entry_timestamp_ms', 'ticker', 'entry_side', 'amount', 'entry_px', 'exit_px', 'reason', 'reason2', 'pnl', 'pnl_bps', 'pnl_less_comm', 'pnl_bps_less_comm', 'total_fees_bps', 'loss_trailing', 'effective_tp_trailing_percent', 'loss_trailing_slippage', 'loss_trailing_slippage_bps', 'max_pain', 'max_pain_bps', 'entry_fees', 'exit_fees', 'total_fees', 'total_slippage_bps', 'duration_hr', 'cumulative_pnl', 'cumulative_pnl_bps', 'total_equity', 'interval_return', 'remarks']
pd_orderhist = pd_orderhist[columns]

pd_orderhist.set_index('exit_datetime', inplace=True)

total_pnl_bps = round(pd_orderhist['pnl_bps'].sum(), 2)
total_pnl_bps_less_comm = round(pd_orderhist['pnl_bps_less_comm'].sum(), 2)
num_sl = pd_orderhist[pd_orderhist.reason=='SL'].shape[0]
num_tp = pd_orderhist[pd_orderhist.reason=='CLOSED'].shape[0]
num_trades = num_tp + num_sl
hit_ratio = round(num_tp / num_trades *100, 2)
print(f"hit_ratio: {hit_ratio}% ({num_tp + num_sl} trades, {num_sl} stops), total_pnl_bps: {total_pnl_bps}, total_pnl_bps_less_comm: {total_pnl_bps_less_comm} over {duration_days} days.")

In [None]:
# Generally long/short pnl analysis useful only in specific ranges, not full history
analysis_end_timestamp_ms = datetime.now().timestamp()
analysis_start_timestamp_ms = (datetime.now() + timedelta(days=-7)).timestamp()
pd_orderhist_range = pd_orderhist[(pd_orderhist['entry_timestamp_ms']>=analysis_start_timestamp_ms) & (pd_orderhist['entry_timestamp_ms']>=analysis_end_timestamp_ms)]

long_short_summary = pd_orderhist_range.groupby('entry_side').agg(
    trade_count=('pnl_bps_less_comm', 'count'),
    total_pnl_bps_less_comm=('pnl_bps_less_comm', 'sum'),
).reset_index()

long_short_summary

In [None]:
from ccxt.okx import okx

exchange = okx()

TIMEFRAME : str = '1m'
MAX_NUM_CANDLES : int = 5
MAX_WHIPSAW_TOLERATED_BPS : int = 30

def calc_post_trade_trailingstops_whatif_analysis(
    pd_orderhist : pd.DataFrame,
    timeframe : str = '1m',
    num_candles_fetched : int = MAX_NUM_CANDLES,
    max_whipsaw_tolerated_bps : int = MAX_WHIPSAW_TOLERATED_BPS
) -> List[Dict[str, Union[str, int, float]]]:
    results = []

    pnl_bps_more : float = 0
    for row in pd_orderhist.itertuples(index=False):
        ticker = row.ticker
        reason = row.reason
        reason2 = row.reason2
        entry_datetime = row.entry_datetime
        entry_timestamp_ms = row.entry_timestamp_ms
        exit_timestamp_ms = row.exit_timestamp_ms
        entry_px = row.entry_px
        exit_px = row.exit_px
        entry_side = row.entry_side
        pnl_bps_less_comm = row.pnl_bps_less_comm
        
        if reason=='CLOSED':
            # Did price ended up become more favorable? That we should have hang on to the trade longer?
            next_candles = exchange.fetch_ohlcv(ticker, timeframe=TIMEFRAME, since=exit_timestamp_ms, limit=MAX_NUM_CANDLES)

            # Format ts, o, h, l, c, v: [[1770856740000, 79.96, 79.98, 79.86, 79.88, 3376.74], [1770856800000, 79.88, 79.92, 79.88, 79.91, 1915.8], ... ]
            next_low = min([ x[3] for x in next_candles ] )
            next_high = max([ x[2] for x in next_candles ] )
            next_low_timestamp_ms = [ x[0] for x in next_candles if x[3]==next_low ][0]
            next_high_timestamp_ms = [ x[0] for x in next_candles if x[2]==next_high ][0]

            msg_prefix = f"{entry_datetime} {entry_side} trade, entry_px: {entry_px}, exit_px: {exit_px}, pnl_bps_less_comm: {pnl_bps_less_comm}: "
            if entry_side=='buy':
                if next_high>exit_px:
                    next_max_pain = max(next_high - next_low, 0)
                    next_max_pain_bps = round(next_max_pain/exit_px *10000, 2)
                    wait_sec_to_best_pnl = int((next_high_timestamp_ms-exit_timestamp_ms)/1000)
                    more_pnl_bps  = round((next_high/exit_px -1) *10000, 2)
                    if next_max_pain_bps<=MAX_WHIPSAW_TOLERATED_BPS:
                        pnl_bps_more += more_pnl_bps

                        results.append(
                            {
                                'ticker' : entry_datetime,
                                'entry_side' : entry_side,
                                'entry_datetime' : entry_datetime,
                                'entry_timestamp_ms' : entry_timestamp_ms,
                                'exit_timestamp_ms' : exit_timestamp_ms,
                                'reason' : reason,
                                'reason2' : reason2,
                                'entry_px' : entry_px,
                                'exit_px' : exit_px,
                                'pnl_bps_less_comm' : pnl_bps_less_comm,

                                'next_low' : next_low,
                                'next_high' : next_high,
                                'next_low_timestamp_ms' : next_low_timestamp_ms,
                                'next_high_timestamp_ms' : next_high_timestamp_ms,

                                'wait_sec_to_best_pnl' : wait_sec_to_best_pnl,
                                'next_max_pain' : next_max_pain,
                                'next_max_pain_bps' : next_max_pain_bps,
                                'more_pnl_bps' : more_pnl_bps
                            }
                        )
                        print(f"{msg_prefix}: Could have exited at {next_high} ({more_pnl_bps} bps) if waited {wait_sec_to_best_pnl} sec if you endured {next_max_pain_bps} bps whipsaw")
            elif entry_side=='sell':
                if next_low<exit_px:
                    next_max_pain = max(next_high - next_low, 0)
                    next_max_pain_bps = round(next_max_pain/exit_px *10000, 2)
                    wait_sec_to_best_pnl = int((next_low_timestamp_ms-exit_timestamp_ms)/1000)
                    more_pnl_bps  = round((exit_px/next_low -1) *10000, 2)
                    if next_max_pain_bps<=MAX_WHIPSAW_TOLERATED_BPS:
                        pnl_bps_more += more_pnl_bps

                        results.append(
                            {
                                'ticker' : entry_datetime,
                                'entry_side' : entry_side,
                                'entry_datetime' : entry_datetime,
                                'entry_timestamp_ms' : entry_timestamp_ms,
                                'exit_timestamp_ms' : exit_timestamp_ms,
                                'reason' : reason,
                                'reason2' : reason2,
                                'entry_px' : entry_px,
                                'exit_px' : exit_px,
                                'pnl_bps_less_comm' : pnl_bps_less_comm,

                                'next_low' : next_low,
                                'next_high' : next_high,
                                'next_low_timestamp_ms' : next_low_timestamp_ms,
                                'next_high_timestamp_ms' : next_high_timestamp_ms,

                                'wait_sec_to_best_pnl' : wait_sec_to_best_pnl,
                                'next_max_pain' : next_max_pain,
                                'next_max_pain_bps' : next_max_pain_bps,
                                'more_pnl_bps' : more_pnl_bps
                            }
                        )
                        print(f"{msg_prefix}: Could have exited at {next_low} ({more_pnl_bps} bps) if waited {wait_sec_to_best_pnl} sec if you endured {next_max_pain_bps} bps whipsaw")

    return pnl_bps_more, results

pnl_bps_more, results = calc_post_trade_trailingstops_whatif_analysis(
    pd_orderhist = pd_orderhist,
    timeframe = TIMEFRAME,
    num_candles_fetched = MAX_NUM_CANDLES,
    max_whipsaw_tolerated_bps = MAX_WHIPSAW_TOLERATED_BPS
)

print(f"pnl_bps_more: {pnl_bps_more}")

In [None]:
pd_orderhist.drop(pd_orderhist.columns[pd_orderhist.columns.str.contains('unnamed',case = False)],axis = 1, inplace = True)
pd_orderhist.drop(pd_orderhist.columns[pd_orderhist.columns.str.contains('entry_timestamp_ms',case = False)],axis = 1, inplace = True)
pd_orderhist.drop(pd_orderhist.columns[pd_orderhist.columns.str.contains('exit_timestamp_ms',case = False)],axis = 1, inplace = True)
pd_orderhist.drop(pd_orderhist.columns[pd_orderhist.columns.str.contains('exchange',case = False)],axis = 1, inplace = True)
# pd_orderhist.drop(pd_orderhist.columns[pd_orderhist.columns.str.contains('reason2',case = False)],axis = 1, inplace = True)

pd_orderhist.to_csv(export_file)

In [None]:
plt.figure(figsize=(12, 6), facecolor='black')
ax = plt.gca()
ax.set_facecolor('black')

plt.hist(
    pd_orderhist[pd_orderhist.pnl_bps>=0]['pnl_bps'],
    
    bins=25,
    edgecolor='#444444',
    alpha=0.85,
    color='dodgerblue'
)

mean_pnl = round(pd_orderhist[pd_orderhist.pnl_bps>=0]['pnl_bps'].mean(), 2)
plt.axvline(mean_pnl, color='limegreen', linestyle='-', linewidth=1.3, alpha=0.85,
            label=f'Mean pnl_bps {mean_pnl}')

plt.title('pnl_bps distribution (Before fees)', 
          fontsize=14, fontweight='bold', color='white')
plt.xlabel('pnl_bps', fontsize=12, color='white')
plt.ylabel('# trades', fontsize=12, color='white')

plt.grid(True, alpha=0.15, linestyle='--', color='gray')
plt.legend(loc='upper right', frameon=False, fontsize=10.5, labelcolor='white')

ax.tick_params(colors='white', which='both')
ax.spines['bottom'].set_color('gray')
ax.spines['top'].set_color('gray')
ax.spines['left'].set_color('gray')
ax.spines['right'].set_color('gray')

plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(15, 6), facecolor='black')
ax = plt.gca()
ax.set_facecolor('black')

plt.hist(
    pd_orderhist[pd_orderhist.pnl_bps>=0]['loss_trailing_slippage_bps'],
    
    bins=25,
    edgecolor='#444444',
    alpha=0.85,
    color='dodgerblue'
)

mean_loss_trailing_slippage = round(pd_orderhist[pd_orderhist.pnl_bps>=0]['loss_trailing_slippage'].mean(), 2)
sum_loss_trailing_slippage = round(pd_orderhist[pd_orderhist.pnl_bps>=0]['loss_trailing_slippage'].sum(), 2)
mean_loss_trailing_slippage_bps = round(pd_orderhist[pd_orderhist.pnl_bps>=0]['loss_trailing_slippage_bps'].mean(), 2)
sum_loss_trailing_slippage_bps = round(pd_orderhist[pd_orderhist.pnl_bps>=0]['loss_trailing_slippage_bps'].sum(), 2)
plt.axvline(
        mean_loss_trailing_slippage_bps, 
        color='limegreen', linestyle='-', linewidth=1.3, alpha=0.85,
        label=f'mean_loss_trailing_slippage {mean_loss_trailing_slippage}, sum_loss_trailing_slippage: {sum_loss_trailing_slippage}, mean_loss_trailing_slippage_bps: {mean_loss_trailing_slippage_bps}, sum_loss_trailing_slippage_bps: {sum_loss_trailing_slippage_bps}'
    )

plt.title('loss_trailing_slippage_bps distribution', 
          fontsize=14, fontweight='bold', color='white')
plt.xlabel('loss_trailing_slippage_bps', fontsize=12, color='white')
plt.ylabel('# trades', fontsize=12, color='white')

plt.grid(True, alpha=0.15, linestyle='--', color='gray')
plt.legend(loc='upper right', frameon=False, fontsize=10.5, labelcolor='white')

ax.tick_params(colors='white', which='both')
ax.spines['bottom'].set_color('gray')
ax.spines['top'].set_color('gray')
ax.spines['left'].set_color('gray')
ax.spines['right'].set_color('gray')

plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 6), facecolor='black')
ax = plt.gca()
ax.set_facecolor('black')

plt.hist(
    pd_orderhist['total_fees_bps'],
    bins=25,
    edgecolor='#444444',
    alpha=0.85,
    color='dodgerblue'
)

mean_total_fees_bps = round(pd_orderhist['total_fees_bps'].mean(), 2)
plt.axvline(mean_total_fees_bps, color='limegreen', linestyle='-', linewidth=1.3, alpha=0.85,
            label=f'Mean total_fees_bps {mean_total_fees_bps}')

plt.title('total_fees_bps distribution', 
          fontsize=14, fontweight='bold', color='white')
plt.xlabel('total_fees_bps', fontsize=12, color='white')
plt.ylabel('# trades', fontsize=12, color='white')

plt.grid(True, alpha=0.15, linestyle='--', color='gray')
plt.legend(loc='upper right', frameon=False, fontsize=10.5, labelcolor='white')

ax.tick_params(colors='white', which='both')
ax.spines['bottom'].set_color('gray')
ax.spines['top'].set_color('gray')
ax.spines['left'].set_color('gray')
ax.spines['right'].set_color('gray')

plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 6), facecolor='black')
ax = plt.gca()
ax.set_facecolor('black')

plt.hist(
    pd_orderhist[pd_orderhist.pnl_bps>=0]['max_pain_bps'],
    bins=25,
    edgecolor='#444444',
    alpha=0.85,
    color='dodgerblue'
)

mean_max_pain_bps = round(pd_orderhist[pd_orderhist.pnl_bps>=0]['max_pain_bps'].mean(), 2)
plt.axvline(mean_max_pain_bps, color='limegreen', linestyle='-', linewidth=1.3, alpha=0.85,
            label=f'Mean max_pain_bps {mean_max_pain_bps}')

plt.title('max_pain_bps distribution', 
          fontsize=14, fontweight='bold', color='white')
plt.xlabel('max_pain_bps', fontsize=12, color='white')
plt.ylabel('# trades', fontsize=12, color='white')

plt.grid(True, alpha=0.15, linestyle='--', color='gray')
plt.legend(loc='upper right', frameon=False, fontsize=10.5, labelcolor='white')

ax.tick_params(colors='white', which='both')
ax.spines['bottom'].set_color('gray')
ax.spines['top'].set_color('gray')
ax.spines['left'].set_color('gray')
ax.spines['right'].set_color('gray')

plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 5), facecolor='black')

# Plot pnl curve
ax = plt.gca()
ax.set_facecolor('black')
ax.grid(False)
ax.set_ylabel('cumulative_pnl_bps', color='lightgray')
plt.plot(
    pd_orderhist.index,
    pd_orderhist['cumulative_pnl_bps'],
    color='lime',
    linewidth=1.6,
    label='Cumulative Pnl bps'
)

# Plot entry prices
ax2 = ax.twinx()
ax2.grid(False)
ax2.set_ylabel('Entry price', color='lightgray')
ax2.plot(
    pd_orderhist.index,
    pd_orderhist['entry_px'],
    color='dimgray',
    linewidth=1,
    label='Entry price'
)

plt.title(f'Cumulative Pnl bps (after fees) total_pnl_bps_less_comm: {total_pnl_bps_less_comm}, num_trades: {num_trades}, num_sl: {num_sl}, hit_ratio: {hit_ratio} ({duration_days} days)', 
          fontsize=14, fontweight='bold', color='white')
plt.xlabel('date/time', fontsize=12, color='white')

ax.tick_params(colors='white', which='both')
ax.spines['bottom'].set_color('gray')
ax.spines['top'].set_color('gray')
ax.spines['left'].set_color('gray')
ax.spines['right'].set_color('gray')

# Highlight weekends
weekends = pd_orderhist.index[
    (pd_orderhist.index.dayofweek == 5) | (pd_orderhist.index.dayofweek == 6)
].normalize().unique()

for t in weekends:
    ax.axvspan(t, t + pd.Timedelta(days=1),
               facecolor='lightgray', alpha=0.22, zorder=0, linewidth=0)

# Plot remarks
mask = pd_orderhist['remarks'].notna() & (pd_orderhist['remarks'].astype(str).str.strip() != '')
events = pd_orderhist.loc[mask].copy()

for idx, row in events.iterrows():
    ts = idx
    remark = str(row['remarks']).strip().lower()
    if 'manual rebook' not in remark:
        ax.axvline(
            x=ts,
            color='dimgray',
            linewidth=1.1,
            linestyle='--',
            alpha=0.7,
            zorder=5
        )
        
        ax.text(
            x=ts,
            y=0.03,
            s=remark,
            color='dimgray',
            fontsize=9,
            fontweight='normal',
            ha='right',
            va='bottom',
            rotation=90,
            bbox=dict(
                facecolor='black',
                alpha=0.65,
                edgecolor='none',
                pad=1.6,
                boxstyle='round,pad=0.4'
            ),
            zorder=6
        )

plt.tight_layout()
plt.show()

In [None]:
# https://github.com/ranaroussi/quantstats
%matplotlib inline
import quantstats as qs

RISK_FREE_RATE = 0.035

qs.reports.basic(
    returns=pd_orderhist['interval_return'], 
    rf=RISK_FREE_RATE,
    title="Tear Sheet"
    )