In [127]:
import numpy as np
import pyedflib
import statistics
import plotly.graph_objects as go
import pandas as pd
from gtda.time_series import SingleTakensEmbedding
from gtda.homology import VietorisRipsPersistence
from gtda.diagrams import PersistenceEntropy, Amplitude, NumberOfPoints, ComplexPolynomial, PersistenceLandscape, HeatKernel, Silhouette, BettiCurve, PairwiseDistance, ForgetDimension
from gtda.plotting import plot_point_cloud, plot_heatmap, plot_diagram
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA, FastICA
from gtda.pipeline import Pipeline 
from numpy.linalg import norm

# Load Data and set important variables

In [128]:
# Choose if you want to look at EEG or EMG data

data_type = "EEG"
#data_type = "EMG"

In [129]:
label_list = [0, 1, 2, 3, 4]

In [130]:
# Load persistence diagrams

train_persistence_diagrams = {} # dictionary with labels as keys, persistence diagrams of the respective classes as values
test_persistence_diagrams = {} # dictionary with labels as keys, persistence diagrams of the respective classes as values


for label in label_list:
    train_persistence_diagrams[label] = np.load("Embeddings_and_Persistence_Diagrams/Train_"+str(data_type)+"_PD"+str(label)+".npy", allow_pickle=True)
    test_persistence_diagrams[label] = np.load("Embeddings_and_Persistence_Diagrams/Test_"+str(data_type)+"_PD"+str(label)+".npy", allow_pickle=True)

# HeatKernel Intensity

In a way, the Heat Kernel shows an "average distribution" of the persistence diagrams for each label, seperated per hole dimensionality.

In [131]:
HK = HeatKernel(sigma=0.00003, n_bins=100)

In [132]:
def heat_kernel_intensity(heatkernel, homology_dimension):
    """ Computes mean intensity of a heatkernel. Only takes positive values because otherwise the mean would
      always be zero.

    Parameters:
    - heatkernel (list of lists): heatkernel of all homology dimensions
    - homology_dimension (int): Which homology dimension to look at (0, 1 or 2)

    Returns:
    - mean intensity of heatkernel of homology dimension homology_dimension
    """
    
    positives =  [x for inner_list in heatkernel[0][homology_dimension] for x in inner_list if x > 0]
    
    return np.mean(positives)

In [133]:
# Training data

train_kernel_densities = []

train_kernel_intensity_dim0 = {}
train_kernel_intensity_dim1 = {}
train_kernel_intensity_dim2 = {}

for label in label_list:

    # Initialize intensity lists of our label
    train_kernel_intensity_dim0[label] = []
    train_kernel_intensity_dim1[label] = []
    train_kernel_intensity_dim2[label] = []

    for diagram in train_persistence_diagrams[label]:
        heatkernel = HK.fit_transform([diagram.astype("float")])
        train_kernel_intensity_dim0[label].append(heat_kernel_intensity(heatkernel, 0))
        train_kernel_intensity_dim1[label].append(heat_kernel_intensity(heatkernel, 1))
        if heat_kernel_intensity(heatkernel, 2) == heat_kernel_intensity(heatkernel, 2): # TODO these checks should be there for all dimensions
            train_kernel_intensity_dim2[label].append(heat_kernel_intensity(heatkernel, 2))
        else:
            train_kernel_intensity_dim2[label].append(0)


train_kernel_densities.append(train_kernel_intensity_dim0)
train_kernel_densities.append(train_kernel_intensity_dim1)
train_kernel_densities.append(train_kernel_intensity_dim2)

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [134]:
# Test

test_kernel_densities = []


test_kernel_intensity_dim0 = {}
test_kernel_intensity_dim1 = {}
test_kernel_intensity_dim2 = {}

for label in label_list:

    # Initialize intensity lists of our label
    test_kernel_intensity_dim0[label] = []
    test_kernel_intensity_dim1[label] = []
    test_kernel_intensity_dim2[label] = []

    for diagram in test_persistence_diagrams[label]:
        heatkernel = HK.fit_transform([diagram.astype("float")])
        test_kernel_intensity_dim0[label].append(heat_kernel_intensity(heatkernel, 0))
        test_kernel_intensity_dim1[label].append(heat_kernel_intensity(heatkernel, 1))
        if heat_kernel_intensity(heatkernel, 2) == heat_kernel_intensity(heatkernel, 2): # TODO these checks should be there for all dimensions
            test_kernel_intensity_dim2[label].append(heat_kernel_intensity(heatkernel, 2))
        else:
            test_kernel_intensity_dim2[label].append(0)


