In [None]:
import numpy as np
from numpy import linalg as la
import matplotlib.pyplot as plt
from sklearn.preprocessing import scale

from data_cube import DataCube
from ssm import SSM
from similarity_network_fusion import SNF, cumulated_euc_ts

In [None]:
from tslearn.metrics import dtw, dtw_path

In [None]:
dc = DataCube(
    subjects="all",
    gestures=["3", "4", "5", "6"],
    channels=["2", "4", "6", "8"],
    data_grp="parsed"
)
dc.load_data()
dc.rms_smooth(300, 20)
dc.normalize_modalities(smooth=True)

---

In [None]:
def pass_synergist_channels(gnum, array):
    """
    pass synergist only channels for each gesture
    gnum - gesture number (i.e. 1, 2, 3, or 4)
    array - entire array of all channels and time index
    """
    synrgsts = {"3":[0, 1, 4], # channels 2 & 8; 0 is tidx
                "4":[0, 2, 3], # channels 4 & 6; 0 is tidx
                "5":[0, 1, 2], # channels 4 & 2; 0 is tidx
                "6":[0, 3, 4]} # channels 6 & 8; 0 is tidx
    synergist_array = np.c_[array[:, synrgsts[gnum][0]],
                            array[:, synrgsts[gnum][1]],
                            array[:, synrgsts[gnum][2]]]
    return synergist_array

---

In [None]:
dict_synerg = {}
for s, gdict in dc.data_set_smooth.items():
    print(s)
    dict_synerg[s] = {}
    for g, a in gdict.items():
        snf = SNF(pass_synergist_channels(g[0], a), k=0.2, metric=cumulated_euc_ts)
        # calculate graph weights to find knn
        snf.calc_weights()
        snf.normalize_weights()
        # generate and normalize knn graphs
        snf.calc_knn_weights()
        snf.normalize_knn_weights()
        # fuse graphs
        snf.network_fusion(eta=0.2, iters=50)
        #print(f"subject {s}; gesture {g}")
        #snf.plot_template()
        # save template to dict
        dict_synerg[s][g] = snf.fused_similarity_template

---

In [None]:
decomp_syn = {}

for s, gdict in dict_synerg.items():
    decomp_syn[s] = {}
    for g, a in gdict.items():
        #print(f"subject {s}; gesture {g}; avg {a.mean()}; sd {a.std()}")
        evals, evecs = la.eig(a)
        evals = evals.real
        sort_idx = np.argsort(-evals)
        evals = evals[sort_idx]
        evecs = evecs[:, sort_idx]
        # calc percent of variance explained
        #print(f"subject {s}; gesture {g}; PoV: {evals[0].real / evals.real.sum()}")
        # do PCA
        res = a @ evecs[:, 0]
        res = scale(res.real)
        decomp_syn[s][g] = res
        # plot results
        plt.subplot(211)
        plt.plot(dc.data_set_smooth[s][g][:,0], res)
        plt.subplot(212)
        plt.plot(dc.data_set_smooth[s][g][:,0], dc.data_set_smooth[s][g][:,1])
        plt.plot(dc.data_set_smooth[s][g][:,0], dc.data_set_smooth[s][g][:,2])
        plt.plot(dc.data_set_smooth[s][g][:,0], dc.data_set_smooth[s][g][:,3])
        plt.plot(dc.data_set_smooth[s][g][:,0], dc.data_set_smooth[s][g][:,4])
        plt.show()

---

In [None]:
subj_lab = []
gest_lab = []
arrays = []

for s, gdict in decomp_syn.items():
    for g, a in gdict.items():
        subj_lab.append(s)
        gest_lab.append(int(g[0]))
        arrays.append(a)


predicts = []
for n, g1 in enumerate(arrays):
    g1_dists = []
    for m, g2 in enumerate(arrays):
        dist = cumulated_euc_ts(g1, g2)
        g1_dists.append(dist)
    g1_dists = np.array(g1_dists)
    pred_idx = np.argsort(g1_dists)[1] # index of 2nd closest array by dtw; 1st closest is self
    predicts.append(gest_lab[pred_idx])

acc = (sum(np.array(gest_lab) == np.array(predicts)) / len(gest_lab)) * 100

print(f"accuracy: {acc}%")

---

In [None]:
# organize vectors by gesture
gest_dict = {"3":[], "4":[], "5":[], "6":[]}

for s, gdict in decomp_syn.items():
    for g, a in gdict.items():
        if g[0] not in ["3", "4", "5", "6"]: continue
        gest_dict[g[0]].append(a)

In [None]:
comp_dict = {"3":{"3":[], "4":[], "5":[], "6":[]},
             "4":{"3":[], "4":[], "5":[], "6":[]},
             "5":{"3":[], "4":[], "5":[], "6":[]},
             "6":{"3":[], "4":[], "5":[], "6":[]}}

