In [None]:
# from curses import tparm
import os
import argparse
import json
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import DataLoader,Dataset

from utils.util import get_mask_mnr, get_mask_bm, get_mask_rm
from utils.util import find_max_epoch, print_size, sampling, calc_diffusion_hyperparams

from imputers.DiffWaveImputer import DiffWaveImputer
from imputers.SSSDSAImputer import SSSDSAImputer
from imputers.SSSDS4Imputer import SSSDS4Imputer

from sklearn.metrics import mean_squared_error,roc_curve, auc
from statistics import mean
# from dtaidistance import dtw

import matplotlib.pyplot as plt

import pandas as pd

SEED = 1234
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
# torch.use_deterministic_algorithms(True)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)

## Import data and result

In [None]:
imputation= np.load("results/ptbxl_248/no_trans/T200_beta00.0001_betaT0.02/imputation199.npy")
data=np.load("results/ptbxl_248/no_trans/T200_beta00.0001_betaT0.02/original11.npy")

In [None]:
mu, sigma, samples, alarm, rule_based_results, groundtruth = torch.load(
    'datasets/standardized_samples_alarms_results_embedding_labels.pt')
# mu, sigma, samples, alarm, rule_based_results, groundtruth = torch.load(
#     'datasets/2015_phy_stanardized.pt')
# samples, alarm, groundtruth,records = torch.load(
#     'datasets/samples.pt')

VT_index,VT_True_index,VT_False_index=[],[],[]
VT_index=[]
VT_F_index=[]
VFIB_index,VFIB_F_index=[],[]
for i in range(len(samples)):
    if alarm[i][0]==1:
        VT_index.append(i)
    if alarm[i][0]==1 and groundtruth[i]==0:
        VT_F_index.append(i)  
    if alarm[i][1]==1 and groundtruth[i]==1:
        VFIB_index.append(i)
    if alarm[i][1]==1 and groundtruth[i]==0:
        VFIB_F_index.append(i)

VT=True
VFIB=False
if VT==True:
    VT_samples_valid=samples[VT_index][-1000:-670]
    VT_samples_test=samples[VT_index][-670:]

    test_gt=groundtruth[VT_index][-670:]
    vaild_gt=groundtruth[VT_index][-1000:-670]
if VFIB==True:
    VFIB_samples_valid=samples[VFIB_index][-150:-100]
    VFIB_samples_test=samples[VFIB_index][-100:]

    test_gt=groundtruth[VFIB_index][-100:]
    vaild_gt=groundtruth[VFIB_index][-150:-100]


In [None]:
mask=torch.zeros(data.shape)
mask[:,:,250:] = 1
mask=~mask.bool()
mask=mask.numpy()

In [None]:
data=torch.from_numpy(data)
result=torch.from_numpy(imputation[1:,:,:])

In [None]:
def normalize(a):
    return (a-a.mean())/(a.max()-a.min()+0.00001)
for i in range(12):
    for j in range(result.shape[0]):
        result[j,i,:]=normalize(result[j,i,:])

## calculate distance

In [None]:
dists=[]
for i in range(data.shape[0]):
    dist=F.mse_loss(result[i,:,:]*mask[i,:,:],data[i,:,:]*mask[i,:,:])
    dists.append(dist)

In [None]:

if VT==True:
    vaild_dists=np.array(dists)[-1000:-670]
    test_dists=np.array(dists)[-670:]

## Find best score in val set.

In [None]:
tpr,tfr,ths=roc_curve(vaild_gt,vaild_dists)
Score=[0]
Max_THRESHOLD={}

for t in ths:
    THRESHOLD = t # Anomaly score threshold for an instance to be considered as anomaly 

    #TIME_STEPS = dataset.window_length
    test_score_df = pd.DataFrame(index=range(vaild_gt.shape[0]))
    test_score_df['loss'] = vaild_dists
    test_score_df['y'] = vaild_gt.numpy()
    test_score_df['threshold'] = THRESHOLD
    test_score_df['anomaly'] = test_score_df.loss <= test_score_df.threshold


    start_end = []
    state = 0
    for idx in test_score_df.index:
        if state==0 and test_score_df.loc[idx, 'y']==1:
            state=1
            start = idx
        if state==1 and test_score_df.loc[idx, 'y']==0:
            state = 0
            end = idx
            start_end.append((start, end))

    for s_e in start_end:
        if sum(test_score_df[s_e[0]:s_e[1]+1]['anomaly'])>0:
            for i in range(s_e[0], s_e[1]+1):
                test_score_df.loc[i, 'anomaly'] = 1
    # test_score_df.to_csv("test.csv")
    actual = np.array(test_score_df['y'])
    predicted = np.array([int(a) for a in test_score_df['anomaly']])

    predicted = np.array(predicted)
    actual = np.array(actual)

    tp = np.count_nonzero(predicted * actual)
    tn = np.count_nonzero((predicted - 1) * (actual - 1))
    fp = np.count_nonzero(predicted * (actual - 1))
    fn = np.count_nonzero((predicted - 1) * actual)
    accuracy = (tp + tn) / (tp + fp + fn + tn)
    score=(tp+tn)/(tp+tn+fp+5*fn)
    if score>= max(Score):
        Score.append(score)
        Max_THRESHOLD.clear()
        Max_THRESHOLD["thershold"]=t
        Max_THRESHOLD["TP"]=tp
        Max_THRESHOLD["TN"]=tn
        Max_THRESHOLD["FP"]=fp
        Max_THRESHOLD["FN"]=fn
        Max_THRESHOLD["accuracy"]=accuracy
        Max_THRESHOLD["score"]=score
        Max_THRESHOLD["TPR"]=tp/(tp+fn)
        Max_THRESHOLD["TNR"]=tn/(tn+fp+0.00001)

