In [None]:
from models import * 
from model_utils import *
from visualization import *
from os.path import join
from dataset import *
import torch
from torch.utils.data import DataLoader
from pymatreader import read_mat
from matplotlib import pyplot as plt 
torch.set_default_dtype(torch.float64)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
import pandas as pd 
from utils import *
import seaborn as sns
import matlab.engine 
eng = matlab.engine.start_matlab()

## Here keep the best model: 

In [None]:
best_models = ['Regs\\2024_2_2_17_25AbsUsed_1_Unrolling_1_model_LSTM_UNet.pt',
            'Regs\\2023_8_12_3_6AbsUsed_1_Unrolling_1_model_LSTM_UNet.pt',
            'Regs\\2023_6_13_8_16AbsUsed_1_Unrolling_1_model_LSTM_UNet.pt',
            'Regs\\2023_6_22_21_12AbsUsed_1_Unrolling_1_model_LSTM_UNet.pt',
            'Regs\\2023_6_22_12_22AbsUsed_1_Unrolling_1_model_LSTM_UNet.pt']

reordering = read_mat('newnode_order_3.mat')['node_order'].astype(int) - 1

In [None]:
EXP_IDX = -1
# file = get_the_latest_file_in_path(join('Regs','*.pt'),EXP_IDX)
file = best_models[0]
print(file)
load_results = torch.load(file)
model_class = load_results['modelclass']
model = model_class(**load_results['essential_dict']).to(device)

model.load_state_dict(load_results['model'])
model.eval()
reg_params = load_results['reg_params']

f = visualize_results(file);
f.suptitle("Bi-LSTM Spatial UNet",fontweight="bold",fontsize=24)

In [None]:
## This cell is for the original same geometry reconstructions

# A = read_mat("ForwMat_HT.mat")
# A = torch.from_numpy(A['Trf_HT_leads']).to(device)
# A = A[:, reordering-1]

# Select the test data you want to apply 

In [None]:
noise = 30
direction = 'z'
angle = '21'
TEST_data = torch.load(join("Test_Dictionaries","test_dictionary_"+direction+"_"+str(angle)+"_Noise"+str(noise)+".pt")) #torch.load('test_dictionary.pt')
A = torch.tensor(TEST_data[0]['inverse_matrix']).to(device)
A_np = A.to('cpu').numpy()


# Beat by beat, uneven batch/time samples

In [None]:
reg_param = load_results["reg_params"]
BATCH_SIZE = 64 if not 'batch_size' in load_results['essential_dict'].keys() else load_results['essential_dict']['batch_size'] 
results = test_model(model, reg_params, TEST_data, A, device, use_abs=True, batch_size=BATCH_SIZE)

In [None]:
np.mean([results[str(i)]["AvgTime"] for i in range(17)] )

In [None]:
sum([p.numel() for p in model.parameters() if p.requires_grad])

In [None]:
columns = [('Test Data',''),
           ('Spatial CC', 'Proposed'), ('Spatial CC', 'MAP'),('Spatial CC', 'Tik'),
           ('Temporal CC', 'Proposed'), ('Temporal CC', 'MAP'),('Temporal CC', 'Tik'),
            ('Spatial RE', 'Proposed'), ('Spatial RE', 'MAP'), ('Spatial RE', 'Tik'),
           ('Temporal RE', 'Proposed'), ('Temporal RE', 'MAP'),('Temporal RE', 'Tik'),
           ('AT CC', 'Proposed'), ('AT CC', 'MAP'),('AT CC', 'Tik'),
           ('Localization Error', 'Proposed'), ('Localization Error', 'MAP'),('Localization Error', 'Tik'),
           ]
df = pd.DataFrame(columns=pd.MultiIndex.from_tuples(columns), data=np.round(np.zeros((18,19)),3))

test_data_col = np.arange(1,18).tolist() + ["Median (IQR)"]
df["Test Data"] = test_data_col

In [None]:
df

