In [1]:
#!pip install giotto-tda

In [2]:
import warnings
warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import os
import json
from tqdm import tqdm

import sklearn

from datetime import datetime
from datetime import timedelta

from gtda.homology import VietorisRipsPersistence
from gtda.time_series import SingleTakensEmbedding
from gtda.diagrams import PersistenceLandscape, PersistenceImage, BettiCurve

In [3]:
with open('config/config.json', 'r') as file:
    config = json.load(file)
    
rs = config['random_state']

# data loading

In [4]:
df = pd.read_csv('../'+config['ticker_data_preprocessed'], index_col=0)
#!pip install giotto-tda
data_temp = df.drop(['sector'], axis=1).T
data_temp = data_temp[data_temp.index < '2021-01-01']
data = data_temp.T.values

## TDA

In [5]:
data_temp = df.drop(['sector'], axis=1).T
data_temp = data_temp[data_temp.index < '2021-01-01']
data = data_temp.T.values

In [6]:
def plot_diag(persistence):
    persistence = persistence.squeeze()
    lim = persistence[:, :2].max() + 0.01
    for dim in np.unique(persistence[:, -1]):
        data_dim = persistence[persistence[:, -1] == dim][:,:2]
        plt.scatter(x = data_dim[:, 0], y = data_dim[:, 1], label = f'Dim {int(dim)}')
    plt.xlim(left = -0.01, right = lim)
    plt.ylim(-0.01, lim)
    plt.plot([0, 1], [0, 1], linestyle = '--')
    plt.legend()

In [7]:
def fit_optimal_emb(data):
    params_list = []
    for i in tqdm(range(data.shape[0])):
        max_embedding_dimension = 10
        max_time_delay = 30
        stride = 5

        embedder = SingleTakensEmbedding(
            parameters_type="search",
            time_delay=max_time_delay,
            dimension=max_embedding_dimension,
            stride=stride,
        )

        y = data[i]
        embedder.fit(y)
        params_list.append([embedder.dimension_, embedder.time_delay_])

    mean_dim = int(np.array(params_list)[:, 0].mean())
    mean_time_delay = int(np.array(params_list)[:, 1].mean())
    return mean_dim, mean_time_delay

In [8]:
diags_one_param_set = []
diags_fitted_param_set = []

for one_param_set in [True, False]:
    if one_param_set:
        mean_dimension, mean_time_delay = fit_optimal_emb(data)

    for i in tqdm(range(data.shape[0])):
        y = data[i]

        stride = 5
        if one_param_set:
            dimension, time_delay = mean_dimension, mean_time_delay
            p_type = 'fixed'
        else:
            dimension = 10
            time_delay = 30
            p_type = 'search'

        embedder = SingleTakensEmbedding(parameters_type=p_type, time_delay=time_delay,
                dimension=dimension, stride=stride)
        y_embedded = embedder.fit_transform(y)[None, :, :]
        homology_dimensions = [0, 1, 2]

        persistence = VietorisRipsPersistence(homology_dimensions=homology_dimensions)
        diag = persistence.fit_transform(y_embedded)
        
        if one_param_set:
            diags_one_param_set.append(diag)
        else:
            diags_fitted_param_set.append(diag)

100%|██████████| 482/482 [00:18<00:00, 26.30it/s]
100%|██████████| 482/482 [01:05<00:00,  7.36it/s]
100%|██████████| 482/482 [01:25<00:00,  5.61it/s]


In [9]:
def format_to_csv(embeddings_list, folder_name):
    embeddings = np.array(embeddings_list)
    formatted_embeddings = pd.DataFrame(embeddings, index = df.index)
    formatted_embeddings.to_csv(folder_name)

In [10]:
for data, name in [(diags_one_param_set, "takens_one"), (diags_fitted_param_set, "takens_mult")]:
    
    persistence_landscape = PersistenceLandscape()
    for dim in [1,2]:
        embeddings_list = []
        for i in range(len(data)):
            pl = persistence_landscape.fit_transform(data[i])[0, dim]
            embeddings_list.append(pl)
        folder_name = f"../results/{name}_PL_{dim}.csv"
        format_to_csv(embeddings_list, folder_name)
        
    sigma = 0.0005
    persistence_image = PersistenceImage(sigma=sigma, n_bins=10)
    for dim in [1,2]:
        embeddings_list = []
        for i in range(len(data)):
            pi = persistence_image.fit_transform(data[i])[0, dim].reshape(100)
            embeddings_list.append(pi)
        folder_name = f"results/{name}_PI_{dim}.csv"
        format_to_csv(embeddings_list, folder_name)
        
    persistence_image = PersistenceImage(sigma=sigma, n_bins=100)
    for dim in [0]:
        embeddings_list = []
        for i in range(len(data)):
            pi = persistence_image.fit_transform(data[i])[0, dim, :, 0]
            embeddings_list.append(pi)
        folder_name = f"../results/{name}_PI_{dim}_sigma={sigma}.csv"
        format_to_csv(embeddings_list, folder_name)
    
    betti_curve = BettiCurve(n_bins=100)
    for dim in [0,1,2]:
        embeddings_list = []
        for i in range(len(data)):
            bc = betti_curve.fit_transform(data[i])[0, dim]
            embeddings_list.append(bc)
        folder_name = f"../results/{name}_BC_{dim}.csv"
        format_to_csv(embeddings_list, folder_name)
        