In [None]:
!pip install mplfinance
!pip install seaborn
!pip install finance-datareader
!pip install pandas_datareader 

In [None]:
from __future__ import division
import matplotlib.pyplot as plt
from statsmodels.nonparametric.kernel_regression import KernelReg
from numpy import linspace
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import warnings
import FinanceDataReader as fdr
from scipy.signal import argrelextrema
sns.set()

plt.rcParams['figure.figsize'] = [16, 9]
plt.rcParams['figure.dpi'] = 200
warnings.simplefilter(action='ignore', category=FutureWarning)

## 극점 찾기

In [None]:
identification_lag=4

def find_max_min(prices):
    prices_ = prices.copy()
    prices_.index = linspace(1., len(prices_), len(prices_))
    kr = KernelReg([prices_.values], [prices_.index.values], var_type='c',  bw=np.full((1, 1), 0.8))
    f = kr.fit([prices_.index.values])
    smooth_prices = pd.Series(data=f[0], index=prices.index)

    local_max = argrelextrema(smooth_prices.values, np.greater)[0]
    local_min = argrelextrema(smooth_prices.values, np.less)[0]

    price_local_max_dt = []
    for i in local_max:
        if (i>1) and (i<(len(prices)-identification_lag-2)):
            price_local_max_dt.append(prices.iloc[i-2:i+2].idxmax())

    price_local_min_dt = []
    for i in local_min:
        if (i>1) and (i<(len(prices)-identification_lag-2)):
            price_local_min_dt.append(prices.iloc[i-2:i+2].idxmin())

    price_local_max_dt = pd.to_datetime(price_local_max_dt)
    price_local_min_dt = pd.to_datetime(price_local_min_dt)

    prices.name = 'price'
    maxima = pd.DataFrame(prices.loc[prices.index.isin(price_local_max_dt)])
    minima = pd.DataFrame(prices.loc[prices.index.isin(price_local_min_dt)])
    max_min = pd.concat([maxima, minima]).sort_index()
    max_min.index.name = 'date'
    max_min = max_min.reset_index()
    max_min = max_min[~max_min.date.duplicated()]

    p = prices.reset_index()
    max_min['day_num'] = p[p['index'].isin(max_min.date)].index.values
    max_min = max_min.set_index('day_num').price

    return max_min, smooth_prices

In [None]:
window_size_min = 18
window_size_max = 22

## 규칙에 맞는 패턴 찾기

In [20]:
from collections import defaultdict

def find_patterns(max_min):
    patterns = defaultdict(list)

    for i in range(5, len(max_min)):
        window = max_min.iloc[i-5:i]

        # pattern must play out in less than 36 days
        if (window.index[-1] - window.index[0] > window_size_max) or (window.index[-1] - window.index[0] < window_size_min):
            continue

        # Using the notation from the paper to avoid mistakes
        e1 = window.iloc[0]
        e2 = window.iloc[1]
        e3 = window.iloc[2]
        e4 = window.iloc[3]
        e5 = window.iloc[4]

        rtop_g1 = np.mean([e1,e3,e5])
        rtop_g2 = np.mean([e2,e4])
        # Head and Shoulders
        if (e1 > e2) and (e3 > e1) and (e3 > e5) and \
            (abs(e1 - e5) <= 0.03*np.mean([e1,e5])) and \
            (abs(e2 - e4) <= 0.03*np.mean([e1,e5])):
                patterns['HS'].append((window.index[0], window.index[-1]))

        # Inverse Head and Shoulders
        elif (e1 < e2) and (e3 < e1) and (e3 < e5) and \
            (abs(e1 - e5) <= 0.03*np.mean([e1,e5])) and \
            (abs(e2 - e4) <= 0.03*np.mean([e1,e5])):
                patterns['IHS'].append((window.index[0], window.index[-1]))

        # Broadening Top
        elif (e1 > e2) and (e1 < e3) and (e3 < e5) and (e2 > e4):
            patterns['BTOP'].append((window.index[0], window.index[-1]))

        # Broadening Bottom
        elif (e1 < e2) and (e1 > e3) and (e3 > e5) and (e2 < e4):
            patterns['BBOT'].append((window.index[0], window.index[-1]))

        # Triangle Top
        elif (e1 > e2) and (e1 > e3) and (e3 > e5) and (e2 < e4):
            patterns['TTOP'].append((window.index[0], window.index[-1]))

        # Triangle Bottom
        elif (e1 < e2) and (e1 < e3) and (e3 < e5) and (e2 > e4):
            patterns['TBOT'].append((window.index[0], window.index[-1]))

        # Rectangle Top
        elif (e1 > e2) and (abs(e1-rtop_g1)/rtop_g1 < 0.0075) and \
            (abs(e3-rtop_g1)/rtop_g1 < 0.0075) and (abs(e5-rtop_g1)/rtop_g1 < 0.0075) and \
            (abs(e2-rtop_g2)/rtop_g2 < 0.0075) and (abs(e4-rtop_g2)/rtop_g2 < 0.0075) and \
            (min(e1, e3, e5) > max(e2, e4)):

            patterns['RTOP'].append((window.index[0], window.index[-1]))

        # Rectangle Bottom
        elif (e1 < e2) and (abs(e1-rtop_g1)/rtop_g1 < 0.0075) and \
            (abs(e3-rtop_g1)/rtop_g1 < 0.0075) and (abs(e5-rtop_g1)/rtop_g1 < 0.0075) and \
            (abs(e2-rtop_g2)/rtop_g2 < 0.0075) and (abs(e4-rtop_g2)/rtop_g2 < 0.0075) and \
            (max(e1, e3, e5) > min(e2, e4)):
            patterns['RBOT'].append((window.index[0], window.index[-1]))

    return patterns


