# **Import Library**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau

import time
import random
import copy

# **Define Model**

In [None]:
"""# **1) Model define**
### trans_VGG에서 사용할 함수인 conv_2 define
"""

def conv_2(in_dim, out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),# Model define
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2,2)
    )
    return model

def conv_3(in_dim, out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),# Model define
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2,2)
    )
    return model

conv_2함수: 2개의 합성곱 층과 MaxPooling 층이 포함된 모델이다. 활성화 함수는 ReLU이다. <br>
conv_3함수: 4개의 합성곱 층과 MaxPooling 층이 포함된 모델이다. 활성화 함수는 ReLU이다. <br>
두 합성곱 층 모두 3x3크기의 커널을 사용하고 padding은 1로 설정해 출력크기와 입력 크기가 동일하다. 모두 마지막에 2x2 MaxPooling 층이 있어 공간 차원을 절반으로 줄인다.

# **Define trans_VGG class**

In [None]:
class trans_VGG(nn.Module):
    def __init__(self, base_dim):
        super(trans_VGG, self).__init__() # 부모 클래스인 nn.Module의 초기화 메서드를 호출
        self.feature = nn.Sequential(
            conv_2(3, base_dim),
            conv_2(base_dim, base_dim*2),
            conv_2(base_dim*2, base_dim*4),
            conv_3(base_dim*4, base_dim*8),
            conv_3(base_dim*8, base_dim*8)
        )
        self.fc_layer = nn.Sequential(
            nn.Linear(base_dim*8*7*7, base_dim*4*7*7), # nn.Linear: 선형 변환 구현. 가중치화 편향을 학습한다.
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*4*7*7, base_dim*2*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*2*7*7, base_dim*7*7)
        )
        for param in self.parameters(): # 모델의 모든 파라미터에 대해 기울기 계산 활성화
            param.requires_grad = True

    def forward(self, x):
        x = self.feature(x) # 모델의 특징 추출
        x = x.view(x.size(0), -1) # 특징 맵을 1차원으로 변환. x.size(0)은 배치 크기를 의미한다.
        x = self.fc_layer(x) # 최종 출력 값을 생성하는 부분
        return x

- self.feature는 conv_2, conv_3 함수로 여러 합성곱 블록을 쌓아 특징을 추츨하는 과정이다.<br>
- self.fc_layer는 특징 맵을 **1차원으로 변환**하는 과정이다. fully connected layer는 입력 데이터의 형태가 1차원 벡터일 때만 작동하기 때문에, 특징 맵을 1차원으로 변환시켜야한다. <br>
각 층 사이에 ReLU와 Dropout이 적용된다.(Dropout은 딥러닝 모델에서 과적합을 방지하기 위해 사용되는 정규화 기법 중 하나이다.)
- forward는 입력을 받아서 특징을 추출하고, 1차원으로 변환한 후, 완전 연결층(fc_layer)를 통과시켜 최종 출력을 생성한다.

Dropout
1. 무작위 뉴런 비활성화: 훈련 과정에서 각 뉴런을 무작위로 선택해 비활성화한다. 비활성화란 출력이 0이 되도록 하는 것을 말한다. 일반적으로 특정 비율의 뉴런이 비활성화된다.<br>
2. Dropout은 훈련 단계에서만 적용되고, 테스트 단계에서는 모든 뉴런이 활성화되는 특징을 가진다.

문제점 및 개선사항
1. Dropout 비율을 설정하지 않았다.
2. nn.Linear(base_dim*8*7*7, ...)에서 base_dim*8*7*7은 고정된 입력 크기를 가진다. 입력 이미지의 크기가 다르면 값이 달라질 수 있는 문제가 생긴다.

- Hyper_paremeter : Learning rate, momentum, weight decay 등은 논문의 Hyper peremeter value로 초기화


In [None]:
import torch.nn.init as init # PyTorch의 초기화 모듈. 모델의 가중치를 초기화하는 데 사용된다.

seed = time.time()

def custom_init_weights(m):
  if seed is not None:
    torch.manual_seed(seed)
  if isinstance(m, torch.nn.Linear) and m.weight is not None:
    init.normal_(m.weight, mean=1, std=0.01)
    if m.bias is not None:
      init.constant_(m.bias, 0)

model = trans_VGG(base_dim=64) # 모델 생성. base_dim은 모델의 차원 설정

loss = nn.BCELoss() # 이진 분류를 위한 손실 함수
optimizer =torch.optim.SGD(model.parameters(), lr = 0.01,momentum = 0.9, weight_decay = 0.0005) # 확률적 경사 하강법(SGD) 옵티마이저를 설정
scheduler = ReduceLROnPlateau(optimizer, mode='max', patience=10, factor=0.1, verbose=True)

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.RandomCrop(224)])

