# **Import Library**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau

import time
import random
import copy

# **Define Model**

In [None]:
"""# **1) Model define**
### trans_VGG에서 사용할 함수인 conv_2 define
"""

def conv_2(in_dim, out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),# Model define
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2,2)
    )
    return model

def conv_3(in_dim, out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),# Model define
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2,2)
    )
    return model

conv_2 함수는 합성곱 레이어로 3개의 레이어를 가진다. 
1. 크기가 3인 convolution filter
2. 크기가 3인 convolution filter
3. 크기가 2인 max pooling filter

conv_3 함수는 합성곱 레이어로 5개의 레이어를 가진다.
1. 크기가 3인 convolution filter
2. 크기가 3인 convolution filter
3. 크기가 3인 convolution filter
4. 크기가 3인 convolution filter
5. 크기가 2인 max pooling filter

# **Define trans_VGG class**

In [None]:
class trans_VGG(nn.Module):
    def __init__(self, base_dim):
        super(trans_VGG, self).__init__()
        self.feature = nn.Sequential(
            conv_2(3, base_dim),
            conv_2(base_dim, base_dim*2),
            conv_2(base_dim*2, base_dim*4),
            conv_3(base_dim*4, base_dim*8),
            conv_3(base_dim*8, base_dim*8)
        )
        self.fc_layer = nn.Sequential(
            nn.Linear(base_dim*8*7*7, base_dim*4*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*4*7*7, base_dim*2*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*2*7*7, base_dim*7*7)
        )
        for param in self.parameters():
            param.requires_grad = True

    def forward(self, x):
        x = self.feature(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layer(x)
        return x

해당 코드는 vgg 모델의 클래스를 정의 하는 과정이다.

생성자 부분
기본 차원을 입력받고 총 5개의 레이어를 거치며 채널수를 두배씩 늘린다.
또한 마지막 부분에서 conv_3를 두번 적용하여 깊은 층에서 깊이있는 특징을 추출하게 된다.

fully connected 레이어 부분
생성자에서 나온 특성을 1차원 벡터로 변환후 ReLU 활성화 함수를 거친다.
이후 과적합을 방지하기 위해 드롭아웃을 사용한다.

순전파 부분
합성곱 레이어를 통해 데이터를 처리하고 fc부분을 통해 처리되어 최종적으로 모델이 예측한 값을 반환한다.


- Hyper_paremeter : Learning rate, momentum, weight decay 등은 논문의 Hyper peremeter value로 초기화


In [None]:
import torch.nn.init as init

seed = time.time()

def custom_init_weights(m):
  if seed is not None:
    torch.manual_seed(seed)
  if isinstance(m, torch.nn.Linear) and m.weight is not None:
    init.normal_(m.weight, mean=1, std=0.01)
    if m.bias is not None:
      init.constant_(m.bias, 0)

model = trans_VGG(base_dim=64)

loss = nn.BCELoss()
optimizer =torch.optim.SGD(model.parameters(), lr = 0.01,momentum = 0.9, weight_decay = 0.0005)
scheduler = ReduceLROnPlateau(optimizer, mode='max', patience=10, factor=0.1, verbose=True)

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.RandomCrop(224)])

from google.colab import drive
drive.mount('/content/drive')

랜덤 시드를 발급하고 레이어에 대한 가중치를 초기화 한다.
레이어의 가중치는 평균 = 1, 표준편차 = 0.01이며 bias와 constant는 0으로 초기화 한다.

이후 BCE 손실함수를 불러온다.

학습 스케쥴러를 성정하여 학습이 정체될 때 학습률을 동적으로 조정한다.

# **Import Dataset**

In [None]:
import os
from PIL import Image
import numpy as np
from torch.utils.data import Dataset

# Project 3 폴더 경로
project_folder = '/content/drive/MyDrive/Project3'

image = []
label = []

# Project 3 폴더 내부의 세부 폴더를 확인하고 이미지와 라벨 데이터 생성
for subdir, _, files in os.walk(project_folder):
    for file in files:
        # 이미지 파일인지 확인
        if file.endswith(('png', 'jpg', 'jpeg')):
            image_path = os.path.join(subdir, file)
            image.append(image_path)

            # 이미지가 속한 세부 폴더의 이름을 라벨로 사용
            label_name = os.path.basename(subdir)
            label.append(label_name)

indices = np.random.permutation(len(image))
IMAGE = [image[i] for i in indices]
LABEL = [label[i] for i in indices]

class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert('RGB')
        image = transforms.RandomCrop(224)(image)
        image = transforms.ToTensor()(image)

        return image, label

BATCH_SIZE = 1

