In [1]:
import matplotlib.pyplot as plt
from PIL import Image
%matplotlib inline
from torch import from_numpy
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
import torch
import numpy as np
import math
from scipy.spatial import distance
import os 
from scipy.misc import imresize
import os.path as osp
import h5py

name = 'casia-px_ad_view-pei-62-nm'
data_path = './data' # path to save .mat file
label_list = list(np.array(range(124)) + 1)
train_list = list(range(1,63)) # select the first 62 subjects for training
test_list = list(range(63,125)) # select the last 62 subjects for testing
cov_list = [1,2,3,4,5,6] # normal walking: 1-6, clothing variation: 7-8, carrying variation: 9-10 
probe_cov = [5,6] # define probe set
gallery_cov = [1,2,3,4] # define gallery set
view_list = [1,2,3,4,5,6,7,8,9,10,11] # define viewing angles 

class Config:
    batch_size = 128
    view_dim = 11
    num_class = 62
    num_channel = 5
    z_dim = 128
    lrG = 5e-5
    lrD = 5e-5
    # img size
    s1_1 = 64
    s1_2 = 64

    s2_1, s3_1, s4_1, s5_1 =\
        32, 16, 8, 4
    s2_2, s3_2, s4_2, s5_2 =\
        32, 16, 8, 4

    max_iter_step = 40000
    lossD = []
    lossPX = []
    lossV = []
    acc_list = []
    
f = h5py.File(osp.join(data_path,'pei_CASIA_5_3.mat'))
data = f['data']
label = f['label']
view = f['view']
cov = f['cov']

In [None]:
def filter_samples(label, label_list, view, view_list, cov, cov_list):
    idx = [ i for i in range(label.shape[0]) if label[i] in label_list and view[i] in view_list and cov[i] in cov_list]
    return idx
   
opt=Config()

data = np.transpose(data, [3,2,1,0])/255.0
label = np.reshape(label,[-1]).astype(int)
view = np.reshape(view,[-1]).astype(int)
cov = np.reshape(cov,[-1]).astype(int)

tmp = np.reshape(data,[-1,data.shape[1]*64*64])
idx = np.isnan(np.sum(tmp,1)) + (np.sum(tmp,1)==0)
data = data[idx==0,:,:,:]
label = label[idx==0]
cov = cov[idx==0]
view = view[idx==0]

In [3]:
view_num = opt.view_dim
train_idx = filter_samples(label, train_list, cov, cov_list, view, view_list)
test_idx = filter_samples(label, test_list, cov, cov_list, view, view_list)

train_n = len(train_idx)
test_n = len(test_idx)
train_label = label[train_idx]
test_label = label[test_idx]
train_cov = cov[train_idx]
test_cov = cov[test_idx]
train_view = view[train_idx]
test_view = view[test_idx]

idxs = [ [train_idx[j] for j in range(train_n) if train_label[j] == train_label[i]] for i in range(train_n) ]
index = []
for (i,idx) in enumerate(idxs):
    index.extend([(train_idx[i],j) for j in idx])
index = np.array(index)

In [None]:
from sklearn.preprocessing import OneHotEncoder
label_enc = OneHotEncoder()
label_enc.fit(np.reshape(train_label,[-1,1]))
num_class = label_enc.transform(np.reshape(train_label,[-1,1])).toarray().shape[1]

view_enc = OneHotEncoder()
view_enc.fit(np.reshape(train_view,[-1,1]))

channel_enc = OneHotEncoder()
channel_enc.fit(np.reshape(range(opt.num_channel),[-1,1]))



# If transform the latent reprensention from viewing angle 0 to viewing angle 36,
# this function will return [[1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]] when batch_size equals 1.
def view_transform_encoder(v1,v2):
    v1 = (v1-1).astype(int)
    v2 = (v2-1).astype(int)
    view_encode = np.zeros([v1.shape[0],opt.view_dim])
    for i in range(v1.shape[0]):
        if v1[i]>v2[i]:
            if v1[i]-v2[i]>5:
                view_encode[i,v1[i]:] = 1
                view_encode[i,0:v2[i]] = 1
            else:
                view_encode[i,v2[i]:v1[i]] = -1
        elif v1[i]<v2[i]:
            if v2[i]-v1[i]>5:
                view_encode[i,v2[i]:] = -1
                view_encode[i,0:v1[i]] = -1
            else:
                view_encode[i,v1[i]:v2[i]] = 1
    return autograd.Variable(from_numpy(view_encode).float().cuda())

