In [116]:
import os
import torch

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import tqdm
from tqdm import trange

In [82]:
class CNN1D(nn.Module):
    def __init__(self, number_of_categories):
        super(CNN1D, self).__init__()
        # convolution layer
        # in_channels must be the same as the number of subcarriers, out_channels can be any value
        self.conv1d = nn.Conv1d(in_channels=64, out_channels=40, kernel_size=3, stride=1)
        self.maxpool = nn.MaxPool1d(kernel_size=2)
        self.relu = nn.ReLU()

        # calculate the in_features
        self.fc1 = nn.Linear(in_features=40 * 19, out_features=128)
        self.fc2 = nn.Linear(in_features=128, out_features=number_of_categories)

    def forward(self, x):
        # input tensor size [batch_size, 64 (features), 40 (sequence length)]
        x = self.conv1d(x)
        # [batch_size, 64 (features), 40 (sequence length)]
        x = self.relu(x)
        x = self.maxpool(x)
        x = x.view(-1, x.shape[1] * x.shape[2])  # Adjust the input size based on the output size after convolutions and pooling
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        
        return x

In [83]:
class CSIDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.int)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [112]:
def importData():
    X_train, X_val, X_test, y_train, y_val, y_test = [], [], [], [], [], []
    for i in range(10):
        temp_train = np.load(f'../data/processed_data/{i}_table_train.npy')
        temp_val = np.load(f'../data/processed_data/{i}_table_val.npy')
        temp_test = np.load(f'../data/processed_data/{i}_table_test.npy')
        if i == 0:
            X_train = temp_train
            X_val = temp_val
            X_test = temp_test
            y_train = np.array([[i] for x in range(X_train.shape[0])])
            y_val = np.array([[i] for x in range(X_val.shape[0])])
            y_test = np.array([[i] for x in range(X_test.shape[0])])
        else:
            X_train = np.append(X_train, temp_train, axis=0)
            X_val = np.append(X_val, temp_val, axis=0)
            X_test = np.append(X_test, temp_test, axis=0)
            y_train = np.append(y_train, np.array([[i] for _ in range(temp_train.shape[0])]), axis=0)
            y_val = np.append(y_val, np.array([[i] for _ in range(temp_val.shape[0])]), axis=0)
            y_test = np.append(y_test, np.array([[i] for _ in range(temp_test.shape[0])]), axis=0)
            
    return X_train, X_val, X_test, y_train, y_val, y_test

In [142]:
class RunningAverage:
    def __init__(self):
        self.steps = 0
        self.total = 0

    def update(self, val):
        self.total += val
        self.steps += 1

    def __call__(self):
        return self.total / float(self.steps)

def accuracy(outputs, labels):
    outputs = np.argmax(outputs.cpu().detach().numpy(), axis=1)
    labels = labels.squeeze()
    # compare outputs with labels
    return np.sum([1 if first == second else 0 for first, second in zip(labels, outputs)]) / float(len(labels))

In [140]:
def train(model, optimizer, trainLoader, loss_fn, epoch, iterations):
    model.train()
    train_loss = RunningAverage()
    train_acc = RunningAverage()
    
    with trange(iterations) as pbar:
        for X_batch, y_batch in trainLoader:
            y_batch = y_batch.type(torch.LongTensor).squeeze(1)
            logits = model(X_batch)
            y_pred = F.log_softmax(logits, dim=1) # use this for accuracy 
            loss = loss_fn(logits, y_batch)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss.update(loss)
            train_acc.update(accuracy(y_pred, y_batch))
            pbar.update(1)
        
    print(f"Train: Epoch {epoch}: Loss {train_loss()}, Accuracy {train_acc()}")
    
def eval(model, valLoader, loss_fn, epoch):
    model.eval()
    eval_loss = RunningAverage()
    eval_acc = RunningAverage()
    
    with torch.no_grad():
        for X_batch, y_batch in valLoader:
            y_batch = y_batch.type(torch.LongTensor).squeeze(1)
            logits = model(X_batch)
            y_pred = F.log_softmax(logits, dim=1) # use this for accuracy 
            loss = loss_fn(logits, y_batch)
            
            eval_loss.update(loss)
            eval_acc.update(accuracy(y_pred, y_batch))
            
    print(f"Eval: Epoch {epoch}: Loss {eval_loss()}, Accuracy {eval_acc()}")
    
