# **Step 0. 데이터셋 준비하기**   
인공지능 모델 학습에 필요한 라이브러리, 유틸리티 함수, 데이터셋을 준비한다.

**Step 0-1. 기본 라이브러리 함수 준비**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy, math, pdb, sys, time, importlib, datetime
import torchvision.transforms as transforms
import random
import glob
import os
import torchvision

from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm
from PIL import Image
from sklearn import metrics
from operator import itemgetter

def accuracy(output, target, topk=(1,)):
    """Computes the precision@k for the specified values of k"""
    maxk = max(topk)
    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
        res.append(correct_k.mul_(100.0 / batch_size))
    return res

def tuneThresholdfromScore(scores, labels, target_fa, target_fr = None):
    """Get error rate from score-label pairs. Tune threshold for similarity score."""
    fpr, tpr, thresholds = metrics.roc_curve(labels, scores, pos_label=1)
    fnr = 1 - tpr

    tunedThreshold = [];
    if target_fr:
        for tfr in target_fr:
            idx = numpy.nanargmin(numpy.absolute((tfr - fnr)))
            tunedThreshold.append([thresholds[idx], fpr[idx], fnr[idx]]);

    for tfa in target_fa:
        idx = numpy.nanargmin(numpy.absolute((tfa - fpr))) # numpy.where(fpr<=tfa)[0][-1]
        tunedThreshold.append([thresholds[idx], fpr[idx], fnr[idx]]);

    idxE = numpy.nanargmin(numpy.absolute((fnr - fpr)))
    eer  = max(fpr[idxE],fnr[idxE])*100

    return (tunedThreshold, eer, fpr, fnr);

def round_down(num, divisor):
    return num - (num%divisor)

def worker_init_fn(worker_id):
    numpy.random.seed(numpy.random.get_state()[1][0] + worker_id)

**Step 0-2. Google Drive 에서 인물 사진 데이터를 가져오기**

In [None]:
from google.colab import drive
import zipfile

drive.mount('/content/drive')
############################################# TODO #############################################

zip_path =       # 이미지 압축 파일 위치 지정
output_path =    # 압축 해제 위치 지정
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(output_path)

val_path =      # val_pairs.csv 파일 위치 지정
!cp "$val_path" "$output_path"

################################################################################################

**Step 0-3. 학습 데이터셋 준비**

In [None]:
class meta_loader(Dataset):
    def __init__(self, train_path, train_ext, transform):

        ## Read Training Files
        files = glob.glob('%s/*/*.%s'%(train_path,train_ext))

        ## Make a mapping from Class Name to Class Number
        dictkeys = list(set([x.split('/')[-2] for x in files]))
        dictkeys.sort()
        dictkeys = { key : ii for ii, key in enumerate(dictkeys) }

        self.transform  = transform

        self.label_dict = {}
        self.data_list  = []
        self.data_label = []

        for lidx, file in enumerate(files):
            speaker_name = file.split('/')[-2]
            speaker_label = dictkeys[speaker_name];

            if not (speaker_label in self.label_dict):
                self.label_dict[speaker_label] = [];

            self.label_dict[speaker_label].append(lidx);

            self.data_label.append(speaker_label)
            self.data_list.append(file)

        print('{:d} files from {:d} classes found.'.format(len(self.data_list),len(self.label_dict)))

    def __getitem__(self, indices):

        feat = []
        for index in indices:
            feat.append(self.transform(Image.open(self.data_list[index])));
        feat = numpy.stack(feat, axis=0)

        return torch.FloatTensor(feat), self.data_label[index]

    def __len__(self):

        return len(self.data_list)

