In [1]:
import time
import copy

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from torch import nn
from torch.utils.data import DataLoader
from torchvision.utils import make_grid

import tqdm
from tqdm.auto import tqdm;

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Make a device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"

In [3]:
device

'cuda'

In [4]:
#GPUS
n_gpus = torch.cuda.device_count()
print(f"Number of gpus: {n_gpus}")

Number of gpus: 1


In [5]:
class ConvBlock (nn.Module):
    def __init__(self,
                in_channels: int,
                out_channels: int,
                 **kwargs): # this means that the key arguments are arbitrary
        super().__init__()

        self.relu = nn.ReLU()
        self.conv = nn.Conv2d(in_channels=in_channels,
                            out_channels=out_channels,
                              **kwargs,
                            device=device)
        self.batchnorm = nn.BatchNorm2d(num_features=out_channels) # to improve performance
    
    def forward(self,x):
        x = self.conv(x)
        x = self.batchnorm(x)
        x = self.relu(x)
        return x

In [6]:
class L2NormLayer(nn.Module):
    def __init__(self,dim=1):
        super().__init__()
        self.dim=dim
    
    def forward(self,x):
        return nn.functional.normalize(x,p=2,dim=self.dim)

In [7]:
class InceptionBlock (nn.Module):
        def __init__(self,
                in_channels: int,
                out_1x1: int,

                red_3x3: int,
                out_3x3: int,

                red_5x5: int,
                out_5x5: int,

                out_1x1pool: int,
                
                l2_mode:bool=False):
        
                super().__init__()

                self.branch1 = ConvBlock(in_channels=in_channels,
                                        out_channels=out_1x1,
                                        kernel_size=1)
                
                self.branch2 = nn.Sequential(
                        ConvBlock(in_channels=in_channels,
                                out_channels=red_3x3,
                                kernel_size=1),
                        ConvBlock(in_channels=red_3x3,
                                out_channels=out_3x3,
                                kernel_size=3,
                                padding=1) # ojo (btw no ponemos el S bc por defecto es 1)
                )
                
                self.branch3 = nn.Sequential(
                        ConvBlock(in_channels=in_channels,
                                out_channels=red_5x5,
                                kernel_size=1),
                        ConvBlock(in_channels=red_5x5,
                                out_channels=out_5x5,
                                kernel_size=5,
                                padding=2) # ojo (btw no ponemos el S bc por defecto es 1)
                )

                if l2_mode:
                        self.branch4=nn.Sequential(
                                L2NormLayer(dim=1),
                                ConvBlock(in_channels=in_channels,
                                out_channels=out_1x1pool,
                                kernel_size=1)
                        )
                else:
                        self.branch4 = nn.Sequential(
                                nn.MaxPool2d(kernel_size=3,stride=1,padding=1),
                                ConvBlock(in_channels=in_channels,
                                        out_channels=out_1x1pool,
                                        kernel_size=1)
                        )

        def forward(self,x):
        # N x filters x 28 x 28 → 0th x 1st x 2nd x 3rd dimension (we use 1)
                return torch.cat([self.branch1(x),self.branch2(x),self.branch3(x),self.branch4(x)],1)

In [8]:
class InceptionBlock_2(nn.Module):
    def __init__(self, 
                in_channels:int, 
                red_3x3:int,
                out_3x3:int) -> None:
        super().__init__()
        self.branch=nn.Sequential(
            ConvBlock(in_channels=in_channels,
                    out_channels=red_3x3,
                    kernel_size=1),
            ConvBlock(in_channels=red_3x3,
                    out_channels=out_3x3,
                    kernel_size=3)
        )
    def forward(self,x):
        return self.branch(x)

