# **Import Library**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau

import time
import random
import copy

# **Define Model**

In [None]:
"""# **1) Model define**
### trans_VGG에서 사용할 함수인 conv_2 define
"""

def conv_2(in_dim, out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),# Model define
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2,2)
    )
    return model

def conv_3(in_dim, out_dim):
    model = nn.Sequential(
        nn.Conv2d(in_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),# Model define
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.Conv2d(out_dim, out_dim, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2,2)
    )
    return model

In [None]:
# 코드의 구조
# conv_2는 두 개의 합성곱-활성화 조합과 하나의 맥스풀링 층으로 구성되어 있다.
# conv_3는 네 개의 합성곱-활성화 조합과 하나의 맥스풀링 층으로 구성되어 있다.

# **Define trans_VGG class**

In [None]:
class trans_VGG(nn.Module):
    def __init__(self, base_dim):
        super(trans_VGG, self).__init__()
        self.feature = nn.Sequential(
            conv_2(3, base_dim),
            conv_2(base_dim, base_dim*2),
            conv_2(base_dim*2, base_dim*4),
            conv_3(base_dim*4, base_dim*8),
            conv_3(base_dim*8, base_dim*8)
        )
        self.fc_layer = nn.Sequential(
            nn.Linear(base_dim*8*7*7, base_dim*4*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*4*7*7, base_dim*2*7*7),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(base_dim*2*7*7, base_dim*7*7)
        )
        for param in self.parameters():
            param.requires_grad = True

    def forward(self, x):
        x = self.feature(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layer(x)
        return x

In [None]:
# 코드의 구조

# 초기화 메서드 (__init__)
# base_dim: 네트워크의 기본 채널 수를 결정하는 파라미터

# 특징 추출 부분 (self.feature)
# 이전에 정의한 합성곱 블록 함수로, 각각 2개와 4개의 합성곱 층과 활성화 함수를 포함한 convolution layer

"""
- 첫 번째 블록 (conv_2(3, base_dim)):
입력 채널: 3 (RGB 이미지)
출력 채널: base_dim

- 두 번째 블록 (conv_2(base_dim, base_dim*2)):
입력 채널: base_dim
출력 채널: base_dim*2

- 세 번째 블록 (conv_2(base_dim*2, base_dim*4)):
입력 채널: base_dim*2
출력 채널: base_dim*4

- 네 번째 블록 (conv_3(base_dim*4, base_dim*8)):
입력 채널: base_dim*4
출력 채널: base_dim*8

- 다섯 번째 블록 (conv_3(base_dim*8, base_dim*8)):
입력 및 출력 채널: base_dim*8
"""

# 완전 연결 층 (self.fc_layer)
"""
nn.Linear: 선형 변환(완전 연결 층)을 수행합니다.

- 활성화 함수 및 드롭아웃
nn.ReLU(True): 활성화 함수로 ReLU를 사용하며, 인플레이스 연산을 수행합니다.
nn.Dropout(): 과적합을 방지하기 위해 드롭아웃을 적용합니다.
"""

# 파라미터 학습 설정
# 모든 파라미터의 requires_grad 속성을 True로 설정하여 역전파 시 파라미터가 업데이트되도록 합니다.

# 순전파 메서드 (forward)
# x = self.feature(x): 입력 데이터를 특징 추출 부분에 통과시켜 특징 맵을 얻습니다.
# x = x.view(x.size(0), -1): 다차원 텐서를 2차원 텐서로 변환하여 완전 연결 층에 입력할 수 있도록 합니다.
# x = self.fc_layer(x): 완전 연결 층을 통과하여 최종 출력을 생성합니다.

"""
< 모델의 전체적인 흐름>
입력 단계: 배치 크기의 RGB 이미지를 입력으로 받습니다.
특징 추출 단계 (self.feature): 합성곱 및 풀링 층을 통해 이미지의 공간적 및 계층적 특징을 추출합니다. 각 블록에서 채널 수가 증가하여 더 복잡한 특징을 학습할 수 있습니다.
평탄화 단계: 합성곱 층의 출력을 펼쳐서 완전 연결 층의 입력으로 사용합니다.
분류 또는 회귀 단계 (self.fc_layer): 완전 연결 층을 통해 최종 출력을 계산합니다.활성화 함수와 드롭아웃을 사용하여 모델의 일반화 성능을 향상시킵니다.
"""

- Hyper_paremeter : Learning rate, momentum, weight decay 등은 논문의 Hyper peremeter value로 초기화


In [None]:
import torch.nn.init as init

seed = time.time()

def custom_init_weights(m):
  if seed is not None:
    torch.manual_seed(seed)
  if isinstance(m, torch.nn.Linear) and m.weight is not None:
    init.normal_(m.weight, mean=1, std=0.01)
    if m.bias is not None:
      init.constant_(m.bias, 0)

model = trans_VGG(base_dim=64)

loss = nn.BCELoss()
optimizer =torch.optim.SGD(model.parameters(), lr = 0.01,momentum = 0.9, weight_decay = 0.0005)
scheduler = ReduceLROnPlateau(optimizer, mode='max', patience=10, factor=0.1, verbose=True)

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.RandomCrop(224)])

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 코드의 구조