TRAINING_image = []
TRAINING_label = []
TEST_image = []
TEST_label = []

for i in range(0,80):
  for j in range(0,20):
    for k in range(0,2):
      TRAINING_image.append(image[200*j+i+k])
      TRAINING_label.append(label[200*j+i+k])

for i in range(80,100):
  for j in range(0,20):
    for k in range(0,2):
      TEST_image.append(image[200*j+i+k])
      TEST_label.append(label[200*j+i+k])

train_dataset = CustomDataset(TRAINING_image, TRAINING_label, transform = transform)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE,num_workers=2)
test_dataset = CustomDataset(TEST_image, TEST_label, transform = transform)
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE,num_workers=2)

# **Training**

In [None]:
"""# **3) TRAINING**"""

EPOCH = 80

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(DEVICE)

start_time = time.time()
train_acc_lst, test_acc_lst = [],[]

for epoch in range(EPOCH):
  model.train()
  correct_pred, num_examples = 0, 3200
  for i, (_image1, _label1) in enumerate(train_loader):
    image1 = _image1.to(DEVICE)
    label1 = _label1[0]
    vector1_tensor = model(image1)

    if (i == 0): #Exception Case
      image2 = image1
      label2 = label1
      vector2_tensor = vector1_tensor

    similarity =  F.cosine_similarity(vector1_tensor, vector2_tensor, dim= -1)
    scaled_similarity = torch.sigmoid(similarity)

    if label1 == label2 and scaled_similarity.item() > 0.5:
        correct_pred += 1
    elif label1 != label2 and scaled_similarity.item() < 0.5:
        correct_pred += 1

    if label1 == label2:
      target_vector = [1]
    else :
      target_vector = [0]

    target_tensor = torch.tensor(target_vector).float()
    target_tensor = target_tensor.to(DEVICE)
    optimizer.zero_grad()
    cost = loss(scaled_similarity, target_tensor)
    cost.backward()
    optimizer.step()

    if not i % 40:
      print (f'Epoch: {epoch:03d}/{EPOCH:03d} | '
            f'Batch {i:03d}/{len(train_loader):03d} |'
             f' Cost: {cost:.4f}')

    #연산량 감소를 위한 텐서 재활용
    image2 = image1.clone()
    label2 = label1
    vector2_tensor = vector1_tensor.detach().clone()

elapsed = (time.time() - start_time)/60
print(f'Total Training Time: {elapsed:.2f} min')

1. 훈련 설정
에포크를 80으로 설정
2. 학습
이미지 데이터를 배치 단위로 가져와 gpu 또는 cpu에 맞게 학습을 진행한다.
이후 모델에서 나온 특성들을 벡터로 저장해준다.
3. 유사도 측정
결과 벡터들을 활용해 코사인 유사도를 구하고 유사도를 0.5를 기준으로 0,1로 나눈다.
4. 손실 계산
계산된 유사도와 실제 값과 손실을 구하고 손실함수에 그래디언트 역전파를 사용하여 가중치를 업데이트 한다.

## 성능 향상 제안

SE block 활용
- Spatial Squeeze and Channel Excitation

해당 알고리즘은 spatial squeeze 부분과 channel excitation 부분으로 나뉘는데
첫번째 부분은 각 채널을 대표하는 정보를 추출하고
두번째 부분은 대표값을 이용해 채널들간의 비선형 특징을 파악한다.
이후 채널별 중요도 값을 기존의 이미지 행렬에 곱한다.

결과적으로 전체적인 맥락을 고려해서 상황에 맞는 채널을 강조할 수 있어 정확도를 향상 시킬수 있다.

In [None]:
class trans_VGG_with_SE(nn.Module):
    def __init__(self, base_dim):
        super(trans_VGG_with_SE, self).__init__()
        self.feature = nn.Sequential(
            conv_2(3, base_dim),
            SEBlock(base_dim),  # SE Block 추가
            conv_2(base_dim, base_dim*2),
            SEBlock(base_dim*2),  # SE Block 추가
            conv_2(base_dim*2, base_dim*4),
            SEBlock(base_dim*4),  # SE Block 추가
            conv_3(base_dim*4, base_dim*8),
            SEBlock(base_dim*8),  # SE Block 추가
            conv_3(base_dim*8, base_dim*8),
            SEBlock(base_dim*8)  # SE Block 추가
        )
        self.fc_layer = nn.Sequential(
            nn.Linear(base_dim*8*7*7, base_dim*4*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*4*7*7, base_dim*2*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*2*7*7, base_dim*7*7)
        )

    def forward(self, x):
        x = self.feature(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layer(x)
        return x
