In [41]:
import pandas as pd 
import glob
import cv2 as cv
import random
import os

import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
import PIL.ImageOps    

import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torchvision.utils
import torch
from torch.autograd import Variable
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

from torch.cuda.amp import autocast
from torch.cuda.amp import GradScaler
from tqdm.auto import tqdm
import timm
import math
from sklearn.model_selection import train_test_split



# Make DataFrame

## columns: review_img_path, product_img_path, label 
---

In [42]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [43]:
all_product_img_list = glob.glob('./masked_data/product_img/*')
all_product_img_list = sorted(all_product_img_list)
# print(all_product_img_list)

In [44]:
review_img_path = []
product_img_path = []
label = []
for product_img in all_product_img_list:
    glob_img_list = []
    str = product_img.split("/")[3]
    str = str.split(".")[0]
    glob_img_list = glob.glob('./masked_data/review_img/'+str +'_review_img/O/*.jpg')
    review_img_path = review_img_path + glob_img_list
    product_img_path = product_img_path + [product_img  for i in range(len(glob_img_list))]

In [45]:
print(len(product_img_path))
print(len(review_img_path))


5389
5389


In [46]:
for i in range(len(review_img_path)):
    random_number = random.randint(0, 1)
    if random_number == 0:
        label.append(0)
    else:
        review_num = review_img_path[i].split('/')[3]
        review_num = review_num.split("_")[0]
        random_number = random.randint(0, len(all_product_img_list) - 1)
        while review_num == random_number:
            random_number = random.randint(0, len(all_product_img_list) - 1)
        product_img_path[i] = all_product_img_list[random_number]
        label.append(1)

In [47]:
df = pd.DataFrame(columns=['review_img_path','product_img_path', 'label'])
df['review_img_path'] = review_img_path
df['product_img_path'] = product_img_path
df['label'] = label

df

Unnamed: 0,review_img_path,product_img_path,label
0,./masked_data/review_img/0_review_img/O/404.jpg,./masked_data/product_img/0.jpg,0
1,./masked_data/review_img/0_review_img/O/174.jpg,./masked_data/product_img/0.jpg,0
2,./masked_data/review_img/0_review_img/O/93.jpg,./masked_data/product_img/0.jpg,0
3,./masked_data/review_img/0_review_img/O/177.jpg,./masked_data/product_img/62.jpg,1
4,./masked_data/review_img/0_review_img/O/399.jpg,./masked_data/product_img/0.jpg,0
...,...,...,...
5384,./masked_data/review_img/98_review_img/O/194.jpg,./masked_data/product_img/14.jpg,1
5385,./masked_data/review_img/98_review_img/O/148.jpg,./masked_data/product_img/10.jpg,1
5386,./masked_data/review_img/98_review_img/O/401.jpg,./masked_data/product_img/77.jpg,1
5387,./masked_data/review_img/98_review_img/O/433.jpg,./masked_data/product_img/98.jpg,0


---

# Siames Net

In [48]:
CFG = {
    'IMG_SIZE':256,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':4,
    'SEED':41
}

In [49]:
train, val, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])


In [50]:
class SiameseNetworkDataset(Dataset):
    def __init__(self,review_img_path,product_img_path,label,transform=None):
        self.review_img_path = review_img_path
        self.product_img_path = product_img_path
        self.label = label
        self.transform = transform
        
    def __getitem__(self,index):
        review_img = cv.imread(self.review_img_path[index],cv.COLOR_BGR2GRAY)
        product_img = cv.imread(self.product_img_path[index], cv.COLOR_BGR2GRAY)
        # review_img = Image.open(self.review_img_path[index])
        # product_img = Image.open(self.product_img_path[index])
        # review_img = review_img.convert("L")
        # product_img = product_img.convert("L")
        

        if self.transform is not None:
            review_img = self.transform(image= review_img)['image']
            product_img = self.transform(image= product_img)['image']
        # print(np.array([int(label[index])], dtype=np.float32))
        return review_img, product_img, torch.from_numpy(np.array([int(label[index])], dtype=np.float32))
    
    def __len__(self):
        return len(self.review_img_path)

