### Get Dataset from Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp '/content/drive/My Drive/physiobank_dataset.json' .

## Imports

In [None]:
import json
import random
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader

from utils import ProgressBar
from dataset import PhysioBank

In [None]:
random.seed(0)
torch.manual_seed(0)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

## Dataset

In [None]:
class PhysioBank:
    def __init__(
        self, path, train_batch_size=1, val_batch_size=1, test_batch_size=1,
        cuda=False, num_workers=1, train_split=0.7, val_split=0.15, mean=78.78, std=28.35
    ):
        """Initializes the dataset for loading."""

        self.path = path
        self.cuda = cuda
        self.num_workers = num_workers
        self.train_split = train_split
        self.val_split = val_split
        self.train_batch_size = train_batch_size
        self.val_batch_size = val_batch_size
        self.test_batch_size = test_batch_size
        self.mean = mean
        self.std = std

        # Get data
        self._create_data(self._read_data())
    
    def _read_data(self):
        with open(self.path) as f:
            data = json.load(f)
        return data
    
    def _get_normalization(self, samples):
        self.transition, self.scale = {}, {}

        # IHR
        samples_ihr = [y for x in samples for y in x['ihr']]
        self.transition['ihr'] = min(samples_ihr)
        self.scale['ihr'] = max(samples_ihr) - self.transition['ihr']

        # Age
        samples_age = [x['age'] for x in samples]
        self.transition['age'] = min(samples_age)
        self.scale['age'] = max(samples_age) - self.transition['age']

    def _create_data(self, samples):
        random.shuffle(samples)

        # Calculate number of samples in each set
        train_limit = int(len(samples) * self.train_split)
        val_limit = int(len(samples) * self.val_split)

        # Distribute data
        self._get_normalization(samples[:train_limit])
        self.train_data = PhysioBankDataset(samples[:train_limit], self.transition, self.scale)
        self.val_data = PhysioBankDataset(samples[train_limit:train_limit + val_limit], self.transition, self.scale)
        self.test_data = PhysioBankDataset(samples[train_limit + val_limit:], self.transition, self.scale)

    def loader(self, type='train', shuffle=True):
        loader_args = { 'shuffle': shuffle }

        # If GPU exists
        if self.cuda:
            loader_args['num_workers'] = self.num_workers
            loader_args['pin_memory'] = True

        if type == 'train':
            loader_args['batch_size'] = self.train_batch_size
            return DataLoader(self.train_data, **loader_args)
        elif type == 'val':
            loader_args['batch_size'] = self.val_batch_size
            return DataLoader(self.val_data, **loader_args)
        else:
            loader_args['batch_size'] = self.test_batch_size
            return DataLoader(self.test_data, **loader_args)


class PhysioBankDataset(Dataset):
    def __init__(self, samples, transition, scale):
        """Initializes the dataset for loading."""
        super(PhysioBankDataset, self).__init__()
        self.samples = samples
        self.transition = transition
        self.scale = scale

    def __len__(self):
        """Returns length of the dataset."""
        return len(self.samples)

    def __getitem__(self, index):
        sample = self.samples[index]

        return (
            (
                (torch.FloatTensor(sample['ihr']) - self.transition['ihr']) / self.scale['ihr'],
                torch.FloatTensor([
                    sample['gender'],
                    (sample['age'] - self.transition['age']) / self.scale['age']
                ])
            ),
            torch.FloatTensor([sample['hypertensive']])
        )

In [None]:
dataset = PhysioBank(
    'physiobank_dataset.json',
    train_batch_size=128,
    val_batch_size=128,
    test_batch_size=128,
    cuda=torch.cuda.is_available()
)

In [None]:
train_loader = dataset.loader(type='train')
val_loader = dataset.loader(type='val')
test_loader = dataset.loader(type='test')

## Model

In [None]:
class HypertensionDetectorBiLSTM(nn.Module):
    def __init__(self, hidden_dim, seq_meta_len, n_layers, dropout, device):
        super().__init__()

        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.device = device

        self.seq_meta_fc = nn.Linear(seq_meta_len, hidden_dim)
        self.rnn = nn.LSTM(1, hidden_dim, num_layers=n_layers, bidirectional=True, dropout=dropout)

        self.fc1 = nn.Linear(2 * n_layers * hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)
    
    def forward(self, seq, seq_meta):
        """Input shapes

        seq: [batch_size, seq_length]
        seq_meta: [batch_size, seq_meta_len]
        """

        batch_size, seq_len = seq.shape
        seq = seq.unsqueeze(-1).permute(1, 0, 2)  # [seq_len, batch_size, 1]

        seq_meta = self.seq_meta_fc(seq_meta)  # [batch_size, hidden_dim]
        seq_meta = seq_meta.unsqueeze(0).repeat(self.n_layers * 2, 1, 1)  # [n_layers * 2, batch_size, hidden_dim]

        _, (hidden, _) = self.rnn(
            seq, (
                seq_meta,
                torch.zeros(self.n_layers * 2, batch_size, self.hidden_dim).to(self.device)
            )
        )  # [2 * num_layers, batch_size, hidden_dim]

        hidden = hidden.permute(1, 0, 2).reshape(batch_size, -1)  # [batch_size, 2 * num_layers * hidden_dim]

        output = self.fc1(hidden)  # [batch_size, 1]
        output = self.fc2(output)
    
        return output

