In [1]:
from polygon import RESTClient
import pandas as pd
from datetime import datetime, timedelta
import time

In [3]:
df = pd.read_csv("QQQ_3min_2021_2025.csv")
data = df.copy()
data['timestamp'] = pd.to_datetime(data['timestamp'])
data['date'] = data['timestamp'].dt.date
data['time'] = data['timestamp'].dt.time

# Filter for regular market hours
data = data[(data['time'] >= pd.to_datetime("09:30").time()) & 
            (data['time'] <= pd.to_datetime("16:00").time())]

# Only keep days with exactly 130 rows
valid_dates = data['date'].value_counts()
valid_dates = valid_dates[valid_dates <= 135].index
data = data[data['date'].isin(valid_dates)]


In [4]:

# Ensure datetime
data['timestamp'] = pd.to_datetime(data['timestamp'])

# Filter market hours: 09:30 to 16:00

data = data[(data['time'] >= pd.to_datetime("09:30").time()) & (data['time'] <= pd.to_datetime("16:00").time())]

# Ensure each day has all expected intervals (26 x 15min = 6.5 hours)
full_df = data.groupby(data['timestamp'].dt.date).filter(lambda x: len(x) > 20)

# Drop temporary column
full_df.drop(columns='time', inplace=True)

# Result stored in `full_days`
full_df['date'] = full_df['timestamp'].dt.date

In [35]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from dtaidistance import dtw
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean


class fractal_analysis:
    def __init__(self, df):
        self.df = df.copy() 
        self.df['date'] = self.df['timestamp'].dt.date # Creates a date time stamp feature column
        self.ref_df_scaled = None # the reference days list
        self.ref_dates = [] # Holds list of dates used for the refernce pattern. 
        self.ref_len = 0 # length of reference dataframe
        self.result_df = {} # holds the results from the CV test

    def _prepare_day(self, date):
        # Ensures data is in datetime.date format.
        d = pd.to_datetime(date).date()

        #  Filters rows for the given date and within market hours
        day_df = self.df[
            (self.df['timestamp'].dt.date == d) &
            (self.df['timestamp'].dt.time >= pd.to_datetime("09:30").time()) &
            (self.df['timestamp'].dt.time < pd.to_datetime("16:00").time())
        ]

        # Raise error if day is missing
        if day_df.empty:
            raise ValueError(f"No data found for {date}")
            
        return day_df['close'].values.reshape(-1, 1) # returns a 2D array

    def set_reference_days(self, dates: list):

        # Convert input string dates to datetime.date objects. 
        self.reference_dates = [pd.to_datetime(d).date() for d in dates]

        # Collect all data for each date and flatten them into one series
        y_segments = [self._prepare_day(d) for d in self.reference_dates]
        y_combined = np.concatenate(y_segments).flatten()

        # Check if there's enough data. 
        if len(y_combined) < 2:
            raise ValueError("Not enough data in reference days.")

        # normalize so first value is 100.
        self.ref_df_scaled = (y_combined / y_combined[0]) * 100
        self.ref_len = len(self.ref_df_scaled)

    
    def shape_loss(self, start_date: str, chunk_size: int) -> dict:
        all_dates = sorted(self.df['date'].unique())
        d0 = pd.to_datetime(start_date).date()
        idx = all_dates.index(d0)
        test_dates = all_dates[idx:idx + chunk_size]
    
        y_segments = [self._prepare_day(d) for d in test_dates]
        y_combined = np.concatenate(y_segments).flatten()  # Ensure 1-D
    
        if len(y_combined) < 2:
            raise ValueError("Not enough data in test days.")
    
        y_scaled = (y_combined / y_combined[0]) * 100  # Normalize to start=100
    
        ref = self.ref_df_scaled[:len(y_scaled)].flatten()  # Ensure 1-D
        test = y_scaled[:len(ref)].flatten()  # Ensure 1-D
    
        dtw_loss, _ = fastdtw(ref, test, dist=euclidean)
        ols_loss = mean_squared_error(ref, test)
    
        return {'dtw_loss': dtw_loss}
      
    def CV_test(self, chunk_size: int = 1):
        print(f"Testing Started for chunk size {chunk_size}")
        all_dates = sorted(self.df['date'].unique())
        results = []
    
        for i in range(len(all_dates) - chunk_size):
            start_date = all_dates[i]
            if start_date in self.ref_dates:
                continue
            try:
                loss = self.shape_loss(start_date, chunk_size=chunk_size)
                results.append({'start_date': start_date, 'loss': loss})
            except Exception as e:
                print(f"Error on {start_date}: {e}")
                continue
    
        self.result_df = pd.DataFrame(results).sort_values(
            by=lambda x: (x['loss']['dtw_loss'], x['loss']['ols_loss'])
        )
        print("Testing Done")

    def run_multiple_chunks(self, chunk_sizes=[1, 5, 30]):
        for chunk in chunk_sizes:
            print(f"\n--- Chunk Size: {chunk} ---")
            self.CV_test(chunk_size=chunk)