In [None]:
test_files = glob(join('TestData','EP','*.mat'))
training_files = np.array(glob('TrainingData/*.mat'))
train_idx, validation_idx = train_test_split(np.arange(0,training_files.__len__()),random_state=101,test_size=0.05)
training_files = training_files[train_idx]
training_files.__len__()

training_data = np.empty((490,0))
for item in training_files:
    data = read_mat(item)
    qrs_begin = data['features']['QRSbegin'] - 1
    qrs_end = data['features']['QRSend'] - 1 
    potvals = data['ts']['potvals'][reordering,qrs_begin:qrs_end]
    training_data = np.hstack((training_data,potvals))
    
training_data = training_data[:,1:]
print(training_data.shape)


In [None]:
Cx, meanX = create_moments(training_data)
Cx.shape , meanX.shape

In [None]:
metrics = np.empty_like((1,))
metric_names = np.empty_like((1,))
test_data = np.empty_like((1,))
method = np.empty_like((1,))

for i in tqdm(range(17)):

    data = TEST_data[i]
    x = data['x'].numpy()
    y = data['y'].numpy()
    badleads = data['badleads']
    std_n = data['std_n']
    at = data['at']
    paceloc = data['paceloc']
    Cn = std_n**2*np.eye(y.shape[0])
    
    x_hat = map_solution(A_np,Cx,Cn,y,meanX) 
    spat_cc, spat_re, temp_cc, temp_re = metrics_with_bad_leads(torch.Tensor(x.T),torch.Tensor(x_hat.T),badleads)
    AT_Results = eng.AT_and_LE(x_hat,paceloc.item(),np.asmatrix(at).T,badleads.cpu().numpy()-1,nargout=2)
    

    
    metrics_for_test_data = np.concatenate((spat_cc.flatten(), spat_re.flatten(), temp_cc.flatten(), temp_re.flatten()))
    metric_names_for_test_data = np.concatenate((np.array(spat_cc.size*['Spatial CC']),np.array(spat_re.size*['Spatial RE']),
                    np.array(temp_cc.size*['Temporal CC']),np.array(temp_re.size*['Temporal RE'])))
    testdata_for_test_data = (i+1)*np.ones(metric_names_for_test_data.shape)
    solution_method = np.array(metric_names_for_test_data.size*['MAP'])

    metrics = np.concatenate((metrics,metrics_for_test_data))
    metric_names = np.concatenate((metric_names,metric_names_for_test_data))
    test_data = np.concatenate((test_data,testdata_for_test_data))
    method = np.concatenate((method ,solution_method))
    
    df.loc[i,("Spatial CC","MAP")] = format(np.median(spat_cc),'.3f') + ' (' + format(np.subtract(*np.percentile(spat_cc,[75, 25])),'.3f') + ')'
    df.loc[i,("Spatial RE","MAP")] = format(np.median(spat_re),'.3f') + ' (' + format(np.subtract(*np.percentile(spat_re,[75, 25])),'.3f') + ')'
    df.loc[i,("Temporal CC","MAP")] = format(np.median(temp_cc),'.3f') + ' (' + format(np.subtract(*np.percentile(temp_cc,[75, 25])),'.3f') + ')'
    df.loc[i,("Temporal RE","MAP")] = format(np.median(temp_re),'.3f') + ' (' + format(np.subtract(*np.percentile(temp_re,[75, 25])),'.3f') + ')'
    df.loc[i,("AT CC","MAP")] = format(AT_Results[0],'.3f')
    df.loc[i,("Localization Error","MAP")] = format(AT_Results[1],'.3f')
    
    results[str(i)]["MAP"] = x_hat
    
    ##############################################################################################################################################
    
    x_hat = DFBlock(A.to(device),torch.Tensor([1.75e-4]).to(device),torch.from_numpy(y).to(device),torch.zeros_like(torch.from_numpy(x).to(device)),device,use_abs=True)
    spat_cc, spat_re, temp_cc, temp_re = metrics_with_bad_leads(torch.Tensor(x.T).to(device),torch.Tensor(x_hat.T).to(device),badleads)
    AT_Results = eng.AT_and_LE(x_hat.detach().cpu().numpy(),paceloc.item(),np.asmatrix(at).T,badleads.cpu().numpy()-1,nargout=2)
    
    metrics_for_test_data = np.concatenate((spat_cc.flatten(), spat_re.flatten(), temp_cc.flatten(), temp_re.flatten()))
    metric_names_for_test_data = np.concatenate((np.array(spat_cc.size*['Spatial CC']),np.array(spat_re.size*['Spatial RE']),
                    np.array(temp_cc.size*['Temporal CC']),np.array(temp_re.size*['Temporal RE'])))
    testdata_for_test_data = (i+1)*np.ones(metric_names_for_test_data.shape)
    solution_method = np.array(metric_names_for_test_data.size*['Tik'])

    metrics = np.concatenate((metrics,metrics_for_test_data))
    metric_names = np.concatenate((metric_names,metric_names_for_test_data))
    test_data = np.concatenate((test_data,testdata_for_test_data))
    method = np.concatenate((method ,solution_method))
    
    df.loc[i,("Spatial CC","Tik")] = format(np.median(spat_cc),'.3f') + ' (' + format(np.subtract(*np.percentile(spat_cc,[75, 25])),'.3f') + ')'
    df.loc[i,("Spatial RE","Tik")] = format(np.median(spat_re),'.3f') + ' (' + format(np.subtract(*np.percentile(spat_re,[75, 25])),'.3f') + ')'
    df.loc[i,("Temporal CC","Tik")] = format(np.median(temp_cc),'.3f') + ' (' + format(np.subtract(*np.percentile(temp_cc,[75, 25])),'.3f') + ')'
    df.loc[i,("Temporal RE","Tik")] = format(np.median(temp_re),'.3f') + ' (' + format(np.subtract(*np.percentile(temp_re,[75, 25])),'.3f') + ')'
    df.loc[i,("AT CC","Tik")] = format(AT_Results[0],'.3f')
    df.loc[i,("Localization Error","Tik")] = format(AT_Results[1],'.3f')
    
    results[str(i)]["Tik"] = x_hat.detach().cpu().numpy()
        
    ##############################################################################################################################################
    
    temp_cc = results[str(i)]['temp_cc']
    temp_re = results[str(i)]['temp_re']
    spat_cc = results[str(i)]['spat_cc']
    spat_re = results[str(i)]['spat_re']
    LE = results[str(i)]['LE']
    ATCC = results[str(i)]['AT_CC']

    
    metrics_for_test_data = np.concatenate((spat_cc.flatten(), spat_re.flatten(), temp_cc.flatten(), temp_re.flatten()))
    metric_names_for_test_data = np.concatenate((np.array(spat_cc.size*['Spatial CC']),np.array(spat_re.size*['Spatial RE']),
                    np.array(temp_cc.size*['Temporal CC']),np.array(temp_re.size*['Temporal RE'])))
    testdata_for_test_data = (i+1)*np.ones(metric_names_for_test_data.shape)
    solution_method = np.array(metric_names_for_test_data.size*['NN'])

    metrics = np.concatenate((metrics,metrics_for_test_data))
    metric_names = np.concatenate((metric_names,metric_names_for_test_data))
    test_data = np.concatenate((test_data,testdata_for_test_data))
    method = np.concatenate((method ,solution_method))
    
    
    df.loc[i,("Spatial CC","Proposed")] = format(np.median(spat_cc),'.3f') + ' (' + format(np.subtract(*np.percentile(spat_cc,[75, 25])),'.3f') + ')'
    df.loc[i,("Spatial RE","Proposed")] = format(np.median(spat_re),'.3f') + ' (' + format(np.subtract(*np.percentile(spat_re,[75, 25])),'.3f') + ')'
    df.loc[i,("Temporal CC","Proposed")] = format(np.median(temp_cc),'.3f') + ' (' + format(np.subtract(*np.percentile(temp_cc,[75, 25])),'.3f') + ')'
    df.loc[i,("Temporal RE","Proposed")] = format(np.median(temp_re),'.3f') + ' (' + format(np.subtract(*np.percentile(temp_re,[75, 25])),'.3f') + ')'
    df.loc[i,("AT CC","Proposed")] = format(ATCC,'.3f')
    df.loc[i,("Localization Error","Proposed")] = format(LE,'.3f')
    
    results[str(i)]["GT"] = x
    results[str(i)]["y"] = y
    
