In [9]:
import matplotlib.pyplot as plt 
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from tqdm.notebook import tqdm
import pandas as pd 

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
import torch.nn as nn
from imblearn.combine import SMOTETomek
from sklearn.feature_selection import SelectKBest, chi2
from torch.autograd import Variable
import torch
import torch.optim as optim
from sklearn import metrics
import torch.nn.functional as F
import time
from datetime import timedelta
import warnings
warnings.filterwarnings('ignore')
from torch.utils.data import Dataset , DataLoader
from sklearn.metrics import confusion_matrix
import plotly.io as pio
pio.renderers.default = 'jupyterlab'

In [3]:

data_path = "../FeatureTestTraining/binary/data_merged_bp_time_ens.csv"
df = pd.read_csv(data_path ,index_col=False)
df.head(1)

Unnamed: 0.1,Unnamed: 0,HR_mean,HR_std,meanNN,SDNN,medianNN,meanSD,SDSD,RMSSD,pNN20,...,hfp,SD1,SD2,pA,pQ,ApEn,shanEn,D2,subject,label
0,0,83.089179,20.181149,758.572049,153.235127,812.5,124.344406,126.424279,177.326336,0.014844,...,0.454831,125.827001,47497.302502,5976443.0,0.002649,0.161377,4.773326,1.318076,2,1


In [4]:
labels = ['Stress','Non-Stress']
values = df['label'].value_counts()
colors = ['red', 'royalblue','green','yellow','pink','grey']
fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=.5)])
fig.update_traces(hoverinfo='label+value',textfont_size=15,marker=dict(colors=colors))
fig.update_layout(annotations=[dict(text='Stress Condition ', 
                                    x=0.50, y=0.5, font_size=15, 
                                    showarrow=False)])
fig.show()



In [4]:


X = df.drop('label', axis=1)
y = df['label']
ros = SMOTETomek(random_state=42)
X_resampled, y_resampled = ros.fit_resample(X, y)

In [5]:
balanced_df = pd.DataFrame(data=X_resampled, columns=df.columns)
balanced_df['label'] = y_resampled

In [6]:
balanced_df = balanced_df.drop(columns=['Unnamed: 0'])

In [7]:
balanced_df.head(1)

Unnamed: 0,HR_mean,HR_std,meanNN,SDNN,medianNN,meanSD,SDSD,RMSSD,pNN20,pNN50,...,hfp,SD1,SD2,pA,pQ,ApEn,shanEn,D2,subject,label
0,83.089179,20.181149,758.572049,153.235127,812.5,124.344406,126.424279,177.326336,0.014844,0.010937,...,0.454831,125.827001,47497.302502,5976443.0,0.002649,0.161377,4.773326,1.318076,2,1


In [8]:
labels = ['Stress','Non-Stress']
values = balanced_df['label'].value_counts()
colors = ['red', 'royalblue','green','yellow','pink','grey']
fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=.5)])
fig.update_traces(hoverinfo='label+value',textfont_size=15,marker=dict(colors=colors))
fig.update_layout(annotations=[dict(text='Stress Condition ', 
                                    x=0.50, y=0.5, font_size=15, 
                                    showarrow=False)])
fig.show()



#### Description of a Simple Heart Rate Variability (HRV) Plot

1. **X-Axis (Samples)**: The horizontal axis represents time, with data points evenly spaced or timestamped along the axis. It typically spans a specific duration or time period during which HRV measurements were taken. Time may be displayed in minutes, hours, or other relevant units, depending on the data's time resolution.

2. **Y-Axis (HRV Metric)**: The vertical axis represents the HRV metric being measured. Common HRV metrics include:

   - **SDNN (Standard Deviation of NN Intervals)**: A measure of overall HRV, reflecting both sympathetic and parasympathetic nervous system influence on heart rate.
   
   - **RMSSD (Root Mean Square of Successive Differences)**: A marker of parasympathetic (vagal) activity, specifically reflecting short-term HRV and respiratory sinus arrhythmia.
   
   - **pNN50 (Percentage of NN50 Intervals)**: Indicates the proportion of adjacent NN intervals (R-R intervals) differing by more than 50 ms, often used as a measure of high-frequency HRV.

In [9]:
"""
An HRV plot displays heart rate variability metrics over time. 
It helps assess cardiovascular health, stress levels, and autonomic nervous system activity."
"""

