In [1]:
!pip install thop

Collecting thop
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl.metadata (2.7 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch->thop)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch->thop)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch->thop)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch->thop)
  Downloading nvidia_curand_cu12-10.3.5.147-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cusolver-cu12==11.6.1.9 (from torch->thop)
  Downloading nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cusparse-cu12==12.3.1.170 (from torch->thop)
  Downloading nvidia_cusparse_cu12-12.3.1.170-py3-none-manylinux

In [2]:
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from torchsummary import summary
from thop import profile  # For FLOPs and MACs calculation
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.preprocessing import label_binarize
import os
import copy
import json

# Helper function for formatting large numbers
def format_units(num):
    """Format large numbers with units (K, M, G, etc.)"""
    magnitude = 0
    while abs(num) >= 1000:
        magnitude += 1
        num /= 1000.0
    return f"{num:.2f} {['', 'K', 'M', 'G', 'T', 'P'][magnitude]}"

# Device Configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Parameters
batch_size = 16
img_size = 224  # Standard size for models
num_epochs = 50
learning_rate = 1e-4
split_ratio = [0.7, 0.15, 0.15]  # 70% training, 15% validation, 15% test

# Dataset Directory
dataset_dir = "/kaggle/input/drone-data/clean_spectrograms"

# Data Transformations
transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.Grayscale(num_output_channels=3),  # Convert grayscale to RGB if needed
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Function to create data loaders with proper dataset splits and consistent indices
def create_data_loaders(full_dataset):
    # Get a generator with fixed seed for consistent splits
    generator = torch.Generator().manual_seed(42)
    
    # Split into Train, Validation, and Test
    train_size = int(split_ratio[0] * len(full_dataset))
    val_size = int(split_ratio[1] * len(full_dataset))
    test_size = len(full_dataset) - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = random_split(
        full_dataset, 
        [train_size, val_size, test_size],
        generator=generator
    )
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, val_loader, test_loader, train_dataset, val_dataset, test_dataset

# Load Full Dataset
full_dataset = datasets.ImageFolder(root=dataset_dir, transform=transform)

# Create data loaders
train_loader, val_loader, test_loader, train_dataset, val_dataset, test_dataset = create_data_loaders(full_dataset)

# Print dataset information
print(f"Total number of samples: {len(full_dataset)}")
class_to_idx = full_dataset.class_to_idx
print("Class to index mapping:", class_to_idx)
for class_name, idx in class_to_idx.items():
    class_samples = len([x for x, y in full_dataset.samples if y == idx])
    print(f"Class {class_name}: {class_samples} samples")

# Number of Classes
num_classes = len(full_dataset.classes)
class_names = full_dataset.classes
print(f"Number of classes: {num_classes}")
print(f"Class Names: {class_names}")

# Function to extract SNR from filename
def extract_snr(filename):
    try:
        # Assuming filename format like "sample_0_snr_-14.png"
        parts = os.path.basename(filename).split('_')
        snr_idx = parts.index('snr') + 1
        return int(parts[snr_idx])
    except (ValueError, IndexError):
        return None

