In [15]:
import httpx
import zipfile
from datetime import datetime, timezone, timedelta
import pandas as pd
from io import BytesIO
import time
import pytz
import os

from typing import List

from backtesting import Backtest, Strategy
from backtesting.lib import crossover
import talib as ta

import duckdb
from collections import deque
import numpy as np
from enum import Enum
import math

In [2]:
DOWNLOAD_DATA : bool = True # switch to true to download data
KLINE_INTERVAL = "1m"
BASE_URL_TEMPLATE = "https://data.binance.vision/data/futures/um/daily/klines/{symbol}/{interval}/{symbol}-{interval}-{date_str}.zip"
SYMBOLS = [
    "BTCUSDT"
]
LOCAL_TZ = pytz.timezone("Asia/Singapore")

In [11]:
def download_historical(symbols : List[str], interval : List[str], days_ago=30) :
    columns = [
        "open_time",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "close_time",
        "quote_volume",
        "count",
        "taker_buy_volume",
        "taker_buy_quote_volume",
        "ignore"
    ]

    daterange_end = datetime.now(LOCAL_TZ).astimezone(timezone.utc) - timedelta(minutes=60 * 30) # Data delivered around 3-4am UTC
    daterange_start = daterange_end - timedelta(days=days_ago)
    
    extract_dates = [date.date().strftime("%Y-%m-%d") for date in pd.date_range(start=daterange_start, end=daterange_end)]

    for symbol in symbols :
        symbol_dir = f"./historical/futures_klines_{interval}/{symbol}/"
        os.makedirs(symbol_dir, exist_ok=True)
        for dt in extract_dates :
            url = BASE_URL_TEMPLATE.format(symbol=symbol, date_str=dt, interval=interval)
            target_file = symbol_dir + f"{dt.replace('-','')}.parquet"
            if os.path.exists(target_file) :
                continue
            response = httpx.get(url)
            response.raise_for_status()

            with zipfile.ZipFile(BytesIO(response.content)) as z:
                # There should be only one file inside the zip
                csv_filename = z.namelist()[0]

                with z.open(csv_filename) as f:
                    df = pd.read_csv(f, names=columns, skiprows=1)
                    df = df[df["ignore"] != 1]
                    df.drop("ignore", axis=1, inplace=True)
                    df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
                    df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
                    df["date"] = pd.to_datetime(dt)
                    df.to_parquet(target_file, index=False)

            time.sleep(0.2)

In [12]:
if DOWNLOAD_DATA : 
    download_historical(
        symbols=SYMBOLS,
        interval=KLINE_INTERVAL
    )

In [13]:
symbol = "BTCUSDT"
symbol_dir = f"./historical/futures_klines_1m/{symbol}/*.parquet"


dd_query = f"""
    SELECT 
        close_time AS timestamp, 
        open/1000 AS Open, 
        high/1000 as High, 
        low/1000 as Low, 
        close/1000 AS Close, 
        volume * 1000 AS Volume  
    FROM read_parquet("{symbol_dir}")
"""


df = duckdb.query(dd_query).to_df()
df.set_index("timestamp", inplace=True)

In [None]:
class Action(Enum) : 
    BUY = 1
    HOLD = 0
    SELL = -1