# Predictive Power of Technical Patterns

In [None]:
def compute_pattern_returns(prices, indentification_lag=4):
    max_min, smooth_prices = find_max_min(prices)
    patterns = find_patterns(max_min)
    returns = (prices.pct_change(1)
                          .shift(-1)
                          .reset_index(drop=True))

    demeaned_returns = (returns - returns.mean()) / returns.std()
    pattern_mean_returns = pd.Series()
    for name, start_end_day_nums in patterns.items():
        if not isinstance(start_end_day_nums, list):
            end_day_nums = [end_day_nums]
        lagged_end_days = map(lambda x: x[1] + indentification_lag, start_end_day_nums)
        pattern_mean_returns[name] = demeaned_returns.loc[lagged_end_days].mean()
        #패턴 발생 x일 후 시점의 일일 정규화 수익률
    return pattern_mean_returns


### Function to download daily stock data from yahoofinance api and save it to a csv

In [None]:
def download_csv_data(ticker, start_date, end_date, path):

    df = fdr.DataReader(ticker, start_date, end_date)
    df = df[['open', 'high', 'low', 'close', 'volume']]
    df['date'] = df.index
    df['dividend'] = 0  # dividend 열 추가
    df['split'] = 1  # split 열 추가

    # save data to csv for later ingestion
    df.to_csv(path, header=True, index=True)

    # plot the time series
    df.close.plot(title='{} prices --- {}:{}'.format(ticker, start_date, end_date));

### Fetching S&P 500 stocks list from wikipedia

In [None]:
import bs4 as bs
import pickle
import requests
import os

def save_sp500_tickers():
    resp = requests.get('http://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
    soup = bs.BeautifulSoup(resp.text, 'lxml')
    table = soup.find('table', {'id': 'constituents'})
    tickers = []

    for row in table.find_all('tr')[1:]:
        ticker = row.find_all('td')[0].text.strip('\n')
        tickers.append(ticker)

    with open("sp500tickers.pickle","wb") as f:
        pickle.dump(tickers,f)

    return tickers

tickers= save_sp500_tickers()

In [None]:
tickers= [x for x in tickers if ".B" not in x]

In [32]:
text_start_date= '2008-01-01'#'2024-01-01'
text_end_date= '2021-07-31'

In [None]:
bundle_name = 'SP500_traditional'
tickers=tickers

folder_path = bundle_name + '/daily/'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)


