# Looking at the impact of filler bars or no filler bars on strategy profitability

In [None]:
import numpy as np
import warnings
import pandas as pd
import pyperclip as clip
import ta
warnings.filterwarnings('ignore', category=FutureWarning)

In [None]:
output_path = '../tmp-data/ss_range_bars.final.csv'

In [None]:
df = pd.read_csv('../data/btcusdt-1year-1m-merged.csv', usecols=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['date'] = pd.to_datetime(df['timestamp']).dt.date 
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
df.rename(columns={'close': 'Close', 'open': 'Open', 'high': 'High', 'low': 'Low'}, inplace=True)
df

In [None]:
def adr(df: pd.DataFrame) -> float:
    daily_high_low = df.groupby(  # type: ignore
                'date')[['High', 'Low']].agg(['max', 'min']) # type: ignore
    high_max: pd.Series = daily_high_low[('High', 'max')]
    low_min: pd.Series = daily_high_low[('Low', 'min')]
    # convert the Series to numeric values
    high_max = pd.to_numeric(high_max, errors='coerce')
    low_min = pd.to_numeric(low_min, errors='coerce')
    adr = high_max - low_min # type: ignore
    daily_high_low['adr'] = adr
    average_adr = np.mean(daily_high_low['adr']).item()  # type: ignore
    return average_adr

def relative_adr_range_size(df_in: pd.DataFrame, resample_arg: str = 'W'):
    # resample the dataframe into monthly periods
    groups = df_in.resample(resample_arg)
    df_out = pd.DataFrame()
    for _, group in groups:
        week_day_seg = group.copy()
        average_adr = adr(week_day_seg)
        week_day_seg['average_adr'] = average_adr
        df_out = pd.concat([df_out, week_day_seg])
    return df_out

In [None]:
def adv(df: pd.DataFrame, window=14) -> pd.Series:
    result = df['volume'].rolling(window=window).mean()
    result.fillna(0, inplace=True)
    return result

In [None]:
cp = df.copy()
df2 = relative_adr_range_size(cp)
df2['adv'] = adv(cp)
df2

In [None]:
def create_range_bar_df(df: pd.DataFrame):
    range_bars = []
    current_bar = {'adv': df.iloc[0]['adv'], 'volume': df.iloc[0]['volume'], 'average_adr': df.iloc[0]['average_adr'], 'timestamp': df.index.to_series(
    )[0], 'Open': df.iloc[0]['Open'], 'High': df.iloc[0]['High'], 'Low': df.iloc[0]['Low'], 'Close': df.iloc[0]['Close']}
    current_high = current_bar['High']
    current_low = current_bar['Low']
    filler_bars = 0

    for index, row in df.iterrows():
        high = row['High']
        low = row['Low']
        range_size = row['average_adr'] * 0.1

        if high - current_low >= range_size:
            current_bar['Close'] = current_low + range_size
            range_bars.append(current_bar)

            num_bars = int((high - current_low - range_size) // range_size)
            for i in range(num_bars):
                current_bar = {'timestamp': pd.Timestamp(index) + pd.Timedelta(seconds=(i + 1)), 'adv': row['adv'], 'volume': row['volume'], 'average_adr': row['average_adr'], 'Open': current_low + range_size * (
                    i), 'High': current_low + range_size * (i + 1), 'Low': current_low + range_size * (i), 'Close': current_low + range_size * (i + 1)}
                # print(f'adjusted timestamp: {current_bar["timestamp"]}')
                filler_bars += 1
                range_bars.append(current_bar)

            current_bar = {'volume': row['volume'] * num_bars, 'average_adr': row['average_adr'], 'adv': row['adv'], 'timestamp': index,
                           'Open': current_low + range_size * num_bars, 'High': high, 'Low': current_low + range_size * num_bars, 'Close': row['Close']}
            current_high = high
            current_low = current_bar['Low']

        elif current_high - low >= range_size:
            current_bar['Close'] = current_high - range_size
            range_bars.append(current_bar)

            num_bars = int((current_high - low - range_size) // range_size)
            for i in range(num_bars):
                current_bar = {'timestamp': pd.Timestamp(index) + pd.Timedelta(seconds=(i + 1)), 'adv': row['adv'], 'volume': row['volume'], 'average_adr': row['average_adr'], 'Open': current_high - range_size * (
                    i + 1), 'High': current_high - range_size * (i), 'Low': current_high - range_size * (i + 1), 'Close': current_high - range_size * (i + 1)}
                # print(f'adjusted timestamp: {current_bar["timestamp"]}')
                filler_bars += 1
                range_bars.append(current_bar)

            current_bar = {'volume': row['volume'] * (num_bars + 1), 'average_adr': row['average_adr'], 'adv': row['adv'], 'timestamp': index,
                           'Open': current_high - range_size * (num_bars + 1), 'High': current_high - range_size * num_bars, 'Low': low, 'Close': row['Close']}
            current_high = current_bar['High']
            current_low = low
        else:
            current_high = max(current_high, high)
            current_low = min(current_low, low)
            current_bar['timestamp'] = index
            current_bar['High'] = current_high
            current_bar['Low'] = current_low
            current_bar['Close'] = row['Close']
            current_bar['average_adr'] = row['average_adr']
            current_bar['volume'] = row['volume']
            current_bar['adv'] = row['adv']

    return pd.DataFrame(range_bars), filler_bars


In [None]:
rb_df, filler_bars = create_range_bar_df(df2.copy())
print(f'filler bars: {filler_bars}')
# Count the number of rows containing NaN values
num_nan_rows = df.isna().any(axis=1).sum()
print(f'num_nan_rows: {num_nan_rows}')


In [None]:
rb_df['timestamp'] = pd.to_datetime(rb_df['timestamp'])
rb_df.set_index('timestamp', inplace=True)
rb_df.sort_index(inplace=True)
rb_df_copy = rb_df.copy()
rb_df

In [None]:
# calculate MACD with custom parameters
macd = ta.trend.MACD(rb_df['Close'], window_slow=26, window_fast=12, window_sign=9)
# add MACD values to the dataframe as new columns
rb_df['macd'] = macd.macd()
rb_df['macd_signal'] = macd.macd_signal()
rb_df['macd_histogram'] = macd.macd_diff()
rb_df.dropna(inplace=True)
rb_df


In [None]:
bb = ta.volatility.BollingerBands(rb_df['Close'], window=12, window_dev=2)

# add upper and lower Bollinger Bands to the dataframe as new columns
rb_df['bb_upper'] = bb.bollinger_hband()
rb_df['bb_lower'] = bb.bollinger_lband()
rb_df['bb_distance'] = bb.bollinger_hband() - bb.bollinger_lband()
rb_df.dropna(inplace=True)
rb_df


In [None]:
rsi = ta.momentum.RSIIndicator(rb_df['Close'], window=7).rsi()
rb_df['rsi'] = rsi
rb_df.dropna(inplace=True)
rb_df.head(20)


In [None]:
rb_df['signal'] = 0
rb_df['false_signal'] = -1
rb_df

In [None]:
class RangeBarStrategy:
    
    long_indexes = []
    short_indexes = []
    anti_squeeze_distance = 10
  
    def __init__(self, df):
        self.df = df
        self.updated_df = pd.DataFrame()

    def bb_upper_near(self, index, row):
        try:
            upper_series = self.df.iloc[:index+1]['bb_upper']
            if len(upper_series) < 2:
                return False
            # print(f'upper_series:\n{str(upper_series)}')
            close = row['Close']
            since = self.iterations_back_till_condition(upper_series, lambda x: x >= close)
            # print(f'bb_upper_near(upper_series >= close): index: {index}, close: {close}, since: {since}')
            return since < 2
        except Exception as e:
            print(f'bb_upper_near: exception: {e.__cause__}')
            raise e
        

    def bb_lower_near(self, index, row):
       try:
         lower_series = self.df.iloc[:index+1]['bb_lower']
         if len(lower_series) < 2:
                return False
        #  print(f'lower_series:\n{str(lower_series)}')
         close = row['Close']
         since = self.iterations_back_till_condition(lower_series, lambda x: x <= close)
        #  print(f'bb_lower_near(lower_series <= close): index: {index}, close: {close}, since: {since}')
         return since < 2
       except Exception as e:
            print(f'bb_lower_near: exception: {e.__cause__}')
            raise e
       
    def iterations_back_till_condition(self, series, condition):
        count = 0
        for value in series[::-1]:
            if condition(value):
                break
            count += 1
        return count
       
    def bb_upper_pointing_up(self, index):
        from scipy.stats import linregress
        bb_seg = self.df.iloc[index-3:index+1]['bb_upper']
        # print(f'bb_seg: {len(bb_seg)}')
        if len(bb_seg) > 0:
            seg_len = len(bb_seg)
            try:
                slope, _, _, _, _ = linregress(range(seg_len), bb_seg)
                # print(f'bb_upper_pointing_up: seg_len: {seg_len}, slope: {slope}')
                return slope > 0
            except Exception as e:
                print(f'bb_upper_pointing_up: exception: {str(e)}')
        return False

    def bb_lower_pointing_down(self, index):
        from scipy.stats import linregress
        bb_seg = self.df.iloc[index-3:index+1]['bb_lower']
        # print(f'bb_seg: {len(bb_seg)}')
        if len(bb_seg) > 0:
            seg_len = len(bb_seg)
            try:
                slope, _, _, _, _ = linregress(range(seg_len), bb_seg)
                # print(f'bb_lower_pointing_down: seg_len: {seg_len}, slope: {slope}')
                return slope < 0
            except Exception as e:
                print(f'bb_lower_pointing_down: exception: {str(e)}')  
           
        return False
    
    def scan(self):
        for index, row in self.df.iterrows():
            try:
                numeric_index = self.df.index.get_loc(index)
                # print(f'numeric_index: {numeric_index}')
                updated_row = self.next(row, numeric_index, index)
                self.df.loc[index] = updated_row
                if numeric_index % 10000 == 0:
                    print(f'progress index: {index}')
            except Exception as e:
                print(f'next: exception: {str(e)}')

    def next(self, row, numeric_index, index):
        is_long_rsi = row['rsi'] > 70
        is_long_macd = row['macd'] > row['macd_signal'] > 0
        is_bb_upper_near = self.bb_upper_near(numeric_index, row)
        is_bb_upper_pointing_up = self.bb_upper_pointing_up(numeric_index)

        is_short_rsi = row['rsi'] < 30
        is_short_macd = row['macd'] < row['macd_signal'] < 0
        is_bb_lower_near = self.bb_lower_near(numeric_index, row)
        is_bb_lower_pointing_down = self.bb_lower_pointing_down(numeric_index)

        is_volume_above_adv_limit = row['volume'] > 0 and row['volume'] > row['adv']
        is_bb_dist_above = row['bb_distance'] > self.anti_squeeze_distance

        if is_long_rsi and is_long_macd and is_bb_upper_near and is_bb_upper_pointing_up and is_bb_dist_above:
            self.long_indexes.append(index)
            row['signal'] = 1
        elif is_short_rsi and is_short_macd and is_bb_lower_near and is_bb_lower_pointing_down and is_bb_dist_above and is_volume_above_adv_limit:
            row['signal'] = -1
            self.short_indexes.append(index)
        else:
            row['signal'] = 0
        return row


In [None]:
# Count the number of rows containing NaN values
num_nan_rows = rb_df.isna().any(axis=1).sum()
print(f'num_nan_rows: {num_nan_rows}')

In [None]:
df_sample = rb_df
strategy = RangeBarStrategy(df_sample)
strategy.scan()
print(f'long_indexes: {len(strategy.long_indexes)}')
print(f'short_indexes: {len(strategy.short_indexes)}')

## With is_not_eq_last_close
### 7 day average adr
* long_indexes: 1872
* short_indexes: 2083

### using 3 on near
false_signals: 1697, true_signals: 8707, diff: 7010
false_signals: 1841, true_signals: 9182, diff: 7341
### using 2 on near
false_signals: 1549, true_signals: 7952, diff: 6403
false_signals: 1664, true_signals: 8266, diff: 6602

In [None]:
na_len_before = len(df_sample)
df_sample.dropna(inplace=True)
na_len_after = len(df_sample)
print(f'na_len_before: {na_len_before}, na_len_after: {na_len_after}')

In [None]:
df_sample.to_csv(output_path)

In [None]:
df_sample.columns