class meta_sampler(torch.utils.data.Sampler):
    def __init__(self, data_source, nPerClass, max_img_per_cls, batch_size):

        self.label_dict         = data_source.label_dict
        self.nPerClass          = nPerClass
        self.max_img_per_cls    = max_img_per_cls;
        self.batch_size         = batch_size;

        self.num_iters          = 0

    def __iter__(self):

        ## Get a list of identities
        dictkeys = list(self.label_dict.keys());
        dictkeys.sort()

        lol = lambda lst, sz: [lst[i:i+sz] for i in range(0, len(lst), sz)]

        flattened_list = []
        flattened_label = []

        ## Data for each class
        for findex, key in enumerate(dictkeys):
            data    = self.label_dict[key]
            numSeg  = round_down(min(len(data),self.max_img_per_cls),self.nPerClass)

            rp      = lol(numpy.random.permutation(len(data))[:numSeg],self.nPerClass)
            flattened_label.extend([findex] * (len(rp)))
            for indices in rp:
                flattened_list.append([data[i] for i in indices])

        ## Data in random order
        mixid           = numpy.random.permutation(len(flattened_label))
        mixlabel        = []
        mixmap          = []

        ## Prevent two pairs of the same speaker in the same batch
        for ii in mixid:
            startbatch = len(mixlabel) - len(mixlabel) % self.batch_size
            if flattened_label[ii] not in mixlabel[startbatch:]:
                mixlabel.append(flattened_label[ii])
                mixmap.append(ii)

        batch_indices = [flattened_list[i] for i in mixmap]

        self.num_iters = len(batch_indices)

        return iter(batch_indices)

    def __len__(self):
        return self.num_iters

def get_data_loader(batch_size, max_img_per_cls, nDataLoaderThread, nPerClass, train_path, train_ext, transform):

    train_dataset = meta_loader(train_path, train_ext, transform)

    train_sampler = meta_sampler(train_dataset, nPerClass, max_img_per_cls, batch_size)

    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        num_workers=nDataLoaderThread,
        sampler=train_sampler,
        pin_memory=False,
        worker_init_fn=worker_init_fn,
        drop_last=True,
    )

    return train_loader

**Step 0-4. 테스트 데이터셋 준비**

In [None]:
class test_dataset_loader(Dataset):
    def __init__(self, test_list, test_path, transform):
        self.test_path  = test_path
        self.data_list  = test_list
        self.transform  = transform

    def __getitem__(self, index):
        img = Image.open(os.path.join(self.test_path, self.data_list[index]))
        return self.transform(img), self.data_list[index]

    def __len__(self):
        return len(self.data_list)


# **Step 1. 인공지능 모델 구성하기**   
얼굴 인식을 위한 인공지능 모델을 만든다.

**Step 1-1. Backbone network 선택**   
이미지를 임베딩 할 수 있는 Backbone network을 만든다.

In [None]:
def MainModel(nOut=1024):
    ## Define your own backbone network
    ############################################# TODO #############################################

    ################################################################################################
    return torchvision.models.resnet18(num_classes=nOut) ## Example: Resnet


**Step 1-2. Loss Function 선택**   
임베딩한 벡터와 원본 Label과의 차이를 비교할 수 있는 Loss Function을 만든다.

In [None]:

class LossFunction(nn.Module):
    ## Define your own loss function
    ############################################# TODO #############################################

    ################################################################################################

    # Example: Triplet Loss
    def __init__(self, hard_rank=0, hard_prob=0, margin=0, **kwargs):
        super(LossFunction, self).__init__()

        self.test_normalize = True
        self.hard_rank  = hard_rank
        self.hard_prob  = hard_prob
        self.margin     = margin

        print('Initialised Triplet Loss')

    def forward(self, x, label=None):
        assert x.size()[1] == 2

        # Normalize anchor and positive
        out_anchor      = F.normalize(x[:,0,:], p=2, dim=1)
        out_positive    = F.normalize(x[:,1,:], p=2, dim=1)

        # Choose appropriate negative indices
        negidx      = self.choose_negative(out_anchor.detach(),out_positive.detach(),type='any')

        # Get negative pairs
        out_negative = out_positive[negidx,:]

        ## Calculate positive and negative pair distances
        pos_dist    = F.pairwise_distance(out_anchor,out_positive)
        neg_dist    = F.pairwise_distance(out_anchor,out_negative)

        ## Triplet loss function
        nloss   = torch.mean(F.relu(torch.pow(pos_dist, 2) - torch.pow(neg_dist, 2) + self.margin))

        return nloss
    ## ===== ===== ===== ===== ===== ===== ===== =====
    ## Negative mining
    ## ===== ===== ===== ===== ===== ===== ===== =====
    def choose_negative(self, embed_a, embed_p, type=None):
        # Get batch size
        batch_size = embed_a.size(0)

        # Positive and negative indices
        negidx = [] # empty list to fill
        allidx = range(0,batch_size)

        for idx in allidx:
            excidx = list(allidx)
            excidx.pop(idx)
            if type == 'any':
                # Random negative mining
                negidx.append(random.choice(excidx))
            else:
                ValueError('Undefined type of mining.')

        return negidx