class MACDStrategy(Strategy):
    fastperiod = 12
    slowperiod = 26
    signalperiod = 9
    MAX_RISK_PER_TRADE_PCT: float = 0.01 # 1% risk per trade
    MAX_ABSOLUTE_DRAWDOWN: float = 0.10 # 10% equity
    MAX_RELATIVE_DRAWDOWN: float = 0.05 # 5% daily
    MAX_SPREAD_PCT: float = 0.003
    MAX_EXPOSURE_PCT: float = 0.05 # 5%
    SMOOTHING_FACTOR: float = 2.0
    ATR_PERIOD_DAYS : float = 14

    def init(self):

        self.KLINE_PERIODS = 60 * 24 * self.ATR_PERIOD_DAYS

        self.warmup_time = pd.Timedelta(minutes=self.ATR_PERIOD_DAYS)

        self.alpha_fast = self.SMOOTHING_FACTOR / (self.fastperiod + 1)  # Fast EMA (12-period)
        self.alpha_slow = self.SMOOTHING_FACTOR / (self.slowperiod + 1)  # Slow EMA (26-period)
        self.alpha_signal = self.SMOOTHING_FACTOR / (self.signalperiod + 1)
        self.last_action = Action.HOLD
        self.EMA_FAST = []
        self.EMA_SLOW = []
        self.SIGNAL = []
        self.current_position = 0.0

    def next(self):

        high = self.data.High
        low = self.data.Low
        close = self.data.Close

        elapsed = self.data.index[-1] - self.data.index[0]

        if elapsed < self.warmup_time :
            return
        high_arr = high[-self.KLINE_PERIODS:]
        low_arr = low[-self.KLINE_PERIODS:]
        close_arr = close[-self.KLINE_PERIODS:]

        last_price = self._broker.last_price

        signal =  self.calculate_signal(last_price)
        current_atr = self.calculate_atr(high_arr, low_arr, close_arr)

        
        total_value = self.equity
        total_cash = self._broker._cash
        max_exposure = total_value * self.MAX_EXPOSURE_PCT
        position_size = self.position.size
        position_value = position_size * last_price
        if total_value > 0 : 
            current_exposure = position_value/total_value
        else :
            current_exposure = 0.0

        result = self.manage_position(current_atr)
        size = self.calculate_position_size()

        if signal == Action.BUY :
            if math.isclose(self.current_position, 0.0)  : 
                print(f"Size: {size}!!!!!!!!!!!!!!!!!!!!!!!!")
                if size:
                    self.entry_position(size, signal)
            elif self.current_position > 0.0 :
                if result.get("TP/SL hit", True):
                    self.position.close()
                # decide whether to scale position
                if current_exposure >= max_exposure:
                    pass
                elif current_exposure < max_exposure:
                    if result.get("TP/SL hit", True):
                        pass
                    else :
                        size = self.calculate_position_size()
                        if size:
                            self.entry_position(size, signal)
            elif self.current_position < 0.0 :
                self.position.close()

        elif signal == Action.SELL : 
            if math.isclose(self.current_position, 0.0)  : 
                size = self.calculate_position_size()
                print(f"Size: {size}!!!!!!!!!!!!!!!!!!!!!!!!")
                if size:
                    self.entry_position(symbol, size, signal)
            elif self.current_position > 0.0 :
                if result.get("TP/SL hit", True):
                    self.position.close()
                # decide whether to scale position
                if current_exposure >= max_exposure:
                    pass
                elif current_exposure < max_exposure:
                    if result.get("TP/SL hit", True):
                        pass
                    else :
                        size = self.calculate_position_size()
                        if size:
                            self.entry_position(size, signal)
            elif self.current_position < 0.0 :
                self.position.close()


    def entry_position(self, size, action : Action) :
        if action == Action.BUY :
            self.buy(size=size)
        elif action == Action.SELL : 
            self.sell(size=size)


    def calculate_signal(self, close) :
        if len(self.EMA_FAST) == 0 :
            curr_fast_ema = close
            self.EMA_FAST.append(close)
        else : 
            prev_fast_ema = self.EMA_FAST[-1]
            curr_fast_ema = self.alpha_fast * close + (1-self.alpha_fast) * prev_fast_ema
            self.EMA_FAST.append(curr_fast_ema)

        
        if len(self.EMA_SLOW) == 0 :
            curr_slow_ema = close
            self.EMA_SLOW.append(close)
        else : 
            prev_slow_ema = self.EMA_SLOW[-1]
            curr_slow_ema = self.alpha_slow * close + (1-self.alpha_slow) * prev_slow_ema
            self.EMA_SLOW.append(curr_slow_ema)


        latest_macd = curr_fast_ema - curr_slow_ema

        if len(self.SIGNAL) == 0 :
            latest_signal = latest_macd
            self.SIGNAL.append(latest_macd) 
        else : 
            prev_signal = self.SIGNAL[-1]
            latest_signal = self.alpha_signal * latest_macd + (1 - self.alpha_signal) * prev_signal
            self.SIGNAL.append(latest_signal)


        if latest_macd > latest_signal and self.last_action != Action.BUY : 
            return Action.BUY
        
        if latest_macd < latest_signal and self.last_action != Action.HOLD : 
            return Action.SELL
    
        return Action.HOLD


    def calculate_position_size(self, atr, close) :
        risk_amount = close * self.MAX_RISK_PER_TRADE_PCT
        position_size = (risk_amount / atr)/1000 
        self.current_position_size = position_size
        return position_size

        
    def calculate_atr(self, high_arr, low_arr, close_arr) -> float : 
        prev_close_arr = np.roll(close_arr, 1)
        prev_close_arr[0] = np.nan

        high_low = high_arr - low_arr
        high_close = np.abs(high_arr - prev_close_arr)
        low_close = np.abs(low_arr - prev_close_arr)

        true_range = np.max(np.stack([high_low, high_close, low_close], axis=0), axis=0)
        atr = true_range.mean()
        return float(atr)
    
    def manage_position(self, atr, atr_multiplier :float = 1.0) :
        position_size = self.position.size
        last_price = self._broker.last_price
        position_value = position_size * last_price
        
        no_position = math.isclose(position_size, 0.0)

        if no_position : 
            return{
                "TP/SL hit": False,
                "new_sl": None,
                "new_tp": None,
                'r_multiple': 0.0,
                'pnl_pct': 0.0,
            }

        
        risk = atr * atr_multiplier
        pnl_pct = self.position.pl_pct if not(no_position) else 0.0
        entry_price = (position_value - self.position.pl)/position_size if not(no_position) else 0.
        
        tp_sl_hit = False

        if self.position.is_long : 
            r_multiple = (last_price - entry_price) / risk 
            if pnl_pct >= 0.02 and r_multiple >= 2.0:
                new_sl = entry_price + (0.5 * risk)  # tighter stop loss
                new_tp = last_price + (1.5 * risk) # higher take profit
            elif pnl_pct >= 0.01 or r_multiple >= 1.5:
                new_sl = entry_price + risk
                new_tp = last_price  + (2 * risk)
            else:
                new_sl = entry_price - risk
                new_tp = last_price + (2 * risk)

            if last_price >= new_tp or last_price <= new_sl : 
                tp_sl_hit = True
        elif self.position.is_short :
            r_multiple = (entry_price - last_price) / risk
            if pnl_pct >= 0.02 and r_multiple >= 2.0:
                new_sl = entry_price - (0.5 * risk)
                new_tp = last_price - (1.5 * risk)
            elif pnl_pct >= 0.01 or r_multiple >= 1.5:
                new_sl = entry_price - risk
                new_tp = last_price - (2 * risk)
            else:
                new_sl = entry_price + risk
                new_tp = last_price - (2 * risk)
            if last_price <= new_tp or last_price >= new_sl : 
                tp_sl_hit = True

        return {
            "TP/SL hit": tp_sl_hit,
            "new_sl": new_sl,
            "new_tp": new_tp,
            'r_multiple': r_multiple,
            'pnl_pct': pnl_pct,
        }





In [29]:
bt = Backtest(df, MACDStrategy, cash=10_000, commission=.002)
stats = bt.run()
stats

TypeError: isclose() takes exactly 2 positional arguments (3 given)