# Define a function to train and evaluate a model
def train_and_evaluate_model(model_name, model, train_loader, val_loader, test_loader, test_dataset, full_dataset):
    print(f"\n{'='*50}")
    print(f"Training and Evaluating {model_name}")
    print(f"{'='*50}")
    
    # Move model to device
    model = model.to(device)
    
    # Print model summary
    print(f"\n{model_name} Summary:")
    try:
        summary(model, (3, img_size, img_size))
    except Exception as e:
        print(f"Could not generate detailed summary due to: {str(e)}")
        print("Continuing with training...")
    
    # Loss and Optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
    
    # Training Loop
    best_val_acc = 0.0
    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
    early_stop_counter = 0
    early_stop_patience = 5
    best_model_weights = None
    
    print(f"\nStarting training {model_name}...")
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
            # Print statistics (optional)
            if (i + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
        avg_train_loss = running_loss / len(train_loader)
        history['train_loss'].append(avg_train_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = correct / total
        history['val_loss'].append(avg_val_loss)
        history['val_acc'].append(val_accuracy)
        
        # Update learning rate scheduler
        scheduler.step(avg_val_loss)
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')
        
        # Save the best model
        if val_accuracy > best_val_acc:
            best_val_acc = val_accuracy
            best_model_weights = copy.deepcopy(model.state_dict())
            torch.save(model.state_dict(), f"{model_name.lower().replace('-', '_')}_drone_rf.pth")
            print(f"Model saved with validation accuracy: {val_accuracy:.4f}")
            early_stop_counter = 0
        else:
            early_stop_counter += 1
        
        # Early stopping
        if early_stop_counter >= early_stop_patience:
            print(f"Early stopping triggered after {epoch+1} epochs")
            break
    
    # Plot training history
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.legend()
    plt.title(f'{model_name} - Loss Over Epochs')
    
    plt.subplot(1, 2, 2)
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.legend()
    plt.title(f'{model_name} - Accuracy Over Epochs')
    plt.savefig(f"{model_name.lower().replace('-', '_')}_drone_rf_training_history.png")
    plt.close()
    
    # Load the best model for evaluation
    model.load_state_dict(best_model_weights)
    model.eval()
    
    # Testing the Model
    print(f"\nEvaluating {model_name} on test set...")
    y_true, y_pred, y_scores = [], [], []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs.data, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
            y_scores.extend(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())
    
    # Classification Report
    cls_report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
    print(f"\n{model_name} Classification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))
    
    # Confusion Matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title(f"{model_name} Confusion Matrix")
    plt.savefig(f"{model_name.lower().replace('-', '_')}_drone_rf_confusion_matrix.png")
    plt.close()
    
    # ROC Curve (for multi-class classification)
    roc_auc = None
    if num_classes > 2:
        y_true_bin = label_binarize(y_true, classes=np.arange(num_classes))
        y_scores_array = np.array(y_scores)
        
        roc_auc = roc_auc_score(y_true_bin, y_scores_array, multi_class="ovr")
        print(f"\n{model_name} Multi-class ROC AUC Score: {roc_auc:.4f}")
    
        plt.figure(figsize=(10, 8))
        for i in range(num_classes):
            fpr, tpr, _ = roc_curve(y_true_bin[:, i], y_scores_array[:, i])
            auc_score = roc_auc_score(y_true_bin[:, i], y_scores_array[:, i])
            plt.plot(fpr, tpr, label=f"Class {class_names[i]} (AUC = {auc_score:.2f})")
    
        plt.plot([0, 1], [0, 1], "k--")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title(f"{model_name} ROC Curve")
        plt.legend()
        plt.savefig(f"{model_name.lower().replace('-', '_')}_drone_rf_roc_curve.png")
        plt.close()
    
    # SNR-based performance analysis
    print(f"\nAnalyzing {model_name} performance by SNR levels...")
    
    # Create a dictionary to store predictions by SNR
    snr_results = {}
    
    # Re-run through test dataset to get filenames and predictions
    test_dataset_files = [full_dataset.samples[i][0] for i in test_dataset.indices]
    test_dataset_labels = [full_dataset.samples[i][1] for i in test_dataset.indices]
    
    # Match predictions with SNR values
    for i, (file_path, true_label) in enumerate(zip(test_dataset_files, test_dataset_labels)):
        snr = extract_snr(file_path)
        if snr is not None:
            if snr not in snr_results:
                snr_results[snr] = {'correct': 0, 'total': 0}
            snr_results[snr]['total'] += 1
            if y_pred[i] == y_true[i]:
                snr_results[snr]['correct'] += 1
    
    # Calculate accuracy by SNR
    snr_accuracy = {snr: results['correct'] / results['total'] 
                    for snr, results in snr_results.items() if results['total'] > 0}
    
    # Plot SNR vs. Accuracy
    sorted_snrs = sorted(snr_accuracy.keys())
    accuracies = [snr_accuracy[snr] for snr in sorted_snrs]
    
    plt.figure(figsize=(10, 6))
    plt.plot(sorted_snrs, accuracies, 'o-')
    plt.xlabel('Signal-to-Noise Ratio (dB)')
    plt.ylabel('Classification Accuracy')
    plt.title(f'{model_name} Performance vs. Signal-to-Noise Ratio')
    plt.grid(True)
    plt.savefig(f"{model_name.lower().replace('-', '_')}_drone_rf_snr_performance.png")
    plt.close()
    
    # Print SNR performance table
    print(f"\n{model_name} Performance by SNR level:")
    print("SNR (dB) | Accuracy | Samples")
    print("-" * 30)
    snr_table = []
    for snr in sorted_snrs:
        acc = snr_accuracy[snr]
        samples = snr_results[snr]['total']
        print(f"{snr:7d} | {acc:.4f} | {samples}")
        snr_table.append({"snr": snr, "accuracy": acc, "samples": samples})
    
    # Inference Time Calculation
    sample_input = torch.randn(1, 3, img_size, img_size).to(device)
    num_samples = 100
    start_time = time.time()
    with torch.no_grad():
        for _ in range(num_samples):
            _ = model(sample_input)
    inference_time = (time.time() - start_time) / num_samples
    
    # Number of Parameters
    num_params = sum(p.numel() for p in model.parameters())
    
    # FLOPs & MACs Calculation
    try:
        flops, macs = profile(model, inputs=(sample_input,), verbose=False)
    except Exception as e:
        print(f"Error calculating FLOPs and MACs: {str(e)}")
        flops, macs = 0, 0
    
    # Convert inference time to milliseconds
    inference_time_ms = inference_time * 1000
    
    print(f"\n{model_name} Total Number of Parameters: {num_params:,}")
    print(f"{model_name} Average Inference Time per Sample: {inference_time_ms:.3f} ms")
    print(f"{model_name} FLOPs: {flops:,} ({format_units(flops)})")
    print(f"{model_name} MACs: {macs:,} ({format_units(macs)})\n")
    
    # Per-class accuracy
    class_accuracy = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
    class_acc_dict = {}
    for i, acc in enumerate(class_accuracy):
        print(f"✅ {model_name} Accuracy for class '{class_names[i]}': {acc:.2%}")
        class_acc_dict[class_names[i]] = float(acc)
    
    # Calculate and display test accuracy with 3 decimal places
    test_correct = sum([1 for i, j in zip(y_true, y_pred) if i == j])
    test_total = len(y_true)
    test_accuracy = test_correct / test_total
    print(f"\n✅ {model_name} Test Set Accuracy: {test_accuracy:.3f}")
    
    # Calculate and display model size in MB
    # Each parameter is typically stored as a 32-bit float (4 bytes)
    model_size_bytes = num_params * 4
    model_size_mb = model_size_bytes / (1024 * 1024)
    print(f"📊 {model_name} Model Size: {model_size_mb:.2f} MB")
    
    # Model characteristics based on model name
    characteristics = []
    if model_name == "SqueezeNet":
        characteristics = [
            "Extremely lightweight model (around 3MB)",
            "Uses 'fire modules' with squeeze and expand layers",
            "Achieves AlexNet-level accuracy with 50x fewer parameters",
            "Ideal for resource-constrained environments like drones"
        ]
    elif model_name == "ShuffleNet":
        characteristics = [
            "Designed for mobile devices with limited computing power",
            "Uses pointwise group convolutions and channel shuffle operations",
            "Very computationally efficient with low FLOPs",
            "Good balance between accuracy and model size"
        ]
    elif model_name == "EfficientNet":
        characteristics = [
            "Uses compound scaling to balance network depth, width, and resolution",
            "Achieves state-of-the-art accuracy with fewer parameters",
            "Family of models with different size-accuracy tradeoffs (B0-B7)",
            "Uses MBConv blocks with squeeze-and-excitation optimization"
        ]
    
    # Print characteristics
    print(f"\nKey characteristics of {model_name}:")
    for char in characteristics:
        print(f"- {char}")
    
    # Save all metrics to a JSON file
    metrics = {
        "model_name": model_name,
        "test_accuracy": float(test_accuracy),
        "inference_time_ms": float(inference_time_ms),
        "model_size_mb": float(model_size_mb),
        "parameters": int(num_params),
        "flops": int(flops),
        "macs": int(macs),
        "roc_auc_score": float(roc_auc) if roc_auc is not None else None,
        "per_class_accuracy": class_acc_dict,
        "snr_performance": snr_table,
        "classification_report": cls_report,
        "characteristics": characteristics
    }
    
    with open(f"{model_name.lower().replace('-', '_')}_drone_rf_metrics.json", "w") as f:
        json.dump(metrics, f, indent=4)
    
    print(f"\nMetrics saved to {model_name.lower().replace('-', '_')}_drone_rf_metrics.json")
    
    return model

# Initialize and train models sequentially

# 1. SqueezeNet
print("\n\n" + "="*80)
print("TRAINING SQUEEZENET")
print("="*80)
squeezenet = models.squeezenet1_1(pretrained=True)
squeezenet.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1, 1), stride=(1, 1))
train_and_evaluate_model('SqueezeNet', squeezenet, train_loader, val_loader, test_loader, test_dataset, full_dataset)

# 2. ShuffleNet
print("\n\n" + "="*80)
print("TRAINING SHUFFLENET")
print("="*80)
shufflenet = models.shufflenet_v2_x1_0(pretrained=True)
shufflenet.fc = nn.Linear(shufflenet.fc.in_features, num_classes)
train_and_evaluate_model('ShuffleNet', shufflenet, train_loader, val_loader, test_loader, test_dataset, full_dataset)

# 3. EfficientNet
print("\n\n" + "="*80)
print("TRAINING EFFICIENTNET")
print("="*80)
efficientnet = models.efficientnet_b0(pretrained=True)
efficientnet.classifier[1] = nn.Linear(efficientnet.classifier[1].in_features, num_classes)
train_and_evaluate_model('EfficientNet', efficientnet, train_loader, val_loader, test_loader, test_dataset, full_dataset)

print("\n\nAll models have been trained and evaluated!")
print("Results have been saved to individual files for each model.")

Using device: cuda


Downloading: "https://download.pytorch.org/models/squeezenet1_1-b8a52dc0.pth" to /root/.cache/torch/hub/checkpoints/squeezenet1_1-b8a52dc0.pth


Total number of samples: 17744
Class to index mapping: {'DJI': 0, 'FutabaT14': 1, 'FutabaT7': 2, 'Graupner': 3, 'Noise': 4, 'Taranis': 5, 'Turnigy': 6}
Class DJI: 1280 samples
Class FutabaT14: 3472 samples
Class FutabaT7: 801 samples
Class Graupner: 801 samples
Class Noise: 8872 samples
Class Taranis: 1663 samples
Class Turnigy: 855 samples
Number of classes: 7
Class Names: ['DJI', 'FutabaT14', 'FutabaT7', 'Graupner', 'Noise', 'Taranis', 'Turnigy']


TRAINING SQUEEZENET


100%|██████████| 4.73M/4.73M [00:00<00:00, 100MB/s]


Training and Evaluating SqueezeNet






SqueezeNet Summary:
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 111, 111]           1,792
              ReLU-2         [-1, 64, 111, 111]               0
         MaxPool2d-3           [-1, 64, 55, 55]               0
            Conv2d-4           [-1, 16, 55, 55]           1,040
              ReLU-5           [-1, 16, 55, 55]               0
            Conv2d-6           [-1, 64, 55, 55]           1,088
              ReLU-7           [-1, 64, 55, 55]               0
            Conv2d-8           [-1, 64, 55, 55]           9,280
              ReLU-9           [-1, 64, 55, 55]               0
             Fire-10          [-1, 128, 55, 55]               0
           Conv2d-11           [-1, 16, 55, 55]           2,064
             ReLU-12           [-1, 16, 55, 55]               0
           Conv2d-13           [-1, 64, 55, 55]           1,088
             ReLU-