for ticker in tickers:
    try:
        download_csv_data(ticker=ticker,
                          start_date=text_start_date,
                          end_date=text_end_date,
                          path=folder_path + ticker.strip(' .^')+'.csv')
    except:
        print(ticker+ " failed")

    df=None

In [None]:
import glob
import pandas as pd

# DD 폴더 안의 모든 CSV 파일 경로를 찾습니다.
data_folder = '/Data/BTC'
csv_files = glob.glob(data_folder+'/*.csv')

# 빈 리스트를 준비하여 각 데이터프레임을 추가할 예정입니다.
dfs = []
dfs_volumn = []
dfs_rsi = []

# 각 CSV 파일을 읽어서 'close' 컬럼만 선택하여 처리합니다.
for file in csv_files:
    # 파일을 읽고 'date' 컬럼을 인덱스로 설정합니다.
    df = pd.read_csv(file, parse_dates=['Date'], index_col='Date')

    # 'close' 컬럼만 선택하고 컬럼 이름을 파일명에서 'csv' 확장자를 제거한 값으로 설정합니다.
    column_name = os.path.basename(file).replace('.csv', '')
    df_close = df[['Close']].rename(columns={'Close': column_name})
    df_volume = df[['Volume']].rename(columns={'Volume': column_name})
    df_rsi = df[['RSI']].rename(columns={'RSI': column_name})

    # 리스트에 데이터프레임을 추가합니다.
    dfs.append(df_close)
    dfs_volumn.append(df_volume)
    dfs_rsi.append(df_rsi)


# 모든 데이터프레임을 하나로 합칩니다.
df_combined = pd.concat(dfs, axis=1)
df_combined.index.name = 'index'
df_combined_volumn = pd.concat(dfs_volumn, axis=1)
df_combined_volumn.index.name = 'index'
df_combined_rsi = pd.concat(dfs_rsi, axis=1)
df_combined_rsi.index.name = 'index'

In [37]:
df_combined.drop(df_combined.index[-1], inplace=True)
df_combined_volumn.drop(df_combined_volumn.index[-1], inplace=True)
df_combined_rsi.drop(df_combined_rsi.index[-1], inplace=True)

In [38]:
df_combined = df_combined.astype(float)
df_combined_volumn = df_combined_volumn.astype(float)
df_combined_rsi = df_combined_rsi.astype(float)

In [39]:
df_combined.dropna(axis=1, inplace=True)
df_combined_volumn.dropna(axis=1, inplace=True)
df_combined_rsi.dropna(axis=1, inplace=True)

print(f"컬럼 개수: {df_combined.shape[1]}")


컬럼 개수: 418


In [41]:
pattern_returns = df_combined.apply(compute_pattern_returns)
pattern_returns.rename(index={'BBOT': 'BBOT(P)', 'BTOP': 'BTOP(N)',
                              'HS': 'HS(N)', 'IHS': 'IHS(P)', 'RTOP': 'RTOP(N)',
                              'TBOT': 'TBOT(P)', 'TTOP': 'TTOP(N)', 'RBOT': 'RBOT(P)'}, inplace=True)
new_order = ['IHS(P)', 'BBOT(P)', 'TBOT(P)', 'RBOT(P)', 'HS(N)', 'BTOP(N)', 'RTOP(N)', 'TTOP(N)']
pattern_returns = pattern_returns.reindex(new_order)

In [None]:
import pandas as pd
import numpy as np
from collections import defaultdict



# 패턴을 종가 시계열 데이터로 변환하는 함수
def convert_patterns_to_prices(df_combined, smoothing_combined):
    all_patterns = defaultdict(list)

    # 각 티커(컬럼)에 대해 for문 실행
    for ticker in df_combined.columns:
        max_min, _ = find_max_min(df_combined[ticker])
        pattern_results = find_patterns(max_min)

        for pattern_name, index_ranges in pattern_results.items():
            for start_idx, end_idx in index_ranges:
                # 인덱스를 날짜와 종가 데이터로 변환
                time_series = smoothing_combined[ticker].iloc[start_idx:end_idx + 1]
                time_series_volumn = df_combined_volumn[ticker].iloc[start_idx:end_idx + 1]
                time_series_rsi = df_combined_rsi[ticker].iloc[start_idx:end_idx + 1]
                all_patterns[pattern_name].append([time_series, time_series_volumn, time_series_rsi])

    return all_patterns