# Calculate rank-k accuracy
def count_cmc_curve(label_list, num_rank):
    acc = np.zeros([num_rank,opt.view_dim,opt.view_dim])
    for i in range(1,opt.view_dim+1):
        x1_idx = filter_samples(label, label_list, view, [i], cov, probe_cov)
        x1 = np.zeros([len(x1_idx),opt.z_dim])
        y1 = label[x1_idx]
        v1 = np.zeros(y1.shape)
        for k in range(0,x1.shape[0],opt.batch_size):
            x = autograd.Variable(from_numpy(data[x1_idx[k:k+opt.batch_size],:,:,:]).float().cuda()) 
            tmp = netE(x)
            x1[k:k+opt.batch_size,:] = tmp.data.cpu().numpy()
            v1[k:k+opt.batch_size] = torch.max(F.softmax(netVes(tmp)),1)[1].data.cpu().numpy()+1 # Estimate the view information of probe set
        
        for j in range(1,opt.view_dim+1):    
            x2_idx = filter_samples(label, label_list, view, [j], cov, gallery_cov)
            x2 = np.zeros([len(x2_idx),opt.z_dim])
            y2 = label[x2_idx]
            v2 = np.zeros(y2.shape)
            for k in range(0,x2.shape[0],opt.batch_size):
                x = autograd.Variable(from_numpy(data[x2_idx[k:k+opt.batch_size],:,:,:]).float().cuda())
                tmp = netE(x)
                x2[k:k+opt.batch_size,:] = netE(x).data.cpu().numpy()
                v2[k:k+opt.batch_size] = torch.max(F.softmax(netVes(tmp)),1)[1].data.cpu().numpy()+1 # Estimate the view information of gallery set
            
            # Transform the probe and gallery set to the same view
            v = np.ones(y1.shape) * max(set(v2), key=list(v2).count)
            for k in range(0,x1.shape[0],opt.batch_size):
                x1[k:k+opt.batch_size,:] = netV(autograd.Variable(from_numpy(x1[k:k+opt.batch_size,:]).float().cuda()) ,view_transform_encoder(v1[k:k+opt.batch_size],v[k:k+opt.batch_size])).data.cpu().numpy()
            v = np.ones(y2.shape) * max(set(v2), key=list(v2).count)
            for k in range(0,x1.shape[0],opt.batch_size):
                x2[k:k+opt.batch_size,:] = netV(autograd.Variable(from_numpy(x2[k:k+opt.batch_size,:]).float().cuda()) ,view_transform_encoder(v2[k:k+opt.batch_size],v[k:k+opt.batch_size])).data.cpu().numpy()
            
            # Normalize the latent representations
            x1 = x1/np.tile(np.reshape(np.linalg.norm(x1,2,1),[-1,1]),[1,x1.shape[1]])
            x2 = x2/np.tile(np.reshape(np.linalg.norm(x2,2,1),[-1,1]),[1,x2.shape[1]])
            dist = distance.cdist(x1,x2)
            
            # Calculate the rank-k accuracies of all probe and gallery pairs
            idx = np.argsort(dist,1)
            match = np.zeros([idx.shape[0]])
            for k in range(num_rank):
                match += (y1==y2[idx[:,k]])
                acc[k,i-1,j-1]=(np.average(match>0)*100)
                
    return acc

In [5]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.xavier_uniform(m.weight.data)
    elif classname.find('Linear') != -1:
        nn.init.xavier_uniform(m.weight.data)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)
        
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.conv1 = nn.ConvTranspose2d(opt.z_dim+opt.num_channel, 128, 4, stride=2, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(128)
        self.conv2 = nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(32)
        self.conv4 = nn.ConvTranspose2d(32, 16, 4, stride=2, padding=1, bias=False)
        self.bn4 = nn.BatchNorm2d(16)
        self.conv5 = nn.ConvTranspose2d(16, 1, 4, stride=2, padding=1, bias=False)
        
    def forward(self, x):
        x = x.view(-1,opt.z_dim+opt.num_channel, 1, 1)
        x = F.leaky_relu(x)
        x = F.leaky_relu(self.bn1(self.conv1(x)))
        x = F.leaky_relu(self.bn2(self.conv2(x)))
        x = F.leaky_relu(self.bn3(self.conv3(x)))
        x = F.leaky_relu(self.bn4(self.conv4(x)))
        x = F.tanh(self.conv5(x))
        x = x.view(-1,opt.s1_1,opt.s1_2)
        return x

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 5, stride=2, padding=2, bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 5, stride=2, padding=2, bias=False)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, 3, stride=2, padding=1, bias=False)
        self.bn4 = nn.BatchNorm2d(256)
        self.linear = nn.Linear(256*opt.s5_1*opt.s5_2, opt.z_dim)

    def forward(self, x):
        x = x.view(-1,1,opt.s1_1,opt.s1_2)
        x = F.leaky_relu(self.bn1(self.conv1(x)))
        x = F.leaky_relu(self.bn2(self.conv2(x)))
        x = F.leaky_relu(self.bn3(self.conv3(x)))
        x = F.leaky_relu(self.bn4(self.conv4(x)))
        x = x.view(-1,opt.num_channel,256*4*4)
        x = torch.mean(x,1)
        x = self.linear(x.view(-1,256*4*4))
        return x