# custom_init_weights 함수 : 모델 내의 nn.Linear 계층의 가중치와 바이어스를 특정 방식으로 초기화
# 가중치 초기화: init.normal_(m.weight, mean=1, std=0.01): 가중치를 평균이 1이고 표준편차가 0.01인 정규분포로 초기화
# 바이어스 초기화: init.constant_(m.bias, 0): 바이어스를 0으로 초기화

# 모델 인스턴스화
# trans_VGG 클래스: 이전에 정의한 VGG 네트워크 기반의 커스텀 신경망 모델

# 손실함수 정의
# nn.BCELoss(): 이진 분류를 위한 바이너리 크로스 엔트로피 손실 함수입니다.
# 이 손실 함수를 사용할 때는 모델의 출력이 시그모이드 함수를 통과하여 0과 1 사이의 값이 되어야한다.

# 옵티마이저 설정 : torch.optim.SGD: 확률적 경사 하강법(SGD) 옵티마이저를 사용합니다
# lr=0.01: 학습률을 0.01로 설정합니다.
# momentum=0.9: 모멘텀을 사용하여 학습 속도를 가속화하고 진동을 줄입니다.
# weight_decay=0.0005: 가중치 감쇠(L2 정규화)를 적용하여 과적합을 방지합니다.

# 학습률 스케줄러 설정
# 모델의 성능 향상이 멈췄을 때 학습률을 줄여주는 스케줄러

# **Import Dataset**

In [None]:
import os
from PIL import Image
import numpy as np
from torch.utils.data import Dataset

# Project 3 폴더 경로
project_folder = '/content/drive/MyDrive/Project3'

image = []
label = []

# Project 3 폴더 내부의 세부 폴더를 확인하고 이미지와 라벨 데이터 생성
for subdir, _, files in os.walk(project_folder):
    for file in files:
        # 이미지 파일인지 확인
        if file.endswith(('png', 'jpg', 'jpeg')):
            image_path = os.path.join(subdir, file)
            image.append(image_path)

            # 이미지가 속한 세부 폴더의 이름을 라벨로 사용
            label_name = os.path.basename(subdir)
            label.append(label_name)

indices = np.random.permutation(len(image))
IMAGE = [image[i] for i in indices]
LABEL = [label[i] for i in indices]

class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert('RGB')
        image = transforms.RandomCrop(224)(image)
        image = transforms.ToTensor()(image)

        return image, label

BATCH_SIZE = 1

TRAINING_image = []
TRAINING_label = []
TEST_image = []
TEST_label = []

for i in range(0,80):
  for j in range(0,20):
    for k in range(0,2):
      TRAINING_image.append(image[200*j+i+k])
      TRAINING_label.append(label[200*j+i+k])

for i in range(80,100):
  for j in range(0,20):
    for k in range(0,2):
      TEST_image.append(image[200*j+i+k])
      TEST_label.append(label[200*j+i+k])

train_dataset = CustomDataset(TRAINING_image, TRAINING_label, transform = transform)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE,num_workers=2)
test_dataset = CustomDataset(TEST_image, TEST_label, transform = transform)
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE,num_workers=2)

In [None]:
# 코드의 구조

# 데이터 수집 및 전처리: 지정된 디렉토리에서 이미지 파일과 해당 라벨을 수집합니다. 데이터를 무작위로 섞어 순서에 의한 영향을 최소화합니다.
# 데이터셋 분할:수집된 데이터를 훈련용과 테스트용으로 분할합니다. 분할 로직은 인덱스 계산을 통해 특정 이미지를 선택하는 방식입니다.
# 데이터셋 및 데이터로더 생성: 커스텀 데이터셋 클래스를 정의하고, 이를 통해 데이터셋 객체를 만듭니다. 데이터로더를 사용하여 배치 단위로 데이터를 모델에 공급할 수 있도록 준비합니다.