file="TestResults//ResultsDict_Model_"+direction+"_"+str(angle)+"_"+'_Noise'+ str(noise) + file.split(".")[0].split("//")[1]
np.save(file=file,arr=results,allow_pickle=True)

In [None]:
file.split('.')

In [None]:
data = np.vstack((metric_names[1:],
            test_data[1:].astype(int),
            np.round(metrics[1:],3).astype(float),
            method[1:]))
data.shape
columns = ['Metrics','TestData','Values','Method']
df2 = pd.DataFrame(columns=columns, data=data.T)
df2['Values'] = pd.to_numeric(df2['Values'])

In [None]:
font = {'family' : 'serif',
        'weight' : 'bold',
        'size'   : 22}

import matplotlib
matplotlib.rc('font', **font)

In [None]:

plt.figure(figsize=(40,25))

flierprops = dict(markersize=5,marker='o',
              linestyle='none')

for i,selected_metric in enumerate(['Temporal CC','Spatial CC','Temporal RE','Spatial RE']):
    subdf = df2.loc[df2['Metrics']==selected_metric]
    subdf.head()
    plt.figure(figsize=(20,12))
    # plt.subplot(2,2,i+1)
    sns.boxplot(data=subdf, x='TestData',y='Values',hue='Method',flierprops=flierprops,showfliers=False)
    plt.ylabel(selected_metric,fontweight='bold')
    plt.xlabel('Test Data Index',fontweight='bold')
    
    if selected_metric == 'Temporal RE':
        plt.ylim([0, 2])
        
    elif selected_metric == 'Spatial CC':
        plt.ylim([-0.3, 1])

    plt.savefig('TestResults//'+str(BATCH_SIZE)+'_dropout_shuffled_'+'_Noise'+ str(noise) + selected_metric.replace(" ","") + "_" + direction + "_" + str(angle) + "_" + 
                '.pdf', format = 'pdf', dpi=2000)


