In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from google.colab import drive
import os
import time
import numpy as np
from tqdm import tqdm
import pandas as pd
from torch.utils.data import Dataset
from PIL import Image

In [None]:
drive.mount('/content/drive')

In [None]:
data_path = '/content/drive/MyDrive/102flowers'
labels_csv_path = '/content/drive/MyDrive/flowers_labeled_data.csv'

In [None]:
# Define dataset and data loaders
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

labeled_data = pd.read_csv(labels_csv_path)
image_paths = labeled_data['image_path'].tolist()
labels = (labeled_data['label'] - 1).astype(int).tolist()

valid_indices = [i for i, label in enumerate(labels) if 0 <= label < 102]
image_paths = [image_paths[i] for i in valid_indices]
labels = [labels[i] for i in valid_indices]

valid_image_paths = []
valid_labels = []
for img_path, label in zip(image_paths, labels):
    if os.path.exists(img_path):
        valid_image_paths.append(img_path)
        valid_labels.append(label)

if not valid_image_paths:
    print(f"Warning: No valid image files found in the CSV file {labels_csv_path}. Please ensure that image files are available.")
    valid_image_paths = image_paths
    valid_labels = labels

# Custom Dataset
class CustomImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        try:
            image = Image.open(self.image_paths[idx]).convert('RGB')
        except FileNotFoundError:
            raise FileNotFoundError(f"Image at {self.image_paths[idx]} not found. Please check the file path.")
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Create dataset
full_dataset = CustomImageDataset(image_paths=valid_image_paths, labels=valid_labels, transform=transform)

In [None]:
# Split dataset into training and testi
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
# Load pretrained EfficientNetV2 model
model = models.efficientnet_v2_s(weights=None)
model.classifier[1] = nn.Linear(model.classifier[1].in_features, 102)

In [None]:
# Training settings
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# First move model to CPU, then GPU
model.to('cpu')
model.to(device)

In [None]:
# Early stopping parameters
patience = 3
best_loss = np.inf
trigger_times = 0

In [None]:
# Train the model
epochs = 12
best_loss = float('inf')
trigger_times = 0
patience = 3

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    start_time = time.time()

    for batch_idx, (inputs, labels) in enumerate(tqdm(train_loader, desc=f"Epoch [{epoch+1}/{epochs}]")):
        inputs, labels = inputs.to(device), labels.to(device)
        labels = labels.long()

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_samples += labels.size(0)

        if (batch_idx + 1) % 10 == 0:
            print(f"Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct_predictions / total_samples
    epoch_time = time.time() - start_time

    print(f"Epoch [{epoch+1}/{epochs}] completed in {epoch_time:.2f} seconds, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

    # Early stopping check
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        trigger_times = 0
    else:
        trigger_times += 1
        print(f"Early stopping trigger times: {trigger_times}")
        if trigger_times >= patience:
            print("Early stopping activated.")
            break

In [None]:
# Evaluate the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc="Evaluating"):
        inputs, labels = inputs.to(device), labels.to(device)
        labels = labels.long()
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the model on the test set: {100 * correct / total:.2f}%')