In [51]:
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            # A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])


In [52]:
train_dataset = SiameseNetworkDataset(train["review_img_path"].values, train["product_img_path"].values, train["label"].values, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)


In [53]:
CFG = {
    'IMG_SIZE':256,
    'EPOCHS':10,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':4,
    'SEED':40
}

In [54]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = 'a'
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [55]:
class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()
        self.backbone = timm.create_model('efficientnet_b0', pretrained=True)
        self.classifier = nn.Linear(1000, 2)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [56]:
# Define the Contrastive Loss Function
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
      # Calculate the euclidian distance and calculate the contrastive loss
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim = True)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                    (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss_contrastive

In [58]:
def train(model, optimizer, train_loader, device):
    model.to(device)
    criterion = ContrastiveLoss().to(device)
    scaler = GradScaler()
    
    best_score = 0
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for review_img, product_img, labels in tqdm(iter(train_loader)):
            review_img = review_img.float().to(device)
            product_img = product_img.float().to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            with autocast():
                output1 = model(review_img)
                output2 = model(product_img)
                loss = criterion(output1,output2,labels)

            
            # loss.backward()
            # optimizer.step()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            train_loss.append(loss.item())
                    
        # _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}]')
       
        # if scheduler is not None:
        #     scheduler.step(_val_score)
            
        # if best_score < _val_score:
        #     best_score = _val_score
        #     best_model = model
        best_model = model
    
    return best_model

In [59]:
model = BaseModel()
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
infer_model = train(model, optimizer, train_loader, device)

100%|██████████| 943/943 [03:11<00:00,  4.93it/s]


Epoch [1], Train Loss : [1.32363]


100%|██████████| 943/943 [03:10<00:00,  4.95it/s]


Epoch [2], Train Loss : [1.28994]


100%|██████████| 943/943 [03:10<00:00,  4.96it/s]


Epoch [3], Train Loss : [1.23076]


100%|██████████| 943/943 [03:09<00:00,  4.97it/s]


Epoch [4], Train Loss : [1.22349]


100%|██████████| 943/943 [03:09<00:00,  4.97it/s]


Epoch [5], Train Loss : [1.18287]


100%|██████████| 943/943 [03:09<00:00,  4.98it/s]


Epoch [6], Train Loss : [1.18113]


100%|██████████| 943/943 [03:08<00:00,  5.00it/s]


Epoch [7], Train Loss : [1.22742]


100%|██████████| 943/943 [03:08<00:00,  5.01it/s]


Epoch [8], Train Loss : [1.14433]


100%|██████████| 943/943 [03:07<00:00,  5.03it/s]


Epoch [9], Train Loss : [1.09651]


100%|██████████| 943/943 [03:06<00:00,  5.05it/s]

Epoch [10], Train Loss : [1.02045]





In [62]:
# Locate the test dataset and load it into the SiameseNetworkDataset
folder_dataset_test = datasets.ImageFolder(root="./output/val")
siamese_dataset = SiameseNetworkDataset(val["review_img_path"].values, 
                                        val["product_img_path"].values,
                                        val["label"].values,
                                        transform=train_transform)
test_dataloader = DataLoader(siamese_dataset, num_workers=0, batch_size=1, shuffle=True)

# Grab one image that we are going to test
dataiter = iter(test_dataloader)
x0, _, _ = next(dataiter)

for i in range(10):
    # Iterate over 10 images and test them with the first image (x0)
    _, x1, label2 = next(dataiter)

    # Concatenate the two images together
    concatenated = torch.cat((x0, x1), 0)
    
    output1, output2 = model.parameters(x0.cuda(), x1.cuda())
    euclidean_distance = F.pairwise_distance(output1, output2)
    imshow(torchvision.utils.make_grid(concatenated), f'Dissimilarity: {euclidean_distance.item():.2f}')

TypeError: parameters() takes from 1 to 2 positional arguments but 3 were given