In [590]:
import torch 
import numpy as np 
import matplotlib.pyplot as plt 
from torch import nn 
import config

In [591]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [592]:
# setup training device
device = "cuda" if torch.cuda.is_available() else "cpu"

# get efficientnet-b0 as a backbone
efficientnet = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_efficientnet_b0', pretrained=True)

# delete the last classifier layer
efficientnet.classifier.fc= nn.Identity()

# freeze all parameters for efficientnet-b0
for param in efficientnet.parameters():
    param.requires_grad = False

# putting model on device 
efficientnet= efficientnet.to(device)

Using cache found in C:\Users\moaaz/.cache\torch\hub\NVIDIA_DeepLearningExamples_torchhub


In [630]:
# siamese network
class Model(nn.Module): 
    def __init__(self  ): 
        """ 
        model input shape is (batch size , CH, W , H) 
        """
        super().__init__()
        self.pool = nn.Sequential( 
            nn.AvgPool1d(kernel_size = 3, stride=2 , padding=1)
        )
        self.f1 = nn.Sequential( 
            nn.Linear(in_features=640, out_features=1024 ),
            nn.ReLU(),
            nn.Dropout(p=0.2 ),
            nn.BatchNorm1d(num_features=1024)
        ) 
        self.f2 = nn.Sequential( 
            nn.Linear(in_features=1024, out_features=512 ),
            nn.ReLU(),
            nn.Dropout(p=0.2 ),
            nn.BatchNorm1d(num_features=512)
        )
        self.f3= nn.Sequential( 
            nn.Linear(in_features=512, out_features=256 ),
            nn.ReLU(),
            nn.Dropout(p=0.2 ),
            nn.BatchNorm1d(num_features=256))
        self.embed= nn.Sequential( 
            nn.Linear(in_features=256, out_features=128 ),
        ) 
    def forward (self , x ):
        x= efficientnet(x)
        x=self.pool(x)
        x= self.f1(x)
        x= self.f2(x)
        x= self.f3(x)
        x= self.embed(x)
        return x
        
model = Model().to(device)

In [631]:
# our loss calculation class 
class loss_fn(): 
    def __init__(self, margin , model):
        
        self.margin = torch.tensor(margin).to(device)
        self.model = model
        
    def _embedding(self,inputs : list[torch.tensor]):
        """
        inputs : list[torch.tensor] with the shape (positive, anchor, negative)
        output : list[torch.tensor] with the shape (positive, anchor, negative)
        """
        # [3,3,256,256]
        positive = self.model(inputs[0])
        anchor = self.model(inputs[1])
        negative = self.model(inputs[2])

        return [positive,anchor,negative]
        
    def compute_distance(self, inputs):

        embeddings = self._embedding(inputs)
        
        anchorEmbedding = embeddings[1]
        positiveEmbedding = embeddings[0]
        negativeEmbedding = embeddings[2]
    
        # calculate the anchor to positive and negative distance
        apDistance = torch.sum( torch.square(anchorEmbedding - positiveEmbedding), axis=-1)
        anDistance = torch.sum( torch.square(anchorEmbedding - negativeEmbedding), axis=-1 )
        return apDistance , anDistance
    
    def compute_loss(self , apDistance , anDistance):
        # return the distances
        loss = apDistance - anDistance
        loss = torch.max(loss + self.margin, torch.tensor(0.0).to(device))
        return loss

In [632]:
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.00001)

In [633]:
loss_function = loss_fn(0.5, model)

# Data loader 