In [62]:
ref_date = np.unique(full_df['date'])[-4:-1]
print(ref_date)

short_df =  full_df[(full_df['timestamp'] >= '2023-01-01') ]

fa = FractalAnalysis(short_df)
fa.set_reference_days(ref_date)
fa.run_cv(chunk_sizes= [1])


[datetime.date(2025, 5, 27) datetime.date(2025, 5, 28)
 datetime.date(2025, 5, 29)]
Running CV for chunk size 1
Error on 2023-01-03: Input vector should be 1-D.
Error on 2023-01-04: Input vector should be 1-D.
Error on 2023-01-05: Input vector should be 1-D.
Error on 2023-01-06: Input vector should be 1-D.
Error on 2023-01-09: Input vector should be 1-D.
Error on 2023-01-10: Input vector should be 1-D.
Error on 2023-01-11: Input vector should be 1-D.
Error on 2023-01-12: Input vector should be 1-D.
Error on 2023-01-13: Input vector should be 1-D.
Error on 2023-01-17: Input vector should be 1-D.
Error on 2023-01-18: Input vector should be 1-D.
Error on 2023-01-19: Input vector should be 1-D.
Error on 2023-01-20: Input vector should be 1-D.
Error on 2023-01-23: Input vector should be 1-D.
Error on 2023-01-24: Input vector should be 1-D.
Error on 2023-01-25: Input vector should be 1-D.
Error on 2023-01-26: Input vector should be 1-D.
Error on 2023-01-27: Input vector should be 1-D.
Error 

KeyboardInterrupt: 

