# Imports

In [None]:
%pip install tslearn
%pip install lightweight-charts
%pip install loguru
%pip install rich
import ipywidgets as widgets
from IPython.display import display
from loguru import logger
from rich import print
import numpy as np
import pandas as pd
from tslearn.metrics import dtw, ctw
from lightweight_charts import Chart, JupyterChart
from datetime import datetime, timedelta, time
import pytz
from collections import namedtuple
from typing import Callable
import traceback

# nytz = pytz.timezone('America/New_York')
est = pytz.timezone('US/Eastern')
Strategy = Callable[[pd.DataFrame, pd.DataFrame], float]
freq='5min'

In [2]:
from dataclasses import dataclass

Projection = namedtuple('Projection', ['window', 'projection_start', 'match_end'])
MatchModel = namedtuple('MatchModel', ['start', 'end', 'score'])

@dataclass
class WindowMatch:
  window: pd.DataFrame
  match_end: datetime
  projection_start: datetime
  score: float = -1

# Data Prep

In [4]:
# Read dataframe
input_file = "C:\\Users\\jkosk\dev\\data\\qqq-20230101-20241004.ohlcv-1m.csv.zip"

df = pd.read_csv(input_file, compression='zip', parse_dates=['ts_event'], index_col='ts_event', date_format='ISO8601')
df.index = df.index.tz_convert(est)
df.index.rename('time', inplace=True)
rows = len(df)
print(f'Read {rows} rows')
original = df.copy(deep=True)

In [5]:
# Deal with duplicates
dups = df[df.index.duplicated()]
print(f'Found {len(dups)} duplicates')

# df = df[~df.index.duplicated()]
# print(f'Read {rows} rows, dropped {rows-len(df)} duplicates.')

In [6]:
# Resample
df = original.resample(freq).agg({'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'})
df.dropna(inplace=True)
df_resampled_original = df.copy(deep=True)
df.head()

  df = original.resample(freq).agg({'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'})


Unnamed: 0_level_0,open,high,low,close,volume
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-01-03 04:00:00-05:00,269.01,269.4,268.78,269.38,15863
2023-01-03 04:05:00-05:00,269.38,269.39,268.84,269.05,14315
2023-01-03 04:10:00-05:00,269.07,269.41,269.07,269.34,5187
2023-01-03 04:15:00-05:00,269.37,269.64,269.15,269.15,5578
2023-01-03 04:20:00-05:00,269.26,269.58,269.26,269.58,1546


# Search Functions

In [None]:
window_boundary_tolerance=timedelta(minutes=5)

def hlc4(data: pd.DataFrame):
  return (data.high + data.low + data.close + data.close) / 4

# def normalize_window(data: pd.DataFrame, feature: pd.Series):
#     norm = (feature  - data.low.min()) / (data.high.max() - data.low.min())
#     if norm_percent:
#       norm = norm * 100/data.low.min()
#     return norm

def normalize_window(feature: pd.Series, base: float):
    return (feature - base) / base

def dtw_hlc4(target: pd.DataFrame, window: pd.DataFrame):
  norm_target = normalize_window(hlc4(target), target.close[-1])
  norm_window = normalize_window(hlc4(window), window.close[-1])
  return dtw(norm_target, norm_window)

def dtw_close(target: pd.DataFrame, window: pd.DataFrame):
  norm_target = normalize_window(target.close, target.close[-1])
  norm_window = normalize_window(window.close, window.close[-1])
  return dtw(norm_target, norm_window)

def dtw_high(target: pd.DataFrame, window: pd.DataFrame):
  norm_target = normalize_window(target.high, target.close[-1])
  norm_window = normalize_window(window.high, window.close[-1])
  return dtw(norm_target, norm_window)

def dtw_low(target: pd.DataFrame, window: pd.DataFrame):
  norm_target = normalize_window(target.low, target.close[-1])
  norm_window = normalize_window(window.low, window.close[-1])
  return dtw(norm_target, norm_window)

