In [1]:
import pandas as pd 
import glob
import cv2 as cv
import random
import os

import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
import PIL.ImageOps    

import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torchvision.utils
import torch
from torch.autograd import Variable
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from sklearn.metrics import f1_score, accuracy_score


from torch.cuda.amp import autocast
from torch.cuda.amp import GradScaler
from tqdm.auto import tqdm
import timm
import math
from sklearn.model_selection import train_test_split
import pytorch_model_summary



In [7]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load Data
---

In [None]:
def SplitDataset(image_dir, val_size, seed = 42):
    fake_images = glob( image_dir + 'fake_images/*.png')
    real_images = glob( image_dir + 'real_images/*.png')
    labels = [1] * len(fake_images) + [0] * len(real_images)

    X_train, X_val, y_train, y_val = train_test_split(fake_images + real_images, labels, test_size=val_size, random_state=seed, shuffle=True)

    return X_train, X_val, y_train, y_val

In [None]:
X_train, X_val, Y_train, Y_val = SplitDataset(img_dir = './DATA/train/',
                val_size = 0.3,
                seed = 41,)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, X, y):
        '''
        X: list of image path
        y: list of label (0->real, 1->fake)
        '''
        self.X = X
        self.y = y
        self.transforms = transforms.Compose([
            transforms.ToTensor(),
            transforms.Resize((256, 256)),
            transforms.Normalize([0.485, 0.456,0.406],[0.229,0.224,0.225])
        ])

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
        impath = self.X[index]
        fname = os.path.basename(impath)
        img = Image.open(impath).convert('RGB')
        img = self.transforms(img)
        target = self.y[index]

        return img, target, fname

In [None]:
train_dataset = CustomDataset(X = X_train, y = Y_train) # + CustomDataset2(X = X_train, y = Y_train)
val_dataset = CustomDataset(X = X_val, y = Y_val)

---

# DataLoader

In [8]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':100,
    'LEARNING_RATE':3e-4,
    # 'LEARNING_RATE':10,
    'BATCH_SIZE':8,
    'SEED':42,
    'NUM_WORKERS':1,
}

In [9]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = 'a'
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [None]:
train_dataloader = DataLoader(dataset = train_dataset,
                              batch_size = CFG['BATCH_SIZE'],
                              num_workers = CFG['NUM_WORKERS'],
                              shuffle = True,
                              pin_memory = True,
                              drop_last = True,)

val_dataloader = DataLoader(dataset = val_dataset,
                            batch_size = CFG['BATCH_SIZE'],
                            num_workers = CFG['NUM_WORKERS'], 
                            shuffle = False,
                            pin_memory = True,
                            drop_last = True)

# Set model
---

In [1]:
# import timm 
# avail_pretrained_models = timm.list_models(pretrained=True)
# print(avail_pretrained_models)

['adv_inception_v3', 'bat_resnext26ts', 'beit_base_patch16_224', 'beit_base_patch16_224_in22k', 'beit_base_patch16_384', 'beit_large_patch16_224', 'beit_large_patch16_224_in22k', 'beit_large_patch16_384', 'beit_large_patch16_512', 'beitv2_base_patch16_224', 'beitv2_base_patch16_224_in22k', 'beitv2_large_patch16_224', 'beitv2_large_patch16_224_in22k', 'botnet26t_256', 'cait_m36_384', 'cait_m48_448', 'cait_s24_224', 'cait_s24_384', 'cait_s36_384', 'cait_xs24_384', 'cait_xxs24_224', 'cait_xxs24_384', 'cait_xxs36_224', 'cait_xxs36_384', 'coat_lite_mini', 'coat_lite_small', 'coat_lite_tiny', 'coat_mini', 'coat_tiny', 'coatnet_0_rw_224', 'coatnet_1_rw_224', 'coatnet_bn_0_rw_224', 'coatnet_nano_rw_224', 'coatnet_rmlp_1_rw_224', 'coatnet_rmlp_2_rw_224', 'coatnet_rmlp_nano_rw_224', 'coatnext_nano_rw_224', 'convit_base', 'convit_small', 'convit_tiny', 'convmixer_768_32', 'convmixer_1024_20_ks9_p14', 'convmixer_1536_20', 'convnext_atto', 'convnext_atto_ols', 'convnext_base', 'convnext_base_384_in

In [15]:
class BaseModel(nn.Module):
    def __init__(self):
        super(BaseModel, self).__init__()
        self.backbone = timm.create_model('efficientnet_b0', pretrained=False)
        self.classifier = nn.Linear(1000, 2)
        # self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.model(x)
        x = nn.Sigmoid()(x)
        
        return x

---

In [16]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    result = []
    pp = []
    pn = []
    nn = []
    n_p = []
    
    with torch.no_grad():
        for review_img, product_img, labels in tqdm(iter(val_loader)):
            review_img = review_img.float().to(device)
            product_img = product_img.float().to(device)

            labels = labels.type(torch.LongTensor).to(device)
            
            pred = model(review_img, product_img)
            # print(pred.shape)
            
            loss = criterion(pred, labels)
            val_loss.append(loss.item())
            
            pred = pred.detach().cpu().numpy().tolist()
            labels = labels.detach().cpu().numpy().tolist()
            
            for i in range(len(pred)):
                if labels[i] == 0:
                    if pred[i][0] < 0.5:
                        result.append(1) #맞게 예측하면 1
                        pp.append(1)
                    else:
                        result.append(0)
                        pn.append(1)
                else:
                    if pred[i][0] > 0.5:
                        result.append(1) #맞게 예측하면 1
                        nn.append(1)
                    else:
                        result.append(0)
                        n_p.append(1)
            # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.accuracy_score.html
            
        result_len = len(result)
        print(sum(pp)/result_len)
        print(sum(pn)/result_len)
        print(sum(nn)/result_len)
        print(sum(n_p)/result_len)
        
        _val_loss = np.mean(val_loss)
        _val_score = (sum(result)/result_len) * 100 
        
        
    
    return _val_loss, _val_score

In [17]:
# Define the Contrastive Loss Function
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, euclidean_distance, label):
      # Calculate the euclidian distance and calculate the contrastive loss
        # loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
        #                             (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        # loss_contrastive = ((1-label) * torch.pow(euclidean_distance, 2) +
        #                             (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        
        loss_contrastive = []
        
        for i in range(len(label)):
              loss_contrastive.append((1-label[i]) * pow(euclidean_distance[i][0], 2) +
                                    (label[i]) * pow(max(self.margin - euclidean_distance[i][0],0), 2))
        return sum(loss_contrastive)/len(label)

