In [None]:
import os
import random
import glob 
import cv2
import pandas as pd
import numpy as np
import math
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

from timm.scheduler.cosine_lr import CosineLRScheduler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

from facenet_pytorch import InceptionResnetV1

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

    # CuDNN 결정론적 옵션 설정
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 원하는 시드 값 설정
seed = 42
set_seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 디바이스 설정
print(device)

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [None]:
people_list = os.listdir('./train')
print(len(people_list))
print(people_list[:5])

In [None]:
train_len = int(len(people_list)*0.8)
val_len = len(people_list) - train_len

print(train_len,val_len)

In [None]:
train_label_list = [i for i in range(train_len)]
valid_label_list = [i for i in range(train_len,train_len+val_len)]
print(len(train_label_list),len(valid_label_list))

In [None]:
train_label_list = set(train_label_list)
valid_label_list = set(valid_label_list)
print(len(train_label_list),len(valid_label_list))

In [None]:
file_list='./ID_List.txt'

train_images_path = []
train_labels = []

valid_images_path = []
valid_labels = []

with open(file_list) as f:
    files = f.read().splitlines()
    
print(len(files)) # 3917311
print(files[-1]) # 86875 train/m.0_nk/85-FaceId-0_align.jpg
    
for file in tqdm(files):
    label, image_path = file.split()
    label = int(label)
    
    if label in train_label_list:
        train_images_path.append(image_path)
        train_labels.append(label)
        
    elif label in valid_label_list:
        valid_images_path.append(image_path)
        valid_labels.append(label)

In [None]:
print(len(train_images_path),len(train_labels))
print(len(valid_images_path),len(valid_labels))

In [None]:
print(train_images_path[:10])
print(train_labels[:10])

In [None]:
class TrainDataset(Dataset):
    def __init__(self,images_path,labels,transform):
        self.images_path = images_path
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.images_path)
    
    def __getitem__(self,idx):
        img_path = self.images_path[idx]
        label = self.labels[idx]
        
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        
        if self.transform is not None:
            img = self.transform(image=img)['image']
            
        return img, label

In [None]:
class ValidDataset(Dataset):
    def __init__(self,images_path,labels,transform):
        self.images_path = images_path
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.images_path)
    
    def __getitem__(self,idx):
        img_path = self.images_path[idx]
        # train/m.0_nk/85-FaceId-0_align.jpg
        # m.0_nk에 있는 데이터중 임의의 이미지 한장선택 (자기자신제외)
        _,folder,file_name = img_path.split('/')
        candidate_dir = os.path.join('train',folder)
        file_list = os.listdir(candidate_dir)
        
        second_file_name = file_name
        
        while second_file_name == file_name:
            second_file_name = random.choice(file_list)
        second_img_path = os.path.join('train',folder,second_file_name)
        
        label = self.labels[idx]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        
        second_img = cv2.imread(second_img_path)
        second_img = cv2.cvtColor(second_img,cv2.COLOR_BGR2RGB)
        
        if self.transform is not None:
            img = self.transform(image=img)['image']
            second_img = self.transform(image=second_img)['image']
        return img, second_img, label

In [None]:
# dataset  mean std calculate
def compute_mean_std(dataset):
    means = []
    stds = []
    for i in tqdm(range(len(dataset))): # data -> (img,label) 형태
        img = dataset[i][0]
        mean = np.mean(img,axis=(0,1))
        std = np.std(img,axis=(0,1))
        means.append(mean)
        stds.append(std)
    means = np.mean(means,axis=0)/255.0
    stds = np.mean(stds,axis=0)/255.0
    
    return means,stds

In [None]:
# # no transform mean, std calculate
# dataset = TrainDataset(train_images_path,train_labels,None)
# print(compute_mean_std(dataset))
# # # # (array([0.60122519, 0.46037095, 0.39414122]), array([0.20757387, 0.18196426, 0.17056237]))

In [None]:
# Albumentations 변환 함수 생성
train_transform = A.Compose([
    A.HorizontalFlip(p=0.5)
    A.Rotate(limit=(-20,20), interpolation=1, border_mode=0, p=0.5),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255, p=1.0),
    ToTensorV2(),
])

valid_transform = A.Compose([
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255, p=1.0),
    ToTensorV2(),
])

In [None]:
train_dataset = TrainDataset(train_images_path, train_labels,transform=train_transform)
train_dataloader = DataLoader(train_dataset, batch_size=1024,num_workers=8, shuffle=True)

valid_dataset = ValidDataset(valid_images_path, valid_labels,transform=valid_transform)
valid_dataloader = DataLoader(valid_dataset, batch_size=1024,num_workers=8, shuffle=False)