# **Training**

In [None]:
"""# **3) TRAINING**"""

EPOCH = 80

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(DEVICE)

start_time = time.time()
train_acc_lst, test_acc_lst = [],[]

for epoch in range(EPOCH):
  model.train()
  correct_pred, num_examples = 0, 3200
  for i, (_image1, _label1) in enumerate(train_loader):
    image1 = _image1.to(DEVICE)
    label1 = _label1[0]
    vector1_tensor = model(image1)

    if (i == 0): #Exception Case
      image2 = image1
      label2 = label1
      vector2_tensor = vector1_tensor

    similarity =  F.cosine_similarity(vector1_tensor, vector2_tensor, dim= -1)
    scaled_similarity = torch.sigmoid(similarity)

    if label1 == label2 and scaled_similarity.item() > 0.5:
        correct_pred += 1
    elif label1 != label2 and scaled_similarity.item() < 0.5:
        correct_pred += 1

    if label1 == label2:
      target_vector = [1]
    else :
      target_vector = [0]

    target_tensor = torch.tensor(target_vector).float()
    target_tensor = target_tensor.to(DEVICE)
    optimizer.zero_grad()
    cost = loss(scaled_similarity, target_tensor)
    cost.backward()
    optimizer.step()

    if not i % 40:
      print (f'Epoch: {epoch:03d}/{EPOCH:03d} | '
            f'Batch {i:03d}/{len(train_loader):03d} |'
             f' Cost: {cost:.4f}')

    #연산량 감소를 위한 텐서 재활용
    image2 = image1.clone()
    label2 = label1
    vector2_tensor = vector1_tensor.detach().clone()

elapsed = (time.time() - start_time)/60
print(f'Total Training Time: {elapsed:.2f} min')

In [None]:
# 코드의 구조
# 1. 모델 및 환경 설정
# 모델을 GPU 또는 CPU로 이동하고, 필요한 변수들을 초기화합니다.

# 2.훈련 루프 실행: 지정된 에포크 수만큼 모델을 학습시킵니다.
# 각 에포크마다 모델을 훈련 모드로 설정하고, 정확도 계산을 위한 변수를 초기화합니다.

# 3. 데이터 로딩 및 전처리: train_loader를 통해 배치 단위로 데이터를 불러옵니다.
# 이미지를 DEVICE로 이동시키고, 라벨을 가져옵니다.
# 이미지를 모델에 입력하여 특징 벡터를 얻습니다.

# 4. 유사도 계산 및 손실 함수 적용
# 현재 배치의 특징 벡터와 이전 배치의 특징 벡터 간의 코사인 유사도를 계산합니다.
# 유사도를 시그모이드 함수를 통해 [0, 1] 범위로 변환합니다.
# 예측된 유사도와 실제 타깃 값 간의 손실을 계산합니다.

# 5. 역전파 및 파라미터 업데이트
# 옵티마이저의 그라디언트를 초기화하고, 역전파를 통해 그라디언트를 계산한 후, 파라미터를 업데이트합니다.

# 6. 정확도 계산 및 출력
# 예측 결과를 기반으로 정확도를 계산합니다.
# 일정 간격으로 진행 상황과 손실 값을 출력합니다.

# 7. 이전 배치 정보 업데이트
# 다음 배치에서 사용할 이전 배치의 정보(이미지, 라벨, 특징 벡터)를 업데이트합니다.

# 8. 훈련 시간 측정
# 전체 훈련이 완료된 후, 총 훈련 시간을 계산하고 출력합니다.

### 코드에서 개선 했으면 하는 부분

In [None]:
# 데이터 분할 로직의 개선
from sklearn.model_selection import train_test_split
train_image, test_image, train_label, test_label = train_test_split(
    IMAGE, LABEL, test_size=0.2, stratify=LABEL, random_state=42)
# 현재 데이터 분할 방식은 인덱스 중복과 범위 오류의 위험이 있기에 데이터를 클래스별로 그룹화하고, train_test_split 함수를 사용하여 분할하는 것이 좋음

> 이전 코드에서 정의된 scheduler가 사용되지 않고 있다. 에포크가 끝날 때마다 검증 손실이나 정확도를 기반으로 학습률을 조정할 수 있을 것 같다.
