In [34]:
import numpy as np
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO)

## Import Data

In [41]:
# Load the data
data = pd.read_parquet("/Users/yiukitcheung/Documents/Projects/Stocks/train_data_repository/test_data.parquet")

# Set the data path
data_path = "/Users/yiukitcheung/Documents/Projects/Stocks/train_data_repository/test_data.parquet"
data.head()


Unnamed: 0,high,close_t-1,low,MACD_HIST,open,close_t-2,BodyDiff,volume,MACD,169EMA,...,Incremental_High_1.0,dual_channel_Alert_0,dual_channel_Alert_1,CandleStickType_red,Engulf_Alert_0,Engulf_Alert_1,MACD_Alert_0,MACD_Alert_1,timestamp,log_daily_return
0,85.98558,86.387512,81.241373,-0.006014,85.062738,87.742287,2.035662,559863000,-0.443547,65.357703,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0,
1,86.22254,83.027077,83.186047,0.101017,84.434838,86.387512,1.367774,377898000,-0.311262,65.598232,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,3.288267
2,89.266031,85.802612,87.025409,0.356199,87.774284,83.027077,0.999832,398341000,0.032971,65.870889,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,2,3.404567
3,92.204538,88.774117,89.040068,0.714234,89.375011,85.802612,2.749539,376203000,0.569564,66.179756,...,1.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,3,3.704634
4,91.76561,92.12455,88.996078,0.801323,91.082726,88.774117,0.543908,437342000,0.856983,66.466333,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,4,-1.736277


## Trendline Automation

In [42]:
class TrendLineAutomation:
    def __init__(self, data_path: str, lookback: int = 30):
        self.data = pd.read_parquet(data_path)
        self.data = self.data.set_index('timestamp')
        features = ['open', 'high', 'low', 'close']
        self.data[features] = np.log(self.data[features])
        self.lookback = lookback
        self.support_slope = [np.nan] * len(self.data)
        self.resist_slope = [np.nan] * len(self.data)

    def check_trend_line(self, support: bool, pivot: int, slope: float, y: np.array):
        if pivot < 0 or pivot >= len(y):
            logging.error(f"Pivot {pivot} is out of bounds for array of length {len(y)}")
            return -1.0
        print(pivot)
        print(y)
        intercept = -slope * pivot + y[pivot]
        
        line_vals = slope * np.arange(len(y)) + intercept
        diffs = line_vals - y

        if support and diffs.max() > 1e-5:
            return -1.0
        elif not support and diffs.min() < -1e-5:
            return -1.0

        err = (diffs ** 2.0).sum()
        return err

    def optimize_slope(self, support: bool, pivot: int, init_slope: float, y: np.array):
        slope_unit = (y.max() - y.min()) / len(y)
        opt_step = 1.0
        min_step = 0.0001
        curr_step = opt_step
        best_slope = init_slope
        best_err = self.check_trend_line(support, pivot, init_slope, y)

        if best_err < 0.0:
            logging.error(f"Initial slope {init_slope} resulted in invalid trend line for pivot {pivot}.")
            return None, None

        get_derivative = True
        derivative = None
        while curr_step > min_step:
            if get_derivative:
                slope_change = best_slope + slope_unit * min_step
                test_err = self.check_trend_line(support, pivot, slope_change, y)
                derivative = test_err - best_err

                if test_err < 0.0:
                    slope_change = best_slope - slope_unit * min_step
                    test_err = self.check_trend_line(support, pivot, slope_change, y)
                    derivative = best_err - test_err

                if test_err < 0.0:
                    logging.error("Derivative failed. Check your data.")
                    return None, None

                get_derivative = False

            if derivative > 0.0:
                test_slope = best_slope - slope_unit * curr_step
            else:
                test_slope = best_slope + slope_unit * curr_step

            test_err = self.check_trend_line(support, pivot, test_slope, y)
            if test_err < 0 or test_err >= best_err:
                curr_step *= 0.5
            else:
                best_err = test_err
                best_slope = test_slope
                get_derivative = True

        return best_slope, -best_slope * pivot + y.iloc[pivot]

    def fit_trendlines_single(self, data: np.array):
        x = np.arange(len(data))
        coefs = np.polyfit(x, data, 1)
        line_points = coefs[0] * x + coefs[1]
        upper_pivot = (data - line_points).argmax()
        lower_pivot = (data - line_points).argmin()
        support_coefs = self.optimize_slope(True, lower_pivot, coefs[0], data)
        resist_coefs = self.optimize_slope(False, upper_pivot, coefs[0], data)
        return support_coefs, resist_coefs

    def fit_trendlines_high_low(self, high: np.array, low: np.array, close: np.array):
        x = np.arange(len(close))
        coefs = np.polyfit(x, close, 1)
        line_points = coefs[0] * x + coefs[1]
        upper_pivot = (high - line_points).argmax()
        lower_pivot = (low - line_points).argmin()
        support_coefs = self.optimize_slope(True, lower_pivot, coefs[0], low)
        resist_coefs = self.optimize_slope(False, upper_pivot, coefs[0], high)
        return support_coefs, resist_coefs

    def calculate_trendlines(self):
        required_columns = {'high', 'low', 'close'}
        if not required_columns.issubset(self.data.columns):
            logging.error(f"Data is missing required columns: {required_columns - set(self.data.columns)}")
            return

        for i in range(self.lookback - 1, len(self.data)):
            candles = self.data.iloc[i - self.lookback + 1: i + 1]
            support_coefs, resist_coefs = self.fit_trendlines_high_low(candles['high'], candles['low'], candles['close'])
            if support_coefs and resist_coefs:
                self.support_slope[i] = support_coefs[0]
                self.resist_slope[i] = resist_coefs[0]

# Usage example
if __name__ == "__main__":
    trendline_automation = TrendLineAutomation(data_path=data_path)
    trendline_automation.calculate_trendlines()
    print(trendline_automation.support_slope)
    print(trendline_automation.resist_slope)

ERROR:root:Initial slope 0.014411398003034924 resulted in invalid trend line for pivot 17.
ERROR:root:Initial slope 0.014701481614252726 resulted in invalid trend line for pivot 13.


15
timestamp
0     4.397425
1     4.421080
2     4.466200
3     4.489086
4     4.488592
5     4.493177
6     4.479791
7     4.491016
8     4.483163
9     4.487727
10    4.517253
11    4.544222
12    4.519510
13    4.537152
14    4.534365
15    4.535106
16    4.620088
17    4.634561
18    4.699249
19    4.708470
20    4.697244
21    4.672100
22    4.718358
23    4.736425
24    4.765998
25    4.773225
26    4.770703
27    4.762092
28    4.776851
29    4.808597
Name: low, dtype: float64
15
timestamp
0     4.397425
1     4.421080
2     4.466200
3     4.489086
4     4.488592
5     4.493177
6     4.479791
7     4.491016
8     4.483163
9     4.487727
10    4.517253
11    4.544222
12    4.519510
13    4.537152
14    4.534365
15    4.535106
16    4.620088
17    4.634561
18    4.699249
19    4.708470
20    4.697244
21    4.672100
22    4.718358
23    4.736425
24    4.765998
25    4.773225
26    4.770703
27    4.762092
28    4.776851
29    4.808597
Name: low, dtype: float64
15
timestamp
0     4.3

KeyError: 1