test_kernel_densities.append(test_kernel_intensity_dim0)
test_kernel_densities.append(test_kernel_intensity_dim1)
test_kernel_densities.append(test_kernel_intensity_dim2)

# L1 norms of Features

Using the L1 norm of the some features as additional ML feature improves the accuracy by a bit.

In [135]:
train_L1_norms = {}
test_L1_norms = {}

In [136]:
def compute_L1_norm_for_signature(persistence_diagrams, label_list, SG):

    L1_norms = {}
    
    for label in label_list:
        
        L1_norms[label] = []
        
        L1_norm_dim0 = []
        L1_norm_dim1 = []
        L1_norm_dim2 = []

        for diagram in persistence_diagrams[label]:
            signature = SG.fit_transform([diagram.astype("float")])
            L1_norm_dim0.append(norm(signature[0][0], 1))
            L1_norm_dim1.append(norm(signature[0][1], 1))
            L1_norm_dim2.append(norm(signature[0][2], 1))

        L1_norms[label].append(L1_norm_dim0)
        L1_norms[label].append(L1_norm_dim1)
        L1_norms[label].append(L1_norm_dim2)

    return L1_norms
    

## Persistence Landscape

In [137]:
PL = PersistenceLandscape()

In [138]:
train_L1_norms["PD"] = compute_L1_norm_for_signature(train_persistence_diagrams, label_list, PL)
test_L1_norms["PD"] = compute_L1_norm_for_signature(test_persistence_diagrams, label_list, PL)

## Betti Curve

In [139]:
BC = BettiCurve()

In [140]:
train_L1_norms["BC"] = compute_L1_norm_for_signature(train_persistence_diagrams, label_list, BC)
test_L1_norms["BC"] = compute_L1_norm_for_signature(test_persistence_diagrams, label_list, BC)

# Save Signature Features

In [141]:
def create_feature_df(data_type, kernel_densities, L1_norms, num_diagrams, label):
    """
    Create DataFrame for each label from features

    Parameters:
    - kernel_densities (list): intensities of heatkernel
    - L1_norms (list): L1 norms of signatures
    - num_diagrams (int): How many diagrams are there in total?
    - label (int): Label for which we want to create a dataframe. 1, 3, 5 or 7.

    Returns:
    - Feature DataFrame (DataFrame)
    """
    
    feature_df = pd.DataFrame(index=np.arange(0, num_diagrams))

    for homology_dim in range(3):
        feature_df[str(data_type)+"_Kernel_Intensity_Dim"+str(homology_dim)] = kernel_densities[homology_dim][label]

    for signature in L1_norms.keys():
        for homology_dim in range(3):
            feature_df[str(data_type)+"_L1_Norm_"+str(signature)+"Dim"+str(homology_dim)] = L1_norms[signature][label][homology_dim]

    # Label
    feature_df["Label"] = label

    return feature_df

In [142]:
train_dataframes = {}
test_dataframes = {}

for label in label_list:
    train_dataframes[label] = create_feature_df(data_type, train_kernel_densities, train_L1_norms, len(train_persistence_diagrams[label]), label)
    test_dataframes[label] = create_feature_df(data_type, test_kernel_densities, test_L1_norms, len(test_persistence_diagrams[label]), label)

In [143]:
# Concatenate and save features of training persistence diagrams
train_feature_df = pd.concat([train_dataframes[0], train_dataframes[1], train_dataframes[2], train_dataframes[3], train_dataframes[4]], ignore_index=True)
train_feature_df.to_csv("Features/Train_"+str(data_type)+"_Signature_Statistics.csv")

# Concatenate and save features of training persistence diagrams
test_feature_df = pd.concat([test_dataframes[0], test_dataframes[1], test_dataframes[2], test_dataframes[3], test_dataframes[4]], ignore_index=True)
test_feature_df.to_csv("Features/Test_"+str(data_type)+"_Signature_Statistics.csv")
