ResNeXT 50 architecture implementation

In [1]:
import torch
import torch.nn as nn
print(torch.__version__)
print(torch.cuda.is_available())

2.1.1+cu121
True


In [2]:
class CardinalityBlock(nn.Module):
    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1, C=32):
        super(CardinalityBlock, self).__init__()
        self.expansion = 2
        self.C = C
        self.small_out_channels = out_channels // C
        self.branch = self._make_branch(in_channels, stride)
        self.identity_downsample = identity_downsample
        self.relu = nn.ReLU()

    def _make_branch(self, in_channels, stride):
        branch_layers = nn.Sequential(
            nn.Conv2d(in_channels, self.small_out_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(self.small_out_channels),
            nn.Conv2d(self.small_out_channels, self.small_out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(self.small_out_channels),
            nn.Conv2d(self.small_out_channels, self.small_out_channels * self.expansion, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(self.small_out_channels * self.expansion)
        )
        return branch_layers

    def forward(self, x):
        identity = x
        branches = [self.branch(x) for _ in range(self.C)]
        x = torch.cat(branches, 1)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x

In [3]:
class ResNeXt(nn.Module):
    def __init__(self, cardinalityBlock, num_repeat, image_channels, num_classes):
        super(ResNeXt, self).__init__()
        self.in_channels = 64
        self.initial_layers = self._init_layers(image_channels)
        self.conv2, self.conv3, self.conv4, self.conv5 = self._make_layers(cardinalityBlock, num_repeat)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(1024 * 2, num_classes)

    def _init_layers(self, image_channels):
        layers = nn.Sequential(
            nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        return layers

    def _make_layers(self, block, num_repeat):
        layers = []
        out_channels_list = [128, 256, 512, 1024]
        for idx, num_blocks in enumerate(num_repeat):
            stride = 1 if idx == 0 else 2
            layer = self._create_resBlock(block, num_blocks, out_channels_list[idx], stride)
            layers.append(layer)
        return layers

    def _create_resBlock(self, block, num_blocks, out_channels, stride):
        identity_downsample = None
        if self.in_channels != out_channels * 2:
            identity_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * 2, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels * 2)
            )
        layers = [block(self.in_channels, out_channels, identity_downsample, stride)]
        self.in_channels = out_channels * 2
        layers.extend([block(self.in_channels, out_channels) for _ in range(num_blocks - 1)])
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.initial_layers(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x

In [4]:
def ResNeXt50(image_channels=3, num_classes=1000):
    return ResNeXt(CardinalityBlock, [3, 4, 6, 3], image_channels, num_classes)

In [5]:
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
from tqdm import tqdm

In [6]:
# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to the input size of the model
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Loading datasets
train_data = ImageFolder(root='train', transform=transform)
val_data = ImageFolder(root='validation', transform=transform)
test_data = ImageFolder(root='test', transform=transform)

# Creating data loaders
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

In [7]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Initialize the model
model = ResNeXt50()
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 2)  # Adjust the number of output classes
model = model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


Using device: cuda


In [8]:
# from tqdm import tqdm

# def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, patience=3):
#     model.to(device)
#     best_val_loss = float('inf')
#     epochs_no_improve = 0
#     best_model_state = None

#     for epoch in range(num_epochs):
#         model.train()
#         total_train_loss = 0

#         # Training loop with progress bar
#         train_progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs} [Training]', unit='batch')
#         for inputs, labels in train_progress_bar:
#             inputs, labels = inputs.to(device), labels.to(device)
#             optimizer.zero_grad()
#             outputs = model(inputs)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()

#             total_train_loss += loss.item()

#         # Validation loop with progress bar
#         model.eval()
#         total_val_loss = 0
#         val_progress_bar = tqdm(val_loader, desc=f'Epoch {epoch + 1}/{num_epochs} [Validation]', unit='batch')
#         with torch.no_grad():
#             for inputs, labels in val_progress_bar:
#                 inputs, labels = inputs.to(device), labels.to(device)
#                 outputs = model(inputs)
#                 loss = criterion(outputs, labels)
#                 total_val_loss += loss.item()

#         avg_train_loss = total_train_loss / len(train_loader)
#         avg_val_loss = total_val_loss / len(val_loader)

#         print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')

#         # Early stopping logic
#         if avg_val_loss < best_val_loss:
#             best_val_loss = avg_val_loss
#             best_model_state = model.state_dict()
#             epochs_no_improve = 0
#         else:
#             epochs_no_improve += 1
#             if epochs_no_improve == patience:
#                 print(f'Early stopping triggered after {epoch + 1} epochs!')
#                 model.load_state_dict(best_model_state)
#                 break

#     return model

In [11]:
# New Training Function
import time
import torch

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, patience=3, save_best_model=False, verbose=True):
    model.to(device)
    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None
    # scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.01) 

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        total_train_correct = 0
        start_time = time.time()

        # Training loop with progress bar
        train_progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs} [Training]', unit='batch')
        for inputs, labels in train_progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            total_train_correct += torch.sum(preds == labels.data)

        # scheduler.step()  # Update learning rate

        # Validation loop with progress bar
        model.eval()
        total_val_loss = 0
        total_val_correct = 0
        with torch.no_grad():
            val_progress_bar = tqdm(val_loader, desc=f'Epoch {epoch + 1}/{num_epochs} [Validation]', unit='batch')
            for inputs, labels in val_progress_bar:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                total_val_loss += loss.item()
                _, preds = torch.max(outputs, 1)
                total_val_correct += torch.sum(preds == labels.data)

        avg_train_loss = total_train_loss / len(train_loader.dataset)
        avg_val_loss = total_val_loss / len(val_loader.dataset)
        train_acc = total_train_correct.double() / len(train_loader.dataset)
        val_acc = total_val_correct.double() / len(val_loader.dataset)
        epoch_duration = time.time() - start_time

        if verbose:
            print(f'Epoch {epoch + 1}/{num_epochs}, Duration: {epoch_duration:.2f}s, Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}')

        # Early stopping and saving best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_model_state = model.state_dict()
            if save_best_model:
                torch.save(model.state_dict(), 'best_model.pth')
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve == patience:
                if verbose:
                    print(f'Early stopping triggered after {epoch + 1} epochs!')
                model.load_state_dict(best_model_state)
                break

    return model

