# The Server Runs the Testing Process on the Plain Testing Dataset

In [5]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import h5py
import tenseal as ts
import pandas as pd
import matplotlib.pyplot as plt
# plt.style.use('dark_background')
from pathlib import Path

project_path = Path.cwd().parent
print(f'project_path: {project_path}')
print(f'torch version: {torch.__version__}')
print(f'tenseal version: {ts.__version__}')

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available():
    print(f'device: {torch.cuda.get_device_name(0)}')
else:
    print('device: cpu')

project_path: /home/dk/Desktop/split-learning-1D-HE
torch version: 1.8.1+cu102
tenseal version: 0.3.10
device: NVIDIA GeForce GTX 1070 Ti


## Dataset

In [6]:
class PTBXL(Dataset):
    """
    The class used by the client to 
    load the PTBXL dataset

    Args:
        Dataset ([type]): [description]
    """
    def __init__(self, train=True):
        if train:
            with h5py.File(project_path/'data/train_ptbxl.hdf5', 'r') as hdf:
                self.x = hdf['X_train'][:]
                self.y = hdf['y_train'][:]
        else:
            with h5py.File(project_path/'data/test_ptbxl.hdf5', 'r') as hdf:
                self.x = hdf['X_test'][:]
                self.y = hdf['y_test'][:]
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        return torch.tensor(self.x[idx], dtype=torch.float), torch.tensor(self.y[idx])

batch_size = 4
test_dataset = PTBXL(train=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

## Loading the trained model

In [7]:
client = torch.load('./weights/trained_client_ptbxl_4096.pth')
server = torch.load('weights/trained_server_ptbxl_4096.pth')

class ECGModel(nn.Module):
    def __init__(self) -> None:
        super(ECGModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=12, 
                                out_channels=16, 
                                kernel_size=7, 
                                padding=3,
                                stride=1)  # 16 x 1000
        self.relu1 = nn.LeakyReLU()
        self.pool1 = nn.MaxPool1d(2)  # 16 x 500
        self.conv2 = nn.Conv1d(in_channels=16, 
                                out_channels=8, 
                                kernel_size=5, 
                                padding=2)  # 8 x 500
        self.relu2 = nn.LeakyReLU()
        self.pool2 = nn.MaxPool1d(2)  # 8 x 250
        
        self.linear = nn.Linear(in_features=8*250,
                                out_features=5)
        self.softmax = nn.Softmax(dim=1)

        self.load_weights()

    def load_weights(self):
        self.conv1.weight.data = client["conv1.weight"]
        self.conv1.bias.data = client["conv1.bias"]
        self.conv2.weight.data = client["conv2.weight"]
        self.conv2.bias.data = client["conv2.bias"]
        self.linear.weight.data = server["W"]
        self.linear.bias.data = server["b"]

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        x = x.view(-1, 8*250)
        x = self.linear(x)
        x = self.softmax(x)
        return x

model = ECGModel()

## The testing loop

In [8]:


def test(model):
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        test_loss = 0.0
        correct, total = 0, 0
        for _, batch in enumerate(test_loader):
            x, y = batch
            x, y = x.to(device), y.to(device)
            y_hat = model(x)
            loss = criterion(y_hat, y)
            test_loss += loss.item()
            correct += torch.sum(y_hat.argmax(dim=1) == y).item()
            total += len(y)
    print(f"test_loss: {(test_loss/len(test_loader)):.4f}, "
          f"test_acc: {((correct/total)*100):.2f}")

test(model.to(device))

test_loss: 1.2598, test_acc: 64.22
