In [18]:
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet50, ResNet50_Weights
import torch.nn as nn
from PIL import Image
from utils import CarDDDataset

# Load the .pt dataset
train_data = torch.load("train_dataset.pt")

# # Define a custom Dataset class
# class CustomDataset(Dataset):
#     def __init__(self, data, transform=None):
#         self.data = data
#         self.transform = transform

#     def __len__(self):
#         return len(self.data)

#     def __getitem__(self, idx):
#         item = self.data[idx]
#         image = item["image"]  # PIL image
#         label = item["labels"]  # Tensor

#         if self.transform:
#             image = self.transform(image)

#         return image, label

# # Define image transformations (resize, normalize, etc.)
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),  # Resize to ResNet's input size
#     transforms.ToTensor(),          # Convert to tensor
#     transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize using ImageNet stats
#                          std=[0.229, 0.224, 0.225])
# ])

# Define a custom Dataset class
class CustomDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        # If no transform is provided, use ResNet50 default transforms
        if transform is None:
            weights = ResNet50_Weights.DEFAULT
            self.transform = weights.transforms()
        else:
            self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        try:
            item = self.data[idx]
            image = item["image"]  # PIL image
            label = item["labels"]  # Tensor
        
            if self.transform:
                image = self.transform(image)
            
            return image, label
        except Exception as e:
            print(f"Error loading item {idx}: {str(e)}")
            raise
    # Example usage:
"""
# Load your dataset
dataset = torch.load("train_dataset.pt")

# Create dataset instance (will use ResNet50 transforms by default)
custom_dataset = CustomDataset(data=dataset)

# Create dataloader
dataloader = DataLoader(
    custom_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4
)
"""

'\n# Load your dataset\ndataset = torch.load("train_dataset.pt")\n\n# Create dataset instance (will use ResNet50 transforms by default)\ncustom_dataset = CustomDataset(data=dataset)\n\n# Create dataloader\ndataloader = DataLoader(\n    custom_dataset,\n    batch_size=32,\n    shuffle=True,\n    num_workers=4\n)\n'

In [19]:
# Create a dataset and DataLoader
train_dataset = CustomDataset(train_data)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0,  # Start with 0 workers for debugging
        pin_memory=True if torch.cuda.is_available() else False)

In [20]:
# Load pre-trained ResNet50 model
model = resnet50(weights=ResNet50_Weights.DEFAULT)

# Freeze all layers except the final layer
for param in model.parameters():
    param.requires_grad = False

# Replace the final fully connected layer
num_classes = 6  # Update based on your dataset
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [21]:
import torch.optim as optim

# Define loss function (binary cross-entropy for multi-label classification)
criterion = nn.BCEWithLogitsLoss()  # Use sigmoid with multi-label outputs

# Only optimize the final layer
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

In [23]:
# Training loop
epochs = 10
model.train()

for epoch in range(epochs):
    running_loss = 0.0

    for batch_idx, (images, labels) in enumerate(train_loader):
        try:
            images, labels = images.to(device), labels.to(device)

            # Zero the gradient
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)

            # Compute the loss
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if batch_idx % 10 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Batch {batch_idx}/{len(train_loader)}, "
                    f"Loss: {loss.item():.4f}")
        
        except Exception as e:
            print(f"Error in batch")
            
            continue

