In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import ConcatDataset
import torchmetrics as tm
import os


In [2]:
# Define data transformations for data augmentation and normalization
train_transforms = [
        transforms.Resize(size=(180,180)),
        transforms.ColorJitter(brightness=0.5,contrast=0.5,saturation=0.5,hue=0.5)
        # transforms.RandomRotation(degrees=60),
        #transforms.RandomGrayscale(),
        #transforms.RandomHorizontalFlip(),
    ]
grayscale_transforms = train_transforms.copy()
grayscale_transforms.append(transforms.Grayscale(num_output_channels=3))
train_transforms_end = [
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]
data_transforms = {
    'train': transforms.Compose(train_transforms+train_transforms_end),
    'val': transforms.Compose([
        transforms.Resize(size=(180,180)),
        #transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}



In [3]:
# Define the data directory
ROOT_DIR = os.path.abspath(os.curdir)
print(ROOT_DIR)
dataset_name='dataset3'
data_dir = os.path.join(ROOT_DIR, dataset_name)
print(data_dir)

concatdatasets = []
concatdatasets_val = []
# Create data loaders
#og_image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), transforms.Compose(train_transforms_end)) for x in ['train']}
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}
tr_grayscale_dataset = {x: datasets.ImageFolder(os.path.join(data_dir, x), transforms.Compose(grayscale_transforms+train_transforms_end)) for x in ['train','val']}
grayscale_transforms.append(transforms.RandomHorizontalFlip(p=1))
tr_grayscale_flipped_dataset = {x: datasets.ImageFolder(os.path.join(data_dir, x), transforms.Compose(grayscale_transforms+train_transforms_end)) for x in ['train','val']}

#concatdatasets.append(og_image_datasets['train'])
concatdatasets.append(image_datasets['train'])
concatdatasets.append(tr_grayscale_dataset['train'])
concatdatasets.append(tr_grayscale_flipped_dataset['train'])

concatdatasets_val.append(image_datasets['val'])
concatdatasets_val.append(tr_grayscale_dataset['val'])
concatdatasets_val.append(tr_grayscale_flipped_dataset['val'])

r_times = 5;
rotate_transf = train_transforms
tr_rotate = []
print(rotate_transf)
for i in range(5):
    rotate_transf = train_transforms.copy()
    for j in range(i):
         rotate_transf.append(transforms.RandomRotation(degrees=(60,60)))
    concatdatasets.append(datasets.ImageFolder(os.path.join(data_dir, 'train'), transforms.Compose(rotate_transf+train_transforms_end)))
    concatdatasets_val.append(datasets.ImageFolder(os.path.join(data_dir, 'val'), transforms.Compose(rotate_transf+train_transforms_end)))

image_datasets['train'] = ConcatDataset(concatdatasets)
image_datasets['val'] = ConcatDataset(concatdatasets_val)
print(len(image_datasets['train']))
print(len(image_datasets['val']))


# print(image_datasets['train'][0])
#transforms.functional.adjust_saturation(image_datasets, saturation_factor=0.3),
#v2.functional.adjust_hue(image_datasets,hue_factor=0.3),
#image_datasets

/home/lemawul/PyTorch
/home/lemawul/PyTorch/dataset3
[Resize(size=(180, 180), interpolation=bilinear, max_size=None, antialias=True), ColorJitter(brightness=(0.5, 1.5), contrast=(0.5, 1.5), saturation=(0.5, 1.5), hue=(-0.5, 0.5))]
2768
288


In [4]:
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=32, shuffle=True, num_workers=4) for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
print(dataset_sizes)

class_names = image_datasets['train'].datasets[0].classes
class_names

#print(dataloaders['train'])

{'train': 2768, 'val': 288}


['Circle', 'Cross', 'Goat', 'Person', 'Spiral', 'Stag', 'Zigzag']

In [5]:
# Load the pre-trained ResNet-18 model
model = models.resnet18(pretrained=True)
#model = models.densenet201(weights='DEFAULT')
#model = models.resnext101_64x4d(weights='DEFAULT')
num_features = model.fc.in_features
#num_features = model.classifier.in_features
model.fc = nn.Linear(num_features, len(class_names))  # Set the final layer to have 8 output classes
#model.classifier = nn.Linear(num_features, len(class_names))  # Set the final layer to have 8 output classes
# Freeze all layers except the final classification layer
# for name, param in model.named_parameters():
#     if "fc" in name:  # Unfreeze the final classification layer
#         param.requires_grad = True
#     else:
#         param.requires_grad = False

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001*2.82, momentum=0.9)  # Use all parameters, do lr = 004 if batches are 64 sqrt(64/4)


