<h1>Content</h1>
<ol>
<li>Setup Colab Environment</li>
<li>Data augmentation</li>
<li>Detection</li>
<ol>
<li>Training
<li>Evaluation
</ol>
<li>Recognition</li>
</ol>

# Setup Colab Environment

In [9]:
!git clone "https://github.com/jakhin03/PROJECT_EAR_DETECTION"

Cloning into 'PROJECT_EAR_DETECTION'...
remote: Enumerating objects: 100216, done.[K
remote: Total 100216 (delta 0), reused 0 (delta 0), pack-reused 100216[K
Receiving objects: 100% (100216/100216), 628.28 MiB | 21.73 MiB/s, done.
Resolving deltas: 100% (19560/19560), done.
Updating files: 100% (184850/184850), done.


In [10]:
!pip install ultralytics



In [1]:
pwd = "./"

# Data augmentation

In [2]:
import torchvision.transforms as transforms
import os
from PIL import Image
import shutil
import random

Set the path

In [3]:
dataset="EarVN1"

In [4]:
dataset_path = '%s/data/datasets/%s/Images'%(pwd,dataset)
train_path = '%s/data/data_train/%s/train'%(pwd,dataset)
valid_path = '%s/data/data_train/%s/val'%(pwd,dataset)

Set the percentage of data for validation

In [5]:
validation_split = 0.2

Split datasets and move to respective directories

In [6]:
# Iterate through the subject subdirectories
for subject_dir in os.listdir(dataset_path):
    subject_path = os.path.join(dataset_path, subject_dir)

    # Create the training and validation subdirectories
    train_subject_path = os.path.join(train_path, subject_dir)
    valid_subject_path = os.path.join(valid_path, subject_dir)
    os.makedirs(train_subject_path, exist_ok=True)
    os.makedirs(valid_subject_path, exist_ok=True)

    # Collect the image file paths
    image_paths = [os.path.join(subject_path, image_file) for image_file in os.listdir(subject_path)]
    num_images = len(image_paths)

    # Shuffle the image paths
    random.shuffle(image_paths)

    # Split the dataset
    num_valid_images = int(num_images * validation_split)
    valid_images = image_paths[:num_valid_images]
    train_images = image_paths[num_valid_images:]

    # Move the images to the respective directories
    for image_path in train_images:
        shutil.copy(image_path, train_subject_path)

    for image_path in valid_images:
        shutil.copy(image_path, valid_subject_path)


Training data transform

In [7]:
augment_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=(-15, 15)),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=(5, 5))], p=0.5),
])

Save transformed images

# Detection

In [8]:
from ultralytics import YOLO

## Training

Load a pretrained YOLO model from ultralytics

In [None]:
model = YOLO(pwd+"/Models/yolov8n.pt")

Train model on custom datasets

In [None]:
model.train(data="%s/data/data_train/EarVN1/data.yaml"%pwd, epochs=100)    # running time = 10 minutes

## Evaluation

### Evaluate on validation set

In [None]:
metrics = model.val()

<h3>Evaluations on custom dataset

### Evalutate the trained model on the training dataset

In [None]:
_ = model.val(split='train', save_json=True)

Evalutate the trained model on the validation dataset

In [None]:
_ = model.val(split='val', save_json=True)

### Realtime Detection

In [None]:
import cv2
from ultralytics import YOLO

Load the YOLOv8 model

In [None]:
model = YOLO('ear_model_5_subjects.pt')
# model = YOLO('ear_model_2_subjects.pt')

Define a video capture object

In [None]:
vid = cv2.VideoCapture(0)

Detection:

In [None]:
while(True):

    fps = vid.get(cv2.CAP_PROP_FPS)
    print('fps:', fps)
    # print("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(fps))

    # Capture the video frame by frame
    ret, frame = vid.read()

    # Run YOLOv8 inference on the frame
    results = model(frame)

    # Visualize the results on the frame
    annotated_frame = results[0].plot()

    # Display the annotated frame
    cv2.imshow("YOLOv8 Inference", annotated_frame)

    # # Display the resulting frame
    # cv2.imshow('frame', frame)

    # the 'q' button is set as the quitting button you may use any desired button of your choice
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# After the loop release the cap object
vid.release()
# Destroy all the windows
cv2.destroyAllWindows()

# Recognition

In [9]:
import torch
from torch import nn
from torch import optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchsummary import summary
import time
from datetime import timedelta
from torch.utils.tensorboard import SummaryWriter

In [10]:
writer = SummaryWriter(log_dir='./runs')
training_dir = '%s/data/data_train/EarVN1/train'%pwd
validation_dir = '%s/data/data_train/EarVN1/val'%pwd
# https://www.sciencedirect.com/science/article/pii/S2352340919309850

In [11]:
# input_dim = (32, 64)
# input_dim = (64, 128)
input_dim = (128, 256)

In [12]:
# ImageNet stats
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
# mean = (0.5, 0.5, 0.5)
# std = (0.5, 0.5, 0.5)

In [13]:
transform = transforms.Compose([
    transforms.Resize(size=input_dim),
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])

In [14]:
training_dataset = torchvision.datasets.ImageFolder(root=training_dir, transform=transform)
validation_dataset = torchvision.datasets.ImageFolder(root=validation_dir, transform=transform)

In [15]:
train_batch_size = 32
val_batch_size = 256
train_dataloader = DataLoader(training_dataset, batch_size=train_batch_size, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=val_batch_size, shuffle=False)

In [16]:
# use resnext
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnext50_32x4d', pretrained=True)