def test(model, testLoader):
    model.eval()
    for X_batch, y_batch in testLoader:
        print(X_batch.shape)
        y_batch = y_batch.type(torch.LongTensor).squeeze(1)
        logits = model(X_batch)
        y_pred = F.log_softmax(logits, dim=1) # use this for accuracy 
        
    print(f"Final Accuracy: {accuracy(y_pred, y_batch)}")

    # True positive percentage
    percentages = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0}
    outputs = np.argmax(y_pred.cpu().detach().numpy(), axis=1)
    labels = y_batch.squeeze()
    
    for i in range(len(outputs)):
        if outputs[i] == labels[i]:
            percentages[outputs[i]] += 1
            
    print(percentages)

In [128]:
def train_and_eval(model, epochs, optimizer, trainLoader, valLoader, loss_fn, iterations):
        
    for epoch in range(epochs):
        train(model, optimizer, trainLoader, loss_fn, epoch, iterations)
        eval(model, valLoader, loss_fn, epoch)
        

In [141]:
def main():
    # 
    batch_size = 64
    X_train, X_val, X_test, y_train, y_val, y_test = importData()
    trainDataset = CSIDataset(X_train, y_train)
    valDataset = CSIDataset(X_val, y_val)
    
    trainLoader = DataLoader(trainDataset, shuffle=True, batch_size=batch_size)
    valLoader = DataLoader(valDataset, shuffle=True, batch_size=batch_size)

    number_of_categories = 10
    model = CNN1D(number_of_categories)
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    loss = nn.CrossEntropyLoss()

    if (os.path.isfile("model.pth")):
        model.load_state_dict(torch.load('model.pth'))
    else:
        train_and_eval(model, 100, optimizer, trainLoader, valLoader, loss, (X_train.shape[0]//batch_size) + 1)
        
    # Simple check with test dataset
    model.eval()
    testDataset = CSIDataset(X_test, y_test)
    testLoader = DataLoader(testDataset, shuffle=True, batch_size=X_test.shape[0])
    test(model, testLoader)
    
    # save the model
    torch.save(model.state_dict(), 'model.pth')
    
main()

torch.Size([300, 64, 40])
[5 4 2 6 2 9 6 1 0 8 9 6 6 2 7 1 0 7 5 5 9 5 1 0 9 2 7 5 1 0 3 6 1 5 6 4 0
 4 8 4 3 0 1 2 4 4 5 8 7 9 5 1 5 3 6 2 1 7 2 0 8 4 2 8 3 4 7 7 4 7 2 1 5 5
 3 9 5 5 3 3 7 6 1 3 3 6 9 0 4 8 8 3 3 9 2 4 5 3 9 2 2 3 3 4 0 5 5 0 0 8 1
 3 6 9 9 7 7 8 0 7 9 7 3 2 6 7 1 3 2 3 1 4 8 0 8 2 3 9 1 7 6 6 0 3 1 5 1 3
 2 3 5 0 3 1 0 9 6 9 1 5 9 5 2 4 1 4 8 3 1 8 6 0 2 9 6 1 8 9 1 2 8 4 6 4 4
 3 1 6 5 0 7 6 8 3 4 2 3 2 3 9 2 1 5 6 8 7 2 1 3 7 1 8 8 5 0 9 7 7 7 3 1 6
 0 9 4 9 8 0 7 0 6 3 5 4 3 3 4 4 7 5 0 3 1 5 9 9 4 8 5 0 2 6 3 5 9 3 2 9 8
 4 7 8 9 1 2 1 8 1 7 2 7 9 6 4 3 0 6 4 6 4 3 5 9 8 6 0 6 4 1 0 6 7 6 3 9 5
 0 7 0 5] tensor([5, 4, 3, 6, 2, 9, 6, 4, 0, 8, 9, 6, 6, 2, 7, 1, 0, 7, 3, 5, 9, 5, 1, 0,
        9, 2, 7, 5, 1, 0, 3, 0, 1, 5, 6, 4, 0, 4, 8, 4, 2, 0, 1, 5, 4, 4, 8, 8,
        7, 9, 5, 1, 5, 3, 0, 2, 1, 7, 2, 0, 8, 4, 2, 8, 3, 4, 7, 7, 4, 7, 3, 1,
        5, 5, 3, 9, 5, 5, 3, 7, 7, 6, 1, 3, 3, 6, 9, 0, 4, 8, 8, 3, 3, 9, 2, 4,
        5, 8, 9, 6, 2, 8, 7, 4, 0, 2, 5, 9, 