**Step 1-3. Optimizer 선택**   
인공지능 모델의 파라미터들을 업데이트 할 optimizer를 만든다.

In [None]:
def Optimizer(parameters, lr, weight_decay):
	## Define your own optimizer
	############################################# TODO #############################################

	################################################################################################
	return torch.optim.Adam(parameters, lr = lr, weight_decay = weight_decay); ## Example: Adam optimizer

**Step 1-4. Learning Rate Scheduler 선택**   
Learning Rate을 조절할 scheduler를 만든다.

In [None]:
def Scheduler(optimizer, test_interval, max_epoch, lr_decay):
	## Define your own learning rate scheduler
	############################################# TODO #############################################

	################################################################################################

	## Example: stepLR scheduler
	sche_fn = torch.optim.lr_scheduler.StepLR(optimizer, step_size=test_interval, gamma=lr_decay)
	lr_step = 'epoch'
	return sche_fn, lr_step

**Step 1-5. 임베딩 네트워크 구성**   
위의 요소들을 사용하여 임베딩 네트워크를 구성한다.

In [None]:
class EmbedNet(nn.Module):

    def __init__(self, nPerClass, nOut):
        super(EmbedNet, self).__init__();

        ## Define an embedding model(Backbone network)
        ############################################# TODO #############################################

        ################################################################################################

        ## Define a loss function
        ############################################# TODO #############################################

        ################################################################################################

        ## Number of examples per identity per batch
        self.nPerClass = nPerClass

    def forward(self, data, label=None):
        data    = data.reshape(-1,data.size()[-3],data.size()[-2],data.size()[-1])
        ## Extract embedding vector from data
        ############################################# TODO #############################################
        outp =
        ################################################################################################

        if label == None:
            ## Eval-only. Just return the embedding vector
            ############################################# TODO #############################################

            ################################################################################################
        else:
            outp    = outp.reshape(self.nPerClass,-1,outp.size()[-1]).transpose(1,0).squeeze(1)
            ## Calculate loss
            ############################################# TODO #############################################
            nloss =
            ################################################################################################
            return nloss

**Step 1-6. 모델 학습 오브젝트 생성**   
임베딩 네트워크를 학습하기 위한 학습 오브젝트를 생성한다.

In [None]:
class ModelTrainer(object):

    def __init__(self, embed_model, mixedprec, lr, weight_decay, test_interval, max_epoch, lr_decay):

        self.__model__  = embed_model

        ## Define optimizer
        ############################################# TODO #############################################
        self.__optimizer__ =
        ################################################################################################

        ## Define learning rate scheduler
        ############################################# TODO #############################################
        self.__scheduler__, self.lr_step =
        ################################################################################################

        ## For mixed precision training
        self.scaler = GradScaler()
        self.mixedprec = mixedprec

        assert self.lr_step in ['epoch', 'iteration']


**Step 1-6. 인공지능 학습**   
해당 인공지능 모델을 학습시키는 함수를 작성한다.