In [13]:
num_epochs = 20  # Adjust the number of epochs as needed
patience = 5     # Adjust the patience for early stopping
save_best_model = True  # Set to True if you want to save the best model

# Call the updated train_model function with all required arguments
trained_model = train_model(
    model, 
    train_loader, 
    val_loader, 
    criterion, 
    optimizer, 
    num_epochs=num_epochs, 
    patience=patience, 
    save_best_model=save_best_model,
    verbose=True  
)

# Optionally, you can save the final trained model state as well
model_save_path = 'final_trained_model.pth'
torch.save(trained_model.state_dict(), model_save_path)
print(f'Final trained model state dictionary saved to {model_save_path}')

Epoch 1/20 [Training]: 100%|██████████████████████████████████████████████████████| 225/225 [05:29<00:00,  1.46s/batch]
Epoch 1/20 [Validation]: 100%|██████████████████████████████████████████████████████| 48/48 [00:22<00:00,  2.09batch/s]


Epoch 1/20, Duration: 352.39s, Train Loss: 0.0104, Train Acc: 0.8470, Val Loss: 0.0113, Val Acc: 0.8210


Epoch 2/20 [Training]: 100%|██████████████████████████████████████████████████████| 225/225 [05:26<00:00,  1.45s/batch]
Epoch 2/20 [Validation]: 100%|██████████████████████████████████████████████████████| 48/48 [00:19<00:00,  2.45batch/s]


Epoch 2/20, Duration: 345.93s, Train Loss: 0.0101, Train Acc: 0.8487, Val Loss: 0.0137, Val Acc: 0.8242


Epoch 3/20 [Training]: 100%|██████████████████████████████████████████████████████| 225/225 [05:25<00:00,  1.44s/batch]
Epoch 3/20 [Validation]: 100%|██████████████████████████████████████████████████████| 48/48 [00:22<00:00,  2.10batch/s]


Epoch 3/20, Duration: 347.96s, Train Loss: 0.0095, Train Acc: 0.8622, Val Loss: 0.0121, Val Acc: 0.8197


Epoch 4/20 [Training]: 100%|██████████████████████████████████████████████████████| 225/225 [05:23<00:00,  1.44s/batch]
Epoch 4/20 [Validation]: 100%|██████████████████████████████████████████████████████| 48/48 [00:22<00:00,  2.15batch/s]


Epoch 4/20, Duration: 346.01s, Train Loss: 0.0098, Train Acc: 0.8710, Val Loss: 1.5453, Val Acc: 0.4564


Epoch 5/20 [Training]: 100%|██████████████████████████████████████████████████████| 225/225 [05:26<00:00,  1.45s/batch]
Epoch 5/20 [Validation]: 100%|██████████████████████████████████████████████████████| 48/48 [00:22<00:00,  2.12batch/s]


Epoch 5/20, Duration: 349.02s, Train Loss: 0.0097, Train Acc: 0.8665, Val Loss: 0.1004, Val Acc: 0.8099


Epoch 6/20 [Training]: 100%|██████████████████████████████████████████████████████| 225/225 [05:27<00:00,  1.46s/batch]
Epoch 6/20 [Validation]: 100%|██████████████████████████████████████████████████████| 48/48 [00:22<00:00,  2.09batch/s]

Epoch 6/20, Duration: 350.90s, Train Loss: 0.0092, Train Acc: 0.8669, Val Loss: 0.2242, Val Acc: 0.8164
Early stopping triggered after 6 epochs!
Final trained model state dictionary saved to final_trained_model.pth





FINAL TRAINED MODEL TESTING

In [17]:
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

# Load the model architecture
model = ResNeXt50()
model_save_path = 'final_trained_model.pth'  # Replace with your model's path
model.fc = nn.Linear(num_features, 2)

# Load the saved state dictionary
model.load_state_dict(torch.load(model_save_path, map_location=device))

# Move model to the right device and set to evaluation mode
model.to(device)
model.eval()