In [9]:
class InceptionBlock_m_3x3 (nn.Module):
        def __init__(self,
                in_channels: int,

                red_3x3: int,
                out_3x3: int,

                red_5x5: int,
                out_5x5: int):
        
                super().__init__()

                self.branch2 = nn.Sequential(
                        ConvBlock(in_channels=in_channels,
                                out_channels=red_3x3,
                                kernel_size=1),
                        ConvBlock(in_channels=red_3x3,
                                out_channels=out_3x3,
                                kernel_size=3,
                                padding=1,
                                stride=2) # ojo (btw no ponemos el S bc por defecto es 1)
                )
                
                self.branch3 = nn.Sequential(
                        ConvBlock(in_channels=in_channels,
                                out_channels=red_5x5,
                                kernel_size=1),
                        ConvBlock(in_channels=red_5x5,
                                out_channels=out_5x5,
                                kernel_size=5,
                                padding=2,
                                stride=2) # ojo (btw no ponemos el S bc por defecto es 1)
                )

                self.branch4 = nn.Sequential(
                        nn.MaxPool2d(kernel_size=3,stride=2,padding=1), #TODO: VERIFICAR LO DEL PADDING CON JAVIER
                )

        def forward(self,x):
        # N x filters x 28 x 28 → 0th x 1st x 2nd x 3rd dimension (we use 1)
                return torch.cat([self.branch2(x),self.branch3(x),self.branch4(x)],1)

## Structure of the Model

In [10]:
class NN2 (nn.Module):
    def __init__(self,
                in_channels = 3):
        super().__init__()

        self.conv1 = ConvBlock(in_channels=in_channels,
                            out_channels=64,
                            kernel_size=7,
                            stride=2,
                            padding=3)
        
        self.inception2 = InceptionBlock_2(in_channels=64, #la cantidad de canales que entran y salen es la misma. ¿esto es correcto?
                                        red_3x3=64,
                                        out_3x3=192)
        
        self.inception3a = InceptionBlock( in_channels= 192, out_1x1= 64, red_3x3= 96, out_3x3= 128, red_5x5= 16,out_5x5= 32, out_1x1pool= 32)#ok
        self.inception3b = InceptionBlock( in_channels= 256, out_1x1= 64, red_3x3= 96, out_3x3= 128, red_5x5= 32,out_5x5= 64, out_1x1pool= 64,l2_mode=True)
        #modificar desactivando branch 1 y 4
        self.inception3c = InceptionBlock_m_3x3( in_channels= 320, red_3x3= 128, out_3x3= 256, red_5x5= 32,out_5x5= 64)
        
        self.inception4a = InceptionBlock( in_channels= 640, out_1x1= 256, red_3x3= 96, out_3x3= 192, red_5x5= 32,out_5x5= 64, out_1x1pool= 128,l2_mode=True)
        self.inception4b = InceptionBlock( in_channels= 640, out_1x1= 224, red_3x3= 112, out_3x3= 224, red_5x5= 32,out_5x5= 64, out_1x1pool= 128,l2_mode=True)
        self.inception4c = InceptionBlock( in_channels= 640, out_1x1= 192, red_3x3= 128, out_3x3= 256, red_5x5= 32,out_5x5= 64, out_1x1pool= 128,l2_mode=True)
        self.inception4d = InceptionBlock( in_channels= 640, out_1x1= 160, red_3x3= 144, out_3x3= 288, red_5x5= 32,out_5x5= 64, out_1x1pool= 128,l2_mode=True)
        #modificar desactivando branch 1 y 4
        self.inception4e = InceptionBlock_m_3x3( in_channels= 640, red_3x3= 160, out_3x3= 256, red_5x5= 64,out_5x5= 128)
        
        self.inception5a = InceptionBlock( in_channels= 1024, out_1x1= 384, red_3x3= 192, out_3x3= 384, red_5x5= 48,out_5x5= 128, out_1x1pool= 128,l2_mode=True)
        self.inception5b = InceptionBlock( in_channels= 1024, out_1x1= 384, red_3x3= 192, out_3x3= 384, red_5x5= 48,out_5x5= 128, out_1x1pool= 128)#ok
        
        self.maxpool = nn.MaxPool2d(kernel_size=3,stride=2,padding=1) #por que se aplico un max pool?

        self.avgpool = nn.AvgPool2d(kernel_size=7) # stride? padding?
        
        self.FC = nn.Linear(1024,128)

        self.norm=nn.BatchNorm2d(num_features=64)


    def forward(self,x):
        x = self.conv1(x)
        x = self.maxpool(x)

        x=self.norm(x)

        x = self.inception2(x)
        x = self.maxpool(x)

        x = self.inception3a(x)
        x = self.inception3b(x)
        x = self.inception3c(x)

        x = self.inception4a(x)
        x = self.inception4b(x)
        x = self.inception4c(x)
        x = self.inception4d(x)
        x = self.inception4e(x)

        x = self.inception5a(x)
        x = self.inception5b(x)
        #print('Despues del inception 5b',x.shape)
        x = self.avgpool(x)
        #print('Despues del avg pool',x.shape)

        x=x.view(x.shape[0],-1)

        x = self.FC(x)
        #print('Despues de fully connected layer',x.shape)
        x = nn.functional.normalize(x,p=2,dim=1)

        return x