In [None]:
def train_network(trainer, loader):
    trainer.__model__.train();
    stepsize = loader.batch_size;
    counter = 0;
    index   = 0;
    loss    = 0;

    with tqdm(loader, unit="batch") as tepoch:
        for data, label in tepoch:
            tepoch.total = tepoch.__len__()
            data    = data.transpose(1,0)

            ## Reset gradients
            ############################################# TODO #############################################

            ################################################################################################

            ## Forward and backward passes
            ############################################# TODO #############################################

            ################################################################################################

            ## Update status
            ############################################# TODO #############################################
            loss +=
            counter +=
            index +=
            ################################################################################################

            # Print statistics to progress bar
            tepoch.set_postfix(loss=loss/counter)

            if trainer.lr_step == 'iteration': trainer.__scheduler__.step()
        if trainer.lr_step == 'epoch': trainer.__scheduler__.step()

    return (loss/counter);

**Step 1-6. 인공지능 평가**   
해당 인공지능 모델을 테스트하는 함수를 작성한다.

In [None]:
def evaluateFromList(trainer, test_list, test_path, nDataLoaderThread, transform, print_interval=100):

    trainer.__model__.eval();
    feats       = {}

    ## Read all lines
    with open(test_list) as f:
        lines = f.readlines()

    ## Get a list of unique file names
    files = sum([x.strip().split(',')[-2:] for x in lines],[])
    setfiles = list(set(files))
    setfiles.sort()

    ## Define test data loader
    test_dataset = test_dataset_loader(setfiles, test_path, transform=transform)
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=1,
        shuffle=False,
        num_workers=nDataLoaderThread,
        drop_last=False,
    )

    print('Generating embeddings')

    ## Extract features for every image
    for data in tqdm(test_loader):
        inp1                = data[0][0].cuda()
        ############################################# TODO #############################################
        ref_feat            =
        ################################################################################################
        feats[data[1][0]]   = ref_feat

    all_scores = [];
    all_labels = [];
    all_trials = []

    print('Computing similarities')

    ## Read files and compute all scores
    for line in tqdm(lines):

        data = line.strip().split(',');

        ref_feat = feats[data[1]]
        com_feat = feats[data[2]]

        ## Get cosine similarity between the two features
        ############################################# TODO #############################################
        score =
        ################################################################################################

        all_scores.append(score.item());
        all_labels.append(int(data[0]));
        all_trials.append(data[1] + "," + data[2])

    return (all_scores, all_labels, all_trials)

**Step 1-6. 파라미터 저장/불러오기**   
학습한 파라미터들을 저장하거나 가져오는 함수를 작성한다.

In [None]:
def saveParameters(model, path):
    torch.save(model.__model__.state_dict(), path);

In [None]:
def loadParameters(model, path):
    self_state = model.__model__.state_dict();
    loaded_state = torch.load(path);
    for name, param in loaded_state.items():
        origname = name;
        if name not in self_state:
            if name not in self_state:
                print("{} is not in the model.".format(origname));
                continue;

        if self_state[name].size() != loaded_state[origname].size():
            print("Wrong parameter length: {}, model: {}, loaded: {}".format(origname, self_state[name].size(), loaded_state[origname].size()));
            continue;

        self_state[name].copy_(param);

# **Step 2. 인공지능 모델 학습**   
얼굴 인식을 위한 인공지능 모델을 학습한다.

**Step 2-1. 하이퍼 파라미터 설정**   
학습에 필요한 하이퍼 파라미터를 설정한다.

In [None]:
# Dataloader
batch_size = 64                                       # Batch size
max_img_per_cls = 500                                 # Maximum number of images per class per epoch
nDataLoaderThread = 5                                 # Number of loader threads

# Training
test_interval = 5                                     # Test and save the model every this epochs
max_epoch = 100                                       # Maximum number of epochs

# Optimizer
lr = 0.001                                            # Learning rate
lr_decay = 0.90                                       # Learning rate decay for steplr scheduler
weight_decay = 0                                      # Weight decay in the optimizer