Epoch 1/10, Batch 0/88, Loss: 0.5321
Epoch 1/10, Batch 10/88, Loss: 0.4728
Epoch 1/10, Batch 20/88, Loss: 0.4376
Epoch 1/10, Batch 30/88, Loss: 0.4135
Epoch 1/10, Batch 40/88, Loss: 0.4067
Epoch 1/10, Batch 50/88, Loss: 0.3862
Epoch 1/10, Batch 60/88, Loss: 0.3894
Epoch 1/10, Batch 70/88, Loss: 0.3648
Epoch 1/10, Batch 80/88, Loss: 0.3269
Epoch 2/10, Batch 0/88, Loss: 0.2968
Epoch 2/10, Batch 10/88, Loss: 0.3491
Epoch 2/10, Batch 20/88, Loss: 0.3117
Epoch 2/10, Batch 30/88, Loss: 0.3477
Epoch 2/10, Batch 40/88, Loss: 0.3551
Epoch 2/10, Batch 50/88, Loss: 0.3775
Epoch 2/10, Batch 60/88, Loss: 0.3404
Epoch 2/10, Batch 70/88, Loss: 0.2717
Epoch 2/10, Batch 80/88, Loss: 0.3083
Epoch 3/10, Batch 0/88, Loss: 0.3531
Epoch 3/10, Batch 10/88, Loss: 0.3288
Epoch 3/10, Batch 20/88, Loss: 0.3087
Epoch 3/10, Batch 30/88, Loss: 0.2878
Epoch 3/10, Batch 40/88, Loss: 0.3626
Epoch 3/10, Batch 50/88, Loss: 0.2917
Epoch 3/10, Batch 60/88, Loss: 0.2808
Epoch 3/10, Batch 70/88, Loss: 0.3049
Epoch 3/10, Bat

In [26]:
torch.save(model.state_dict(), "resnet50_1.pth")

# Evaluation

## Inference ot test data

In [29]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np

class TestDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.weights = ResNet50_Weights.DEFAULT
        self.preprocess = self.weights.transforms()
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        image = item["image"]
        label = item["labels"]  # Assuming this is already a tensor of shape [6]
        
        if self.preprocess:
            image = self.preprocess(image)
            
        return image, label

def evaluate_model(model, test_data_path, device, batch_size=32, num_classes=6):
    """
    Evaluate the model on test data and return metrics for multi-class classification
    
    Args:
        model: The trained PyTorch model
        test_data_path: Path to test_data.pt file
        device: torch.device object
        batch_size: Batch size for testing
        num_classes: Number of classes (default: 6)
    """
    # Load test data
    test_data = torch.load(test_data_path)
    test_dataset = TestDataset(test_data)
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0
    )
    
    # Set model to evaluation mode
    model.eval()
    
    # Lists to store predictions and true labels
    all_predictions = []
    all_labels = []
    
    # Disable gradient computation for inference
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            
            # Forward pass
            outputs = model(images)
            
            # Convert outputs to predictions using sigmoid for multi-label
            predictions = torch.sigmoid(outputs) > 0.5
            
            # Store predictions and labels
            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Convert to numpy arrays
    all_predictions = np.array(all_predictions)
    all_labels = np.array(all_labels)
    
    # Calculate metrics for each class
    class_metrics = {}
    for i in range(num_classes):
        precision, recall, f1, _ = precision_recall_fscore_support(
            all_labels[:, i],
            all_predictions[:, i],
            average='binary'
        )
        
        class_metrics[f'class_{i}'] = {
            'precision': precision,
            'recall': recall,
            'f1': f1
        }
    
    # Calculate overall metrics
    # Micro average considers each class prediction equally
    precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(
        all_labels,
        all_predictions,
        average='micro'
    )
    
    # Macro average calculates metrics for each class and takes unweighted mean
    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(
        all_labels,
        all_predictions,
        average='macro'
    )
    
    # Calculate exact match ratio (all classes predicted correctly)
    exact_match = np.mean(np.all(all_predictions == all_labels, axis=1))
    
    # Calculate per-class accuracy
    class_accuracy = np.mean(all_predictions == all_labels, axis=0)
    
    results = {
        'exact_match_ratio': exact_match,
        'micro_avg': {
            'precision': precision_micro,
            'recall': recall_micro,
            'f1': f1_micro
        },
        'macro_avg': {
            'precision': precision_macro,
            'recall': recall_macro,
            'f1': f1_macro
        },
        'per_class': class_metrics,
        'class_accuracy': {f'class_{i}': acc for i, acc in enumerate(class_accuracy)}
    }
    
    return results, all_labels, all_predictions