ResNeXt(
  (initial_layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): CardinalityBlock(
      (branch): Sequential(
        (0): Conv2d(64, 4, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (3): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): Conv2d(4, 8, kernel_size=(1, 1), stride=(1, 1))
        (5): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (identity_downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(

In [29]:
scores = []
labels = []

with torch.no_grad():
    for inputs, target_labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)

        # If your model outputs logits, convert them to probabilities using sigmoid or softmax (as appropriate)
        probabilities = torch.sigmoid(outputs).cpu().numpy()
        scores.extend(probabilities)
        labels.extend(target_labels.cpu().numpy())

In [30]:
print(scores)

[array([1., 0.], dtype=float32), array([1., 0.], dtype=float32), array([0.6661837, 0.3486223], dtype=float32), array([0.70131475, 0.32202497], dtype=float32), array([0.9466084 , 0.06499086], dtype=float32), array([1., 0.], dtype=float32), array([1., 0.], dtype=float32), array([1., 0.], dtype=float32), array([0.5407042 , 0.47433913], dtype=float32), array([0.8894755 , 0.12946999], dtype=float32), array([0.77722615, 0.24138257], dtype=float32), array([0.85954684, 0.16254094], dtype=float32), array([0.9126425 , 0.09557277], dtype=float32), array([1., 0.], dtype=float32), array([0.9978496 , 0.00318731], dtype=float32), array([0.9842593 , 0.02132888], dtype=float32), array([0.928491  , 0.08727547], dtype=float32), array([0.99887925, 0.00148984], dtype=float32), array([0.9049505 , 0.11096736], dtype=float32), array([0.69279104, 0.32414496], dtype=float32), array([1., 0.], dtype=float32), array([1., 0.], dtype=float32), array([0.83275867, 0.18525063], dtype=float32), array([0.9571998 , 0.0531

In [31]:
def calculate_metrics(labels, scores, far_target=1e-3):
    labels = np.array(labels)
    scores = np.array(scores)[:, 1]  # Take the probabilities of the positive class

    # Accuracy
    predictions = (scores > 0.5).astype(int)  # Using 0.5 as the threshold
    accuracy = accuracy_score(labels, predictions)

    # Calculate ROC Curve and EER
    fpr, tpr, thresholds = roc_curve(labels, scores)
    eer = brentq(lambda x: 1. - x - interp1d(fpr, tpr)(x), 0., 1.)

    # Find TAR at specified FAR
    far_index = np.where(fpr <= far_target)[0][-1]
    tar_at_far = tpr[far_index]

    return accuracy, eer, tar_at_far

# Calculate the metrics
accuracy, eer, tar_at_far = calculate_metrics(labels, scores)

print(f'Accuracy: {accuracy:.4f}')
print(f'EER: {eer:.4f}')
print(f'TAR at FAR={1e-3}: {tar_at_far:.4f}')

Accuracy: 0.8198
EER: 0.2898
TAR at FAR=0.001: 0.0174


Best Trained

In [32]:
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

# Load the model architecture
model = ResNeXt50()
model_save_path = 'best_model.pth'  # Replace with your model's path
model.fc = nn.Linear(num_features, 2)

# Load the saved state dictionary
model.load_state_dict(torch.load(model_save_path, map_location=device))

# Move model to the right device and set to evaluation mode
model.to(device)
model.eval()

ResNeXt(
  (initial_layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): CardinalityBlock(
      (branch): Sequential(
        (0): Conv2d(64, 4, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (3): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): Conv2d(4, 8, kernel_size=(1, 1), stride=(1, 1))
        (5): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (identity_downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1))
        (1): BatchNorm2d(

In [33]:
scores = []
labels = []

with torch.no_grad():
    for inputs, target_labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)

        # If your model outputs logits, convert them to probabilities using sigmoid or softmax (as appropriate)
        probabilities = torch.sigmoid(outputs).cpu().numpy()
        scores.extend(probabilities)
        labels.extend(target_labels.cpu().numpy())

In [34]:
def calculate_metrics(labels, scores, far_target=1e-3):
    labels = np.array(labels)
    scores = np.array(scores)[:, 1]  # Take the probabilities of the positive class

    # Accuracy
    predictions = (scores > 0.5).astype(int)  # Using 0.5 as the threshold
    accuracy = accuracy_score(labels, predictions)

    # Calculate ROC Curve and EER
    fpr, tpr, thresholds = roc_curve(labels, scores)
    eer = brentq(lambda x: 1. - x - interp1d(fpr, tpr)(x), 0., 1.)

    # Find TAR at specified FAR
    far_index = np.where(fpr <= far_target)[0][-1]
    tar_at_far = tpr[far_index]

    return accuracy, eer, tar_at_far

# Calculate the metrics
accuracy, eer, tar_at_far = calculate_metrics(labels, scores)

print(f'Accuracy: {accuracy:.4f}')
print(f'EER: {eer:.4f}')
print(f'TAR at FAR={1e-3}: {tar_at_far:.4f}')

Accuracy: 0.8191
EER: 0.2506
TAR at FAR=0.001: 0.0451