fig = make_subplots(rows=2, cols=2, subplot_titles=("HR_mean", "medianNN", "SDNN", "RMSSD"))

trace1 = go.Scatter(x=balanced_df.index[:70], y=balanced_df['HR_mean'][:70], mode='lines', name='HR_mean')
trace2 = go.Scatter(x=balanced_df.index[:70], y=balanced_df['medianNN'][:70], mode='lines', name='medianNN')
trace3 = go.Scatter(x=balanced_df.index[:70], y=balanced_df['SDNN'][:70], mode='lines', name='SDNN')
trace4 = go.Scatter(x=balanced_df.index[:70], y=balanced_df['RMSSD'][:70], mode='lines', name='RMSSD')

fig.add_trace(trace1, row=1, col=1)
fig.add_trace(trace2, row=2, col=1)
fig.add_trace(trace3, row=1, col=2)
fig.add_trace(trace4, row=2, col=2)

fig.update_xaxes(title_text="Sample", row=1, col=1)
fig.update_xaxes(title_text="Sample", row=2, col=1)
fig.update_xaxes(title_text="Sample", row=1, col=2)
fig.update_xaxes(title_text="Sample", row=2, col=2)

# Customize the layout with a black background
fig.update_layout(
    title="Heart Rate Analysis",
    paper_bgcolor='black',  # Set background color to black
    plot_bgcolor='black',   # Set plot background color to black
    font=dict(color='white'),  # Set font color to white
    showlegend=True,
    xaxis=dict(showline=True, showgrid=False),
    yaxis=dict(showline=True, showgrid=False),
)

fig.update_annotations(font=dict(color='white'))

fig.update_layout(width=900, height=600)
fig.show()


In [10]:
feats = ['HR_mean', 'HR_std', 'meanNN', 'SDNN', 'medianNN', 'meanSD',
       'SDSD', 'RMSSD', 'pNN20', 'pNN50', 'TINN']
label_dict = {'non-stress': 0, 'stress': 1}
int_to_label = {0: 'non-stress', 1: 'stress'}


In [11]:
def select_features_and_return_balanced_df(balanced_df, number_features=11):
    X = balanced_df.drop('label', axis=1)
    y = balanced_df['label']
    
    scaler = MinMaxScaler()
    X_scaled = scaler.fit_transform(X)
    
    best_features = SelectKBest(score_func=chi2, k=number_features)
    
    best_features.fit(X_scaled, y)
    
    selected_feature_indices = best_features.get_support(indices=True)
    
    selected_features = X.columns[selected_feature_indices]
    
    print(f"Number of selected features: {len(selected_features)}")
    print(f"Selected features: {list(selected_features)}")
    
    X_selected = best_features.transform(X_scaled)
    
    selected_df = pd.DataFrame(data=X_selected, columns=selected_features)
    selected_df['label'] = y
    
    return selected_df

In [12]:
best_selected_df = select_features_and_return_balanced_df(balanced_df , number_features=11)

Number of selected features: 11
Selected features: ['HR_mean', 'HR_std', 'meanNN', 'medianNN', 'meanSD', 'RMSSD', 'pNN20', 'pNN50', 'SD1', 'SD2', 'pA']


In [13]:
best_selected_df.head(1)

Unnamed: 0,HR_mean,HR_std,meanNN,medianNN,meanSD,RMSSD,pNN20,pNN50,SD1,SD2,pA,label
0,0.34068,0.680308,0.478054,0.531915,0.502317,0.611273,0.536,0.553191,0.610237,0.415737,0.273026,1


In [14]:
train , val =  train_test_split(best_selected_df, test_size=0.3)


In [15]:


colors = ['red', 'royalblue','green','yellow','pink','grey']

values_train = train['label'].value_counts()
values_test = val['label'].value_counts()

labels = ['Stress', 'Non-Stress']

fig = make_subplots(rows=1, cols=2, subplot_titles=['Stress/non-Stress Label Counts (val)', 'Stress/non-Stress Label Counts (train)'],
                    specs=[[{'type':'pie'}, {'type':'pie'}]])

fig_val = go.Figure(data=[go.Pie(labels=labels, values=values_test, hole=.5, marker=dict(colors=colors), name='HR_mean')])
fig_val.update_traces(hoverinfo='label+value', textfont_size=15)

fig_train = go.Figure(data=[go.Pie(labels=labels, values=values_train, hole=.5, marker=dict(colors=colors), name="Training")])
fig_train.update_traces(hoverinfo='label+value', textfont_size=15)