In [699]:
class custom_data_loader(datasets.ImageFolder):
    def __init__(self, *arg, **kw) :
        super(custom_data_loader, self).__init__(*arg, **kw)
        self.n_triplets =len(self.samples)
        
        self.train_triplets = self.gen_example()
    def __len__(self):
        return (len(self.targets))

    def gen_example(self ): 
       
       labels = torch.Tensor(self.targets)
    
       triplets = []
       for x in np.arange(self.n_triplets): 
            
            idx = np.random.randint(0, labels.size(0))
            idx_matches = np.where(labels.numpy() == labels[idx].numpy())[0] 
            idx_no_matches = np.where(labels.numpy() != labels[idx].numpy())[0]
            idx_a, idx_p = np.random.choice(idx_matches, 2, replace=True)
            idx_n = np.random.choice(idx_no_matches, 1)[0]
            triplets.append([  idx_a,idx_p, idx_n])
       return np.array(triplets)
    
    def set_triplets(self, triplets):
        self.train_triplets = triplets

    def __getitem__(self, idx):
        t = self.train_triplets[idx]

        path_a, _ = self.samples[t[0]]
        path_p, _ = self.samples[t[1]]
        path_n, _ = self.samples[t[2]]

        img_a = self.loader(path_a)
        img_p = self.loader(path_p)
        img_n = self.loader(path_n)
        if self.transform is not None:
            img_a = self.transform(img_a)
            img_p = self.transform(img_p)
            img_n = self.transform(img_n)
        
        return img_p , img_a , img_n

In [700]:
data_transform= data_transform = transforms.Compose([
            transforms.Resize(size=config.IMAGE_SIZE ),
            transforms.ToTensor() 
        ])
triplet_data_train  =custom_data_loader( root=config.TRAIN_DATASET, transform=data_transform  )
triplet_dataloader_train  = DataLoader(triplet_data_train, batch_size=4, shuffle=False)

In [701]:
triplet_data_test  =custom_data_loader( root=config.TEST_DATASET, transform=data_transform  )
triplet_dataloader_test  = DataLoader(triplet_data_test, batch_size=4, shuffle=False)

In [709]:

def train_step(model: torch.nn.Module,
               train_data ,
               test_data,
               loss_fn,
               optimizer,
               device: torch.device = device ,
               epochs : int = 10):
    train_loss_acc = []
    test_loss_acc= []
    for i in range(epochs):
       
        train_loss, train_acc = 0, 0
        test_loss, test_acc =0,0
        for batch, (p , a,n ) in enumerate(train_data):
        # Send data to GPU
        # postive , anchor , negative = postive.to(device), anchor.to(device), negative.to(device)
            p ,a ,n = p.to(device),a.to(device),n.to(device)
            inputs = torch.stack([p,a,n] , dim =0)
        # 2. Calculate loss
            
            apDistance , anDistance = loss_fn.compute_distance(inputs)
            loss = loss_fn.compute_loss(apDistance , anDistance  )

            train_loss += loss.sum()
        # 3. Optimizer zero grad
            optimizer.zero_grad()
        # 4. Loss backward
            loss.sum().backward()
        # 5. Optimizer step
            optimizer.step() 
        train_loss_acc.append(train_loss.detach().to("cpu").numpy())
        model.eval()
        with torch.no_grad() : 
            for batch, (p , a,n ) in enumerate(test_data):
        
        # Send data to GPU
        # postive , anchor , negative = postive.to(device), anchor.to(device), negative.to(device)
                p ,a ,n = p.to(device),a.to(device),n.to(device)
                inputs = torch.stack([p,a,n] , dim =0)
        # 2. Calculate loss
            
                apDistance , anDistance = loss_fn.compute_distance(inputs)
                loss = loss_fn.compute_loss(apDistance , anDistance  )
 
                test_loss += loss.sum()

            test_loss_acc.append(test_loss.to("cpu").detach().numpy())
        print(f"train loss: {train_loss} test loss: {test_loss} @ epoch {i}")
    return train_loss_acc , test_loss_acc


In [1]:
train_loss , test_loss = train_step(model, triplet_dataloader_train , triplet_dataloader_test, loss_function, optimizer,  device)

NameError: name 'train_step' is not defined

TODO: 
- get a better model
- evaluation metrics
- setup testing on a single input  