In [1]:
import torch
from torch import nn
import torch.nn.functional as F
from pandas import read_csv
from sklearn.metrics import accuracy_score
from numpy import vstack
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import random_split
from torch.nn import BCELoss
from torch.optim import SGD

In [8]:
class CSVDataset(Dataset):
    def __init__(self, path):
        df = read_csv(path, header=None)

        self.X = df.values[:, :-1]
        self.y = df.values[:, -1]
        self.X = self.X.astype('float32')
        self.y = LabelEncoder().fit_transform(self.y)
        self.y = self.y.astype('float32')
        self.y = self.y.reshape((len(self.y), 1))

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return [self.X[idx], self.y[idx]]

    def get_splits(self, n_test=0.33):
        test_size = round(n_test * len(self.X))
        train_size = len(self.X) - test_size
        return random_split(self, [train_size, test_size])

def prepare_data(path):
    dataset = CSVDataset(path)
    train, test = dataset.get_splits()
    train_dl = DataLoader(train, batch_size=32, shuffle=True)
    test_dl = DataLoader(test, batch_size=1024, shuffle=False)
    return train_dl, test_dl

In [9]:
class CustomLayer(nn.Module):
    def __init__(self, in_units: int):
        super().__init__()
        self.W = torch.randn(in_units, in_units, requires_grad=True)
        self.b = torch.ones(in_units, requires_grad=True)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        y1 = torch.matmul(x, self.W.T) + self.b
        return y1*y1 - x

In [10]:
def train_model(train_dl: DataLoader, model: nn.Module):
    criterion = BCELoss()
    optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
    for epoch in range(100):
        for i, (inputs, targets) in enumerate(train_dl):
            optimizer.zero_grad()
            yhat = model(inputs)
            loss = criterion(yhat, targets)
            loss.backward()
            optimizer.step()

def evaluate_model(test_dl: DataLoader, model: nn.Module):
    predictions, actuals = list(), list()
    for i, (inputs, targets) in enumerate(test_dl):
        yhat = model(inputs)
        yhat = yhat.detach().numpy()
        actual = targets.numpy()
        actual = actual.reshape((len(actual), 1))
        yhat = yhat.round()
        predictions.append(yhat)
        actuals.append(actual)
    predictions, actuals = vstack(predictions), vstack(actuals)
    acc = accuracy_score(actuals, predictions)
    return acc

In [12]:
path = 'https://raw.githubusercontent.com/jbrownlee/Datasets/master/ionosphere.csv'
train_dl, test_dl = prepare_data(path)
print(len(train_dl.dataset), len(test_dl.dataset))

model = nn.Sequential(
    CustomLayer(34),
    nn.ReLU(),
    nn.Linear(34, 12),
    nn.ReLU(),
    nn.Linear(12, 1),
    nn.Sigmoid()
)
train_model(train_dl, model)
acc = evaluate_model(test_dl, model)
print('Accuracy: %.3f' % acc)

235 116
Accuracy: 0.922