In [11]:
torch.manual_seed(42)
model_1 = NN2(in_channels=3).to(device)

In [12]:
#Sample de prueba

sample=torch.randn(size=(1,3,224,224)).to(device)

In [13]:
sample.shape

torch.Size([1, 3, 224, 224])

In [14]:
model_1.eval()
with torch.inference_mode():
    model_1(sample)

## Transform

In [15]:
transform=transforms.Compose([
    transforms.Resize((224,224)),  #Para pasar a 224 x 224 pixels
    transforms.ToTensor()          #Para convertir la imagen a tensor
])

## Path of dataset

In [16]:
import os

print(os.getcwd())

c:\Tesis\Tesis\Codigo\model_pytorch


In [17]:
from pathlib import Path

root_data_path=Path('data')
train_path=root_data_path / 'train'

test_path=root_data_path / 'test'

train_path

WindowsPath('data/train')

In [18]:
img_path_list=list(test_path.glob('*/*.jpg'))

import random
from PIL import Image

random_image_path=random.choice(img_path_list)
img=Image.open(random_image_path)

transform(img).shape

torch.Size([3, 224, 224])

In [19]:
train_data=torchvision.datasets.ImageFolder(train_path,
                                        transform=transform,
                                        target_transform=None)

test_data=torchvision.datasets.ImageFolder(test_path,
                                        transform=transform)

In [20]:
train_data.samples

[('data\\train\\cinthia\\rostro_0.jpg', 0),
 ('data\\train\\cinthia\\rostro_1.jpg', 0),
 ('data\\train\\cinthia\\rostro_2.jpg', 0),
 ('data\\train\\cinthia\\rostro_3.jpg', 0),
 ('data\\train\\cinthia\\rostro_4.jpg', 0),
 ('data\\train\\cinthia\\rostro_5.jpg', 0),
 ('data\\train\\cinthia\\rostro_6.jpg', 0),
 ('data\\train\\cinthia\\rostro_7.jpg', 0),
 ('data\\train\\cinthia\\rostro_8.jpg', 0),
 ('data\\train\\cinthia\\rostro_9.jpg', 0),
 ('data\\train\\wia\\rostro_0.jpg', 1),
 ('data\\train\\wia\\rostro_1.jpg', 1),
 ('data\\train\\wia\\rostro_2.jpg', 1),
 ('data\\train\\wia\\rostro_3.jpg', 1),
 ('data\\train\\wia\\rostro_4.jpg', 1),
 ('data\\train\\wia\\rostro_5.jpg', 1),
 ('data\\train\\wia\\rostro_6.jpg', 1),
 ('data\\train\\wia\\rostro_7.jpg', 1),
 ('data\\train\\wia\\rostro_8.jpg', 1),
 ('data\\train\\wia\\rostro_9.jpg', 1),
 ('data\\train\\william\\rostro_0.jpg', 2),
 ('data\\train\\william\\rostro_1.jpg', 2),
 ('data\\train\\william\\rostro_2.jpg', 2),
 ('data\\train\\william\\ros

## Split the data

In [57]:
BATCH_SIZE=10

In [22]:
train_data

Dataset ImageFolder
    Number of datapoints: 30
    Root location: data\train
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
           )

In [40]:
test_data

Dataset ImageFolder
    Number of datapoints: 6
    Root location: data\test
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
           )