# Move the model to the GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)




In [6]:
def save_checkpoint(model, optimizer, save_path, epoch):
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch
    }, save_path)

In [7]:
def load_checkpoint(model, optimizer, load_path):
    checkpoint = torch.load(load_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    
    return model, optimizer, epoch

In [8]:
# Training loop
import torchmetrics as tm


num_epochs = 300
writer = SummaryWriter()
#unique_labels = [0,1,2,3,4,5,6] # 8 with dots
av = 'micro'

# Initialize metrics for the entire epoch
f1_metric = tm.F1Score(task='multiclass',num_classes=len(class_names), average='none').to(device)
recall_metric = tm.Recall(task='multiclass',num_classes=len(class_names), average='none').to(device)


save_path=f'{dataset_name}_resnet18_checkpoint.pth'
if os.path.isfile(save_path):
    model, optimizer, checkpoint_e = load_checkpoint(model, optimizer, save_path)
else:
    checkpoint_e=0

print(checkpoint_e)
    
for epoch in range(checkpoint_e,num_epochs):
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()
        else:
            model.eval()

        running_loss = 0.0
        running_corrects = 0

         # Reset the metric objects for each epoch
        f1_metric.reset()
        recall_metric.reset()
        
        for inputs, labels in dataloaders[phase]:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(inputs)

                #print(f"Outputs: {outputs.size(1)}")
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                #print(f"Unique predicted values: {torch.unique(preds)}")
                #print(f"Unique label values: {torch.unique(labels)}")

                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
             # Update metrics with the batch results
            f1_metric.update(preds, labels)
            recall_metric.update(preds, labels)
            # acc = tm.functional.classification.multiclass_accuracy(preds, labels, num_classes=len(class_names))
            # writer.add_scalar(f"{dataset_name}_resnext64_mc_acc/{phase}", acc, epoch)
            # f1 = tm.F1Score(task="multiclass", num_classes=class_names.length)
            # writer.add_scalar(f"{dataset_name}_resnext64_f1/{phase}", f1, epoch)
            #writer.add_scalar(f"{dataset_name}_resnext64_Precision/{phase}", precision, epoch)

        epoch_loss = running_loss / dataset_sizes[phase]

        epoch_acc = running_corrects.double() / dataset_sizes[phase]
        epoch_f1 = f1_metric.compute()
        epoch_recall = recall_metric.compute()

        writer.add_scalar(f"{dataset_name}_resnet18_EpochLoss/{phase}", epoch_loss, epoch)
        writer.add_scalar(f"{dataset_name}_resnet18_EpochAcc/{phase}", epoch_acc, epoch)
        for i, (f1, recall) in enumerate(zip(epoch_f1, epoch_recall)):
            writer.add_scalar(f"{dataset_name}_rotated_resnet18_F1Score/{phase}_class_{i}", f1, epoch)
            writer.add_scalar(f"{dataset_name}_rotated_resnet18_Recall/{phase}_class_{i}", recall, epoch)
            print(f'{phase}_class_{i}_f1score at epoch {epoch}: {f1}')
            print(f'{phase}_class_{i}_recall at epoch {epoch}: {recall}')
        
        avg_epoch_f1 = epoch_f1.mean()
        avg_epoch_recall = epoch_recall.mean()
        writer.add_scalar(f"{dataset_name}_rotated_resnet18_F1Score_avg/{phase}", avg_epoch_f1, epoch)
        writer.add_scalar(f"{dataset_name}_rotated_resnet18_Recall_avg/{phase}", avg_epoch_recall, epoch)

        #f1_s = f1_score(, preds.cpu(), labels=unique_labels, average=None)/dataset_sizes[phase]



        print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} F1 Score AVG: {avg_epoch_f1:.4f} Recall AVG: {avg_epoch_recall:.4f}')
    save_checkpoint(model, optimizer, save_path, epoch)