def get_window(data: pd.DataFrame, window_start_time: time, window_size_days: int, window_end: datetime):
    # convert the data index into a list of dates, removing duplicates
    dates = np.unique(data.index.date)
    idx = np.searchsorted(dates, window_end.date())

    if idx >= window_size_days:
      window_start = datetime.combine(dates[idx-window_size_days], window_start_time)
      window_start = est.localize(window_start)
      indexer = data.index.get_indexer([window_start], method='nearest', tolerance=window_boundary_tolerance)

      if not indexer or indexer[0] == -1:
        nearest = data.index.get_indexer([window_start], method='nearest')
        print(f"Can't load window starting at {window_start}. indexer was {indexer}. nearest was {nearest}.")
        return None
      else:
        window_start = data.index[indexer[0]]
        window = data.loc[window_start:window_end]
        # print(f"get_window: Loaded window starting at {window_start}. indexer was {indexer}. head:")
        # print(window.head())
        return window
    else:
      print(f"Can't load window ending at {window_end}. Date index for start was {idx}.")
      return None

def find_similar_windows(data: pd.DataFrame, window_time_start: time, window_size_days: int, target_end: datetime, strategy: Strategy):
    target_window: pd.DataFrame = get_window(data, window_time_start, window_size_days, target_end)
    if target_window is None:
      raise Exception('Can''t load target window')

    print(f'Using target window {target_window.index[0]}-{target_window.index[-1]}')
    print(f'Searching for windows of length {window_size_days} days ending at time {target_end.time()}')
    idxs = data.index.indexer_at_time(target_end.time())

    matches = []
    success = 0
    fail = 0
    for i in idxs:
      window_end = data.index[i]
      window = get_window(data, window_time_start, window_size_days, window_end)
      if window is None:
        fail += 1
        continue
      try:
        score = strategy(target_window, window)
      except Exception as e:
        print(f'Error calculating score: {e}')
        fail += 1
        continue
      score = strategy(target_window, window)
      matches.append(MatchModel(window.index[0], window.index[-1], score))
      success += 1
    print(f'Successfully processed {success} matches, with {fail} failures.')
    return matches

def least_distance(matches: list[MatchModel], top: int = 0):
    # Sort ascending
    if not top:
      return sorted(matches, key=lambda match: match.score)
    else:
      return sorted(matches, key=lambda match: match.score)[:top]

def highest_score(matches: list[MatchModel], top: int = 0):
    # Sort descending
    if not top:
      return sorted(matches, key=lambda match: match.score, reverse=True)
    else:
      return sorted(matches, key=lambda match: match.score, reverse=True)[:top]

def find_similar_dtw_hlc4(data: pd.DataFrame, window_time_start: time, window_size_days: int, target_end: datetime, top: int = None):
    matches = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_hlc4)
    return least_distance(matches, top)

def find_similar_dtw_high_low_1(data: pd.DataFrame, window_time_start: time, window_size_days: int, target_end: datetime, top: int = None):
    matches_high = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_high)
    matches_low = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_low)

    # Select the top from the intermediate results, only use results that are in both top results.
    intermediate_top_size = int(len(matches_high) / 5)
    matches_high = least_distance(matches_high, intermediate_top_size)
    matches_low = least_distance(matches_low, intermediate_top_size)

    matches = []
    for match_high in matches_high:
      for match_low in matches_low:
        if match_high.end.date() == match_low.end.date():
          score = (match_high.score + match_low.score)/2
          matches.append(MatchModel(match_high.start, match_high.end, score))
    return least_distance(matches, top)

def find_similar_dtw_high_low_2(data: pd.DataFrame, window_time_start: time, window_size_days: int, target_end: datetime, top: int = None):
    matches_high = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_high)
    matches_low = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_low)

    lookup_low = {match.end: match for match in matches_low}
    matches = []
    for match_high in matches_high:
      score = (match_high.score + lookup_low[match_high.end].score)/2
      matches.append(MatchModel(match_high.start, match_high.end, score))
    return least_distance(matches, top)