In [None]:
print(len(valid_dataset),len(valid_dataloader))

In [None]:
# delete normalize
class MyInceptionResnetV1(InceptionResnetV1):
    def __init__(self, *args, **kwargs):
        super(MyInceptionResnetV1, self).__init__(*args, **kwargs)

    def forward(self, x):
        x = self.conv2d_1a(x)
        x = self.conv2d_2a(x)
        x = self.conv2d_2b(x)
        x = self.maxpool_3a(x)
        x = self.conv2d_3b(x)
        x = self.conv2d_4a(x)
        x = self.conv2d_4b(x)
        x = self.repeat_1(x)
        x = self.mixed_6a(x)
        x = self.repeat_2(x)
        x = self.mixed_7a(x)
        x = self.repeat_3(x)
        x = self.block8(x)
        x = self.avgpool_1a(x)
        x = self.dropout(x)
        x = self.last_linear(x.view(x.shape[0], -1))
        x = self.last_bn(x)
        if self.classify:
            x = self.logits(x)
        # else:  # 이 부분에서 정규화를 제거
            # x = F.normalize(x, p=2, dim=1)
        return x

In [None]:
# loss 선언 
class SphereFaceLoss(nn.Module):
    def __init__(self, in_features, out_features, m=4):
        super(SphereFaceLoss, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.m = m
        self.base = 1000.0
        self.gamma = 0.0001
        self.power = 2
        self.lambda_min = 5.0
        self.iter = 0
        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

        # 체비쇼프 다항식
        self.mlambda = [
            lambda x: x ** 0,
            lambda x: x ** 1,
            lambda x: 2 * x ** 2 - 1,
            lambda x: 4 * x ** 3 - 3 * x,
            lambda x: 8 * x ** 4 - 8 * x ** 2 + 1,
            lambda x: 16 * x ** 5 - 20 * x ** 3 + 5 * x
        ]

    def forward(self, x, label):
        # lambda = max(lambda_min,base*(1+gamma*iteration)^(-power))
        self.iter += 1
        self.cur_lambda = max(self.lambda_min, self.base * (1 + self.gamma * self.iter) ** (-1 * self.power))
        
        # x : (B,in_channer) , label : (B,)
        # --------------------------- cos(theta) & phi(theta) ---------------------------
        cos_theta = F.linear(F.normalize(x), F.normalize(self.weight)) # self.weight -> (out,in) -> in에 normalize 적용됨.
        cos_theta = cos_theta.clamp(-1, 1) # (B,out_channel)
        cos_m_theta = self.mlambda[self.m](cos_theta) # (B,out_channel) # 함수 전체 element에 적용됨
        theta = cos_theta.data.acos() # (B,out_channel)
        k = ((self.m * theta) / math.pi).floor() # (B,out_channel)
        phi_theta = ((-1.0) ** k) * cos_m_theta - 2 * k # (B,out_channel)
        phi_theta_ = (self.cur_lambda * cos_theta + phi_theta) / (1 + self.cur_lambda)
        NormOfFeature = torch.norm(x, 2, 1) # (B,)

        # --------------------------- convert label to one-hot ---------------------------
        one_hot = torch.zeros_like(cos_theta) # (B,out_channel)
        one_hot.scatter_(1, label.view(-1,1), 1) # (B,out_channel) # out_channel = num_classes

        # --------------------------- Calculate output ---------------------------
        # output = one_hot * phi_theta + (1.0 - one_hot) * cos_theta # (B,out_channel)

        output = one_hot * phi_theta_ + (1 - one_hot) * cos_theta  # (B,out_channel)        
        output *= NormOfFeature.view(-1, 1) # (B,out_channel)
        
        return output

In [None]:
model = MyInceptionResnetV1(classify=False, pretrained='vggface2').to(device)
margin = SphereFaceLoss(in_features=512, out_features=69500).to(device)
criterion = nn.CrossEntropyLoss()

In [None]:
optimizer = optim.SGD([{'params': model.parameters()}, {'params': margin.parameters()}],lr=0.01,weight_decay= 5e-4)
scheduler = CosineLRScheduler(optimizer, t_initial=10, cycle_decay=0.7, lr_min=1e-5)

In [None]:
for epoch in range(10):
    model.train()
    margin.train()
    train_loss = 0.0
    
    for batch_idx, (images,labels) in enumerate(tqdm(train_dataloader)):
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(images) # (B,512)
        outputs = margin(outputs,labels)
        loss = criterion(outputs,labels)
        loss.backward()
        
        train_loss += loss.item() *len(images)
        
        if batch_idx % 100 == 0:
            print(f'Epoch : {epoch+1} | Iter : ({batch_idx+1}/{len(train_dataloader)}) | train_loss : {train_loss / ((batch_idx + 1) * len(images))}, now_lr = {get_lr(optimizer)}')
        
        optimizer.step()
        scheduler.step(epoch + batch_idx/len(train_dataloader))
        
    model.eval()
    valid_collect_count = 0.0
    best_accuracy = 0.0
    
    with torch.no_grad():
        for images,second_images,labels in tqdm(valid_dataloader):
            images = images.to(device)
            second_images = second_images.to(device)
            
            outputs = model(images) # (Batch,512)
            outputs = F.normalize(outputs,p=2,dim=1)
            second_outputs = model(second_images)
            second_outputs = F.normalize(second_outputs,p=2,dim=1)
            
            cos_sim = F.cosine_similarity(outputs, second_outputs) # (Batch,)
            cos_true = torch.where(cos_sim>=0.5,True,False)
            
            valid_collect_count+=torch.sum(cos_true)
        
        valid_accuracy = valid_collect_count / len(valid_dataset)
        if valid_accuracy>best_accuracy:
            best_accuracy = valid_accuracy
            path = f"./pth_file/InceptionResnetV1_sphere_best_model.pt"
            path_margin = f"./pth_file/InceptionResnetV1_sphere_best_margin.pt"
            torch.save(model.state_dict(), path)
            torch.save(margin.state_dict(), path_margin)
            
        
    print(f'Epoch : {epoch+1} | train_loss = {train_loss / len(train_dataset)} | valid_accuracy = {valid_accuracy}')

In [None]:
class TestDataset(Dataset):
    def __init__(self,transform):
        self.transform = transform
        
    def __len__(self):
        return 6000
    
    def __getitem__(self,idx):
        left_img_path = os.path.join('test',f'left_face_{idx}.jpg')
        right_img_path = os.path.join('test',f'right_face_{idx}.jpg')
        
        left_img = cv2.imread(left_img_path)
        left_img = cv2.cvtColor(left_img,cv2.COLOR_BGR2RGB)

        right_img = cv2.imread(right_img_path)
        right_img = cv2.cvtColor(right_img,cv2.COLOR_BGR2RGB)
        
        if self.transform is not None:
            left_img = self.transform(image=left_img)['image']
            right_img = self.transform(image=right_img)['image']
            
        return left_img, right_img


In [None]:
# dataset  mean std calculate
def compute_test_mean_std(dataset):
    means = []
    stds = []
    for i in tqdm(range(len(dataset))): # data -> (img1,img2) 형태
        img_1 = dataset[i][0]
        mean_1 = np.mean(img_1,axis=(0,1))
        std_1 = np.std(img_1,axis=(0,1))
        means.append(mean_1)
        stds.append(std_1)
        
        img_2 = dataset[i][1]
        mean_2 = np.mean(img_2,axis=(0,1))
        std_2 = np.std(img_2,axis=(0,1))
        means.append(mean_2)
        stds.append(std_2)
    means = np.mean(means,axis=0)/255.0
    stds = np.mean(stds,axis=0)/255.0
    
    return means,stds

In [None]:
# # no transform mean, std calculate
# test_dataset = TestDataset(transform=None)
# print(compute_test_mean_std(test_dataset))
# # (array([0.60026345, 0.45291217, 0.36765159]), array([0.19051669, 0.16606976, 0.15597025]))

In [None]:
test_transform = A.Compose([
    A.Normalize(mean=[0.60026345, 0.45291217, 0.36765159], std=[0.19051669, 0.16606976, 0.15597025], max_pixel_value=255, p=1.0),
    ToTensorV2()
])

In [None]:
test_dataset = TestDataset(transform=test_transform)
test_dataloader = DataLoader(test_dataset, batch_size=1024,num_workers=8, shuffle=False)

In [None]:
model.eval()

answer_list = []

with torch.no_grad():
    for images,second_images in tqdm(test_dataloader):
        images = images.to(device)
        second_images = second_images.to(device)
                
        outputs = model(images) # (Batch,512)
        second_outputs = model(second_images)
        cos_sim = F.cosine_similarity(outputs, second_outputs) # (Batch,)
        print(cos_sim)
        answer_list.append(cos_sim)
    answer_list = torch.cat(answer_list,dim=0)
print(answer_list)

In [None]:
print(answer_list[:])

In [None]:
submission = pd.read_csv('./submission/sample_submission.csv')
submission['answer'] = answer_list.tolist()

In [None]:
submission.to_csv('./submission/InceptionResnetV1_sphere_submission.csv',index=False)