In [39]:
class fractal_analysis:
    def __init__(self, df):
        self.df = df.copy()
        self.df['date'] = self.df['timestamp'].dt.date
        self.reference_day = None
        self.reference_dates = []
        self.reference_day_len = 0
        self.result_df = None

    def _prepare_day(self, date):
        d = pd.to_datetime(date).date()
        day_df = self.df[
            (self.df['timestamp'].dt.date == d) &
            (self.df['timestamp'].dt.time >= pd.to_datetime("09:30").time()) &
            (self.df['timestamp'].dt.time < pd.to_datetime("16:00").time())
        ]
        if day_df.empty:
            raise ValueError(f"No data found for {date}")
        return day_df['close'].values.reshape(-1, 1)

    def HIDE_set_reference_days(self, dates: list):
        self.reference_dates = [pd.to_datetime(d).date() for d in dates]
        y_segments = [self._prepare_day(d) for d in self.reference_dates]
        y_combined = np.concatenate(y_segments)
        self.reference_day_len = len(y_combined)

        scaler = MinMaxScaler(feature_range=(0, 100))
        self.reference_day = scaler.fit_transform(y_combined).flatten()

    def set_reference_days(self, dates: list):
        self.reference_dates = [pd.to_datetime(d).date() for d in dates]
        y_segments = [self._prepare_day(d) for d in self.reference_dates]
        y_combined = np.concatenate(y_segments).flatten()
    
        if len(y_combined) < 2:
            raise ValueError("Not enough data in reference days.")

        self.reference_day = (y_combined / y_combined[0]) * 100

    def HIDE_shape_loss(self, start_date: str) -> float:
        all_dates = sorted(self.df['date'].unique())
        d0 = pd.to_datetime(start_date).date()

        if d0 not in all_dates:
            raise ValueError(f"{d0} not found in data.")

        idx = all_dates.index(d0)
        if idx + len(self.reference_dates) > len(all_dates):
            raise ValueError(f"Not enough following dates after {d0}.")

        test_dates = all_dates[idx:idx + len(self.reference_dates)]
        y_segments = [self._prepare_day(d) for d in test_dates]
        y_combined = np.concatenate(y_segments)

        scaler = MinMaxScaler(feature_range=(0, 100))
        y_scaled = scaler.fit_transform(y_combined).flatten()

        min_len = min(len(self.reference_day), len(y_scaled))
        ref = self.reference_day[:min_len]
        test = y_scaled[:min_len]

        return mean_squared_error(ref, test)

    def shape_loss(self, start_date: str) -> float:
        all_dates = sorted(self.df['date'].unique())
        d0 = pd.to_datetime(start_date).date()
        idx = all_dates.index(d0)
        test_dates = all_dates[idx:idx + len(self.reference_dates)]
    
        y_segments = [self._prepare_day(d) for d in test_dates]
        y_combined = np.concatenate(y_segments).flatten()
    
        if len(y_combined) < 2:
            raise ValueError("Not enough data in test days.")
    
        y_scaled = (y_combined / y_combined[0]) * 100
        ref = self.reference_day[:len(y_scaled)]
        test = y_scaled[:len(ref)]
    
        return mean_squared_error(ref, test)

    
    def CV_test(self):
        print("Testing Started")
        all_dates = sorted(self.df['date'].unique())
        results = []

        for i in range(len(all_dates) - len(self.reference_dates)):
            start_date = all_dates[i]
            if start_date in self.reference_dates:
                continue
            try:
                loss = self.shape_loss(start_date)
                results.append({'start_date': start_date, 'loss': loss})
            except Exception as e:
                print(f"Error on {start_date}: {e}")
                continue

        self.result_df = pd.DataFrame(results).sort_values(by='loss')
        print("Testing Done")

    def HIDE_plot(self, start_date: str, lookahead_days: int = 1):
        if self.reference_day is None:
            raise ValueError("Reference day not set. Call set_reference_days() first.")
    
        all_dates = sorted(self.df['date'].unique())
        d0 = pd.to_datetime(start_date).date()
        if d0 not in all_dates:
            raise ValueError(f"Start date {d0} not found.")
    
        idx = all_dates.index(d0)
        test_span = len(self.reference_dates)
        total_needed = test_span + lookahead_days
    
        if idx + total_needed > len(all_dates):
            raise ValueError(f"Not enough days after {start_date} to include {lookahead_days} future day(s).")
    
        test_dates = all_dates[idx:idx + total_needed]
        y_segments = [self._prepare_day(d) for d in test_dates]
        y_combined = np.concatenate(y_segments)
    
        scaler = MinMaxScaler(feature_range=(0, 100))
        y_scaled = scaler.fit_transform(y_combined).flatten()
    
        match_len = sum(len(s) for s in y_segments[:test_span])
        ref_len = min(len(self.reference_day), match_len)
        X_ref = np.arange(ref_len)
        X_full = np.arange(len(y_scaled))
    
        # Plot
        plt.figure(figsize=(12, 4))
        plt.plot(X_ref, self.reference_day[:ref_len], label=f'Reference: {" + ".join(str(d) for d in self.reference_dates)}', linewidth=1)
        plt.plot(X_full, y_scaled, '--', label=f'Test: {test_dates[0]} + {test_span}d + {lookahead_days}d future', linewidth=2)
        plt.axvline(ref_len, color='red', linestyle=':', label='Future start')
        plt.title(f'Fractal Shape Comparison\nRef: {self.reference_dates} vs Test: {test_dates[:test_span]} + {lookahead_days} future day(s)')
        plt.xlabel("3-min intervals")
        plt.ylabel("Scaled Price")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    def plot_individual(self, lookahead_days: int = 1, top_n: int = 20):
        if self.reference_day is None or not self.reference_dates:
            raise ValueError("Reference day not set.")
        if self.result_df is None or self.result_df.empty:
            raise ValueError("No test results found. Run CV_test() first.")
    
        all_dates = sorted(self.df['date'].unique())
        ref_last_date = max(self.reference_dates)
        ref_end_idx = all_dates.index(ref_last_date)
    
        # Plot reference future if available
        ref_future = []
        ref_future_dates = all_dates[ref_end_idx + 1 : ref_end_idx + 1 + lookahead_days] \
            if ref_end_idx + lookahead_days < len(all_dates) else []
    
        if ref_future_dates:
            ref_future_segments = [self._prepare_day(d).flatten() for d in ref_future_dates]
            ref_future = np.concatenate(ref_future_segments)
            ref_future = (ref_future / ref_future[0]) * 100
    
        for i, row in self.result_df.head(top_n).iterrows():
            start_date = pd.to_datetime(row['start_date']).date()
            test_start_idx = all_dates.index(start_date)
            test_total_days = len(self.reference_dates) + lookahead_days
    
            if test_start_idx + test_total_days > len(all_dates):
                continue
    
            test_dates = all_dates[test_start_idx : test_start_idx + test_total_days]
            y_segments = [self._prepare_day(d).flatten() for d in test_dates]
            y_combined = np.concatenate(y_segments)
            y_scaled = (y_combined / y_combined[0]) * 100
    
            # Plot
            plt.figure(figsize=(12, 5))
            X_ref = np.arange(len(self.reference_day))
            plt.plot(X_ref, self.reference_day, label='Reference', linewidth=2, color='black')
    
            if len(ref_future):
                X_ref_future = np.arange(len(self.reference_day), len(self.reference_day) + len(ref_future))
                plt.plot(X_ref_future, ref_future, '--', label=f'Ref Future ({ref_future_dates[0]})', color='gray')
    
            X_test = np.arange(len(y_scaled))
            plt.plot(X_test, y_scaled, label=f'Test: {start_date} (+{lookahead_days}d)', alpha=0.8, linewidth=2)
    
            plt.axvline(len(self.reference_day), color='red', linestyle=':', label='Future starts')
            plt.title(f'Match {i+1}: Ref vs Test {start_date}')
            plt.xlabel('3-min intervals')
            plt.ylabel('Indexed Price (Start = 100)')
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.show()
             
            