In [None]:
thre=Max_THRESHOLD["thershold"]

In [None]:
tpr,tfr,ths=roc_curve(test_gt,test_dists)
Score=[0]
Max_THRESHOLD={}

# for t in ths:
THRESHOLD =thre# Anomaly score threshold for an instance to be considered as anomaly 

#TIME_STEPS = dataset.window_length
test_score_df = pd.DataFrame(index=range(test_gt.shape[0]))
test_score_df['loss'] = test_dists
test_score_df['y'] = test_gt.numpy()
test_score_df['threshold'] = THRESHOLD
test_score_df['anomaly'] = test_score_df.loss <= test_score_df.threshold
# test_score_df['t'] = [x[59].item() for x in test_dataset.x]

# aUC=roc_auc_score(test_score_df['y'] ,test_score_df['anomaly'])

start_end = []
state = 0
for idx in test_score_df.index:
    if state==0 and test_score_df.loc[idx, 'y']==1:
        state=1
        start = idx
    if state==1 and test_score_df.loc[idx, 'y']==0:
        state = 0
        end = idx
        start_end.append((start, end))

for s_e in start_end:
    if sum(test_score_df[s_e[0]:s_e[1]+1]['anomaly'])>0:
        for i in range(s_e[0], s_e[1]+1):
            test_score_df.loc[i, 'anomaly'] = 1
# test_score_df.to_csv("test.csv")
actual = np.array(test_score_df['y'])
predicted = np.array([int(a) for a in test_score_df['anomaly']])

predicted = np.array(predicted)
actual = np.array(actual)

tp = np.count_nonzero(predicted * actual)
tn = np.count_nonzero((predicted - 1) * (actual - 1))
fp = np.count_nonzero(predicted * (actual - 1))
fn = np.count_nonzero((predicted - 1) * actual)
accuracy = (tp + tn) / (tp + fp + fn + tn)
score=(tp+tn)/(tp+tn+fp+5*fn)
# if score>= max(Score):
#     Score.append(score)
Max_THRESHOLD.clear()
Max_THRESHOLD["thershold"]=THRESHOLD
Max_THRESHOLD["TP"]=tp
Max_THRESHOLD["TN"]=tn
Max_THRESHOLD["FP"]=fp
Max_THRESHOLD["FN"]=fn
Max_THRESHOLD["accuracy"]=accuracy
Max_THRESHOLD["score"]=score
Max_THRESHOLD["TPR"]=tp/(tp+fn)
Max_THRESHOLD["TNR"]=tn/(tn+fp)

In [None]:
Max_THRESHOLD

In [None]:
rows=2
cols=2
j=12
plt.figure(figsize=(12, 9))
for i in range(1,5,1):
    plt.subplot(rows,cols,i)
    plt.plot(range(500),result[j+i,8,:],label="gen")
    plt.plot(range(500),data[j+i,8,:],label="Target")
    plt.legend()
    # plt.title(("Flase" if test_gt[j+i]==0 else "True")+str(test_score_df['anomaly'][j+i]))

plt.show()

In [None]:
plt.plot(test_score_df.index, test_score_df.loss, label='loss')
# plt.plot(test_score_df.index, test_score_df.threshold, label='threshold')
plt.axhline(Max_THRESHOLD["thershold"], label='threshold',color='r')
plt.plot(test_score_df.index, test_gt.numpy(), label='y')
plt.xticks(rotation=25)
plt.legend()

In [None]:
test_gt.sum()

In [None]:
for i in range(1,10):
    plt.figure(figsize=(12, 9))
    plt.subplot(2,1,1)
    plt.plot(range(500),result[i,1,:],label="Gen")
    plt.plot(range(500),data[i,1,:],label="Target")
    plt.legend()
    plt.title(("GT: Flase II" if test_gt[i]==0 else "GT: True II")+"  Label:"+str(test_score_df['anomaly'][i]))
    plt.subplot(2,1,2)
    plt.plot(range(500),result[i,8,:],label="Gen")
    plt.plot(range(500),data[i,8,:],label="Target")
    plt.legend()
    plt.title(("GT: Flase ABP" if test_gt[i]==0 else "GT: True ABP")+"  Label:"+str(test_score_df['anomaly'][i]))
    plt.savefig("sample/"+str(i)+".png")
    plt.close()

In [None]:
name=['I', 'II', 'III', 'V', 'aVF', 'aVL', 'aVR', 'PLETH','ABP','PAP','CVP','RESP']
records_name=np.array(records)[VT_test_index]

In [None]:
for i in range(1, 100):
    plt.figure(figsize=(12, 18))
    for j in range(12):
        plt.subplot(6, 2, j+1)
        plt.plot(range(500), result[i, j, :], label="Gen")
        plt.plot(range(500), data[i, j, :], label="Target")
        plt.text(250,0.45,"observation part")
        plt.text(400,0.45,"prediction part")
        plt.axvline(375,color='r',linestyle='--')
        plt.legend()

        plt.title(records_name[i-1]+"  "+("GT: False  "+name[j] if test_gt[i] == 0 else "GT: True  " +name[j]) +
                    "  Label:" + str(bool(test_score_df['anomaly'][i])))
    plt.tight_layout()
    plt.savefig("sample/" + str(i) + ".png")
    plt.close()