In [1]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import pickle

In [21]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


Basic Block: Conv → BN → ReLU → Conv → BN → Add skip → ReLU

In [16]:
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=2, downsample=None):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=1)
        self.batchnorm1 = nn.BatchNorm1d(out_channels)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv1d(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size, padding=1)
        self.batchnorm2 = nn.BatchNorm1d(out_channels)
        # add skip
        self.downsample = downsample
        self.relu2 = nn.ReLU(inplace=True)


    def forward(self, x):
        ip = x
        x = self.conv1(x)
        x = self.batchnorm1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.batchnorm2(x)
        
        if self.downsample is not None:
            ip = self.downsample(ip)
        x = x + ip # add the skipped input
        
        x = self.relu2(x)

        return x   
    

In [3]:
with open(r"C:\Users\tom_r\Desktop\Machine-Learning\RF_spectrum_analysis\subset.pkl", 'rb') as file:
    dataset = pickle.load(file)

In [4]:
for k, v in dataset.items():
    print(f"{k}: shape {v.shape}, dtype {v.dtype}")

X: shape (532480, 1024, 2), dtype float16
y: shape (532480,), dtype int64
snr: shape (532480,), dtype int64


In [5]:
unique_labels = np.unique(dataset['y'])
label_map = {old: new for new, old in enumerate(unique_labels)}

# Remap y labels
mapped_y = np.vectorize(label_map.get)(dataset['y'])

In [6]:
X_tensor = torch.tensor(dataset['X'], dtype=torch.float32)  
X_tensor = X_tensor.permute(0, 2, 1)  # (N, 2, 1024)

In [17]:
downsample = nn.Sequential(
    nn.Conv1d(in_channels=2, out_channels=64, kernel_size=1, stride=2),
    nn.BatchNorm1d(64)
)

basic = BasicBlock(in_channels=2, out_channels=64, downsample=downsample)


output = basic.forward(X_tensor[0].unsqueeze(0)) 
output.shape

torch.Size([1, 64, 512])

In [32]:
class ResNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.layer1 = BasicBlock(2, 64, downsample=nn.Sequential(
            nn.Conv1d(2, 64, kernel_size=1, stride=2),
            nn.BatchNorm1d(64)
        ))

        self.layer2 = BasicBlock(64, 128, downsample=nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=1, stride=2),
            nn.BatchNorm1d(128)
        ))

        self.layer3 = BasicBlock(128, 256, downsample=nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=1, stride=2),
            nn.BatchNorm1d(256)
        ))

        self.layer4 = BasicBlock(256, 512, downsample=nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=1, stride=2),
            nn.BatchNorm1d(512)
        ))

        #classifier block
        self.adppool = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Linear(512, 5)
        

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        return x


In [28]:
class RFDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32).permute(0, 2, 1)  # (N, 2, 1024)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

rf_dataset = RFDataset(dataset['X'], mapped_y)
rf_loader = DataLoader(rf_dataset, batch_size=254, shuffle=True)

# calculate steps per epoch for training
trainSteps = len(rf_loader.dataset) // 64
trainSteps

8320

In [29]:
model = ResNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 5 

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for batch_idx, (batch_X, batch_y) in enumerate(rf_loader):
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == batch_y).sum().item()
        total += batch_y.size(0)

        # Print progress every 100 batches
        if (batch_idx + 1) % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(rf_loader)}], Loss: {loss.item():.4f}")

    avg_loss = total_loss / len(rf_loader)
    accuracy = 100 * correct / total
    print(f"Epoch {epoch+1} finished → Avg Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")


RuntimeError: Expected target size [254, 64], got [254]