fig.add_trace(fig_val.data[0], row=1, col=1)
fig.add_trace(fig_train.data[0], row=1, col=2)

# Update the layout
fig.update_layout(
    title="Labels Counts Analysis",
    paper_bgcolor='white',  
    plot_bgcolor='black',  
    font=dict(color='white'),  
    showlegend=False,  
    xaxis=dict(showline=True, showgrid=False),
    yaxis=dict(showline=True, showgrid=False),
)

# Update subplot titles
fig.update_annotations(
    dict(text='Stress Condition', font_size=15, showarrow=False),
    selector=dict(row=1, col=1)
)
fig.update_annotations(
    dict(text='', font_size=15, showarrow=False),  
    selector=dict(row=1, col=2)
)

fig.update_layout(annotations=[dict(text='Stress Condition ', 
                                    x=0.50, y=0.5, font_size=15, 
                                    showarrow=False)])
fig.show()


In [16]:
# def one_hot_encode(x, n_classes):
#     """
#     One hot encode a list of sample labels. Return a one-hot encoded vector for each label.
#     : x: List of sample Labels
#     : return: Numpy array of one-hot encoded labels
#      """
#     return np.eye(n_classes)[x]


# n_classes = 2
# Encoder_val= one_hot_encode(val["label"] , n_classes)
# Encoder_train = one_hot_encode(train["label"] , n_classes)
# train["label"]=list(Encoder_train)
# val["label"]=list(Encoder_val)

In [17]:
class WESAD_DATASET(Dataset):
    def __init__(self, data: pd.DataFrame, transformer=None):
        super(WESAD_DATASET, self).__init__()
        self.transformer = transformer
        self.data = data
        self.y = torch.tensor(data["label"].values).float()
        
        if 'subject' in self.data.columns:
            self.data.drop(['label', 'subject'], axis=1, inplace=True)
        else:
            self.data.drop(['label'], axis=1, inplace=True)
            
        if self.transformer is not None:
            self.data = self.normalize(self.data)
        
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        x = self.data.iloc[idx].values
        y = self.y[idx]
    
        return torch.tensor(x), y

    def normalize(self,data):
        normalized_balanced_df=(data-data.min())/(data.max()-data.min())
        return normalized_balanced_df

In [18]:
feats = ['subject','HR_mean', 'HR_std', 'meanNN', 'SDNN', 'medianNN', 'meanSD',
       'SDSD', 'RMSSD', 'pNN20', 'pNN50', 'TINN','label']
best_features = best_selected_df.columns
label_dict = {'no-stress': 0, 'stress': 1}
int_to_label = {0: 'no-stress', 1: 'stress'}


In [19]:
frequency_domain_features = [
    'subject','LF', 'HF', 'ULF', 'VLF',
    'LFHF', 'total_power', 'lfp', 'hfp', 'label'
]

spatial_domain_features = [
    'subject','HR_mean', 'HR_std', 'meanNN', 'SDNN', 'medianNN', 'meanSD',
    'SDSD', 'RMSSD', 'pNN20', 'pNN50', 'TINN', 'label'
]

non_linear_hrv_measures = [
    'subject','SD1', 'SD2', 'pA', 'pQ', 'ApEn',
    'shanEn', 'D2', 'label'
]


In [22]:
!pip install ptflops


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [10]:
class LinearStress(nn.Module): 
    def __init__(self,args):
        super(LinearStress, self).__init__()
        self.in_features = args["in_features"]
        self.out_features = args["in_features"]
        self.out_classes = args["out_classes"]
        self.fc = nn.Sequential(
                        nn.Linear(self.in_features,self.out_features),
                        #nn.Dropout(0.5),
                        nn.ReLU(),
                        nn.Linear(self.out_features, self.out_classes),
                        #nn.Dropout(0.5),
                        nn.LogSoftmax(dim=1))
        
    def forward(self, x):
        return self.fc(x)    



In [11]:
model_args = {
        "in_features": 11,
        "out_features": 128,
        "out_classes": 2,
    }
model = LinearStress(model_args)


In [19]:
total_params = sum(p.numel() for p in model.parameters())
total_params

156

In [26]:
import torchvision.models as models
import torch
from ptflops import get_model_complexity_info

