In [1]:
import pandas as pd
import numpy as np
import os
import utils

Eros norm:
$$
EROS(\mathbf{A,B},w) = \sum_{i=1}^n w_i\cdot|<a_i,b_i>|
$$

In [7]:
path = "tctodd/"
dirs = os.listdir(path=path)
weeks = sorted([i for i in dirs if i != ".DS_Store"])
filenames = sorted(os.listdir(path+weeks[1]))

data = []
labels = dict()
label_cnt = 0

for w in weeks:
    temp_path = path+w+"/"
    filenames = sorted(os.listdir(temp_path))
    for fn in filenames:
        label = fn.split('.')[0][:-2]
        
        if label not in labels:
            labels[label] = label_cnt
            label_cnt += 1
            
        data.append({'label':labels[label], 'time_series':pd.read_csv(temp_path+fn, header=None, sep='\t',).values})
        
df = pd.DataFrame(data, columns=['label', 'time_series'])

#from sklearn.utils import shuffle
import sklearn as sk
from sklearn.model_selection import train_test_split
seed = 0

# Shuffle the dataframe
shuffled_df = sk.utils.shuffle(df, random_state=seed)

'''
from utils import extract_matrix
# Create the matrix version
shuffled_df.time_series = shuffled_df.time_series.apply(extract_matrix)
shuffled_df["shape"] = shuffled_df.time_series.apply(lambda x: x.shape)
shuffled_df.head()
'''
X_train, X_test, y_train, y_test = train_test_split(shuffled_df['time_series'], shuffled_df['label'], stratify=shuffled_df['label'], train_size=.7, random_state=seed)

In [8]:
def compute_S_matrix(ts_series:pd.Series) -> tuple[np.array, list]:
    """function to compute the S matrix of shape n x N (n = number of predictors, N = number of examples). 
    Such matrix will be used to compute
    the weight vector needed by Eros norm

    Args:
        ts_series (pd.Series): Series containing the dataset of time series.
        Each entry is a list of vectors. 
        Each vector is a component of the i-th time series

    Returns:
        tuple[np.array, list]: returns the matrix S and the list of
        right eigenvectors matrices computed for each time series
    """
    s_matrix = np.zeros(shape=(len(ts_series), ts_series.iloc[0].shape[-1]))
    v_list = [] # list of right eigenvector matrix
    for i in range(len(ts_series)):
        ts = ts_series.iloc[i] # time x predictors
        
        #The matrix S will be nxN where n is the predictor dimension and N is the number of time-series examples.
        #Hence, we will use the transpose to compute the covariance matrix.
        ts = ts.T # predictors x time
        
        # Compute the covariance matrix of the i-th example of the dataset
        cov_ts = np.cov(ts)
        # Compute the SVD of the covariance matrix
        u, s, v_t = np.linalg.svd(cov_ts)
        s_matrix[i] = s
        v_list.append(v_t.T)
    return s_matrix.T, v_list

In [9]:
S, v_list_train = compute_S_matrix(X_train)

In [10]:
v_list_train[0].shape

(22, 22)

In [11]:
S.shape

(22, 1795)

In [12]:
_, v_list_test = compute_S_matrix(X_test)

In [15]:
def compute_weight_vector(S:np.ndarray, aggregation:str='mean', algorithm:int=1) -> np.array:
    """compute the weight vector used in the computation of Eros norm

    Args:
        S (np.ndarray): matrix containing eigenvalues of each predictor
        aggregation (str, optional): aggregation function to use. Defaults to 'mean'.
        algorithm(int): choose the algorithm to use to compute weight vector.
        - Algorithm 1: do not normalize rows of the S matrix. Perform directly the computation of w
        - Algorithm 2: first normalize rows of the S matrix and then compute w.
    Returns:
        np.array: return the normalized weight vector
    """
    n = S.shape[0] # number of predictors
    if (algorithm == 2):
        # first normalize each eigenvalues
        #print(S[0])
        S = S/np.sum(S, axis=-1).reshape(-1,1)
        #print(S.shape)
        #print(S[0])
    if (aggregation == 'mean'):
        w = np.mean(S, axis=-1)
    elif (aggregation == 'min'):
        w = np.min(S, axis=-1)
    elif (aggregation == 'max'):
        w = np.max(S, axis=-1)
    return w/np.sum(w)

In [16]:
w = compute_weight_vector(S, algorithm=2)
w.shape

(22,)

In [17]:
def eros_norm(weight_vector:np.array, A:np.array, B:np.array):
    """compute eros norm

    Args:
        weight_vector (np.array): weight vector
        A (np.array): time_series_1
        B (np.array): time_series_2

    Returns:
        float: distance between the 2 time series. Bounded in (0,1]
    """
    # since we want to use a_i and b_i which 
    # are the orthonormal column vectors of A and B,
    # we decide to transpose A and B
    A = A.T
    B = B.T
    
    n = A.shape[0] # number of predictors
    
    eros = 0
    
    for i in range(n):
        eros += weight_vector[i]*np.abs(np.dot(A[i], B[i]))
    return eros

