In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
from torch.utils.data import Dataset, random_split
from torchvision import transforms
import os
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F

In [None]:
DATA_DIR = "/kaggle/input/butterfly-image-classification/"

In [None]:
torch.__version__

# Define Dataset class

In [None]:
TRAIN_PATH = DATA_DIR +"Training_set.csv"
TEST_PATH = DATA_DIR +"Testing_set.csv"

In [None]:
class ButterflyDataset(Dataset):
    def __init__(self, csv_path, img_dir, transform=None, train=True):
        df = pd.read_csv(csv_path)
        self.image_names = df["filename"]
        self.train = train
        if train:
            self.labels = df["label"]
            self.classes = list(set(self.labels))
        self.img_dir = img_dir
        self.transform = transform
        
    
    def __getitem__(self, index):
        img = Image.open(os.path.join(self.img_dir, self.image_names[index]))
        if self.transform is not None:
          img = self.transform(img)
        if self.train:
            label = self.labels[index]
            return img, label
        else:
            return img
    def __len__(self):
        return self.labels.shape[0]

# Define Dataloader

In [None]:
from torch.utils.data import DataLoader

train_transform = transforms.Compose(
        [
            #transforms.Resize(128),
            #transforms.RandomCrop((28, 28)),
            transforms.ToTensor(),
            # normalize images to [-1, 1] range
            transforms.Normalize((0.1307,), (0.3081,)),
        ])
test_transform = transforms.Compose(
        [
            #transforms.Resize(128),
            #transforms.CenterCrop((28, 28)),
            transforms.ToTensor(),
            # normalize images to [-1, 1] range
            transforms.Normalize((0.1307,), (0.3081,)),
        ]
    )
train_dataset = ButterflyDataset(
    csv_path= TRAIN_PATH,
    img_dir="/kaggle/input/butterfly-image-classification/train",
    transform= train_transform,
)

# Split dataset into train and validation sets

In [None]:
val_ratio = 0.2
train_ratio = 1 - val_ratio
train_size = int(train_ratio * len(train_dataset))
val_size = len(train_dataset) - train_size

train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

In [None]:
train_loader = DataLoader(
    dataset=train_subset,
    batch_size=64,
    shuffle=True,  # want to shuffle the dataset
    num_workers=2,  # number processes/CPUs to use
)

In [None]:
val_loader = DataLoader(
    dataset=val_subset,
    batch_size=64,
    shuffle=False,  # want to shuffle the dataset
    num_workers=2,  # number processes/CPUs to use
)

In [None]:
train_labels = [train_dataset[idx][1] for idx in train_subset.indices]

# Get labels on validation set
val_labels = [train_dataset[idx][1] for idx in val_subset.indices]

In [None]:
len(train_labels)

In [None]:
len(train_subset.indices)

In [None]:
#train_labels

In [None]:
#val_labels

In [None]:
train_dataset.labels.nunique()

In [None]:
test_dataset = ButterflyDataset(
    csv_path=TEST_PATH,
    img_dir="/kaggle/input/butterfly-image-classification/test",
    transform=test_transform,
    train=False
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=0
)

In [None]:
# train_classes = list(set([train_labels[idx] for idx in train_subset.indices]))
# val_classes = list(set([val_labels[idx] for idx in val_subset.indices]))

# num_classes = len(train_classes)

In [None]:
num_classes = len(set(train_labels))

In [None]:
train_loader.batch_size

# Define model architechture 

In [None]:
class AlexNet(nn.Module):
    def __init__(self):
        super(AlexNet, self).__init__()
        
        self.conv1 = nn.Conv2d(3, 96, kernel_size=11, stride=4)
        self.relu1 = nn.ReLU()
        self.lrn1 = nn.LocalResponseNorm(2, k=2)
        self.pool1 = nn.MaxPool2d(3, stride=2)
            
        self.conv2 = nn.Conv2d(96, 256, kernel_size=5, padding=2)
        self.relu2 = nn.ReLU()
        self.lrn2 = nn.LocalResponseNorm(2, k=2)
        self.pool2 = nn.MaxPool2d(3, stride=2)
            
        self.conv3 = nn.Conv2d(256, 384, kernel_size=3, padding=1)
        self.relu3 = nn.ReLU()
        self.conv4 = nn.Conv2d(384, 384, kernel_size=3, padding=1)
        self.relu4 = nn.ReLU()
        self.conv5 = nn.Conv2d(384, 256, kernel_size=3, padding=1)
        self.relu5 = nn.ReLU()
        self.pool5 = nn.MaxPool2d(kernel_size=3, stride=2)
            
        self.fc1 = nn.Linear(256 * 5 * 5, 4096)
        self.relu6 = nn.ReLU()
        
        self.fc2 = nn.Linear(4096, 4096)
        self.relu7 = nn.ReLU()
        
        self.fc3 = nn.Linear(4096, 75)
            
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.lrn1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.lrn2(x)
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.conv4(x)
        x = self.relu4(x)
        x = self.conv5(x)
        x = self.relu5(x)
        x = self.pool5(x)
        
        x = x.view(-1, 256 * 5 * 5)  # Adjusted based on the input size
        
        x = self.relu6(self.fc1(x)) 
        x = self.relu7(self.fc2(x))
        x = self.fc3(x)
        
        return x
  

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
model = AlexNet().to(device)
print(model)

# Define optimizers

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [None]:
train_dataset.labels

In [None]:
label_encoder = {label: id for id, label in enumerate(set(train_dataset.labels))}
#print(label_encoder)
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    correct = 0
    for batch, (X, y) in enumerate(dataloader):
        y = [label_encoder[label] for label in y]
        y = torch.tensor(y)
        y = F.one_hot(y, num_classes)
        y = y.float()
        X = torch.tensor(X)
        #y = torch.tensor(y)
        X, y = X.to(device), y.to(device)
        #print(X)
        #print(y)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            y = torch.argmax(y, dim=1)
            
            correct += (pred.argmax(1) == y).sum().item()
            correct /= size
            print(f"Val Error: \n Accuracy: {(100*correct):>0.1f}%\n")
         
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
def val(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            y = [label_encoder[label] for label in y]
            y = torch.tensor(y)
            y = F.one_hot(y, num_classes)
            y = y.float()
            
            X = torch.tensor(X)
            
            X, y = X.to(device), y.to(device)
            #print("y Shape: ",y.shape)
            pred = model(X)
            #print("pred shape: ", pred.shape)
            test_loss += loss_fn(pred, y).item()
            y = torch.argmax(y, dim=1)
            
            correct += (pred.argmax(1) == y).sum().item()
            #print("lenght of y: ", len(y))
            #print("pred argmax: ", pred.argmax(1))
            #print("pred y: ", y)
    test_loss /= num_batches
    correct /= size
    print("num batch: ", num_batches)
    print("Size:", size)
    print("correct: ", correct)
    print(f"Val Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 20
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_loader, model, loss_fn, optimizer)
    val(val_loader, model, loss_fn)
print("Done!")

In [None]:
#val(val_loader, model, loss_fn)

In [None]:
# labels = len(list(set([label for label, index in label_encoder.items()])))
# print(labels)
# print(num_classes)



In [None]:
# for label, index in label_encoder.items():
#     print(f"Label: {label}, Encoded Index: {index}")


In [None]:
#os.mkdir("/kaggle/working/model")

In [None]:
torch.save(model.state_dict(), "/kaggle/working/model/model.pth")