writer.flush()
writer.close()
print("Training complete!")


0


  return F.conv2d(input, weight, bias, self.stride,


train_class_0_f1score at epoch 0: 0.47191011905670166
train_class_0_recall at epoch 0: 0.501838207244873
train_class_1_f1score at epoch 0: 0.1304347813129425
train_class_1_recall at epoch 0: 0.0803571417927742
train_class_2_f1score at epoch 0: 0.4747866690158844
train_class_2_recall at epoch 0: 0.5032894611358643
train_class_3_f1score at epoch 0: 0.5652173757553101
train_class_3_recall at epoch 0: 0.6825000047683716
train_class_4_f1score at epoch 0: 0.34302327036857605
train_class_4_recall at epoch 0: 0.24583333730697632
train_class_5_f1score at epoch 0: 0.25196850299835205
train_class_5_recall at epoch 0: 0.1666666716337204
train_class_6_f1score at epoch 0: 0.3838862478733063
train_class_6_recall at epoch 0: 0.2977941036224365
train Loss: 1.4352 Acc: 0.4718 F1 Score AVG: 0.3745 Recall AVG: 0.3540
val_class_0_f1score at epoch 0: 0.43809524178504944
val_class_0_recall at epoch 0: 0.359375
val_class_1_f1score at epoch 0: 0.0
val_class_1_recall at epoch 0: 0.0
val_class_2_f1score at epoch

In [9]:
## # Save the model
torch.save(model.state_dict(), 'teacher3_resnet18_300e.pth')

# Classification on Unseen Image

To use the saved model to classify unseen images, you need to load the model and then apply it to the new images for inference. 

In [3]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image

# Load the saved model
#model = models.resnet18(pretrained=True)
#model.fc = nn.Linear(model.fc.in_features, 1000)  # Adjust to match the original model's output units
#model = models.densenet201(weights='DEFAULT')
model = models.resnext101_64x4d(weights='DEFAULT')
num_features = model.fc.in_features
model_name = 'resnext101_64x4d-173b62eb'
model.load_state_dict(torch.load(model_name+'.pth'))
#model.fc = nn.Linear(num_features, 7)  # Set the final layer to have 8 output classes
model.eval()
torch.save(model.state_dict(), model_name+'_old.pth', _use_new_zipfile_serialization=False)

# # Create a new model with the correct final layer
# new_model = models.resnet18(pretrained=True)
# new_model.fc = nn.Linear(new_model.fc.in_features, 8)  # Adjust to match the desired output units

# # Copy the weights and biases from the loaded model to the new model
# new_model.fc.weight.data = model.fc.weight.data[0:2]  # Copy only the first 2 output units
# new_model.fc.bias.data = model.fc.bias.data[0:2]




Prepare your new image for classification. You should use the same data transformations you used during training. Here's an example of how to prepare an image for inference:

In [None]:
# Load and preprocess the unseen image
image_path = 'test.jpg'  # Replace with the path to your image
image = Image.open(image_path)
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
input_tensor = preprocess(image)
input_batch = input_tensor.unsqueeze(0)  # Add a batch dimension


Perform inference using the model:

In [None]:
# Perform inference
with torch.no_grad():
    output = model(input_batch)
#print(output)
# Get the predicted class
_, predicted_class = output.max(1)
#print(predicted_class.item())

# Map the predicted class to the class name
class_names = ['Circle', 'Cross',#'Dots',
               'Goat','Person','Spiral','Stag','Zigzag']  # Make sure these class names match your training data
predicted_class_name = class_names[predicted_class.item()]

print(f'The predicted class is: {predicted_class_name}')


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

# Display the image with the predicted class name
image = np.array(image)
plt.imshow(image)
plt.axis('off')
plt.text(10, 10, f'Predicted: {predicted_class_name}', fontsize=12, color='white', backgroundcolor='red')
plt.show()