class ViewTransformLayer(nn.Module):
    def __init__(self):
        super(ViewTransformLayer, self).__init__()
        self.view_trans = nn.Linear(opt.view_dim,opt.z_dim, bias=False)
    
    def forward(self, x, view_encode):
        z = x + self.view_trans(view_encode)
        return z
    
    
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 5, stride=2, padding=2, bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 5, stride=2, padding=2, bias=False)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, 3, stride=2, padding=1, bias=False)
        self.bn4 = nn.BatchNorm2d(256)
        
        self.linear1 = nn.Linear(256*opt.s5_1*opt.s5_2, opt.num_class+opt.num_channel+opt.view_dim)

    def forward(self, x):
        x = x.view(-1,1,opt.s1_1,opt.s1_2)
        x = F.leaky_relu(self.conv1(x))
        x = F.leaky_relu(self.conv2(x))
        x = F.leaky_relu(self.conv3(x))
        x = F.leaky_relu(self.conv4(x))
        x = x.view(-1,256*x.size(2)*x.size(3))
        y = self.linear1(x)
        return y

class ViewEstimator(nn.Module):
    def __init__(self):
        super(ViewEstimator, self).__init__()
        self.linear1 = nn.Linear(opt.z_dim, opt.z_dim)
        self.linear2 = nn.Linear(opt.z_dim,opt.view_dim)
        
    def forward(self, x):
        x = F.leaky_relu(self.linear1(x))
        y = self.linear2(x)
        return y
        

with torch.cuda.device(0): 
    netE = Encoder().cuda().float()
    netE.apply(weights_init)
    netV = ViewTransformLayer().cuda().float()
    netV.apply(weights_init)
    netG = Generator().cuda().float()
    netG.apply(weights_init)
    netD = Discriminator().cuda().float()
    netD.apply(weights_init)
    netVes = ViewEstimator().cuda().float()
    netVes.apply(weights_init)
    

In [6]:
def augmentation(X):
# [n,period,mx,my]
    Y = np.ones(X.shape)*X[0,0,0,0]
    Z = np.ones(X.shape)
    n = X.shape[0]
    for i in range(n):
        for t in range(opt.num_channel):
            shift = np.random.randint(-8,8)
            if shift>0:
                Y[i,t,:,shift:64] = X[i,t,:,0:64-shift]
            else:
                Y[i,t,:,0:64+shift] = X[i,t,:,0-shift:64]     
            k1= np.random.randint(0,int(64*0.2))
            b = imresize(Y[i,t,:,:],[64-k1,64-k1])
            k2 = np.random.randint(0,k1+1)
            Z[i,t,:,:] = Z[i,t,:,:]*b[0,0]
            Z[i,t,k2:k2+64-k1,k2:k2+64-k1] = b
    return Z/255.0

def construct_train(opt):
    idx_same = np.random.randint(0, index.shape[0], [opt.batch_size])
    idx1 = index[idx_same,0]
    idx2 = index[idx_same,1]
    
    y = np.reshape(label[idx2],[-1,1])
    enc_y = label_enc.transform(y).toarray()
    x1 = data[idx1,:,:,:]
    x2 = data[idx2,:,:,:]
    z_channel = np.random.randint(0,opt.num_channel,opt.batch_size)
    x2 = x2[range(opt.batch_size),z_channel,:,:]
    z_channel = np.reshape(z_channel,[-1,1])
    enc_z_channel = channel_enc.transform(z_channel).toarray()
    
    view1 = view[idx1]
    view2 = view[idx2]
    enc_view2 = view_enc.transform(np.reshape(view2,[-1,1])).toarray()
    
    x1 = augmentation(x1)
    x1 = autograd.Variable(from_numpy(x1).float().cuda())
    x2 = autograd.Variable(from_numpy(x2).float().cuda())
    y = autograd.Variable(from_numpy(y).float().cuda())
    enc_view_trans = view_transform_encoder(view1,view2)
    view1 = autograd.Variable(from_numpy(view1-1).long().cuda())
    enc_z_channel = autograd.Variable(from_numpy(enc_z_channel).float().cuda())
    enc_y = autograd.Variable(from_numpy(enc_y).float().cuda())
    enc_view2 = autograd.Variable(from_numpy(enc_view2).float().cuda())

    return x1, x2, view1, enc_view_trans, enc_z_channel, enc_y, enc_view2

