In [None]:
!pip install pycocotools --quiet
!git clone https://github.com/pytorch/vision.git
!git checkout v0.3.0

!cp vision/references/detection/utils.py ./
!cp vision/references/detection/transforms.py ./
!cp vision/references/detection/coco_eval.py ./
!cp vision/references/detection/engine.py ./
!cp vision/references/detection/coco_utils.py ./

In [None]:
import zipfile
import torch
import torch.nn as nn
import os
import cv2 as cv
import torch.optim as optim
from torchvision import transforms
import numpy as np
import torch.nn.functional as F
import timeit
import pickle
import matplotlib.pyplot as plt
import torchvision.models as models
from google.colab.patches import cv2_imshow

import random
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from xml.etree import ElementTree as et
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torch
import torchvision
from torchvision import transforms as torchtrans
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import sys

# these are the helper libraries imported.
from engine import train_one_epoch, evaluate
import utils
import transforms as T

# for image augmentations
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

# Extragerea datelor de antrenare pentru task-urile 1 si 2

In [None]:
def extragere_exemple_pozitive():
    nr_img = 0
    folders = ["barney", "betty", "fred", "wilma"]

    for folder_name in folders:
        cale_txt = os.path.join("antrenare", f"{folder_name}_annotations.txt")
        continut_txt = np.loadtxt(cale_txt, dtype='str')

        for i, linie in enumerate(continut_txt):
            nr_img += 1
            cale_img = os.path.join("antrenare", f"{folder_name}", linie[0])
            short_path = f"img_{nr_img:05}.jpg"
            cale_output = os.path.join("data", "exemplePozitive", short_path)

            imagine = cv.imread(cale_img)

            coord = np.array(linie[1:5], np.int32)
            chip_frumos = cv.resize(imagine[coord[1]:coord[3], coord[0]:coord[2]].copy(), (36,36))
            cv.imwrite(cale_output, chip_frumos)


In [None]:
def extragere_exemple_negative(num_regions=3, region_size=(36, 36)):
    folders = ["barney", "betty", "fred", "wilma"]

    for folder_name in folders:
        cale_txt = os.path.join("antrenare", f"{folder_name}_annotations.txt")
        continut_txt = np.loadtxt(cale_txt, dtype='str')

        for i in range(1000):
            short_path = f"{i+1:04}.jpg"
            cale_img = os.path.join("antrenare", f"{folder_name}", short_path)
            imagine = cv.imread(cale_img)
            height, width, _ = imagine.shape

            indici_per_poza = np.where(continut_txt[:, 0] == short_path)[0]
            face_boxes = [tuple(np.array(continut_txt[idx][1:5], np.int32)) for idx in indici_per_poza]

            non_face_regions = []

            while len(non_face_regions) < num_regions:
                rand_top = np.random.randint(0, height - region_size[0])
                rand_left = np.random.randint(0, width - region_size[1])

                overlaps_with_face = any(
                    not (rand_left + region_size[1] < xmin or rand_top + region_size[0] < ymin or
                         rand_left > xmax or rand_top > ymax)
                    for xmin, ymin, xmax, ymax in face_boxes
                )

                if not overlaps_with_face:
                    non_face_regions.append((rand_left, rand_top, region_size[1], region_size[0]))

            for j, (left, top, width, height) in enumerate(non_face_regions):
                region = imagine[top:top + height, left:left + width]
                cale_output = os.path.join("data", "exempleNegative", f"{folder_name}_{i+1:04}_{j}.jpg")
                cv.imwrite(cale_output, region)


In [None]:
num_epochs = 10
data_path = ""
train_path_pozitive = data_path + "/exemplePozitive"
train_path_negative = data_path + "/exempleNegative"


# Antrenarea modelelor

In [None]:
model_alexnet = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=False)
model_barney = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=False)
model_betty = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=False)
model_fred = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=False)
model_wilma = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=False)
model_unknown = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=False)

In [None]:
nr_filters = model_alexnet.classifier[4].out_features
model_alexnet.classifier[6] = nn.Linear(nr_filters, 2)
model_barney.classifier[6] = nn.Linear(nr_filters, 2)
model_betty.classifier[6] = nn.Linear(nr_filters, 2)
model_fred.classifier[6] = nn.Linear(nr_filters, 2)
model_wilma.classifier[6] = nn.Linear(nr_filters, 2)
model_unknown.classifier[6] = nn.Linear(nr_filters, 2)

### Task 1

In [None]:
list_images_paths_pozitive = os.listdir(train_path_pozitive)
list_images_paths_negative = os.listdir(train_path_negative)

In [None]:
train_dataset = []
current_label = 1 # positive images