## Print metrics

In [30]:
# Evaluate model
results, all_labels, all_predictions = evaluate_model(model, "test_dataset.pt", device)
    
    # Print results
print("\nEvaluation Results:")
print(f"\nExact Match Ratio: {results['exact_match_ratio']:.4f}")
    
print("\nMicro Average Metrics:")
for metric, value in results['micro_avg'].items():
    print(f"{metric}: {value:.4f}")
    print("\nMacro Average Metrics:")
        
for metric, value in results['macro_avg'].items():
    print(f"{metric}: {value:.4f}")
            
    print("\nPer-Class Accuracy:")
for class_name, accuracy in results['class_accuracy'].items():
    print(f"{class_name}: {accuracy:.4f}")
            
    print("\nPer-Class Metrics:")
for class_name, metrics in results['per_class'].items():
    print(f"\n{class_name}:")
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")

# # Optional: Save results to file
# with open('evaluation_results.txt', 'w') as f:
#     f.write("Evaluation Results:\n")
#                 f.write(f"Exact Match Ratio: {results['exact_match_ratio']:.4f}\n")
#                 f.write("\nMicro Average Metrics:\n")
#                 for metric, value in results['micro_avg'].items():
#                     f.write(f"{metric}: {value:.4f}\n")
#                 # ... add other metrics as needed


Evaluation Results:

Exact Match Ratio: 0.4733

Micro Average Metrics:
precision: 0.7527

Macro Average Metrics:
recall: 0.7459

Macro Average Metrics:
f1: 0.7493

Macro Average Metrics:
precision: 0.7787

Per-Class Accuracy:
recall: 0.6862

Per-Class Accuracy:
f1: 0.7154

Per-Class Accuracy:
class_0: 0.7807

Per-Class Metrics:
class_1: 0.7701

Per-Class Metrics:
class_2: 0.8877

Per-Class Metrics:
class_3: 0.9706

Per-Class Metrics:
class_4: 0.8690

Per-Class Metrics:
class_5: 0.9813

Per-Class Metrics:

class_0:

class_1:

class_2:

class_3:

class_4:

class_5:
precision: 0.9286
recall: 0.8387
f1: 0.8814


In [33]:
all_predictions

array([[False, False, False, False, False,  True],
       [False,  True, False, False, False, False],
       [ True,  True, False, False, False, False],
       ...,
       [ True, False, False, False, False, False],
       [False, False, False,  True, False, False],
       [False,  True, False, False, False, False]])

In [34]:
all_labels

array([[0., 0., 0., 0., 0., 1.],
       [0., 1., 0., 0., 0., 0.],
       [1., 1., 0., 0., 0., 0.],
       ...,
       [1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0.]], dtype=float32)

## Print metrics with sklearn

In [35]:
from sklearn.metrics import multilabel_confusion_matrix, classification_report

cm = multilabel_confusion_matrix(all_labels, all_predictions)
print(cm)
print( classification_report(all_labels,all_predictions))

[[[182  35]
  [ 47 110]]

 [[117  74]
  [ 12 171]]

 [[318   8]
  [ 34  14]]

 [[302   1]
  [ 10  61]]

 [[293  16]
  [ 33  32]]

 [[341   2]
  [  5  26]]]
              precision    recall  f1-score   support

           0       0.76      0.70      0.73       157
           1       0.70      0.93      0.80       183
           2       0.64      0.29      0.40        48
           3       0.98      0.86      0.92        71
           4       0.67      0.49      0.57        65
           5       0.93      0.84      0.88        31

   micro avg       0.75      0.75      0.75       555
   macro avg       0.78      0.69      0.72       555
weighted avg       0.76      0.75      0.74       555
 samples avg       0.77      0.78      0.75       555



  _warn_prf(average, modifier, msg_start, len(result))
