In [2]:
import time
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import torch.nn.functional as F

# Load RadioML dataset
def load_radioml_data(filepath='RML2016.10a_dict.pkl'):
    with open(filepath, 'rb') as f:
        data_dict = pickle.load(f, encoding='latin1')

    data = []
    labels = []
    for key, value in data_dict.items():
        mod_type, snr = key
        data.append(value)
        labels.extend([mod_type] * value.shape[0])

    data = np.vstack(data)
    label_set = sorted(list(set(labels)))
    label_to_int = {label: i for i, label in enumerate(label_set)}
    labels = np.array([label_to_int[label] for label in labels])

    return data, labels, label_to_int

# Dataset class for PyTorch
class RadioMLDataset(Dataset):
    def __init__(self, data, labels):
        data = data[:, np.newaxis, :, :]  # Add channel dimension (1, 2, 1024)
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Depthwise Separable Convolution Layer
class DepthwiseSeparableConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, padding=1):
        super(DepthwiseSeparableConv, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=kernel_size, padding=padding, groups=in_channels)
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x

# Original Paper Model
class PaperAMCModel(nn.Module):
    def __init__(self):
        super(PaperAMCModel, self).__init__()

        # Initial Convolution Layer with stride 2 to replace pooling
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3, 3), stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(16)

        # Second Convolution Layer with stride 2
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(32)

        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(32, 32, kernel_size=(1, 1), padding=0),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=(3, 1), padding=(1, 0)),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=(1, 3), padding=(0, 1)),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )

        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=(1, 1), padding=0),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=(3, 1), padding=(1, 0)),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=(1, 3), padding=(0, 1)),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        # Block 3
        self.block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(1, 1), padding=0),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=(3, 1), padding=(1, 0)),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=(1, 3), padding=(0, 1)),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # Global Pooling and Fully Connected Layer
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, 24)  # Assuming 24 classes in the dataset

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Optimized Lightweight Model with Depthwise Separable Convolutions
class OptimizedAMCModel(nn.Module):
    def __init__(self, num_classes=24):
        super(OptimizedAMCModel, self).__init__()

        # Initial Depthwise Separable Convolution Layer
        self.conv1 = DepthwiseSeparableConv(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)

        # Second Depthwise Separable Convolution Layer
        self.conv2 = DepthwiseSeparableConv(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)

        # Block 1
        self.block1 = nn.Sequential(
            DepthwiseSeparableConv(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            DepthwiseSeparableConv(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )

        # Block 2
        self.block2 = nn.Sequential(
            DepthwiseSeparableConv(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            DepthwiseSeparableConv(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        # Block 3
        self.block3 = nn.Sequential(
            DepthwiseSeparableConv(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            DepthwiseSeparableConv(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )

        # Global Pooling and Fully Connected Layer
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Weight Initialization
def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        if m.bias is not None:
            nn.init.zeros_(m.bias)

# Training function
def train_model(model, train_loader, criterion, optimizer, scheduler, test_loader, epochs=45):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        train_accuracy = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader):.4f}, Training Accuracy: {train_accuracy:.2f}%")
        scheduler.step()
        val_accuracy = evaluate_model(model, test_loader)
        print(f"Validation Accuracy after Epoch {epoch+1}: {val_accuracy:.2f}%")

# Evaluation function
def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    return accuracy_score(all_labels, all_preds) * 100

# Function to measure inference time
def measure_inference_time(model, test_loader):
    model.eval()
    start_time = time.time()
    with torch.no_grad():
        for inputs, _ in test_loader:
            _ = model(inputs)
    end_time = time.time()
    return (end_time - start_time) / len(test_loader.dataset)

# Main function to compare models
def main():
    # Load data
    data, labels, label_to_int = load_radioml_data('RML2016.10a_dict.pkl')
    X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

    # DataLoaders
    train_dataset = RadioMLDataset(X_train, y_train)
    test_dataset = RadioMLDataset(X_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    models = {
        "Updated Paper Model": PaperAMCModel(),
        "Updated FPGA-Optimized Model": OptimizedAMCModel(num_classes=len(label_to_int))
    }

    results = {}

    # Training and Evaluation
    for model_name, model in models.items():
        model.apply(init_weights)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-5)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.01)
        print(f"\nTraining {model_name}...")
        train_model(model, train_loader, criterion, optimizer, scheduler, test_loader, epochs=45)

        # Evaluate accuracy
        accuracy = evaluate_model(model, test_loader)

        # Measure inference time
        inference_time = measure_inference_time(model, test_loader)

        # Count parameters
        param_count = sum(p.numel() for p in model.parameters() if p.requires_grad)

        # Store results
        results[model_name] = {
            "Accuracy": accuracy,
            "Inference Time (s/sample)": inference_time,
            "Parameter Count": param_count
        }

    # Display Comparison Table
    print("\nComparison Results:")
    print(f"{'Feature':<30} {'Updated Paper Model':<25} {'Updated FPGA-Optimized Model':<25}")
    print("="*80)
    print(f"{'Convolution Type':<30} {'Standard 3x3 & Asymmetric':<25} {'Depthwise Separable':<25}")
    print(f"{'Number of Layers':<30} {'10 layers':<25} {'Fewer layers':<25}")
    print(f"{'Parameter Reduction Technique':<30} {'Bottleneck & Asymmetric':<25} {'Depthwise Separable':<25}")
    print(f"{'Skip Connections':<30} {'Used':<25} {'Not used':<25}")
    print(f"{'Pooling Strategy':<30} {'Multiple Pooling Layers':<25} {'Single Global Pooling':<25}")
    print(f"{'Fully Connected Layers':<30} {'One Fully Connected':<25} {'Minimized with Global Pooling':<25}")
    print(f"{'Parameter Count':<30} {results['Updated Paper Model']['Parameter Count']:<25} {results['Updated FPGA-Optimized Model']['Parameter Count']:<25}")
    print(f"{'Inference Time (s/sample)':<30} {results['Updated Paper Model']['Inference Time (s/sample)']:<25.6f} {results['Updated FPGA-Optimized Model']['Inference Time (s/sample)']:<25.6f}")
    print(f"{'Accuracy':<30} {results['Updated Paper Model']['Accuracy']:<25.4f} {results['Updated FPGA-Optimized Model']['Accuracy']:<25.4f}")
    print(f"{'Suitability for FPGA':<30} {'Less suitable':<25} {'Highly suitable':<25}")

if __name__ == "__main__":
    main()



Training Updated Paper Model...
Epoch 1/45, Loss: 1.8894, Training Accuracy: 37.08%
Validation Accuracy after Epoch 1: 46.76%
Epoch 2/45, Loss: 2.1687, Training Accuracy: 22.92%
Validation Accuracy after Epoch 2: 15.60%
Epoch 3/45, Loss: 1.9071, Training Accuracy: 30.77%
Validation Accuracy after Epoch 3: 36.70%
Epoch 4/45, Loss: 1.8357, Training Accuracy: 33.54%
Validation Accuracy after Epoch 4: 32.23%
Epoch 5/45, Loss: 1.7538, Training Accuracy: 35.89%
Validation Accuracy after Epoch 5: 16.17%
Epoch 6/45, Loss: 1.7358, Training Accuracy: 36.66%
Validation Accuracy after Epoch 6: 37.77%
Epoch 7/45, Loss: 1.7030, Training Accuracy: 37.84%
Validation Accuracy after Epoch 7: 37.48%
Epoch 8/45, Loss: 1.6764, Training Accuracy: 38.57%
Validation Accuracy after Epoch 8: 41.35%
Epoch 9/45, Loss: 1.6589, Training Accuracy: 39.17%
Validation Accuracy after Epoch 9: 41.76%
Epoch 10/45, Loss: 1.6503, Training Accuracy: 39.63%
Validation Accuracy after Epoch 10: 39.41%
Epoch 11/45, Loss: 1.6451