In [58]:
train_loader=DataLoader(dataset=train_data,
                        batch_size=BATCH_SIZE,
                        num_workers=2,
                        shuffle=True)

test_loader=DataLoader(dataset=test_data,
                        batch_size=BATCH_SIZE,
                        num_workers=2,
                        shuffle=False)

In [45]:
len(train_loader)

6

In [24]:
image_batch,label_batch=next(iter(train_loader))

In [25]:
len(train_loader)

2

In [26]:
label_batch

tensor([0, 0, 1, 1, 2])

In [27]:
optimizer=optim.Adam(params=model_1.parameters(),
                    lr=0.1)
loss_fn=nn.TripletMarginLoss(margin=0.2,p=2)

In [28]:
train_data.class_to_idx

{'cinthia': 0, 'wia': 1, 'william': 2}

In [47]:
dict_reference=dict()
for key,value in train_data.class_to_idx.items():
    dict_reference[value]=[]

dict_reference

{0: [], 1: [], 2: []}

In [30]:
range(len(dict_reference.keys()))

range(0, 3)

In [31]:
batch_images,batch_labels=next(iter(train_loader))
batch_labels

tensor([0, 0, 1, 1, 2])

In [55]:
from copy import deepcopy
import random

In [None]:
def embedding_extracted():
    return

In [60]:
# #CODE

# for batch, (X,y) in enumerate(train_loader):

#     X,y=X.to(device),y.to(device)
#     y_preds=model_1(X) #embedding vector

#     # #loss=loss_fn(y_preds,y)
#     # #optimizer.zero_grad()
    
#     # #loss_fn.backward()
#     # #optimizer.step()

#     print(f'Batch #{batch}')

#     data_loss_triple_function = deepcopy(dict_reference) 

#     for embedding,label in zip(y_preds,y):
#         data_loss_triple_function[(label.cpu().item())].append(embedding)
    
#     print('Embeddings 0',len(data_loss_triple_function[0]))
#     print('Embeddings 1',len(data_loss_triple_function[1]))
#     print('Embeddings 2',len(data_loss_triple_function[2]))
#     # print('-------------------------------------------------')

#     for clase in data_loss_triple_function.keys():
#         if len(data_loss_triple_function[clase]) >1 :
#             print('Useful case',clase)
            


    
#     print('-------------------------------------------------')

#     # for elements in range(len(data_loss_triple_function.keys())):
#     #     print(batch)



Batch #0
Embeddings 0 5
Embeddings 1 3
Embeddings 2 2
Useful case 0
Useful case 1
Useful case 2
-------------------------------------------------
Batch #1
Embeddings 0 2
Embeddings 1 5
Embeddings 2 3
Useful case 0
Useful case 1
Useful case 2
-------------------------------------------------
Batch #2
Embeddings 0 3
Embeddings 1 2
Embeddings 2 5
Useful case 0
Useful case 1
Useful case 2
-------------------------------------------------


In [33]:
# def train_step(
#         model:nn.Module,
#         dataloader:torch.utils.data.dataloader,
#         loss_fn:nn.Module,
#         optimizer:torch.optim.Optimizer,
#         device):
    
#     model.train()

#     train_loss,train_acc=0,0

#     for batch, (X,y) in enumerate(dataloader):
#         X,y=X.to(device),y.to(device)
#         y_preds=model(X)
#         loss=loss_fn(y_preds,y)
#         train_loss+=loss

#         optimizer.zero_grad()

#         loss_fn.backward()

#         optimizer.step()


In [65]:
from pytorch_metric_learning.distances import CosineSimilarity
from pytorch_metric_learning.reducers import ThresholdReducer
from pytorch_metric_learning.regularizers import LpRegularizer
from pytorch_metric_learning import losses
from pytorch_metric_learning.miners import TripletMarginMiner

distance=CosineSimilarity()
reducer = ThresholdReducer(high=0.3)
embedding_regularizer = LpRegularizer()