def find_similar_dtw_high_low_4(data: pd.DataFrame, window_time_start: time, window_size_days: int, target_end: datetime, top: int = None):
    matches_high = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_high)
    matches_low = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_low)
    matches_close = find_similar_windows(data, window_time_start, window_size_days, target_end, dtw_close)

    matches = []
    lookup_high = {match.end: match for match in matches_high}
    lookup_low = {match.end: match for match in matches_low}
    for match_close in matches_close:
      score = (lookup_high[match_close.end].score + lookup_low[match_close.end].score + match_close.score*2)/4
      matches.append(MatchModel(match_close.start, match_close.end, score))
    return least_distance(matches, top)

# Run

In [None]:
df = df_resampled_original.copy(deep=True)
df = df.between_time('04:00', '17:00')

In [None]:
from market_pattern_search.plotting import get_window_matches

# Run similarity search
confirmed_matches = set()
top_n = 7

end = est.localize(datetime(2024, 7, 15, 10))
# matches = find_similar_dtw_hlc4(df, time(9, 30), 1, end)

matches = find_similar_dtw_high_low_4(df, time(9, 30), 1, end)
top_matches = matches[:top_n]
window_matches = get_window_matches(df, top_matches)

# auto_confirm=False
auto_confirm=True
confirmed_matches = set([match.end for match in top_matches])

# Configure Plot Matches

In [None]:
show_project_default = False

if not auto_confirm:
  confirmed_matches.clear()

# Configure dropdown
window_match_options = [WindowMatch(pd.DataFrame(), pd.NaT, pd.NaT)] + window_matches

input_layout = layout=widgets.Layout(display='inline-block', width='30%')
input_layout_none = layout=widgets.Layout(display='none')
# container_layout = layout=widgets.Layout(flex_flow='row')

chart_selector = widgets.Dropdown(
    options=[(match.match_end, i) for i, match in enumerate(window_match_options)],
    value=0,
    description='Chart:',
    layout=input_layout
)

confirm_button_layout = input_layout if not auto_confirm else input_layout_none
confirm_button = widgets.ToggleButton(value=False, description='Confirm Match', button_style='success', layout=confirm_button_layout)
show_project_button = widgets.ToggleButton(value=show_project_default, description='Show Projection', button_style='success', layout=input_layout)

In [None]:
from market_pattern_search.plotting import create_chart

# Configure chart callback

current_chart = None
def chart_callback(chart_selected: int, confirm_match: bool, show_project: bool):
    global current_chart
    global confirmed_matches
    selected_match_end = window_match_options[chart_selected].match_end

    # print(f'selected_match_end: {selected_match_end}, confirm_match: {confirm_match}, show_project: {show_project}')
    if selected_match_end != current_chart or show_project_button.value != show_project:
      chart = create_chart(window_match_options[chart_selected], show_project)
      if chart:
        chart.load()
      current_chart = selected_match_end
      show_project_button.value = show_project
    elif confirm_match and pd.notna(selected_match_end):
      confirmed_matches.add(selected_match_end)
    elif pd.notna(selected_match_end):
      confirmed_matches.discard(selected_match_end)

    show_project_button.description = 'Hide Projection' if show_project else 'Show Projection'

    if selected_match_end in confirmed_matches:
      confirm_button.value = True
      confirm_button.description = 'Confirmed'
    else:
      confirm_button.value = False
      confirm_button.description = 'Confirm Match'
    # logger.info(f'Confirmed matches: {confirmed_matches}')

# Plot Matches

In [None]:
interact = widgets.interactive(chart_callback, chart_selected=chart_selector, confirm_match=confirm_button, show_project=show_project_button)
display(interact)

# Configure Plot Projections from Target

In [None]:
confirmed_matches