optG=optim.RMSprop([{'params':netE.parameters()},{'params':netV.parameters()},{'params':netG.parameters()}],lr=5e-5,weight_decay=1.5e-4) 
optD=optim.RMSprop(netD.parameters(),lr=5e-5, weight_decay=1.5e-4)
optVes=optim.Adam([{'params':netVes.parameters()}],lr=1e-4,weight_decay=1.5e-4) 



In [None]:
# Train the encoder, generator, discriminator and the view transform layer.

global_it = 0
with torch.cuda.device(0):
    for it in range(opt.max_iter_step):
        
        global_it += 1
        print(global_it)
        
        for _ in range(5):  
            netD.zero_grad()
            netVes.zero_grad()
            
            # Sampling batchsize pairs with the same identity
            x1, x2, view1, enc_view_trans, enc_z_channel, enc_y, enc_view2 = construct_train(opt)
            
            # Encoder gait templates x1 to latent reprensentations z_hidden by view information "enc_view_trans"
            z_hidden = netV(netE(x1), enc_view_trans)  
            noise = np.random.rand(opt.batch_size,opt.z_dim)*1e-4
            z_hidden = z_hidden + autograd.Variable(from_numpy(noise).float().cuda())
    
            # Concat one hot representations of channel to z_hidden
            z = torch.cat((z_hidden,enc_z_channel), 1) 
            
            (x2D) = netD(x2) 
            fake_x1 = netG(z)
            (x1D) = netD(fake_x1)
            
            # flag is one-hot encoding vector, represented by concatenating the one-hot representation of view, channel and the identity of samples 
            flag = torch.cat((enc_view2,enc_y,enc_z_channel),1)
            lossD = torch.mean(torch.sum((-x2D+x1D)*flag,1)) 

            # Define gradient penalty loss of WGANs
            alpha = torch.rand(opt.batch_size, 1, 1)
            alpha = alpha.expand(x2.size())
            alpha = autograd.Variable(alpha.float().cuda())
            interpolates = alpha*x2 + ((1-alpha)*fake_x1)
            disc_interpolates = torch.mean(torch.sum(netD(interpolates)*flag,1))
            gradients = autograd.grad(outputs=disc_interpolates, inputs=interpolates, grad_outputs=torch.ones(disc_interpolates.size()).float().cuda(), create_graph=True, retain_graph=True, only_inputs=True)[0]
            gradients = gradients.view(-1,64*64)
            gradients_penalty = ((gradients.norm(2, dim=1)-1)**2).mean() * 10

            lossD2 = lossD + gradients_penalty  # Discriminator's adversarial loss               
            lossD2.backward()
            optD.step()

        for _ in range(1):
            netE.zero_grad()
            netV.zero_grad()
            netG.zero_grad()
            
            x1, x2, view1, enc_view_trans, enc_z_channel, enc_y, enc_view2  = construct_train(opt)
            z_hidden = netE(x1)        
            
            noise = np.random.rand(opt.batch_size,opt.z_dim)*1e-4          
            z = torch.cat((netV(z_hidden,enc_view_trans) + autograd.Variable(from_numpy(noise).float().cuda()),enc_z_channel), 1)
            fake_x1 = netG(z)
            (x1D) = netD(fake_x1) 
            flag = torch.cat((enc_view2,enc_y,enc_z_channel),1)

            lossG1 = torch.mean(torch.abs(x2-fake_x1)) # Pixel-wise loss
            lossG2 = torch.mean(torch.sum(-x1D*flag,1)) # Generator's adversarial loss
            lossG = lossG1 + 1e-5*lossG2
            
            lossG.backward(retain_graph=True)
            optG.step()
            
        if global_it%1000==0:
            print('iter %d:'%(global_it))
            torch.save(netE.state_dict(), osp.join('./model','{}-E.ptm'.format(name)))
            torch.save(netV.state_dict(), osp.join('./model','{}-V.ptm'.format(name)))
            torch.save(netG.state_dict(), osp.join('./model','{}-G.ptm'.format(name)))
            torch.save(netD.state_dict(), osp.join('./model','{}-D.ptm'.format(name)))



In [None]:
# Training the view-angle classifier

with torch.cuda.device(0):
    for it in range(10000):
        global_it += 1
        for _ in range(1):
            netVes.zero_grad()
            x1, x2, view1, enc_view_trans, enc_z_channel, enc_y, enc_view2  = construct_train(opt)
            z_hidden = netE(x1)        
            lossFun = nn.CrossEntropyLoss()
            lossG3 = lossFun(netVes(z_hidden), view1) 

            lossG3.backward()
            optVes.step()
            
        if global_it%1000==0:
            torch.save(netVes.state_dict(), osp.join('./model','{}-Ves.ptm'.format(name)))