# if torch.cuda.is_available() and self.use_cuda:
#             self.device = 'cuda:0'
macs, params = get_model_complexity_info(model, (1, 11), as_strings=True,
                                       print_per_layer_stat=True, verbose=True)
print('{:<30}  {:<8}'.format('Computational complexity: ', macs))
print('{:<30}  {:<8}'.format('Number of parameters: ', params))

LinearStress(
  156, 100.000% Params, 167.0 Mac, 100.000% MACs, 
  (fc): Sequential(
    156, 100.000% Params, 167.0 Mac, 100.000% MACs, 
    (0): Linear(132, 84.615% Params, 132.0 Mac, 79.042% MACs, in_features=11, out_features=11, bias=True)
    (1): ReLU(0, 0.000% Params, 11.0 Mac, 6.587% MACs, )
    (2): Linear(24, 15.385% Params, 24.0 Mac, 14.371% MACs, in_features=11, out_features=2, bias=True)
    (3): LogSoftmax(0, 0.000% Params, 0.0 Mac, 0.000% MACs, dim=1)
  )
)
Computational complexity:       167.0 Mac
Number of parameters:           156     


In [20]:
for param in model.parameters():
    print(f"param :{(param.numel())}")

param :121
param :11
param :22
param :2


In [21]:
def get_data_loaders(balanced_df, features,train_batch_size=1, test_batch_size=1):
    #balanced_df = pd.read_csv('data/m14_merged.csv', index_col=0)[feats]

    # train_balanced_df = balanced_df[ balanced_df['subject'] != subject_id].reset_index(drop=True)
    # test_balanced_df = balanced_df[ balanced_df['subject'] == subject_id].reset_index(drop=True)
    train_balanced_df , val_balanced_df =  train_test_split(balanced_df[features], test_size=0.3)

    train_dset = WESAD_DATASET(train_balanced_df,transformer=True)
    test_dset = WESAD_DATASET(val_balanced_df,transformer=True)

    train_loader = torch.utils.data.DataLoader(train_dset, batch_size=train_batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dset, batch_size=test_batch_size,shuffle=False)
    
    return train_loader, test_loader

In [22]:
train_loader, _=get_data_loaders(best_selected_df,best_features)
for batch in train_loader:
    feat , label = batch 
    print(feat.shape, label.shape)
    break
    

torch.Size([1, 11]) torch.Size([1])


In [23]:
seq_len = len(feats) 
embed_dim = balanced_df.shape[0] 
print(f"seq_len : {seq_len} \n embed_dim : {embed_dim}")

seq_len : 13 
 embed_dim : 1082


In [24]:
def save_model(model, model_path):
    """Save model."""
    torch.save(model.state_dict(), model_path)


def load_model(model, model_path, use_cuda=False):
    """Load model."""
    map_location = 'cpu'
    if use_cuda and torch.cuda.is_available():
        map_location = 'cuda:0'
    model.load_state_dict(torch.load(model_path, map_location))
    return model

