In [1]:
import os
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split
import scipy
import torch
from PIL import Image
from torchvision.transforms import transforms

In [2]:
class OxfordDataset(Dataset):
    def __init__(self, rootdir,transform):
        self.rootdir = rootdir
        self.img_dir = os.path.join(rootdir, "jpg")
        
        labels_mat = scipy.io.loadmat(os.path.join(self.rootdir, "imagelabels.mat"))
        self.labels = labels_mat['labels'][0] - 1
        self.transform = transform
        self.error_logs = []
    def __len__(self):
        return len(self.labels)
    def __getitem__(self,idx):
        try:
            img_name = f'image_{idx + 1:05d}.jpg'

            img_path = os.path.join(self.img_dir, img_name)
            image = Image.open(img_path)
            image.verify()
            image = Image.open(img_path)
            if image.size[0] < 32 or image.size[1] <32:
                raise ValueError(f"Image too small {image.size}")
            
            if image.mode != "RGB":
                image = image.convert('RGB')
            
            image = self.transform(image)
            label = self.labels[idx]
            
        
        

            return image, label
        except Exception as e:
            self.error_logs.append({
                'index':idx,
                'error':e,
                'path': img_path if 'img_path' in locals() else 'unknown'})
            next_idx = (idx+1) % len(self)
            return self.__getitem__(next_idx)



            

    


In [3]:
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406],std=[0.229,0.224,0.225])
])

In [4]:
dataset = OxfordDataset(rootdir="flower_data", transform=transform)
dataloader = DataLoader(dataset, 4, shuffle=True)

In [5]:
train_split = int(len(dataset) * 0.70)
validation_split = int(len(dataset) * 0.15)
test_split = len(dataset) - train_split - validation_split

In [9]:
print(f"train : {train_split}"
      f"test : {test_split}"
      f"validation : {validation_split}")

train : 5732test : 1229validation : 1228


In [6]:
train_dataset, validation_dataset,test_dataset = random_split(dataset, [train_split,validation_split,test_split])

In [7]:
len(train_dataset)

5732

In [12]:
len(validation_dataset)

1228

In [12]:
len(test_dataset)

1229

In [8]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validation_loader = DataLoader(validation_dataset,batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset,batch_size=32, shuffle=False)

In [9]:
len(train_loader)

180

In [10]:
for images,labels in train_loader:
    print(images.shape)
    break

torch.Size([32, 3, 224, 224])


In [11]:
class OxfordClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.layers = nn.Sequential(
            nn.Linear(150528, 512),
            nn.ReLU(),
            nn.Linear(512,128),
            nn.ReLU(),
            nn.Linear(128,102)

        )
    
    def forward(self, x):
        x = self.flatten(x)
        x = self.layers(x)
        return x


In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

  return torch._C._cuda_getDeviceCount() > 0


device(type='cpu')

In [13]:
model = OxfordClassifier().to(device)

optimizer = optim.Adam(model.parameters(),lr = 0.001)
loss_function = nn.CrossEntropyLoss()


In [14]:
def train_epoch(model, train_loader, optimizer,loss_function):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for batch_idx,(data,labels) in enumerate(train_loader):
        data,labels = data.to(device),labels.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output,labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _,predicted = output.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        if batch_idx %  10 == 0 and batch_idx > 0:

            #validation check
            avg_loss = running_loss/10
            acc = 100. * correct/total
            print(f' [{batch_idx * len(data)}/{len(train_loader.dataset)}]'
                  f' Loss: {avg_loss:.3f} | Accuracy: {acc:.1f} %')
            running_loss = 0.0


In [15]:
def validation(model,validation_loader,device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for idx, (data, targets) in enumerate(validation_loader):
            data, targets = data.to(device), targets.to(device)
            output = model(data)
            _,predicted = output.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    
    return 100. * correct/total


In [16]:
num_epochs = 10

for idx in range(num_epochs):
    print(f"Epoch : {idx}")
    train_epoch(model,train_loader,optimizer,loss_function)
    validation_acc = validation(model, validation_loader, device)
    print(f"Validation Accuracy : {validation_acc}")



Epoch : 0
 [320/5732] Loss: 19.992 | Accuracy: 1.1 %
 [640/5732] Loss: 7.739 | Accuracy: 2.4 %
 [960/5732] Loss: 5.314 | Accuracy: 2.6 %
 [1280/5732] Loss: 4.694 | Accuracy: 2.5 %
 [1600/5732] Loss: 4.666 | Accuracy: 2.7 %
 [1920/5732] Loss: 4.641 | Accuracy: 2.6 %
 [2240/5732] Loss: 4.617 | Accuracy: 2.5 %
 [2560/5732] Loss: 4.616 | Accuracy: 2.7 %
 [2880/5732] Loss: 4.631 | Accuracy: 2.6 %


KeyboardInterrupt: 