<a href="https://colab.research.google.com/github/orilevi2809/DL/blob/main/project/mobileNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import cv2

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

from torchvision import models, transforms
import torchvision.transforms as transforms

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

from PIL import Image

In [None]:
IMG_SIZE = 224 #TODO: check if we can minize the size of the image e.g. 32*32

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

img_dir = "./labels/"
labels_dict = {"crimp": 1, "pinch": 2, "pocket": 4, "jug": 8, "edge": 16, "sloper": 32}

# Define the transform for image preprocessing
test_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.3),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
def save_model(model, model_file):
    # Save the model's state dictionary to disk
    torch.save(model.state_dict(), model_file)


In [None]:
def load_images_paths(img_dir, labels):
    data = []
    target = []
    label_count = {}  # Dictionary to store the count of images for each label
    for label in labels:
        label_dir = os.path.join(img_dir, label)
        count = 0  # Initialize the count for the current label
        for filename in os.listdir(label_dir):
            if filename.endswith(".jpg") or filename.endswith(".jpeg")or filename.endswith(".png"):
                img_path = os.path.join(label_dir, filename)
                data.append(img_path)
                target.append(labels_dict[label])
                count += 1  # Increment the count for the current label
        label_count[label] = count  # Store the count in the dictionary

    # Print the count of images for each label
    for label, count in label_count.items():
        print(f"Number of images for label '{label}': {count}")
    return data, target


In [None]:
def load_image_data(img_paths, labels, transform, augment_unbalanced=False):
    if augment_unbalanced:
        data = []
        target = []
        label_counts = {label: 0 for label in set(labels)}
        for img_path, label in zip(img_paths, labels):
            img = Image.open(img_path).convert("RGB")
            img = test_transform(img)
            data.append(img)
            target.append(label)
            label_counts[label] += 1

        is_unbalanced = any(count < max(label_counts.values()) for count in label_counts.values())
        print(label_counts)

        if is_unbalanced:
            print("unbalanced")
            augmented_data = []
            augmented_target = []
            #TODO: add while all equal
            for img_path, label in zip(img_paths, labels):
                img = Image.open(img_path).convert("RGB")
                augmented_data.append(test_transform(img))
                augmented_target.append(label)

                if label_counts[label] < max(label_counts.values()):
                    augmented_img = train_transform(img)
                    augmented_data.append(augmented_img)
                    augmented_target.append(label)
                    label_counts[label] += 1
            print(label_counts)
            return augmented_data, augmented_target
        return data, target
    else:
        if isinstance(img_paths[0], torch.Tensor):  # Check if data is already in tensor format
            return img_paths, labels
        else:
            data = []
            target = []
            for img_path, label in zip(img_paths, labels):
                img = Image.open(img_path).convert("RGB")
                img = test_transform(img)
                data.append(img)
                target.append(label)
            return data, target

In [None]:

def train_classifier(train_dataloader, val_dataloader, model, criterion, optimizer, device):
    for epoch in range(10):  # 10 is the best!!
        running_loss = 0.0
        total_correct = 0
        total_samples = 0

        # Switch model to training mode
        model.train()

        for i, data in enumerate(train_dataloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data[0].to(device), data[1].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # Calculate the number of correct predictions in the current batch
            _, predicted = torch.max(outputs.data, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)

        train_accuracy = 100 * total_correct / total_samples

        # Switch model to evaluation mode for validation
        model.eval()

        val_correct = 0
        val_samples = 0
        val_loss = 0.0
        with torch.no_grad():
            for i, data in enumerate(val_dataloader, 0):
                inputs, labels = data[0].to(device), data[1].to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_correct += (predicted == labels).sum().item()
                val_samples += labels.size(0)

        val_accuracy = 100 * val_correct / val_samples

        print(f'Epoch: {epoch + 1}, Training loss: {running_loss / len(train_dataloader)}, Training accuracy: {train_accuracy}%, Validation loss: {val_loss / len(val_dataloader)}, Validation accuracy: {val_accuracy}%')

    print('Finished Training')
    return model

In [None]:
def evaluate_classifier(test_dataloader, model, device):
    model.eval()  # Switch model to evaluation mode
    correct = 0
    total = 0
    all_labels = []
    all_predictions = []
    with torch.no_grad():
        for data in test_dataloader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.numpy())
            all_predictions.extend(predicted.numpy())

    accuracy = 100 * correct / total
    print(f'Accuracy of the model on the test images: {accuracy}%')
    # Generate classification report and confusion matrix
    report = classification_report(all_labels, all_predictions, zero_division=1)
    matrix = confusion_matrix(all_labels, all_predictions)
    print("Classification Report:")
    print(report)
    print("Confusion Matrix:")
    print(matrix)


In [1]:

####### pre prosses data #TODO:refactor to preprocces
img_paths, labels = load_images_paths(img_dir, labels_dict.keys())

# Split the data into train, validation, and test sets (e.g., 70%, 15%, 15%)
img_paths_train, img_paths_test, labels_train, labels_test = train_test_split(img_paths, labels, test_size=0.3,stratify=labels, random_state=42)
img_paths_val, img_paths_test, labels_val, labels_test = train_test_split(img_paths_test, labels_test,test_size=0.5, stratify=labels_test,random_state=42)

data_train, labels_train = load_image_data(img_paths_train, labels_train, train_transform,augment_unbalanced=True)
data_val, labels_val = load_image_data(img_paths_val, labels_val, test_transform,augment_unbalanced=False)
data_test, labels_test = load_image_data(img_paths_test, labels_test, test_transform,augment_unbalanced=False)

# Convert to tensors
data_train = torch.stack(data_train)
labels_train = torch.tensor(labels_train)

data_val = torch.stack(data_val)
labels_val = torch.tensor(labels_val)

data_test = torch.stack(data_test)
labels_test = torch.tensor(labels_test)

# Create TensorDatasets and DataLoaders
train_dataset = TensorDataset(data_train, labels_train)
val_dataset = TensorDataset(data_val, labels_val)
test_dataset = TensorDataset(data_test, labels_test)

train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)



SyntaxError: ignored

In [None]:
# Load pre-trained MobileNetV2 model and replace the classifier layer
model = models.mobilenet_v2(pretrained=True)
#for param in model.parameters():
#    param.requires_grad = False

# Enable gradient computation for all parameters in the classifier layers
#for param in model.classifier.parameters():
#    param.requires_grad = True

# Only train the last three layers
#for param in model.classifier[-3:].parameters():
#    param.requires_grad = True

model.classifier[1] = nn.Linear(model.last_channel, len(labels_dict))
model = model.to(device)



In [None]:
# Define loss criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()))

model = train_classifier(train_dataloader, val_dataloader, model, criterion, optimizer, device)


In [None]:

if not os.path.exists('hold_classifier_MobileNet.pth'):
    # Save the MobileNetV2 model
    model_file = 'hold_classifier_MobileNet.pth'
    save_model(model, model_file)

    # Load the model for inference
    model.load_state_dict(torch.load('hold_classifier_MobileNet.pth'))




In [None]:
evaluate_classifier(test_dataloader, model, device)