In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import torch.optim as optim
import gc
import os
import numpy as np
import shutil
import random
from sklearn.metrics import precision_score, recall_score, f1_score, roc_curve, auc, confusion_matrix
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import seaborn as sns 

In [2]:
# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device("cuda:0")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("Using CPU")


Using GPU: Tesla P100-PCIE-16GB


In [None]:
torch.cuda.empty_cache()
gc.collect()


In [3]:
in_dir = "/mnt/nis_lab_research/data/class_data/far_shah_b1-b3"
out_dir = "../../data/classifier/far_shah_b1-b3"
num_workers = 8


In [7]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to the input size expected by ResNet
    transforms.ToTensor(),
    # CHANGE TO BE DATA SPECIFIC
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


In [None]:
def tt_split(input_dir, out_dir, train_ratio):
    """
    Splits the dataset in the given directory into train and test sets.

    :param input_dir: Path to the input directory.
    :param train_ratio: Ratio of train set (between 0 and 1).
    """
    if not 0 <= train_ratio <= 1:
        raise ValueError("Train ratio must be between 0 and 1")

    base_dir = out_dir
    train_dir = os.path.join(base_dir, 'train')
    test_dir = os.path.join(base_dir, 'test')

    # Create train and test directories
    for directory in [train_dir, test_dir]:
        os.makedirs(directory, exist_ok=True)

    # Process each class directory
    for class_name in os.listdir(input_dir):
        class_dir = os.path.join(input_dir, class_name)
        if os.path.isdir(class_dir):
            # Create class directories in train and test
            os.makedirs(os.path.join(train_dir, class_name), exist_ok=True)
            os.makedirs(os.path.join(test_dir, class_name), exist_ok=True)

            # Get a list of images and shuffle them
            images = os.listdir(class_dir)
            random.shuffle(images)

            # Split images into train and test
            split_point = int(len(images) * train_ratio)
            train_images = images[:split_point]
            test_images = images[split_point:]

            # Copy images to train and test directories
            for image in train_images:
                shutil.copy2(os.path.join(class_dir, image), os.path.join(train_dir, class_name))
            for image in test_images:
                shutil.copy2(os.path.join(class_dir, image), os.path.join(test_dir, class_name))

In [None]:
tt_split(in_dir, out_dir, 0.8)

In [8]:
train_set = torchvision.datasets.ImageFolder(root='../../data/classifier/far_shah-b1-b2_cln/train', transform=transform)
train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=num_workers)


In [9]:
test_set = torchvision.datasets.ImageFolder(root='../../data/classifier/far_shah-b1-b2_cln/test', transform=transform)
test_loader = DataLoader(test_set, batch_size=128, shuffle=True, num_workers=num_workers)

In [None]:
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 26)  # 27 total classes - text captcha have 0 so it is removed for now


In [None]:
criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.Adam(model.parameters(), lr=0.001)
model = model.to(device)


In [None]:
for epoch in range(0, 25):  # loop over the dataset multiple times
    model.train()
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        print(i)
    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")
print('Finished Training')

In [None]:
# Can be saved directly from the GPU
torch.save(model, './pth/test_ep25.pth')

In [4]:
# To load the model later
model = torch.load('./pth/far_shah_b1-b3_rn50_ep25.pth')
model.eval()  # Set it to evaluation mode

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [10]:
model.eval()  # Set the model to evaluation mode

# Variables to hold predictions and actual labels
y_pred = []
y_true = []
y_score = []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        probabilities = torch.softmax(outputs, dim=1)  # Assuming outputs are raw scores from your model
        _, predicted = torch.max(outputs.data, 1)
        
        # Accumulate true labels and predictions
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_score.extend(probabilities.cpu().numpy())
        

In [51]:
len(set(y_true))

26

In [48]:
gt_by_class = []
for i in range (0, 26):
    tmp = []
    for j, gt in enumerate(y_true):
        if i == gt:
            tmp.append([gt, y_pred[j]])
        
    gt_by_class.append(tmp)
     

In [1]:
tot = 0 
for i, class_ in enumerate(gt_by_class):
    class_cntr = 0
    for inst in class_:
        if inst[0] == inst[1]:
            class_cntr += 1
    print(i, class_cntr)
    tot = tot + class_cntr / len(class_)

NameError: name 'gt_by_class' is not defined

In [56]:
tot/len(gt_by_class)

0.48408166036774475

In [None]:

# Convert accumulated predictions and labels to numpy arrays
y_pred = np.array(y_pred)
y_true = np.array(y_true)
y_score = np.array(y_score)

# Determine the unique classes in y_true and binarize
classes = np.unique(y_true)  # Identify unique class labels
y_true_binarized = label_binarize(y_true, classes=classes)

n_classes = len(classes)

# Calculate metrics
accuracy = np.mean(y_pred == y_true)
precision = precision_score(y_true, y_pred, average="weighted", labels=classes)
recall = recall_score(y_true, y_pred, average="weighted", labels=classes)
f1 = f1_score(y_true, y_pred, average="weighted", labels=classes)

# ROC Curve and AUC for Micro-average
fpr, tpr, _ = roc_curve(y_true_binarized.ravel(), y_score.ravel())
roc_auc = auc(fpr, tpr)

# Print metrics
print(f'Accuracy: {accuracy * 100:.2f}%')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

# Calculate and visualize the confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=classes)
plt.figure(figsize=(12, 12))
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

# Plot ROC Curve for Micro-average
plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='Micro-average ROC curve (area = {0:0.2f})'.format(roc_auc))
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic - Micro-average')
plt.legend(loc='lower right')
plt.show()