Downloading: "https://download.pytorch.org/models/shufflenetv2_x1-5666bf0f80.pth" to /root/.cache/torch/hub/checkpoints/shufflenetv2_x1-5666bf0f80.pth
100%|██████████| 8.79M/8.79M [00:00<00:00, 142MB/s]



Training and Evaluating ShuffleNet

ShuffleNet Summary:
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 24, 112, 112]             648
       BatchNorm2d-2         [-1, 24, 112, 112]              48
              ReLU-3         [-1, 24, 112, 112]               0
         MaxPool2d-4           [-1, 24, 56, 56]               0
            Conv2d-5           [-1, 24, 28, 28]             216
       BatchNorm2d-6           [-1, 24, 28, 28]              48
            Conv2d-7           [-1, 58, 28, 28]           1,392
       BatchNorm2d-8           [-1, 58, 28, 28]             116
              ReLU-9           [-1, 58, 28, 28]               0
           Conv2d-10           [-1, 58, 56, 56]           1,392
      BatchNorm2d-11           [-1, 58, 56, 56]             116
             ReLU-12           [-1, 58, 56, 56]               0
           Conv2d-13           [-1, 58, 28, 28

Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth



ShuffleNet Total Number of Parameters: 1,260,779
ShuffleNet Average Inference Time per Sample: 6.554 ms
ShuffleNet FLOPs: 151,691,960.0 (151.69 M)
ShuffleNet MACs: 1,260,779.0 (1.26 M)

✅ ShuffleNet Accuracy for class 'DJI': 77.00%
✅ ShuffleNet Accuracy for class 'FutabaT14': 82.85%
✅ ShuffleNet Accuracy for class 'FutabaT7': 88.17%
✅ ShuffleNet Accuracy for class 'Graupner': 96.26%
✅ ShuffleNet Accuracy for class 'Noise': 98.17%
✅ ShuffleNet Accuracy for class 'Taranis': 97.01%
✅ ShuffleNet Accuracy for class 'Turnigy': 90.23%

✅ ShuffleNet Test Set Accuracy: 0.925
📊 ShuffleNet Model Size: 4.81 MB

Key characteristics of ShuffleNet:
- Designed for mobile devices with limited computing power
- Uses pointwise group convolutions and channel shuffle operations
- Very computationally efficient with low FLOPs
- Good balance between accuracy and model size

Metrics saved to shufflenet_drone_rf_metrics.json


TRAINING EFFICIENTNET


100%|██████████| 20.5M/20.5M [00:00<00:00, 159MB/s]



Training and Evaluating EfficientNet

EfficientNet Summary:
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 32, 112, 112]             864
       BatchNorm2d-2         [-1, 32, 112, 112]              64
              SiLU-3         [-1, 32, 112, 112]               0
            Conv2d-4         [-1, 32, 112, 112]             288
       BatchNorm2d-5         [-1, 32, 112, 112]              64
              SiLU-6         [-1, 32, 112, 112]               0
 AdaptiveAvgPool2d-7             [-1, 32, 1, 1]               0
            Conv2d-8              [-1, 8, 1, 1]             264
              SiLU-9              [-1, 8, 1, 1]               0
           Conv2d-10             [-1, 32, 1, 1]             288
          Sigmoid-11             [-1, 32, 1, 1]               0
SqueezeExcitation-12         [-1, 32, 112, 112]               0
           Conv2d-13         [-1, 16, 112,