from google.colab import drive
drive.mount('/content/drive')

- custom_init_weights는 가중치 초기화 함수이다. 주어진 모듈이 Linear 층일 경우, 가중치를 정규 분포로 초기화하고, 편향을 0으로 초기화하는 함수이다. <br>
가중치를 초기화하는 이유는 정규 분포로 초기화하여 뉴런이 균형 있게 작동하고 학습이 원활하게 진행될 수 있게 하기 위해서이다.
편향을 0으로 초기화하는 이유는 초기 상태에서 뉴런의 활성화에 영향을 주지 않게 하기 위해서이다. 편향은 이후 학습에서 자연스럽게 조정된다.
- scheduler는 성능이 개선되지 않을 때 학습률을 감소시킨다. patience는 개선이 없는 에폭 수를 의미한다.  

# **Import Dataset**

In [None]:
import os
from PIL import Image
import numpy as np
from torch.utils.data import Dataset

# Project 3 폴더 경로
project_folder = '/content/drive/MyDrive/Project3'

image = []
label = []

# Project 3 폴더 내부의 세부 폴더를 확인하고 이미지와 라벨 데이터 생성
for subdir, _, files in os.walk(project_folder):
    for file in files:
        # 이미지 파일인지 확인
        if file.endswith(('png', 'jpg', 'jpeg')):
            image_path = os.path.join(subdir, file)
            image.append(image_path)

            # 이미지가 속한 세부 폴더의 이름을 라벨로 사용
            label_name = os.path.basename(subdir)
            label.append(label_name)

indices = np.random.permutation(len(image))
IMAGE = [image[i] for i in indices]
LABEL = [label[i] for i in indices]

class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert('RGB')
        image = transforms.RandomCrop(224)(image)
        image = transforms.ToTensor()(image)

        return image, label

BATCH_SIZE = 1

TRAINING_image = []
TRAINING_label = []
TEST_image = []
TEST_label = []

for i in range(0,80):
  for j in range(0,20):
    for k in range(0,2):
      TRAINING_image.append(image[200*j+i+k])
      TRAINING_label.append(label[200*j+i+k])

for i in range(80,100):
  for j in range(0,20):
    for k in range(0,2):
      TEST_image.append(image[200*j+i+k])
      TEST_label.append(label[200*j+i+k])

train_dataset = CustomDataset(TRAINING_image, TRAINING_label, transform = transform)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE,num_workers=2)
test_dataset = CustomDataset(TEST_image, TEST_label, transform = transform)
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE,num_workers=2)

# **Training**

In [None]:
"""# **3) TRAINING**"""

EPOCH = 80 # 훈련할 에폭 수 설정

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 사용할 디바이스 결정
model = model.to(DEVICE) # 모델을 해당 디바이스로 이동

start_time = time.time()
train_acc_lst, test_acc_lst = [],[]

for epoch in range(EPOCH):
  model.train()
  correct_pred, num_examples = 0, 3200
  for i, (_image1, _label1) in enumerate(train_loader):
    image1 = _image1.to(DEVICE)
    label1 = _label1[0]
    vector1_tensor = model(image1)

    if (i == 0): #Exception Case
      image2 = image1
      label2 = label1
      vector2_tensor = vector1_tensor

    similarity =  F.cosine_similarity(vector1_tensor, vector2_tensor, dim= -1) # 두 벡터 간 코사인 유사도 계산
    scaled_similarity = torch.sigmoid(similarity) # 시그모이드 함수를 통해 값 스케일링

    if label1 == label2 and scaled_similarity.item() > 0.5: # 예측의 정확성을 평가하는 조건문
        correct_pred += 1
    elif label1 != label2 and scaled_similarity.item() < 0.5:
        correct_pred += 1

    if label1 == label2:
      target_vector = [1]
    else :
      target_vector = [0]

    target_tensor = torch.tensor(target_vector).float()
    target_tensor = target_tensor.to(DEVICE)
    optimizer.zero_grad()
    cost = loss(scaled_similarity, target_tensor)
    cost.backward()
    optimizer.step()

    if not i % 40:
      print (f'Epoch: {epoch:03d}/{EPOCH:03d} | '
            f'Batch {i:03d}/{len(train_loader):03d} |'
             f' Cost: {cost:.4f}')

    #연산량 감소를 위한 텐서 재활용
    image2 = image1.clone()
    label2 = label1
    vector2_tensor = vector1_tensor.detach().clone()

elapsed = (time.time() - start_time)/60
print(f'Total Training Time: {elapsed:.2f} min')