In [None]:
!pip install git+https://github.com/cleanlab/cleanvision.git
import torchvision.transforms as transforms
from torchvision.transforms import v2
from torchvision import models
import torch.optim as optim
from tqdm import tqdm
import torch.nn as nn
import numpy as np
import random
import torch
import os

In [None]:
import os

def count_files_in_folders(directory):
    total_files, total_folders = 0, 0

    for item in os.listdir(directory):
        item_path = os.path.join(directory, item)
        if os.path.isdir(item_path):
            num_files = sum(1 for name in os.listdir(item_path) if os.path.isfile(os.path.join(item_path, name)))
            print(f"Folder '{item}' contains {num_files} file(s).")
            total_files += num_files
            total_folders += 1

    if total_folders > 0:
        avg_num_files = total_files / total_folders
        print(f"\nAverage number of files per folder is: {avg_num_files:.2f}")
    else:
        print("No folders found.")

DIRECTORY = 'data/Mushrooms'
count_files_in_folders(DIRECTORY)

In [None]:
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    v2.ToDtype(torch.float32, scale=True),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    v2.ToDtype(torch.float32, scale=True),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

In [None]:
def get_image_paths(directory):
    image_paths = []

    for rel_path1 in os.listdir(directory):
        subdir_path = os.path.join(directory, rel_path1)
        if os.path.isdir(subdir_path):
            for rel_path2 in tqdm(os.listdir(subdir_path)):
                full_path = os.path.join(subdir_path, rel_path2)
                if os.path.isfile(full_path):
                    image_paths.append(full_path)

    return image_paths

def split_train_test(image_paths, test_ratio=0.1):
    random.shuffle(image_paths)
    split_index = int(len(image_paths) * test_ratio)
    test_image_paths = image_paths[:split_index]
    train_image_paths = image_paths[split_index:]
    return train_image_paths, test_image_paths

image_paths = get_image_paths(DIRECTORY)
train_image_paths, test_image_paths = split_train_test(image_paths)

print(f"Number of images:\nTest: {len(test_image_paths)}\nTrain: {len(train_image_paths)}")

In [None]:
def create_class_to_index_mapping(directory):
    class_names = sorted(os.listdir(directory))
    class_to_index = {class_name: idx for idx, class_name in enumerate(class_names)}
    return class_to_index

class_to_index = create_class_to_index_mapping(DIRECTORY)
print(class_to_index)

In [None]:
class OurMushroom:
    def __init__(self, image_paths, transform=None):
        self.transform = transform
        self.image_paths = image_paths
        self.class_to_index = class_to_index

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = ImageFile.open(img_path)
        label = self.class_to_index[img_path.split(os.path.sep)[-2]]
        if self.transform is not None:
            img = self.transform(img)
        return img, label

    def __len__(self):
        return len(self.image_paths)

In [None]:
train_data = OurMushroom(train_image_paths, transform = transform_train)
test_data = OurMushroom(test_image_paths, transform = transform_test)

train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=32, shuffle=False)

In [None]:
label_dict = {}
for path in train_image_paths:
    label_name = os.path.basename(os.path.dirname(path))
    label_idx = class_to_index.get(label_name)
    if label_idx is not None:
        label_dict[label_idx] = label_dict.get(label_idx, 0) + 1

sorted_label_dict = sorted(label_dict.items())
print(sorted_label_dict)

In [None]:
num = [label_dict[i] for i in range(9)]
total_num = sum(num)
weights = [1 / count * total_num for count in num]
max_weight = max(weights)
final_weights = [weight / max_weight for weight in weights]
print(final_weights)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if device.type == 'cuda':
    print('Code is being executed using CUDA [GPU]\n'f'Using GPU Device: {torch.cuda.get_device_name(device)}')
else:
    print('Code is being executed using CPU (slow)')

In [None]:
num_class = 9
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
model = models.resnet101(weights=None)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_class)
model = model.to(device)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
num_class, best_accuracy, number_of_epochs = 9, 0, 25

In [None]:
for epoch in range(number_of_epochs):
        model.train()
        train_loss, train_correct = 0.0, 0
    
        for images, labels in tqdm(train_loader):
            optimizer.zero_grad()
            labels, images = labels.to(device), images.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            train_correct += (predicted == labels).sum().item()
    
        train_loss /= len(train_loader)
        train_accuracy = train_correct * 100.0 / len(train_data)
    
        model.eval()
        test_loss, test_correct = 0.0, 0
    
        with torch.no_grad():
            for images, labels in tqdm(test_loader):
                labels, images = labels.to(device), images.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                test_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                test_correct += (predicted == labels).sum().item()
    
        test_loss /= len(test_loader)
        test_accuracy = test_correct * 100.0 / len(test_data)
    
        if test_accuracy >= best_accuracy:
            print(f"Detected better accuracy: {test_accuracy}. Replaced with {best_accuracy}")
            best_accuracy = test_accuracy
            torch.save(model.state_dict(), "model.pth")
    
        print(f"Epoch: {epoch + 1} / {number_of_epochs}\n"
              f"Train Loss: {train_loss:.2f}, Train Accuracy: {train_accuracy:.2f}\n"
              f"Test Loss: {test_loss:.2f}, Test Accuracy: {test_accuracy:.2f}\n")

In [None]:
from PIL import Image

model = models.resnet101(weights=None, num_classes=9)
model.load_state_dict(torch.load("model.pth"))
model.eval()

def predict_image(image_path, model):
    
    image = Image.open(image_path).convert('RGB')
    image = transform_test(image).unsqueeze(0)
    with torch.no_grad():
        output = model(image)
        probabilities = torch.softmax(output, dim=1)
        predicted_mushroom = torch.argmax(probabilities, dim=1).item()
    return predicted_mushroom, probabilities[0, predicted_mushroom].item()