# Loss
margin = 0.1                                          # Loss margin for some loss functions
scale = 30                                            # Loss scale for some loss functions
nPerClass = 2                                         # Number of images per class per batch, only for metric learning based losses
nClasses = 2000                                       # Number of classes in the softmax layer, only for softmax-based losses

# Load/Save
initial_model = ""                                    # Path of initial model's parameters
save_path = ""                                        # Output path for logs and parameters

# Data
train_path = ""                                       # Path of train dataset
test_path = ""                                        # Path of validation dataset
test_list = ""                                        # Path of list for validation (val_pairs.csv)
train_ext = "jpg"

# Model
nOut = 1024                                           # Size of embedding feature vectors

# Test only
eval = False                                          # Evalation-only knob
output = ""

# Other
mixedprec = False
gpu = 0

**Step 2-2. 인공지능 모델 학습**   
인공지능 모델을 학습하는 코드를 작성한다.

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"]='{}'.format(gpu)

if not (os.path.exists(save_path)):
  os.makedirs(save_path)

## Define network
############################################# TODO #############################################
s =
################################################################################################
it = 1

## Input transformations for training
train_transform = transforms.Compose(
  [transforms.ToTensor(),
    transforms.Resize(256),
    transforms.RandomCrop([224,224]),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

## Input transformations for evaluation
test_transform = transforms.Compose(
  [transforms.ToTensor(),
    transforms.Resize(256),
    transforms.CenterCrop([224,224]),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

# Initialise trainer and data loader
############################################# TODO #############################################
trainLoader =
trainer =
################################################################################################

## Load model weights
modelfiles = glob.glob('{}/model0*.model'.format(save_path))
modelfiles.sort()

## If the target directory already exists, start from the existing file
if len(modelfiles) >= 1:
  loadParameters(trainer, modelfiles[-1]);
  print("Model {} loaded from previous state!".format(modelfiles[-1]));
  it = int(os.path.splitext(os.path.basename(modelfiles[-1]))[0][5:]) + 1
elif(initial_model != ""):
  loadParameters(trainer, initial_model);
  print("Model {} loaded!".format(initial_model));

## If the current iteration is not 1, update the scheduler
for ii in range(1,it):
  trainer.__scheduler__.step()

## Print total number of model parameters
pytorch_total_params = sum(p.numel() for p in s.__S__.parameters())
print('Total model parameters: {:,}'.format(pytorch_total_params))

## Evaluation-only code
if eval == True:
  ## Evaluate network
  ############################################# TODO #############################################
  sc, lab, trials =
  ################################################################################################
  result = tuneThresholdfromScore(sc, lab, [1, 0.1]);
  print('EER {:.4f}'.format(result[1]))
  if output != '':
    with open(output,'w') as f:
      for ii in range(len(sc)):
        f.write('{:4f},{:d},{}\n'.format(sc[ii],lab[ii],trials[ii]))
  quit();

## Write args to scorefile for training
scorefile = open(save_path+"/scores.txt", "a+");
strtime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
scorefile.flush()

## Core training script
for it in range(it,max_epoch+1):
    clr = [x['lr'] for x in trainer.__optimizer__.param_groups]
    print(time.strftime("%Y-%m-%d %H:%M:%S"), it, "Training epoch {:d} with LR {:.5f} ".format(it,max(clr)));
    ############################################# TODO #############################################
    loss =
    ################################################################################################
    if it % test_interval == 0:
        ## Evaluate network
        ############################################# TODO #############################################
        sc, lab, trials =
        ################################################################################################
        result = tuneThresholdfromScore(sc, lab, [1, 0.1]);

        print("IT {:d}, Val EER {:.5f}".format(it, result[1]));
        scorefile.write("IT {:d}, Val EER {:.5f}\n".format(it, result[1]));
        saveParameters(trainer, save_path+"/model{:09d}.model".format(it));

    print(time.strftime("%Y-%m-%d %H:%M:%S"), "TLOSS {:.5f}".format(loss));
    scorefile.write("IT {:d}, TLOSS {:.5f}\n".format(it, loss));

    scorefile.flush()

scorefile.close();