In [25]:
class Trainer(object):
    """Trainer."""

    def __init__(self, **kwargs):
        self.n_epochs = kwargs["epochs"]
        self.batch_size = kwargs["batch_size"]
        self.validate = kwargs["validate"]
        self.save_best_dev = kwargs["save_best_dev"]
        self.use_cuda = kwargs["use_cuda"]
        self.print_every_step = kwargs["print_every_step"]
        self.optimizer = kwargs["optimizer"]
        self.model_path = kwargs["model_path"]
        self.eval_metrics = kwargs["eval_metrics"]
        self._best_accuracy = 0.0
        self.criterion = nn.CrossEntropyLoss()

        self.device = 'cpu'
        if torch.cuda.is_available() and self.use_cuda:
            self.device = 'cuda:0'

    def train(self, network, data_train, dev_data=None):
        # transfer model to gpu if available
        network = network.to(self.device)

        # define batch iterator
        train_iter = data_train

        
        # define Tester over dev data
        if self.validate:
            default_valid_args = {
                "batch_size":  self.batch_size,
                "use_cuda": self.use_cuda}
            validator = Tester(**default_valid_args)

        start = time.time()
        for epoch in range(1, self.n_epochs + 1):
            # turn on network training mode
            network.train()

            # one forward and backward pass
            self._train_step(
                train_iter, network, start=start,
                n_print=self.print_every_step, epoch=epoch)

            # validation
            if self.validate:
                if dev_data is None:
                    raise RuntimeError(
                        "self.validate is True in trainer, "
                        "but dev_data is None."
                        " Please provide the validation data.")
                eval_results = validator.test(network, dev_data)

                if self.save_best_dev and self.best_eval_result(eval_results):
                    save_model(network, self.model_path)
                    print("Saved better model selected by validation.")

    def _train_step(self, data_iterator, network, **kwargs):
        """Training process in one epoch.
        """
        step = 0
        for batch in data_iterator:
            feats, target = batch
            feats , target = feats.to(self.device),target.long().to(self.device)
            self.optimizer.zero_grad()
            logits = network(feats.float())
        


            loss = self.criterion(logits, target)
            loss.backward()
            self.optimizer.step()

            if kwargs["n_print"] > 0 and step % kwargs["n_print"] == 0:
                end = time.time()
                diff = timedelta(seconds=round(end - kwargs["start"]))
                print_output = "[epoch: {:>3} step: {:>4}]" \
                    " train loss: {:>4.6} time: {}".format(
                        kwargs["epoch"], step, loss.item(), diff)
                print(print_output)

            step += 1

    def best_eval_result(self, eval_results):
        """Check if the current epoch yields better validation results.

        :param eval_results: dict, format {metrics_name: value}
        :return: bool, True means current results on dev set is the best.
        """
        assert self.eval_metrics in eval_results, \
            "Evaluation doesn't contain metrics '{}'." \
            .format(self.eval_metrics)

        accuracy = eval_results[self.eval_metrics]
        if accuracy > self._best_accuracy:
            self._best_accuracy = accuracy
            return True
        else:
            return False


In [26]:
class Tester(object):
    """Tester."""

    def __init__(self, **kwargs):
        self.batch_size = kwargs["batch_size"]
        self.use_cuda = kwargs["use_cuda"]
        self.device = 'cpu'
        if torch.cuda.is_available() and self.use_cuda:
            self.device = 'cuda:0'

    def test(self, network, dev_data, threshold=0.87):
        # transfer model to gpu if available
        network = network.to(self.device)

        # turn on the testing mode; clean up the history
        network.eval()
        output_list = []
        truth_list = []

        # define batch iterator
        data_iter = dev_data


        # predict
        for batch in data_iter:
            feats ,  target = batch
            feats ,  target = feats.to(self.device),target.long().to(self.device)


            with torch.no_grad():
                prediction = network(feats.float())

            output_list.append(prediction.detach())
            truth_list.append(target.detach())

        # evaluate
        eval_results = self.evaluate(output_list, truth_list, threshold)
        print("[tester] {}".format(self.print_eval_results(eval_results)))
    
        return eval_results

    def evaluate(self, predict, truth, threshold=0.33):
        
        """Compute evaluation metrics.

        :param predict: list of Tensor
        :param truth: list of dict
        :param threshold: threshold of positive probability
        :return eval_results: dict, format {name: metrics}.
        """

        y_trues, y_preds = [], []
        for y_true, logit in zip(truth, predict):
            _, y_pred = torch.max(logit, 1)
            y_pred = (y_pred > threshold).long().cpu().numpy()
            y_true = y_true.cpu().numpy()
            y_trues.append(y_true)
            y_preds.append(y_pred)
        
        y_true = np.concatenate(y_trues, axis=0)
        y_pred = np.concatenate(y_preds, axis=0)
       
        
        precision = metrics.precision_score(y_true, y_pred,pos_label=1)
        accuracy = metrics.accuracy_score(y_true, y_pred)
        recall = metrics.recall_score(y_true, y_pred,pos_label=1)
        f1 = metrics.f1_score(y_true, y_pred,pos_label=1)

        metrics_dict = {"accuracy":accuracy, "precision": precision, "recall": recall, "f1": f1}

        return metrics_dict 



    def print_eval_results(self, results):
        """Override this method to support more print formats.
        :param results: dict, (str: float) is (metrics name: value)
        """
        return ", ".join(
            [str(key) + "=" + "{:.4f}".format(value)
             for key, value in results.items()])

