In [None]:
import warnings
warnings.filterwarnings("ignore")
from tslearn.clustering import TimeSeriesKMeans
from sklearn import preprocessing
import numpy as np
import pandas as pd
from tslearn import clustering
import matplotlib.pylab as plt
from tslearn.clustering import KShape
import matplotlib as mpl
from sklearn import metrics
from tslearn.metrics import soft_dtw
from tslearn.metrics import dtw
import seaborn as sns

In [None]:
# This case we will analyze the CLOSE price of SPY from 1993 to 2020.
# --   train a dataset with time series of length 15 (I.e from T to T+15) and assign them to 10 different clusters
# --   apply a self-computed score function to quantity the quality of clusters
# --   show some analysis and graphs based on the clustering result

In [None]:
def read_input_data(path, header=None, sep=None, cols=None):
    df = pd.read_csv(path, header=header, sep=sep)
    if cols:
        df.columns = cols
    return df

def generate_training_data(df, length, step, start_index, end_index, col):
    # Store the start index of each time series in the training data
    X_index = []
    bStart = True
    index = start_index
    while index < end_index:
        X_index.append(index)
        series = df[col].iloc[index:index+length].values.reshape(1,-1)
        # normalize each time series
        series = preprocessing.normalize(series)
        if bStart:
            X_train = series
            bStart = False
        else:
            # concat new time series to training data
            X_train = np.vstack((X_train,series))
        index += step
    X_index = np.asarray(X_index)
    return (X_train, X_index)

In [None]:
# Read input data and generate training data set
# Each time series of length 15, set step to 5, leave the last 100 observations for testing
data = read_input_data('SPY.txt', sep=',', cols=['Date','Open','High','Low','Close','Volumn'])
length = 15
step = 5
start_index = 0
end_index = len(data) - 100
X_train, X_index = generate_training_data(data, length, step, start_index, end_index, 'Close')

In [None]:
# next step is to train TimeSeriesKMeans model with preset distance metrics
# we set the number of clusters to 10
n_cluster = 10
metric = 'dtw'
km = TimeSeriesKMeans(n_clusters=n_cluster,metric=metric)
labels_train = km.fit_predict(X_train)

In [None]:
# Now let's try with our test data
# When we generate the training data, we left the last 100 observations of SPY for testing purpose
# Randomly pick one, predict with the model
# Plot predicted cluster vs test time series
rdm_idx = np.random.randint(85) + end_index
X_test = data['Close'].iloc[rdm_idx:rdm_idx+length].values.reshape(1,-1)
X_test = preprocessing.normalize(X_test)
test_label = km.predict(X_test)
target_cluster = test_label[0]
X_cluster_test = X_train[labels_train == target_cluster]
plt.plot(range(1,length+1),np.median(X_cluster_test,axis=0),label='median trace of cluster ' + str(target_cluster))
plt.plot(range(1,length+1),X_test.flatten(),label='test time series')
plt.legend()
plt.show()

In [None]:
# plot cluster for pattern check
def plt_cluster(i_cluster, X, labels, period, model, num_obs):
    X_cluster = X[labels == i_cluster]
    num_row = X_cluster.shape[0]
    if num_row < 2:
        return
    plt.title('Cluster: {!r}'.format(i_cluster))
    #plt.plot(range(period), model.cluster_centers_[i_cluster], label='Centroid')
    size = min(num_row, num_obs)
    # output at most size rows
    random_indice = np.random.choice(num_row, size=size, replace=False)
    for series in X_cluster[random_indice,:]:
        if np.array_equal(model.cluster_centers_[i_cluster].flatten(),series):
            continue
        plt.plot(range(period), series)
    #plt.legend()
    plt.grid()
    plt.show()

In [None]:
# we can plot 50 random time series in this cluster
# to check if any pattern can be recognized (ascending, descending, convex, concave etc)
plt_cluster(target_cluster, X_train, labels_train, length, km, 50)

In [None]:
# for a given cluster, find 5-day period returns after the pattern for all timeseries in the cluster
def get_abs_cluster_return(df, index_arr, return_days, col):
    return_arr = []
    for index in index_arr:
        r_temp = []
        for day in return_days:
            curr_index = index + day
            prev_index = index + day - 1
            if curr_index < len(df) and prev_index >= 0:
                curr = df[col].iloc[curr_index]
                prev = df[col].iloc[prev_index]
                r = (np.log(curr) - np.log(prev)) * 100
                r_temp.append(r)
            else:
                r_temp.append("")
        return_arr.append(r_temp)
    return_arr = np.asarray(return_arr)
    return return_arr

In [None]:
# next let's calculate the return matrix of this cluster
# for each time series in the cluster, we select end date T and then calculate the return from T+1 to T+5
return_days = list(range(15,20))
X_cluster_index = X_index[labels_train == target_cluster]
abs_return_arr = get_abs_cluster_return(data, X_cluster_index, return_days, 'Close')

In [None]:
def plot_density(return_arr, day):
    sort_return = np.sort(return_arr[:,day])
    sns.distplot(sort_return, hist=True, kde=True, 
             bins=10, color = 'darkblue', 
             hist_kws={'edgecolor':'black'},
             kde_kws={'linewidth': 4})
    title = 'T+' + str(day+1)
    plt.title('Density plot for return: {!r}'.format(title))
    plt.xlabel('Return')
    plt.ylabel('Density')
    plt.show()

def plot_ecdf(return_arr, day):
    sort_return = np.sort(return_arr[:,day])
    y = np.arange(1, len(sort_return)+1) / len(sort_return)
    med_val = np.median(sort_return)
    median = np.array([med_val for i in range(len(y))])
    plt.plot(sort_return, y, marker='.', linestyle='none')
    plt.plot(median, y, label='median')
    title = 'T+' + str(day+1)
    plt.title('ECDF plot for return: {!r}'.format(title))
    plt.xticks(np.arange(np.floor(np.min(sort_return)),np.ceil(np.max(sort_return)) + 1,step=1))
    plt.yticks(np.arange(0,1.1,step=0.1))
    plt.xlabel("Return")
    plt.ylabel('ECDF')
    plt.legend()
    plt.show()

In [None]:
# Plot probability distribution plot for return on day T+1
plot_density(abs_return_arr,0)

In [None]:
# Plot empirical cumulative distribution plot for return on day T+1
plot_ecdf(abs_return_arr,0)