def compute_kernel_matrix(num_examples:int, weight_vector:np.array, v_list:list[np.array]) -> np.array:
    """compute the kernel matrix to be used in PCA

    Args:
        num_examples (int): number of examples in the dataset
        weight_vector (np.array): weight vector 
        v_t_list (list[np.array]): list of right eigenvector matrices

    Returns:
        np.array: kernel matrix with pairwise eros norm
    """
    N = num_examples
    K_eros = np.zeros(shape=(N,N))

    for i in range(N):
        j = 0
        while (j <= i):
            K_eros[i,j] = eros_norm(weight_vector, v_list[i], v_list[j])
            if (i != j): 
                K_eros[j,i] = K_eros[i,j]
            j += 1

    # check whether the kernel matrix is positive semi definite (PSD) or not
    is_psd = np.all(np.linalg.eigvals(K_eros) >= 0)
    threshold = 1e-10
    # if not PSD, add to the diagonal the minimal value among eigenvalues of K_eros
    if is_psd == False:
        delta = np.min(np.linalg.eigvals(K_eros))
        delta_ary = [np.abs(delta) + threshold for _ in range(K_eros.shape[0])]
        K_eros += np.diag(delta_ary)
    is_psd = np.all(np.linalg.eigvals(K_eros) >= 0)
    if is_psd == True:
        print("now PSD")
        return K_eros
    else:
        print("not PSD")
    return K_eros

In [18]:
def perform_PCA(num_examples:int, weight_vector:np.array, v_list:list[np.array]) -> tuple[np.ndarray, np.array]:
    """extract principal components in the feature space

    Args:
        num_examples (int): number of examples in the dataset
        weight_vector (np.array): weight vector 
        v_t_list (list[np.array]): list of right eigenvector matrices

    Returns:
        tuple[np.ndarray, np.array]:
        - K_eros matrix
        - eigenvectors (principal components) of the feature space
    """
    K_eros = compute_kernel_matrix(num_examples, weight_vector, v_list)
    O = np.ones(shape=(num_examples,num_examples))
    O *= 1/num_examples
    K_eros_mc = K_eros - O@K_eros - K_eros@O + O@K_eros@O # K_eros mean centered
    eig_vals, eig_vecs = np.linalg.eig(K_eros_mc)
    return K_eros, eig_vecs, eig_vals


def compute_test_kernel_matrix(num_training_examples:int, num_test_examples:int, weight_vector:np.array, v_list_train:list[np.array], v_list_test:list[np.array]) -> np.array:
    """compute the K eros test kernel matrix used to project test data

    Args:
        num_examples_train (int): number of examples in the training dataset
        num_examples_test (int): number of examples in the test dataset
        weight_vector (np.array): weight vector 
        v_list_train (list[np.array]): list of right eigenvector matrices of the training dataset
        v_list_test (list[np.array]): list of right eigenvector matrices of the test dataset

    Returns:
        np.array: kernel matrix with pairwise eros norm
    """
    N_train = num_training_examples
    N_test = num_test_examples
    K_eros_test = np.zeros(shape=(N_test,N_train))

    for i in range(N_test):
        for j in range(N_train):
            K_eros_test[i,j] = eros_norm(weight_vector, v_list_test[i], v_list_train[j])
    return K_eros_test

In [19]:
#K_eros_train = compute_kernel_matrix(len(X_train), weight_vector=w, v_list=v_list_train)

# V column vectors, use v[:,i] to select the corresponding eigenvector
K_eros_train, V, eig_vals = perform_PCA(len(X_train), weight_vector=w, v_list=v_list_train)

K_eros_test = compute_test_kernel_matrix(len(X_train), len(X_test), w, v_list_train, v_list_test)

now PSD


In [20]:
np.all(np.linalg.eigvals(K_eros_train) >= 0)
delta = min(np.linalg.eigvals(K_eros_train))
delta_I = np.diag(np.array([np.abs(delta) for _ in range(len(K_eros_train))]))
np.all(np.linalg.eigvals(K_eros_train) >= 0)
delta

9.999917569731185e-11

In [39]:
N_test = len(X_test)
N_train = len(X_train)
O_test = np.ones(shape=(N_test, N_train))*(1/N_train)
O_train = np.ones(shape=(N_train, N_train))*(1/N_train)

K_eros_test_mc = K_eros_test - O_test@K_eros_train - K_eros_test@O_train + O_test@K_eros_train@O_train

Y = K_eros_test_mc @ V

the $i$-th MTS item in the test set is represented as features (length = num examples in the training set) in the $i$-th row of $\mathbf{Y}$

In [40]:
Y.shape, V.shape

((770, 51), (1795, 1795))

In [41]:
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

'''
from sklearn.preprocessing import StandardScaler

SC = StandardScaler()
SC.fit(V)
V_scaled = SC.transform(V)
Y_scaled = SC.transform(Y)

prova_train = y_train.to_numpy().astype('int')
prova_test = y_test.to_numpy().astype('int')
'''

svc = SVC()
svc.fit(V[:,:51], y_train)

predictions = svc.predict(Y[:,:51])

print(accuracy_score(y_test, predictions))

0.01038961038961039


In [34]:
norm_eig_vals = eig_vals/np.sum(eig_vals)

In [35]:
np.sum(norm_eig_vals[:51])

0.1358448438450274