In [27]:
class Predictor(object):
    """An interface for predicting outputs based on trained models.
    """

    def __init__(self, batch_size=8, use_cuda=False):
        self.batch_size = batch_size
        self.use_cuda = use_cuda

        self.device = 'cpu'
        if torch.cuda.is_available() and self.use_cuda:
            self.device = 'cuda:0'

    def predict(self, network, data, threshold=0.33):
        # transfer model to gpu if available
        network = network.to(self.device)

        # turn on the testing mode; clean up the history
        network.eval()
        batch_output = []
        index_col =[]
        
        # define batch iterator
        data_iter = data

        data_iter

        for idx_col , batch in enumerate(data_iter):
            feats, _ = batch
            feats = feats.to(self.device)
            with torch.no_grad():
                prediction = network(feats.float())
                

                

            batch_output.append(prediction.detach())
            index_col.append(idx_col)

        return self._post_processor(batch_output, threshold) , index_col

    def _post_processor(self, batch_output, threshold=0.33):
        """Convert logit tensor to label."""
        y_preds = []
        for logit in batch_output:
            _, y_pred = torch.max(logit, 1)
            y_pred = (y_pred > threshold).long().cpu().numpy()
            y_preds.append(y_pred)
        y_pred = np.concatenate(y_preds, axis=0)

        return y_pred

In [28]:
def train():
    """Train model.
    """

    print("Training...")
    train_loader, test_loader=get_data_loaders(df,best_features)



    model_args = {
        "in_features": 11,
        "out_features": 128,
        "out_classes": 2,
    }
    model = LinearStress(model_args)
    model_path="save_checkpoint/LinearStress.pth"

    # define trainer
    trainer_args = {
        "epochs":50,
        "batch_size": 1,
        "validate": True,
        "save_best_dev": True,
        "use_cuda": False,
        "print_every_step": 1000,
        "optimizer": torch.optim.Adam(model.parameters(), lr=1e-3),
        "model_path": model_path,
        "eval_metrics": "f1",
    }
    trainer = Trainer(**trainer_args)

    trainer.train(model, data_train=train_loader, dev_data=test_loader)

    print('')


In [29]:
train()

Training...
[epoch:   1 step:    0] train loss: 0.513826 time: 0:00:00
[tester] accuracy=0.9135, precision=0.9296, recall=0.9536, f1=0.9415
Saved better model selected by validation.
[epoch:   2 step:    0] train loss: 0.199385 time: 0:00:01
[tester] accuracy=0.9436, precision=0.9735, recall=0.9485, f1=0.9608
Saved better model selected by validation.
[epoch:   3 step:    0] train loss: 0.1287 time: 0:00:01
[tester] accuracy=0.9436, precision=0.9735, recall=0.9485, f1=0.9608
[epoch:   4 step:    0] train loss: 0.165471 time: 0:00:02
[tester] accuracy=0.9436, precision=0.9735, recall=0.9485, f1=0.9608
[epoch:   5 step:    0] train loss: 0.0157611 time: 0:00:02
[tester] accuracy=0.9436, precision=0.9735, recall=0.9485, f1=0.9608
[epoch:   6 step:    0] train loss: 1.80841 time: 0:00:03
[tester] accuracy=0.9286, precision=0.9888, recall=0.9124, f1=0.9491
[epoch:   7 step:    0] train loss: 0.00995351 time: 0:00:03
[tester] accuracy=0.9436, precision=0.9735, recall=0.9485, f1=0.9608
[epoch

In [30]:
def test():
    """Train model.
    """

    print("Testing...")
    model_path="save_checkpoint/LinearStress.pth"


    model_args = {
        "in_features": 11,
        "out_features": 128,
        "out_classes": 2,
    }
    model = LinearStress(model_args)

    # define tester
    tester_args = {
        "batch_size": 1,
        "use_cuda": False,
    }
    tester = Tester(**tester_args)

    # test and threshold selection
    _, test_loader=  get_data_loaders(df,best_features)
    #_ , test_loader = get_data_loaders(scaled_data_df, feats_test)


    best_thresh, best_f1 = 0., 0.
    for thresh in np.arange(0.33, 0.87, 0.01):
        thresh = np.round(thresh, 2)
        f1 = tester.test(model, test_loader, threshold=thresh)["f1"]
        print("threshold: {:.2f} f1: {}".format(thresh, f1))
        if f1 > best_f1:
            best_thresh, best_f1 = thresh, f1
            print(f"best f1 on dev: {best_f1} threshold: {best_thresh:.2f}")

    print('')


In [31]:
args = test()

Testing...
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.33 f1: 0.9258312020460359
best f1 on dev: 0.9258312020460359 threshold: 0.33
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.34 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.35 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.36 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.37 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.38 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.39 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
threshold: 0.40 f1: 0.9258312020460359
[tester] accuracy=0.8910, precision=0.9577, recall=0.8960, f1=0.9258
thres

### Predictions:

In [32]:
Test_path = "../FeatureTestTraining/binary/test/test.csv"

test_df = pd.read_csv(Test_path, index_col=False)
test_df['condition'].unique()

array(['no stress', 'time pressure', 'interruption'], dtype=object)

In [34]:
test_df = test_df[test_df['condition'] != 'interruption']

In [35]:
test_df['condition'].unique()

array(['no stress', 'time pressure'], dtype=object)

In [36]:
def change_label(label):
    if label == 'time pressure':
        return 'stress'
    else:
        return 'no stress'
test_df['condition'] = test_df['condition'].apply(change_label)

In [37]:
test_df.rename(columns={'condition': 'label'}, inplace=True)

In [38]:
labels = ['no stress', 'stress']
values = test_df['label'].value_counts()
colors = ['red', 'royalblue','green','yellow','pink','grey']
fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=.5)])
fig.update_traces(hoverinfo='label+value',textfont_size=15,marker=dict(colors=colors))
fig.update_layout(annotations=[dict(text='Stress Condition ', 
                                    x=0.50, y=0.5, font_size=15, 
                                    showarrow=False)])