smoothing_dict = {}
for ticker in df_combined.columns:
    # Kernel Regression 적용
    kr = KernelReg([df_combined[ticker].values], 
                   [np.linspace(1., len(df_combined), len(df_combined))], 
                   var_type='c', 
                   bw=np.full((1, 1), 0.8))
    
    f = kr.fit([np.linspace(1., len(df_combined), len(df_combined))])
    smooth_prices = pd.Series(data=f[0], index=df_combined.index)  # 원래 인덱스로 설정
    smoothing_dict[ticker] = smooth_prices  # 딕셔너리에 저장
# 2️⃣ 한 번에 DataFrame으로 변환
smoothing_combined = pd.DataFrame(smoothing_dict, index=df_combined.index).copy()

# 패턴별 종가 시계열 변환 실행
pattern_time_series = convert_patterns_to_prices(df_combined, smoothing_combined)

# 출력 확인
for pattern, series_list in pattern_time_series.items():
    print("{}: {} occurences".format(pattern, len(series_list)))

In [None]:
!pip install tslearn
!pip install fastdtw

1. 각 timeseries마다 min-max 정규화
2. interpolation
3. close price 변수에 대해 패턴별 dba mean 계산
4. 각 패턴 집합 속 dba mean과 close price의 path matrix 계산 후 volume, rsi에 매칭
5. path matrix를 적용한 volumn, rsi의 mean 계산 

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tslearn.barycenters import dtw_barycenter_averaging
from collections import defaultdict
from fastdtw import fastdtw

# Min-Max 정규화 함수
def min_max_scaling(series):
    min_val = series.min()
    max_val = series.max()
    return (series - min_val) / (max_val - min_val) if max_val != min_val else series * 0  # 같은 값만 있으면 0으로 처리


def interpolate_series(series_list, new_length=50):
    """
    길이가 다른 시계열을 동일한 길이로 보간 (Interpolation)하는 함수.
    
    Args:
        series_list (list of np.array): 원본 시계열 리스트
    Returns:
        np.array: 보간된 시계열 리스트
    """
    if not series_list:
        return None

    interpolated_series = np.array([
        np.interp(
            np.linspace(0, len(series)-1, new_length),  # 새롭게 생성할 x값 (균등한 간격)
            np.arange(len(series)),  # 원래 x값 (0, 1, 2, ...)
            series  # 원래 y값
        ) for series in series_list
    ])
    
    return interpolated_series


def remove_duplicate_indices(path):
    seen = set()
    result = []
    
    for item in path:
        if item[1] not in seen:
            result.append(item)
            seen.add(item[1])
    
    return result

# 시각화
plt.figure(figsize=(10, 5))

centroid_dict = {}