In [66]:
loss_func = losses.TripletMarginLoss(distance = CosineSimilarity(), 
                                    reducer = ThresholdReducer(high=0.3), 
                                    embedding_regularizer = LpRegularizer())

In [112]:
minig_func_train=TripletMarginMiner(margin=0.2,distance=distance,type_of_triplets='hard')

minig_func_test=TripletMarginMiner(margin=0.2,distance=distance,type_of_triplets='easy')

In [109]:
def train_step(
        model:torch.nn.Module,
        dataloader:torch.utils.data.dataloader,
        loss_fn:torch.nn.Module,
        optimizer:torch.optim.Optimizer,
        mining_func,
        device):
    
    model.train()

    train_loss,train_acc=0,0

    for batch, (X,y) in enumerate(dataloader):
        #Send data to the target device
        X,y=X.to(device),y.to(device)

        #1. Forward pass
        embeddings=model(X) #Output model logits

        indices_tuple=mining_func(embeddings,y)

        #2. Calculate the loss

        anchors=embeddings[indices_tuple[0]]
        positives=embeddings[indices_tuple[0]]
        negatives=embeddings[indices_tuple[0]]

        loss=loss_fn(anchors,positives,negatives)

        loss=torch.nan_to_num(loss,nan=0.0)

        train_loss+=loss.item()

        #3.Optimizer zero grad
        optimizer.zero_grad()

        #4. Loss backward
        loss.backward()

        #5. Optimizer step
        optimizer.step()

        #Calculate accuracy metric
        # y_pred_class=torch.argmax(torch.softmax(embeddings,dim=1),dim=1)
        # train_acc+= (y_pred_class == y).sum().item() / len(embeddings)

    #Adjust metrics to get average loss and accuracy per batch
    train_loss= train_loss/len(dataloader)
    #train_acc=train_acc / len(dataloader)
    print(train_loss)
    #return train_loss,train_acc
    return train_loss

In [110]:
train_loss_model,train_acc_model=train_step(model=model_1,
                                            dataloader=train_loader,
                                            loss_fn=loss_fn,
                                            optimizer=optimizer,
                                            mining_func=minig_func_train,
                                            device=device)

In [117]:
train_loss_model

0.13333333532015482

In [113]:
def test_step(
        model:torch.nn.Module,
        dataloader:torch.utils.data.dataloader,
        loss_fn:torch.nn.Module,
        mining_func,
        device=device):
    
    #Put model in eval mode
    model.eval()

    test_loss,test_acc=0,0

    with torch.inference_mode():
        #Loop through DataLoader batches

        for batch, (X,y) in enumerate(dataloader):
            #Send data to the target device
            X,y=X.to(device),y.to(device)

            #1. Forward pass
            embeddings=model(X)

            indices_tuple=mining_func(embeddings,y)

            anchors=embeddings[indices_tuple[0]]
            positives=embeddings[indices_tuple[0]]
            negatives=embeddings[indices_tuple[0]]

            #2. Calculate the loss
            loss=loss_fn(anchors,positives,negatives)

            loss=torch.nan_to_num(loss,nan=0.0)

            test_loss+=loss.item()

            #3. Calculate the accuracy
            # test_preds_labels=torch.argmax(torch.softmax(embeddings,dim=1),dim=1)
            # test_acc+= (test_preds_labels==y).sum().item() / len(test_preds_labels)

    #Adjust the metrics to get averague loss and accuracy per batch
    test_loss=test_loss / len(dataloader)
    #test_acc=test_acc / len(dataloader)

    #return test_loss,test_acc
    return test_loss

In [114]:
test_loss=test_step(model=model_1,
        dataloader=test_loader,
        loss_fn=loss_fn,
        mining_func=minig_func_test,
        device=device)

In [115]:
test_loss

0.0