In [None]:
df.iloc[17,1:13] = "-"
medians = df.iloc[:17,13:].astype(float).median().round(3)
iqrs = df.iloc[:17,13:].astype(float).apply(lambda x: x.quantile(0.75) - x.quantile(0.25)).round(3)

total_col = medians.astype(str) + "(" + iqrs.astype(str) + ")"
df.iloc[17,13:] = total_col

In [None]:
df

In [None]:
medians = [df.iloc[1:17,i].apply(lambda x: x.split("(")[0]).astype(float).round(3).median() for i in range(1,13)]
iqrs = [df.iloc[1:17,i].apply(lambda x: x.split("(")[0]).astype(float).round(3).quantile(0.75) - df.iloc[1:17,i].apply(lambda x: x.split("(")[0]).astype(float).round(3).quantile(0.25)  for i in range(1,13)]

df.iloc[17,1:13] = [str(round(item1,3)) + "(" + str(round(item2,3)) + ")" for item1,item2 in zip(medians,iqrs)]

In [None]:
df

In [None]:
styled_df = df.style.set_table_styles([
    {'selector': 'th', 'props': [('text-align', 'center')]}
])
# Display the styled DataFrame
display(styled_df)

In [None]:
df.style.hide(axis="index").to_latex('TestResults//'+str(BATCH_SIZE) + '_dropout_shuffled' + direction + "_" + str(angle) +'_Noise'+ str(noise) +'.tex',column_format=df.columns.__len__()*'c')