for g1 in ["3", "4", "5", "6"]:
    for g2 in ["3", "4", "5", "6"]:
        for i in range(144):
            for j in range(144):
                if i == j and g1 == g2: continue
                comp_dict[g1][g2].append(dtw(gest_dict[g1][i], gest_dict[g2][j]))

In [None]:
for g1 in ["3", "4", "5", "6"]:
    print(f"gesture {g1} dtw similarities w/ other gestures:")
    for g2 in ["3", "4", "5", "6"]:
        print(f"avg similarity vs gesture {g2}: {np.mean(comp_dict[g1][g2])}")

---

In [None]:
from ripser import ripser, lower_star_img
from persim import plot_diagrams, PersImage, bottleneck
from TDA_helper_fcns import sublevel_set_time_series_dist

In [None]:
subj_lab = []
gest_lab = []
arrays = []

for s, gdict in dict_synerg.items():
    for g, a in gdict.items():
        subj_lab.append(s)
        gest_lab.append(int(g[0]))
        arrays.append(a)

# calculate bottleneck distance between all pds and make predictions
predicts = []
for n, g1 in enumerate(arrays):
    dgm1 = lower_star_img(g1)
    g1_bottlenecks = []
    for m, g2 in enumerate(arrays):
        dgm2 = lower_star_img(g2)
        distance_bottleneck, (matching, D) = bottleneck(dgm1, dgm2, matching=True)
        g1_bottlenecks.append(distance_bottleneck)
    g1_bottlenecks = np.array(g1_bottlenecks)
    pred_idx = np.argsort(g1_bottlenecks)[1] # index of 2nd closest array by dtw; 1st closest is self
    predicts.append(gest_lab[pred_idx])

acc = (sum(np.array(gest_lab) == np.array(predicts)) / len(gest_lab)) * 100

print(f"accuracy: {acc}%")

---

In [None]:
subj_lab = []
gest_lab = []
arrays = []

for s, gdict in decomp_syn.items():
    for g, a in gdict.items():
        subj_lab.append(s)
        gest_lab.append(int(g[0]))
        arrays.append(a)

# calculate bottleneck distance between all pds and make predictions
predicts = []
for n, g1 in enumerate(arrays):
    rips = Rips(maxdim=0, verbose=False) # initialize rips complex
    sls1 = sublevel_set_time_series_dist(g1)
    dgm1 = rips.fit_transform(sls1, distance_matrix=True)[0]
    g1_bottlenecks = []
    for m, g2 in enumerate(arrays):
        sls2 = sublevel_set_time_series_dist(g2)
        dgm2 = rips.fit_transform(sls2, distance_matrix=True)[0]
        distance_bottleneck, (matching, D) = bottleneck(dgm1, dgm2, matching=True)
        g1_bottlenecks.append(distance_bottleneck)
    g1_bottlenecks = np.array(g1_bottlenecks)
    pred_idx = np.argsort(g1_bottlenecks)[1] # index of 2nd closest array by dtw; 1st closest is self
    predicts.append(gest_lab[pred_idx])

acc = (sum(np.array(gest_lab) == np.array(predicts)) / len(gest_lab)) * 100

print(f"accuracy: {acc}%")

---

In [None]:
rips = Rips(maxdim=2, verbose=False)
pim = PersImage(pixels=[px,px], spread=sd)

In [None]:
# generate persistence diagrams and convert to pim
# 1 cycles
subj_lab = []
gest_lab = []
arrays = []

px = 20
sd = 0.5

for s, gdict in dict_synerg.items():
    for g, a in gdict.items():
        subj_lab.append(s)
        gest_lab.append(int(g[0]))
        # initialize rips complex and persistence image object
        rips = Rips(maxdim=1, verbose=False)
        pim = PersImage(pixels=[px,px], spread=sd, verbose=False)
        # calculate persistence diagram
        dgm = rips.fit_transform(a, distance_matrix=True)
        pim_vec = pim.transform(dgm[1])
        arrays.append(pim_vec)

In [None]:
predicts = []
for n, g1 in enumerate(arrays):
    g1_dists = []
    for m, g2 in enumerate(arrays):
        g1_dists.append(la.norm(g1 - g2))
    pred_idx = np.argsort(g1_dists)[1] # index of 2nd closest array by dtw; 1st closest is self
    predicts.append(gest_lab[pred_idx])

acc = (sum(np.array(gest_lab) == np.array(predicts)) / len(gest_lab)) * 100

print(f"accuracy: {acc}%")

In [None]:
for i in range(5,10):
    print(gest_lab[i])
    plt.imshow(arrays[i].reshape(px, px))
    plt.show()