In [None]:
from market_pattern_search.plotting import get_window_match

plot_bands=True

# Get the target window
target_window = get_window_match(df, matches[0])
target_projection = target_window.window.loc[target_window.projection_start:]
scale_base = target_window.window.close.loc[target_window.match_end]

confirmed_windows=[]
for window_match in window_matches:
  if window_match.match_end in confirmed_matches:
    confirmed_windows.append(window_match)
confirmed_projections = [window_match.window.loc[window_match.projection_start:] for window_match in confirmed_windows if pd.notna(window_match.projection_start)]

# Find longest projection, extend target_projection to include the others
longest_projection = max(len(projection) for projection in confirmed_projections)
longest_projection = max(longest_projection, len(target_projection))
common_project_index = pd.date_range(start=target_window.projection_start, periods=longest_projection, freq=freq)

# Create new target window by extending using the common projection index
extended_target_index = target_window.window.loc[:target_window.match_end].index.append(common_project_index)
extended_target_index = extended_target_index.drop_duplicates()
if not extended_target_index.is_monotonic_increasing:
  raise Exception('Extended target index is not monotonic increasing')
extended_target_window = pd.DataFrame(index=extended_target_index, columns=target_window.window.columns)
extended_target_window.update(target_window.window)

target_window=WindowMatch(extended_target_window, target_window.projection_start, target_window.match_end)
target_chart = create_chart(target_window, show_projection=True)

def get_line_data(line_data: pd.Series|pd.DataFrame):
  if isinstance(line_data, pd.Series):
    line_data = line_data.to_frame()
  line_data.rename(columns={line_data.columns[0]: 'value'}, inplace=True)
  line_data.index = line_data.index.tz_localize(None)
  return line_data

# Add projections to the chart
scaled_projections = []
for projection in confirmed_projections:
  # Align projection with the target index
  aligned_values = np.full_like(common_project_index, np.nan)
  aligned_values[:len(projection.close.values)] = projection.close.values
  aligned_projection = pd.DataFrame(index=common_project_index, data=aligned_values, columns=['close'])

  # Scale projection data to the target window
  norm = normalize_window(aligned_projection.close, aligned_projection.close[0])
  scaled = (norm * scale_base) + scale_base
  scaled = get_line_data(scaled)
  scaled_projections.append(scaled)

# Calculate average of all projections_real
scaled_projects_combined = pd.concat(scaled_projections, axis=1)
average_projection = scaled_projects_combined.mean(axis=1)
average_projection = get_line_data(average_projection)

# Add average projection as a line to the target chart
line = target_chart.create_line(color='#FFA500', width=2)
line.set(average_projection)

if plot_bands:
  # Calculate percentile bands
  percentile_high = scaled_projects_combined.quantile(0.8, axis=1)
  percentile_low = scaled_projects_combined.quantile(0.2, axis=1)
  line = target_chart.create_line(color='rgba(255, 255, 255, 0.6)', style='dashed', width=1)
  line.set(get_line_data(percentile_high))
  line = target_chart.create_line(color='rgba(255, 255, 255, 0.6)', style='dashed', width=1)
  line.set(get_line_data(percentile_low))
else:
  # Add scaled projection as a line to the target chart
  for projection in scaled_projections:
    line = target_chart.create_line(color='rgba(255, 255, 255, 0.6)', style='dashed', width=1)
    line.set(projection)


# Plot Projections from Target

In [None]:
# Display the chart
target_chart.load()

# Logger

In [None]:
# @title
# Create an logging widget at the bottom.
logger.remove(None)
log_output = widgets.Output()
level = 'INFO'
# level = 'DEBUG'

# TODO add function name to logger
fmt="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{function}: {message}</level>"

def custom_log_handler(log_record):
    formatted_message = log_record
    with log_output:
        print(formatted_message)

logger.add(custom_log_handler, format=fmt, level=level)
display(log_output)