for image_name in list_images_paths_pozitive:
    image = cv.imread(train_path_pozitive + "/" + image_name)
    train_dataset.append((image, current_label))

In [None]:
current_label = 0 # negative images

for image_name in list_images_paths_negative:
    image = cv.imread(train_path_pozitive + "/" + image_name)
    train_dataset.append((image, current_label))

In [None]:
def resize_image(image, size=64):
    resized_image = cv.resize(image, (size, size))
    return resized_image


transformed_data = []
for image, label in train_dataset:

    transformed_image = resize_image(image)
    transformed_image = torch.tensor(transformed_image).permute(2, 0, 1).float()
    transformed_image = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(transformed_image)
    transformed_data.append((transformed_image, label))


In [None]:
from torch.utils.data import DataLoader

data_loader = {
    'train' : torch.utils.data.DataLoader(transformed_data[:15000], batch_size=100, shuffle=True, num_workers=1),
    'validation' : torch.utils.data.DataLoader(transformed_data[15000:], batch_size=100, shuffle=True, num_workers=1)
}

In [None]:
if(torch.cuda.is_available()):
    model_alexnet = model_alexnet.cuda()

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_alexnet.parameters(), lr=0.001, momentum=0.9)

total = 0
correct = 0
start = time.time()

for epoch in range(num_epochs):

    for i, data in enumerate(data_loader['train']):
        images, labels = data
        model_alexnet.train()
        if(torch.cuda.is_available()):
            images = images.cuda()
            labels = labels.cuda()

        optimizer.zero_grad()
        outputs = model_alexnet(images.float())

        _, predicted = torch.max(outputs, 1)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        labels = labels.type(torch.LongTensor)
        labels = labels.to(device)
        loss = criterion(outputs, labels)
        if(i % 50 == 0):
            print(f'Epoca: {epoch} Batch: {i} loss: {loss.item()}')
            print(f'Acuratete antrenare: {(correct/total)*100}')

        loss.backward()
        optimizer.step()

    with torch.no_grad():
        model_alexnet.eval()
        validation_correct = 0
        validation_total = 0
        for i, data in enumerate(data_loader['validation'], 1):
            images, labels = data
            if(torch.cuda.is_available()):
                images = images.cuda()
                labels = labels.cuda()

            outputs = model_alexnet(images.float())
            _, predicted = torch.max(outputs, 1)

            validation_total += labels.size(0)
            validation_correct += (predicted == labels).sum().item()
            if(i % 10 == 0):
                print(f'Acuratete validare: {validation_correct/validation_total)*100}\n\n')


print(f'Timp antrenare {time.time()-start} secunde')
print(f'Acuratete antrenare {(correct/total)*100}')



In [None]:
torch.save(model_alexnet.state_dict(), '/content/drive/MyDrive/Colab Notebooks/proiect_cava2/model_alexnet_task1.pth')

### Task 2

In [None]:
dict_characters = {
    "barney": [],
    "betty": [],
    "fred": [],
    "wilma": [],
    "unknown": []
}

dict_characters_transformed = {
    "barney": [],
    "betty": [],
    "fred": [],
    "wilma": [],
    "unknown": []
}

labels = {
    "barney": 0,
    "betty": 1,
    "fred": 2,
    "wilma": 3,
    "unknown": 4
}

transformed_list_labels = []

In [None]:
for elem, lista_elem in dict_characters.items():
    elem_images_path = os.listdir(f"/content/drive/MyDrive/Colab Notebooks/proiect_cava2/train_{elem}")
    print(f"{elem} are {len(elem_images_path)} imagini")

    for image_name in elem_images_path:
        image = cv.imread(f"/content/drive/MyDrive/Colab Notebooks/proiect_cava2/train_{elem}/{image_name}")
        lista_elem.append(image)

        transformed_image = torch.tensor(image.copy()).permute(2, 0, 1).float()
        transformed_image = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(transformed_image)

        transformed_list_labels.append((transformed_image, labels[elem]))