In [18]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model = model.to(device)
    # criterion = nn.NLLLoss(reduction="sum").to(device)
    criterion = ContrastiveLoss().to(device)
    
    best_score = 0
    best_model = None
    
    for epoch in range(0, CFG['EPOCHS']):
        model.train()
        train_loss = []
        flag_list = []
        
        for review_img, product_img, labels in tqdm(iter(train_loader)):
            
            review_img = review_img.float().to(device)
            product_img = product_img.float().to(device)
                  
            labels = labels.type(torch.LongTensor).to(device)
            
            optimizer.zero_grad()
            output = model(review_img, product_img)
            # print(output)
            # print(output[3][0])
            
            # print(labels)
            # print(labels[3])
            
            loss = criterion(output, labels)
            # print(loss)
            
            # output = output.detach().cpu().numpy().tolist()
            # labels = labels.detach().cpu().numpy().tolist()
            # print(output)
            # print(output[3][0])
            # print(labels)
            # print(labels[1])
            # print(loss)
            
            # print(loss.item())
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
                    
                
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val accuracy score : [{_val_score:.5f}]')
       
        if scheduler is not None:
            scheduler.step(_val_score)
            
        if best_score < _val_score:
            best_score = _val_score
            best_model = model
            torch.save(best_model.state_dict(), "./distance_EffNetBase_E_Contra.pt")
            
            
    return best_model

In [19]:
model = BaseModel()
model.load_state_dict(torch.load('./distance_VGGBase_E_Contra.pt'))
model.eval()
# print(pytorch_model_summary.summary(model, torch.zeros(8,3,256,256),max_depth=None, show_parent_layers=True, show_input=True))

BaseModel(
  (backbone): EfficientNet(
    (conv_stem): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn1): BatchNormAct2d(
      32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
      (drop): Identity()
      (act): SiLU(inplace=True)
    )
    (blocks): Sequential(
      (0): Sequential(
        (0): DepthwiseSeparableConv(
          (conv_dw): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (bn1): BatchNormAct2d(
            32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
            (drop): Identity()
            (act): SiLU(inplace=True)
          )
          (se): SqueezeExcite(
            (conv_reduce): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
            (act1): SiLU(inplace=True)
            (conv_expand): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
            (gate): Sigmoid()
          )
          (conv_pw): Conv2d(32, 16, kernel_size=(1,

In [20]:
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/1641 [00:00<?, ?it/s]

  0%|          | 0/704 [00:00<?, ?it/s]

0.37137777777777775
0.1248
0.35502222222222224
0.1488
Epoch [0], Train Loss : [0.19432] Val Loss : [0.18476] Val accuracy score : [72.64000]


  0%|          | 0/1641 [00:00<?, ?it/s]

  0%|          | 0/704 [00:00<?, ?it/s]

0.2928
0.20337777777777777
0.3848888888888889
0.11893333333333334
Epoch [1], Train Loss : [0.12688] Val Loss : [0.22306] Val accuracy score : [67.76889]


  0%|          | 0/1641 [00:00<?, ?it/s]

  0%|          | 0/704 [00:00<?, ?it/s]

0.24124444444444446
0.25493333333333335
0.4176
0.08622222222222223
Epoch [2], Train Loss : [0.07122] Val Loss : [0.24801] Val accuracy score : [65.88444]


  0%|          | 0/1641 [00:00<?, ?it/s]

  0%|          | 0/704 [00:00<?, ?it/s]

0.26222222222222225
0.23395555555555556
0.4106666666666667
0.09315555555555556
Epoch [3], Train Loss : [0.05439] Val Loss : [0.24023] Val accuracy score : [67.28889]
Epoch 00004: reducing learning rate of group 0 to 1.5000e-04.


  0%|          | 0/1641 [00:00<?, ?it/s]

  0%|          | 0/704 [00:00<?, ?it/s]

0.29457777777777777
0.2016
0.39982222222222225
0.104
Epoch [4], Train Loss : [0.04123] Val Loss : [0.20478] Val accuracy score : [69.44000]


  0%|          | 0/1641 [00:00<?, ?it/s]

KeyboardInterrupt: 