In [70]:
import sys
import h5py
import torch
from torch import nn, optim

# Basic pytorch setup

In [71]:
# Connect torch to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(device)

cuda


In [None]:
# Load test set
f = h5py.File('data/sdr_wifi_test.hdf5', 'r')
X_test = f['X'][()]
y_test = f['y'][()]
f.close()

# Load train set
f = h5py.File('data.sdr_wifi_train.hdf5', 'r')
X_train = f['X'][()]
y_train = f['y'][()]
f.close()

# Description of input variables

- `input_size` is the amount of frequencies that have been measured
- `hidden_size` is the amount of neurons in the hidden layer
- `num_layers` is the amount of hidden layers that perform changes to get the correct prediction (chosen based on https://stats.stackexchange.com/questions/181/how-to-choose-the-number-of-hidden-layers-and-nodes-in-a-feedforward-neural-netw)
- `output_size` is the amount of frequencies that are predicted, these are equal to the input since we want to see from the full prediction which frequency most likely has the lowest interference
- `seq_length` is the amount of history that gets taken into account to make the new prediction
- `num_epochs` is the amount of training rounds
- `learning_rate` is the rate at which the weights of the hidden layers are updated to improve prediction results (cannot be too high because it might overshoot)


In [73]:
data = torch.FloatTensor(X_train).to(device)
labels = torch.FloatTensor(y_train).to(device)

print(data.shape)
print(labels.shape)

# Neural network input
input_size = data.shape[1]
hidden_size = 64
num_layers = 1
output_size = labels.shape[1]

learning_rate = 1e-3
num_epochs = 5
batch_size = 64

torch.Size([1439971, 4, 29])
torch.Size([1439971, 4])


In [74]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # Fully connected on the last output
        out = self.sigmoid(out)  # Apply sigmoid activation
        return out

# Initialize the model, loss function, and optimizer
model = LSTMModel(input_size, hidden_size, output_size, num_layers).to(device)
criterion = nn.MSELoss()  # Binary Cross Entropy Loss for outputs between 0 and 1
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [75]:
# Training loop
for epoch in range(num_epochs):
    model.train()

    for i in range(0, len(data), batch_size):
        # Get batch
        x_batch = data[i:i+batch_size]
        y_batch = labels[i:i+batch_size]

        x_batch = x_batch.permute(0, 2, 1)  # Reshape to (batch_size, seq_len, input_size)

        # Forward pass
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        sys.stdout.write(f"\rEpoch [{epoch+1}/{num_epochs}], ({i} / {len(data)}) Loss: {loss.item():.4f}")

Epoch [5/5], (1439936 / 1439971) Loss: 0.0041

In [76]:
model.eval()
with torch.no_grad():
    total_loss = 0
    correct_predictions = 0
    total_samples = 0

    for i in range(0, len(X_test), batch_size):
        X_batch = torch.FloatTensor(X_test[i:i+batch_size]).to(device)
        y_batch = torch.FloatTensor(y_test[i:i+batch_size]).to(device)

        X_batch = X_batch.permute(0, 2, 1)  # Reshape to (batch_size, seq_len, input_size)

        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)

        total_loss += loss.item()

        # Convert probabilities to binary predictions
        predicted = (outputs > 0.5).float()

        # Calculate the number of correct predictions
        correct_predictions += (predicted == y_batch).sum().item()
        total_samples += y_batch.numel()

        sys.stdout.write(f"\rTest Loss: {loss.item():.4f}")

    accuracy = correct_predictions / total_samples
    print(f"\nTotal Test Loss: {total_loss}")
    print(f"Test Accuracy: {accuracy:.4f}")

print("\nDone!")

Test Loss: 0.0084
Total Test Loss: 13.430269373166084
Test Accuracy: 0.9931

Done!


In [83]:
# Make a prediction based on the last sample
model.eval()

with torch.no_grad():
    X_sample = torch.FloatTensor(X_test[-1]).to(device)
    X_sample = X_sample.view(1, X_sample.shape[1], X_sample.shape[0])
    output = model(X_sample)

    predicted = (output > 0.5).float()

    print(f"Predicted: {predicted}")
    print(f"Actual: {y_test[-1]}")
    print(f"Output: {output}")

Predicted: tensor([[1., 0., 0., 1.]], device='cuda:0')
Actual: [0 0 1 1]
Output: tensor([[0.9479, 0.1858, 0.3457, 0.9773]], device='cuda:0')