In [None]:
for idx, face_model in enumerate([model_barney, model_betty, model_fred, model_wilma, model_unknown]):
    if(torch.cuda.is_available()):
        face_model = face_model.cuda()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(face_model.parameters(), lr=0.001, momentum=0.9)

    train_model_data = []
    for j in range(len(transformed_list_labels)):
        # daca e din clasa pe care vrem sa o antrenam ii trecem clasa 1, altfel 0
        if transformed_list_labels[j][1] == idx:
            ind_class = 1
        else:
            ind_class = 0
        train_model_data.append((transformed_list_labels[j][0], ind_class))
    print(np.shape(train_model_data))

    data_loader_class = torch.utils.data.DataLoader(train_model_data,
                                          batch_size=100,
                                          shuffle=True,
                                          num_workers=1)

    print(f'Antrenare {face_model}')
    num_epochs = 10
    total = 0
    correct = 0
    start = time.time()

    for epoch in range(num_epochs):

        for i, data in enumerate(data_loader_class):
            images, labels = data
            face_model.train()
            if(torch.cuda.is_available()):
                images = images.cuda()
                labels = labels.cuda()

            optimizer.zero_grad()
            outputs = face_model(images.float())
            _, predicted = torch.max(outputs, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()


            labels = labels.type(torch.LongTensor)
            labels = labels.to(device)
            loss = criterion(outputs, labels)
            if(i % 50 == 0):
                print(f'Epoca: {epoch} Batch: {i} loss: {loss.item()}')
                print(f'Acuratete antrenare: {(correct/total)*100}')
            loss.backward()
            optimizer.step()


    print(f'Timp antrenare {time.time()-start} secunde')
    print(f'Acuratete antrenare {(correct/total)*100}')



In [None]:
torch.save(model_barney.state_dict(), '/content/drive/MyDrive/Colab Notebooks/proiect_cava2/modeL_barney_alexnet.pth')
torch.save(model_betty.state_dict(), '/content/drive/MyDrive/Colab Notebooks/proiect_cava2/model_betty_alexnet.pth')
torch.save(model_fred.state_dict(), '/content/drive/MyDrive/Colab Notebooks/proiect_cava2/model_fred_alexnet.pth')
torch.save(model_wilma.state_dict(), '/content/drive/MyDrive/Colab Notebooks/proiect_cava2/model_wilma_alexnet.pth')
torch.save(model_unknown.state_dict(), '/content/drive/MyDrive/Colab Notebooks/proiect_cava2/model_unknown_alexnet.pth')


# Antrenarea modelului Faster-RCNN

In [None]:
class FlintstoneImagesDataset(torch.utils.data.Dataset):
    def __init__(self, files_dir, labels_file, transforms=None):
        self.transforms = transforms
        self.files_dir = files_dir
        self.labels_file = labels_file

        # Citirea etichetelor din fișierul text
        self.annotations = self.read_annotations(labels_file)

        # Clasele (presupunem că sunt distincte și ordonate)
        self.classes = ['_', 'barney', 'betty', 'fred', 'wilma', 'unknown']

    def read_annotations(self, labels_file):
        annotations = {}
        with open(labels_file, 'r') as file:
            for line in file:
                parts = line.strip().split()
                img_name, bbox = parts[0], list(map(int, parts[1:5]))
                class_name = parts[5]
                if img_name not in annotations:
                    annotations[img_name] = []
                annotations[img_name].append(bbox + [class_name])
        return annotations

    def __getitem__(self, idx):
        img_name = sorted(list(self.annotations.keys()))[idx]
        image_path = os.path.join(self.files_dir, img_name)
        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img_res = img_rgb / 255.0  # Normalize the image

        boxes = []
        labels = []
        for box in self.annotations[img_name]:
            xmin, ymin, xmax, ymax, class_name = box
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(self.classes.index(class_name))

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)
        image_id = os.path.splitext(img_name)[0]
        target = {"boxes": boxes, "labels": labels, "area": area, "iscrowd": iscrowd, "image_id": image_id}
        # target = {"boxes": boxes, "labels": labels, "area": area, "iscrowd": iscrowd, "image_id": torch.tensor([idx])}

        if self.transforms:
            sample = self.transforms(image=img_res, bboxes=boxes, labels=labels)
            img_res = sample['image']
            target['boxes'] = torch.Tensor(sample['bboxes'])

        return img_res, target

    def __len__(self):
        return len(self.annotations)

In [None]:
def get_object_detection_model(num_classes):

    # load a model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [None]:
def get_transform():
    return A.Compose([
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})


In [None]:
dataset = FlintstoneImagesDataset(files_dir, labels_file, transforms=get_transform())

torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()

test_split = 0.2
tsize = int(len(dataset)*test_split)
dataset = torch.utils.data.Subset(dataset, indices[:-tsize])
dataset_test = torch.utils.data.Subset(dataset, indices[-tsize:])

data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=10, shuffle=True, num_workers=4,
    collate_fn=utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=10, shuffle=False, num_workers=4,
    collate_fn=utils.collate_fn)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

num_classes = 6
model = get_object_detection_model(num_classes)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [None]:
num_epochs = 5

for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    lr_scheduler.step()
    evaluate(model, data_loader_test, device=device)

In [None]:
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'lr_scheduler_state_dict': lr_scheduler.state_dict(),
    'epoch': epoch,
}, 'model_epoch_{}.pth'.format(epoch))