for pattern, series_list in pattern_time_series.items():
    # Min-Max 정규화
    normalized_series_list = [[min_max_scaling(ts) for ts in sublist] for sublist in series_list]
    
    # 1️⃣ **Interpolation 적용 (Close Price, Volume, RSI)**
    close_prices = [ts[0] for ts in normalized_series_list]
    volumes = [ts[1] for ts in normalized_series_list]
    rsis = [ts[2] for ts in normalized_series_list]

    interpolated_close_prices = interpolate_series(close_prices)
    interpolated_volumes = interpolate_series(volumes)
    interpolated_rsis = interpolate_series(rsis)
    
    # DBA 중심 시계열 계산
    centroid = dtw_barycenter_averaging(interpolated_close_prices)

    aligned_volumes = []
    aligned_rsis = []
    
    for close_price, volume, rsi in zip(interpolated_close_prices, interpolated_volumes, interpolated_rsis):
        _, path = fastdtw(close_price, centroid)
        # 해당 시계열의 Volume과 RSI를 DTW path에 맞춰 재정렬. 그 전에 path를 치역을 일대일로 대응되게 
        path = remove_duplicate_indices(path)
        volume_warped = np.array([volume[i] for i, _ in path])
        rsi_warped = np.array([rsi[i] for i, _ in path])
        
        aligned_volumes.append(volume_warped)
        aligned_rsis.append(rsi_warped)
    
    # 3️⃣ Volume과 RSI의 최종 DBA 평균 계산
    centroid_volume = np.mean(aligned_volumes, axis=0)
    centroid_rsi = np.mean(aligned_rsis, axis=0)
    
    # Centroid 저장 (딕셔너리에 추가)
    centroid_dict[pattern] = {'close': centroid.squeeze(), 
                              'volume': centroid_volume, 
                              'rsi': centroid_rsi}

    # 그래프 생성
    plt.figure(figsize=(8, 5))

    # 각 개별 시계열을 점선으로 플로팅
    for series in close_prices[:3]:
        plt.plot(np.linspace(0, 1, len(series)), series.values, alpha=0.5, linestyle='dashed')

    # Centroid 시계열 플로팅 (굵은 실선)
    plt.plot(np.linspace(0, 1, len(centroid)), centroid, label=f'Centroid - {pattern}', linewidth=2, color='red')

    # 그래프 타이틀 및 범례 추가
    plt.title(f'Centroid Visualization for {pattern}')
    plt.legend()

    # 그래프 저장
    plt.savefig(bundle_name + f'/centroid_{pattern}.png')

    # 그래프 출력
    plt.show()

    # 그래프 닫기 (다음 패턴을 위해)
    plt.close()


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import matplotlib as mpl
mpl.rcParams.update(mpl.rcParamsDefault)

def plot_close_only(centroid_dict):
    selected_keys = ['IHS', 'HS', 'TBOT', 'TTOP', 'BBOT', 'BTOP']  # 선택 및 순서 지정

    fig, axs = plt.subplots(1, len(selected_keys), figsize=(len(selected_keys) * 3, 2.3))
    
    if len(selected_keys) == 1:
        axs = [axs]
    
    for i, name in enumerate(selected_keys):
        data = centroid_dict[name]
        close_series = data['close']
        min_val, max_val = np.min(close_series), np.max(close_series)

        if max_val - min_val == 0:
            norm_series = np.zeros_like(close_series)
        else:
            norm_series = (close_series - min_val) / (max_val - min_val)

        low, high = 0.1, 0.9  # 'c' 변수의 스케일링 범위
        scaled_series = norm_series * (high - low) + low

        axs[i].plot(scaled_series, color='orange', linewidth=4.0) 
        axs[i].set_title(name, fontsize=30)
        axs[i].get_xaxis().set_visible(False)
        axs[i].get_yaxis().set_visible(False)
        axs[i].set_ylim(0, 1)
        
        for spine in axs[i].spines.values():
            spine.set_linewidth(2)  # 테두리 굵게 설정

    plt.tight_layout()
    plt.show()

# 실행
plot_close_only(centroid_dict)


In [None]:
import pickle

with open(bundle_name + f"/centroids.csv", 'wb') as f:
    pickle.dump(centroid_dict, f)
    

with open(bundle_name + f"/centroids.csv", 'rb') as f:
    loaded_dict = pickle.load(f)

print(loaded_dict)


In [52]:
del loaded_dict['RTOP']
del loaded_dict['RBOT']

In [None]:
# 항목별로 stack
result = np.array([
    np.stack([v['close'], v['volume'], v['rsi']], axis=1)
    for v in loaded_dict.values()
])

result.shape

In [54]:
import numpy as np
from scipy.interpolate import interp1d

# 보간할 타겟 길이
target_length = 22

# 기존 인덱스 (0부터 49까지)
x_original = np.linspace(0, 1, 50)

# 새로운 인덱스 (0부터 1까지 22개 지점)
x_new = np.linspace(0, 1, target_length)

# 결과를 저장할 배열 초기화
interpolated = np.zeros((6, target_length, 3))

# 각 sample(0~5)에 대해 보간 수행
for i in range(6):
    for j in range(3):  # 각 채널(0~2)에 대해
        f = interp1d(x_original, result[i, :, j], kind='linear')
        interpolated[i, :, j] = f(x_new)

print(interpolated.shape)  # (6, 22, 3)


(6, 22, 3)