In [None]:
model = HypertensionDetectorBiLSTM(
    128, dataset.train_data[0][0][1].shape[0], 2, 0.1, device
).to(device)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 595,585 trainable parameters


## Train

Create optimizer and criterion

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.BCEWithLogitsLoss().to(device)

Define training and validation functions

In [None]:
def train(model, loader, optimizer, criterion, device):
    model.train()
    pbar = ProgressBar(target=len(loader), width=8)
    correct = 0
    processed = 0

    for batch_idx, data in enumerate(loader, 0):
        (source, source_meta), target  = data
        source = source.to(device)
        source_meta = source_meta.to(device)
        target = target.to(device)

        optimizer.zero_grad()
        output = model(source, source_meta)

        loss = criterion(output, target)
        loss.backward()

        optimizer.step()

        pred = (output > 0.5).float()
        correct += pred.eq(target.view_as(pred)).sum().item()
        processed += len(target)
        accuracy = 100 * correct / processed

        pbar.update(batch_idx, values=[
            ('Loss', round(loss.item(), 2)), ('Accuracy', round(accuracy, 2))
        ])
    
    pbar.add(1, values=[
        ('Loss', round(loss.item(), 2)), ('Accuracy', round(accuracy, 2))
    ])

In [None]:
def val(model, loader, criterion, device):
    model.eval()
    correct = 0
    loss = 0

    with torch.no_grad():
        for (source, source_meta), target in loader:
            source = source.to(device)
            source_meta = source_meta.to(device)
            target = target.to(device)

            output = model(source, source_meta)

            cost = criterion(output, target)
            loss += cost.item()

            pred = (output > 0.5).float()
            correct += pred.eq(target.view_as(pred)).sum().item()
    
    loss /= len(loader)
    accuracy = correct / len(loader)
    print(
        f'Validation set: Average loss: {loss:.4f}, Accuracy: {accuracy:.2f}%\n'
    )

    return accuracy

In [None]:
epochs = 100
best_val_accuracy = 0

for epoch in range(1, epochs + 1):
    print(f'Epoch {epoch}:')
    train(model, train_loader, optimizer, criterion, device)
    accuracy = val(model, val_loader, criterion, device)

    if accuracy > best_val_accuracy:
        print(f'Validation accuracy improved from {best_val_accuracy:.2f}% to {accuracy:.2f}%\n')
        best_val_accuracy = accuracy
        torch.save(model.state_dict(), 'hypertension_detector.pt')

Epoch 1:
Validation set: Average loss: 0.6918, Accuracy: 52.00%

Validation accuracy improved from 0.00% to 52.00%

Epoch 2:
Validation set: Average loss: 0.6879, Accuracy: 52.00%

Epoch 3:
Validation set: Average loss: 0.6849, Accuracy: 52.00%

Epoch 4:
Validation set: Average loss: 0.6733, Accuracy: 52.00%

Epoch 5:
Validation set: Average loss: 0.6497, Accuracy: 67.25%

Validation accuracy improved from 52.00% to 67.25%

Epoch 6:
Validation set: Average loss: 0.6090, Accuracy: 77.75%

Validation accuracy improved from 67.25% to 77.75%

Epoch 7:
Validation set: Average loss: 0.5558, Accuracy: 80.75%

Validation accuracy improved from 77.75% to 80.75%

Epoch 8:
Validation set: Average loss: 0.5158, Accuracy: 86.00%

Validation accuracy improved from 80.75% to 86.00%

Epoch 9:
Validation set: Average loss: 0.4812, Accuracy: 79.50%

Epoch 10:
Validation set: Average loss: 0.4961, Accuracy: 82.50%

Epoch 11:
Validation set: Average loss: 0.4729, Accuracy: 82.25%

Epoch 12:
Validation set

## Test

In [None]:
model.load_state_dict(torch.load('hypertension_detector.pt'))

<All keys matched successfully>

In [None]:
_ = val(model, test_loader, criterion, device)

Validation set: Average loss: 0.4052, Accuracy: 85.75%