fig.show()



In [39]:
le = LabelEncoder()
le.fit(test_df['label'])
test_df['label'] = le.transform(test_df['label'])


In [40]:
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(test_df)
scaled_data_df = pd.DataFrame(scaled_data, columns=test_df.columns)

In [41]:
scaled_data_df.columns

Index(['MEAN_RR', 'MEDIAN_RR', 'SDRR', 'RMSSD', 'SDSD', 'SDRR_RMSSD', 'HR',
       'pNN25', 'pNN50', 'SD1', 'SD2', 'KURT', 'SKEW', 'MEAN_REL_RR',
       'MEDIAN_REL_RR', 'SDRR_REL_RR', 'RMSSD_REL_RR', 'SDSD_REL_RR',
       'SDRR_RMSSD_REL_RR', 'KURT_REL_RR', 'SKEW_REL_RR', 'VLF', 'VLF_PCT',
       'LF', 'LF_PCT', 'LF_NU', 'HF', 'HF_PCT', 'HF_NU', 'TP', 'LF_HF',
       'HF_LF', 'sampen', 'higuci', 'datasetId', 'label'],
      dtype='object')

In [42]:
feats_test = ['MEAN_RR', 'MEDIAN_RR', 'SDRR', 'RMSSD', 'SDSD', 'SDRR_RMSSD', 'HR',
       'pNN25', 'pNN50','MEAN_REL_RR','SDRR_RMSSD_REL_RR','label']

In [43]:
len(feats_test)

12

In [44]:
def infer():
    """Inference using model.
    """

    print("Predicting...")
    int_to_label = {0: 'no-stress', 1: 'stress'}

    model_args = {
        "in_features": 11,
        "out_features": 128,
        "out_classes": 2,
    }
    model = LinearStress(model_args)
    model_path="save_checkpoint/LinearStress.pth"

    load_model(model, model_path, use_cuda=True)

    # define predictor
    predictor = Predictor(batch_size=1, use_cuda=True)

    # predict
    _, data_test = get_data_loaders(test_df, feats_test)

    threshold = 0.50
    y_pred, index_col = predictor.predict(model, data_test, threshold=threshold)

    # Map integer labels to their corresponding string labels
    y_pred_labels = [int_to_label[label] for label in y_pred]

    # print result 
    for i in range(0,10):
        print(f"prediction Stress Detection on HRV as {y_pred_labels[i]} \n index_features {index_col[i]}")
    print('')


In [45]:
infer()

Predicting...
prediction Stress Detection on HRV as no-stress 
 index_features 0
prediction Stress Detection on HRV as stress 
 index_features 1
prediction Stress Detection on HRV as stress 
 index_features 2
prediction Stress Detection on HRV as stress 
 index_features 3
prediction Stress Detection on HRV as stress 
 index_features 4
prediction Stress Detection on HRV as no-stress 
 index_features 5
prediction Stress Detection on HRV as stress 
 index_features 6
prediction Stress Detection on HRV as no-stress 
 index_features 7
prediction Stress Detection on HRV as stress 
 index_features 8
prediction Stress Detection on HRV as stress 
 index_features 9