Using cache found in C:\Users\giakh/.cache\torch\hub\pytorch_vision_v0.10.0


In [17]:
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


In [18]:
model = model.to(device)
summary(model, (3, input_dim[0], input_dim[1]))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 64, 64, 128]           9,408
       BatchNorm2d-2          [-1, 64, 64, 128]             128
              ReLU-3          [-1, 64, 64, 128]               0
         MaxPool2d-4           [-1, 64, 32, 64]               0
            Conv2d-5          [-1, 128, 32, 64]           8,192
       BatchNorm2d-6          [-1, 128, 32, 64]             256
              ReLU-7          [-1, 128, 32, 64]               0
            Conv2d-8          [-1, 128, 32, 64]           4,608
       BatchNorm2d-9          [-1, 128, 32, 64]             256
             ReLU-10          [-1, 128, 32, 64]               0
           Conv2d-11          [-1, 256, 32, 64]          32,768
      BatchNorm2d-12          [-1, 256, 32, 64]             512
           Conv2d-13          [-1, 256, 32, 64]          16,384
      BatchNorm2d-14          [-1, 256,

In [19]:
# Hyperparameters
epochs = 10
learning_rate = 1e-3
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=model.parameters(), lr=learning_rate)

In [20]:
def train(dataloader, model, loss_function, optimizer, epoch):
    model.train()      # set the model in training mode
    avg_train_loss, correct = 0, 0
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        predictions = model(X)      # forward propagation
        loss = loss_function(predictions, y)        # loss
        avg_train_loss += loss.item()
        optimizer.zero_grad()   # zero the parameter gradients
        loss.backward()         # backpropagation
        optimizer.step()
        _, predicted = torch.max(predictions.data, 1)  # the class with the highest energy is what we choose as prediction
        correct += (predicted == y).sum().item()
        if batch % 20 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    avg_train_loss /= len(dataloader)
    train_accuracy = 100*correct/len(dataloader.dataset)
    statistics('training', train_accuracy, avg_train_loss, epoch)
    return

In [21]:
def evaluate_validation(dataloader, model, loss_function, epoch):
    model.eval()        # set to evaluation model
    avg_validation_loss, correct = 0, 0
    with torch.no_grad():
        for data in dataloader:
            images, labels = data[0].to(device), data[1].to(device)
            predictions = model(images)
            avg_validation_loss += loss_function(predictions, labels).item()       # loss
            _, predicted = torch.max(predictions.data, 1)   # the class with the highest energy is what we choose as prediction
            correct += (predicted == labels).sum().item()
    avg_validation_loss /= len(dataloader)
    validation_accuracy = 100*correct/len(dataloader.dataset)
    statistics('validation', validation_accuracy, avg_validation_loss, epoch)
    return

In [22]:
def statistics(dataset, accuracy, loss, epoch):
    writer.add_scalar('Loss/' + dataset, loss, epoch)
    writer.add_scalar('Accuracy/' + dataset, accuracy, epoch)
    print("{},\tLoss: {:.3f}\t| Accuracy: {:.3f}".format(dataset.title(), loss, accuracy))
    return

In [23]:
def optimize(epochs, train_dataloader, validation_dataloader, model, loss_function, optimizer):
    start_time = time.time()
    for i in range(epochs):
        print(f"\nEpoch {i+1}\n----------------------------------------------")
        train(train_dataloader, model, loss_function, optimizer, i)
        evaluate_validation(validation_dataloader, model, loss_function, i)
    end_time = time.time()
    time_dif = end_time - start_time
    print("Time usage: " + str(timedelta(seconds=int(round(time_dif)))))
    return

## Training

In [24]:
optimize(epochs, train_dataloader, validation_dataloader, model, loss_function, optimizer)


Epoch 1
----------------------------------------------


loss: 9.129079  [    0/50071]


In [None]:
print('Finished Training')
# training time, 3hrs 30 mins
torch.save(model.state_dict(), "ear_classifier.pth")
writer.close()

## Evaluation

In [None]:
import torch
from PIL import Image
import torchvision.transforms as transforms

In [None]:
img = Image.open("./test_input_ear.jpg")

In [None]:
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device} device")
path = "ear_classifier.pth"

In [None]:
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnext50_32x4d', pretrained=False)
model = model.to(device)
model.load_state_dict(torch.load(path))

In [None]:
# ImageNet stats
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
# mean = (0.5, 0.5, 0.5)
# std = (0.5, 0.5, 0.5)
input_dim = (128, 256)

In [None]:
transform = transforms.Compose([
    transforms.Resize(size=input_dim),
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])

In [None]:
input_tensor = transform(img)

In [None]:
input_tensor = input_tensor.unsqueeze(0)  # Add batch dimension if needed
input_tensor = input_tensor.to(device)

In [None]:
input_tensor.shape

In [None]:
model.eval()
with torch.no_grad():
    output = model(input_tensor)

In [None]:
# Interpret the output
probabilities = torch.nn.functional.softmax(output[0], dim=0)
predicted_class = torch.argmax(probabilities)
print(predicted_class+1)

## Graphs

In [None]:
from tensorboard import program
import webbrowser

In [None]:
log_dir = './runs/'
tb = program.TensorBoard()
tb.configure(argv=[None, '--logdir', log_dir])
url = tb.launch()
print(f"Tensorflow listening on {url}")
webbrowser.open_new('http://localhost:6006/')

# Kill process
# Windows
# netstat -ano | findstr :6006
# taskkill /F /PID {PID}