In [135]:
def train_model(model:torch.nn.Module,
        train_dataloader:torch.utils.data.dataloader,
        test_dataloader:torch.utils.data.dataloader,
        optimizer:torch.optim.Optimizer,
        loss_fn:torch.nn.Module,
        minig_func_train,
        minig_func_test,
        epochs=5,
        device=device):
    
        #2. Create empty results dictionary
        # results={"train_loss":[],
        #         "train_acc":[],
        #         "test_loss":[],
        #         "test_acc":[]}

        results={"train_loss":[],
                "test_loss":[]}

        #3. Loop through training and testing step for a number of epochs
        for epoch in tqdm(range(epochs)):
                train_loss=train_step(model=model,
                                    dataloader=train_dataloader,
                                    loss_fn=loss_fn,
                                    optimizer=optimizer,
                                    mining_func=minig_func_train,
                                    device=device)
                test_loss=test_step(model=model,
                                    dataloader=test_dataloader,
                                    loss_fn=loss_fn,
                                    mining_func=minig_func_test,
                                    device=device)
        
                #4. Print out what's happening
                #print('Train loss',train_loss)
                #print('Test loss',test_loss)
                print(f"Epoch: {epoch} | Train loss:{train_loss} | Test loss : {test_loss}")
                #5. Update results diccionary
                results["train_loss"].append(train_loss)
                results["test_loss"].append(test_loss)

        return results

In [136]:
torch.manual_seed(42)
model_1 = NN2(in_channels=3).to(device)
optimizer=optim.Adam(params=model_1.parameters(),
                    lr=0.1)
loss_fn=nn.TripletMarginLoss(margin=0.2,p=2)

In [137]:
results_model_1=train_model(
    model=model_1,
    train_dataloader=train_loader,
    test_dataloader=test_loader,
    optimizer=optimizer,
    loss_fn=loss_fn,
    minig_func_train=minig_func_train,
    minig_func_test=minig_func_test,
    epochs=10,
    device=device
)

 10%|█         | 1/10 [00:05<00:48,  5.37s/it]

Epoch: 0 | Train loss:(0.13333334028720856, 0.0) | Test loss : 0.0


 20%|██        | 2/10 [00:10<00:43,  5.38s/it]

Epoch: 1 | Train loss:(0.13333334028720856, 0.0) | Test loss : 0.0


 30%|███       | 3/10 [00:15<00:36,  5.26s/it]

Epoch: 2 | Train loss:(0.13333333532015482, 0.0) | Test loss : 0.0


 40%|████      | 4/10 [00:21<00:31,  5.25s/it]

Epoch: 3 | Train loss:(0.20000000298023224, 0.0) | Test loss : 0.0


 50%|█████     | 5/10 [00:26<00:26,  5.27s/it]

Epoch: 4 | Train loss:(0.20000000298023224, 0.0) | Test loss : 0.0


 60%|██████    | 6/10 [00:31<00:21,  5.34s/it]

Epoch: 5 | Train loss:(0.06666666766007741, 0.0) | Test loss : 0.0


 70%|███████   | 7/10 [00:37<00:15,  5.32s/it]

Epoch: 6 | Train loss:(0.13333334028720856, 0.0) | Test loss : 0.0


 80%|████████  | 8/10 [00:42<00:10,  5.40s/it]

Epoch: 7 | Train loss:(0.06666666766007741, 0.0) | Test loss : 0.0


 90%|█████████ | 9/10 [00:48<00:05,  5.39s/it]

Epoch: 8 | Train loss:(0.06666666766007741, 0.0) | Test loss : 0.0


100%|██████████| 10/10 [00:53<00:00,  5.36s/it]

Epoch: 9 | Train loss:(0.20000000794728598, 0.0) | Test loss : 0.0





In [138]:
results_model_1

{'train_loss': [(0.13333334028720856, 0.0),
  (0.13333334028720856, 0.0),
  (0.13333333532015482, 0.0),
  (0.20000000298023224, 0.0),
  (0.20000000298023224, 0.0),
  (0.06666666766007741, 0.0),
  (0.13333334028720856, 0.0),
  (0.06666666766007741, 0.0),
  (0.06666666766007741, 0.0),
  (0.20000000794728598, 0.0)],
 'test_loss': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]}