In [2]:
%matplotlib inline
import os
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import random
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

## 辅助函数
一些绘制图像等功能的辅助函数

In [None]:
def imshow(img,text=None,should_save=False):
    npimg = img.numpy() #转换为numpy类型
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic',fontweight='bold',
            bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
    plt.imshow(np.transpose(npimg, (1, 2, 0)))#调整数组顺序
    plt.show()    

def show_plot(iteration,loss):
    plt.plot(iteration,loss)
    plt.show()

## 定义配置类
用于管理配置

In [None]:
class Config():
    training_dir = "./data/embeddingVectors/training/"
    testing_dir = "./data/embeddingVectors/testing/"
    train_batch_size = 64
    train_number_epochs = 100

## 数据集处理类
这个类会生成一对向量和一个标签，0表示相似，1表示不相似

In [None]:
class SiameseNetworkDataset(Dataset):
    
    def __init__(self,folderDataset):
        #数据集的路径
        self.folderDataset = folderDataset

        
    def __getitem__(self,index):
        dirs = os.listdir(self.folderDataset)
        vec0_folder = random.choice(dirs)
        if os.path.isdir(os.path.join(self.folderDataset, vec0_folder)):
            #we need to make sure approx 50% of examples are in the same class
            should_get_same_class = random.randint(0,1) 
            if should_get_same_class:
                while True:
                    #keep looping till the same class example is found
                    vec1_folder = random.choice(dirs)
                    if os.path.isdir(self.folderDataset + "/" + vec1_folder):
                        if vec0_folder==vec1_folder:
                            files1_list = os.listdir(os.path.join(self.folderDataset, vec1_folder))
                            files0_list = os.listdir(os.path.join(self.folderDataset, vec0_folder))
                            sample0 = random.choice(files0_list)
                            sample1 = random.choice(files1_list)
                            break
            else:
                while True:
                    #keep looping till a different class example is found
                    vec1_folder = random.choice(dirs) 
                    if os.path.isdir(os.path.join(self.folderDataset, vec1_folder)):
                        if vec0_folder !=vec1_folder:
                            files1_list = os.listdir(os.path.join(self.folderDataset, vec1_folder))
                            files0_list = os.listdir(os.path.join(self.folderDataset, vec0_folder))
                            sample0 = random.choice(files0_list)
                            sample1 = random.choice(files1_list)
                            break
        #读取文件中的向量数据
        
        return sample0, sample1 , torch.from_numpy(np.array([int(vec1_folder!=vec0_folder)],dtype=np.float32))
    
    def __len__(self):
        dirs = os.listdir(self.folderDataset)
        len = 0
        for  i, dir in enumerate(dirs, 0):
            file_list = os.listdir(os.path.join(self.folderDataset, dir))
            len += file_list.count()
                
        return len

## 定义神经网络

In [3]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__() #调用父类nn.Module.__init__(),调用父类的构造函数

        self.fc1 = nn.Sequential(
            nn.Linear(64, 500),
            nn.ReLU(inplace=True),

            nn.Linear(500, 500),
            nn.ReLU(inplace=True),

            nn.Linear(500, 5))

    def forward_once(self, x):
        output = self.fc1(x)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2

## 定义损失函数

In [None]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim = True)#计算欧式距离
        #如果属于同一类则让两者距离尽可能的小，若不属于同一类则让距离大于等于margin
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))

        return loss_contrastive

## 开始训练!

In [None]:
folder_dataset = dset.ImageFolder(root=Config.training_dir)
#其默认你的数据集已经自觉按照要分配的类型分成了不同的文件夹，一种类型的文件夹下面只存放一种类型的数据
siamese_dataset = SiameseNetworkDataset(folderDataset=folder_dataset)
train_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=Config.train_batch_size)
net = SiameseNetwork().cuda()
criterion = ContrastiveLoss()
optimizer = optim.Adam(net.parameters(),lr = 0.0005 )
counter = []
loss_history = [] 
iteration_number= 0
for epoch in range(0,Config.train_number_epochs):
    for i, data in enumerate(train_dataloader,0):
        sample0, sample1 , label = data
        sample0, sample1 , label = sample0.cuda(), sample1.cuda() , label.cuda()#从cpu转移到gpu
        optimizer.zero_grad()#模型中参数的梯度设为0
        output1,output2 = net(sample0,sample1)
        loss_contrastive = criterion(output1,output2,label)
        loss_contrastive.backward()
        optimizer.step()
        if i %10 == 0 :
            print("Epoch number {}\n Current loss {}\n".format(epoch,loss_contrastive.item()))
            iteration_number +=10
            counter.append(iteration_number)
            loss_history.append(loss_contrastive.item())
show_plot(counter,loss_history)

## 测试
用欧式距离表示其相似性，label0表示相似，1表示不相似
The last 3 subjects were held out from the training, and will be used to test. The Distance between each image pair denotes the degree of similarity the model found between the two images. Less means it found more similar, while higher values indicate it found them to be dissimilar.

In [None]:
folder_dataset_test = dset.ImageFolder(root=Config.testing_dir)
siamese_dataset = SiameseNetworkDataset(folderDataset=folder_dataset_test)

test_dataloader = DataLoader(siamese_dataset,num_workers=6,batch_size=1,shuffle=True)
dataiter = iter(test_dataloader)


for i in range(10):
    x0,x1,label2 = next(dataiter)

    output1,output2 = net(Variable(x0).cuda(),Variable(x1).cuda())
    euclidean_distance = F.pairwise_distance(output1, output2)
    print("euclidean_distance: %lf , lable %d" %(euclidean_distance.item(), label2))