In [61]:
from sklearn.preprocessing import MinMaxScaler
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

class FractalAnalysis:
    def __init__(self, df):
        self.df = df.copy()
        self.df['date'] = self.df['timestamp'].dt.date
        self.ref_dates = []
        self.ref_data = None
        self.ref_len = 0
        self.results = {}

    def _get_day_data(self, date):
        d = pd.to_datetime(date).date()
        day_df = self.df[
            (self.df['timestamp'].dt.date == d) &
            (self.df['timestamp'].dt.time >= pd.to_datetime("09:30").time()) &
            (self.df['timestamp'].dt.time < pd.to_datetime("16:00").time())
        ]
        if day_df.empty:
            raise ValueError(f"No data found for {date}")
        return day_df['close'].values.flatten()  # ensure 1D array

    def set_reference_days(self, dates):
        self.ref_dates = [pd.to_datetime(d).date() for d in dates]
        segments = [self._get_day_data(d) for d in self.ref_dates]
        combined = np.concatenate(segments)
        self.ref_data = (combined / combined[0]) * 100
        self.ref_len = len(self.ref_data)

    def _dtw_loss(self, test_data):
        ref_resized = np.interp(np.linspace(0, len(self.ref_data), num=len(test_data)),
                                np.arange(len(self.ref_data)), self.ref_data)
        return fastdtw(ref_resized, test_data, dist=euclidean)[0]

    def run_cv(self, chunk_sizes):
        all_dates = sorted(self.df['date'].unique())
        ref_date_set = set(self.ref_dates)

        for size in chunk_sizes:
            print(f"Running CV for chunk size {size}")
            results = []
            for i in range(len(all_dates) - size):
                chunk = all_dates[i:i+size]
                if any(d in ref_date_set for d in chunk):
                    continue
                try:
                    segments = [self._get_day_data(d) for d in chunk]
                    combined = np.concatenate(segments)
                    scaled = (combined / combined[0]) * 100
                    scaled = scaled.flatten()  # ensure 1D array
                    loss = self._dtw_loss(scaled)
                    results.append({"start_date": chunk[0], "loss": loss})
                except Exception as e:
                    print(f"Error on {chunk[0]}: {e}")
                    continue
            self.results[size] = pd.DataFrame(results).sort_values(by="loss")

    def plot_top_matches(self, size, top_n=5):
        if size not in self.results:
            raise ValueError("Chunk size not found in results")

        df = self.results[size].head(top_n)
        all_dates = sorted(self.df['date'].unique())

        for _, row in df.iterrows():
            start_date = pd.to_datetime(row['start_date']).date()
            idx = all_dates.index(start_date)
            dates = all_dates[idx:idx+size]
            segments = [self._get_day_data(d) for d in dates]
            combined = np.concatenate(segments)
            scaled = (combined / combined[0]) * 100
            scaled = scaled.flatten()

            ref = self.ref_data
            test = np.interp(np.linspace(0, len(ref), num=len(scaled)), np.arange(len(ref)), ref)

            plt.figure(figsize=(12, 4))
            plt.plot(np.arange(len(ref)), ref, label='Reference', linewidth=2)
            plt.plot(np.arange(len(test)), scaled, '--', label=f'Test: {start_date}', linewidth=1.5)
            plt.title(f"Match: {start_date}, DTW Loss: {row['loss']:.2f}")
            plt.xlabel("3-min intervals")
            plt.ylabel("Scaled Price")
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.show()
