In [1]:
import numpy as np
import pandas as pd
import scipy.io as sio
import numpy.linalg as la
from sklearn.preprocessing import scale,LabelEncoder

In [2]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


class NeuralNet(nn.Module):
    """
    """
    def __init__(self,input_size=1000,hidden_size=100,output_size=10):
        super(NeuralNet,self).__init__()
        self.fc1 = nn.Linear(in_features=input_size, out_features=hidden_size, bias=True)
        self.fc2 = nn.Linear(in_features=hidden_size, out_features=output_size, bias=True)
    def forward(self,X):
        FX = F.relu(self.fc1(X)) # hidden layer activation features
        prob = F.softmax(self.fc2(FX),dim=1) # probability output
        return FX,prob
    
def RCS(FX,y,l,alpha=0.5,sigma=100,lamda=1e-2):        
    m = FX.shape[0]    
    Deltay = torch.as_tensor(y[:,None]==y,dtype=torch.float64,device=FX.device)
    Deltal = torch.as_tensor(l[:,None]==l,dtype=torch.float64,device=FX.device)
    FX_norm = torch.sum(FX ** 2, axis = -1)
    K = torch.exp(-(FX_norm[:,None] + FX_norm[None,:] - 2 * torch.matmul(FX, FX.t())) / sigma) * Deltay
                    
    P = K * Deltal
    H = ((1.0 - alpha) / m**2) * torch.matmul(K,K) * torch.matmul(Deltal,Deltal) + (1.0 * alpha / m) * torch.matmul(P,P)
    h = torch.mean(P,axis=0)
    theta = torch.matmul(torch.inverse(H + lamda * torch.eye(m,device=FX.device)),h)
    D = 2 * torch.matmul(h,theta) - torch.matmul(theta,torch.matmul(H,theta)) - 1
    return D


class JPDA:
    """
    The true training batch size per iteration is batch_size * num_class * num_domain
    """
    def __init__(self,input_size=1024,hidden_size=512,output_size=68,seed=1000,device=torch.device('cpu'),
                 epoch=200,alpha=0.5,sigma=None,lamda=1e-2,gamma=1,batch_size=4,lr=1e-3,log=False):
        args_values = locals()
        args_values.pop("self")
        for arg,value in args_values.items():
            setattr(self,arg,value)
            
    def fit(self,X_list,y_list,Xt,yt):
        torch.manual_seed(self.seed)
        net = NeuralNet(input_size=self.input_size,hidden_size=self.hidden_size,output_size=self.output_size).to(self.device)
        optimizer = optim.SGD(params=net.parameters(),lr=self.lr,momentum=0.9)
        
        for epoch in range(self.epoch):
            dataset_loaders,l = [],0
            for Xs,ys in zip(X_list,y_list):
                for i,counts in zip(*np.unique(ys,return_counts=True)):
                    dataset = np.hstack((Xs[ys==i],ys[ys==i][:,None],l * np.ones((counts,1))))
                    dataset_loaders.append(torch.utils.data.DataLoader(dataset=torch.tensor(dataset),batch_size=self.batch_size,shuffle=True,drop_last=True))
                l = l + 1

            train_err,m_train = 0.0, 0.0

            for batches in zip(*dataset_loaders):
                Xyl = torch.cat(batches,dim=0)
                X,y,l = Xyl[:,:-2].to(self.device,torch.float32),Xyl[:,-2].to(self.device,torch.int64),Xyl[:,-1].to(self.device,torch.int64)

                m = X.shape[0]
                FX,prob = net(X)
    
                num_class = prob.shape[1]
                negative_log_loss = -torch.mean(torch.sum(torch.log(prob) * F.one_hot(y,num_class),dim=1))
            
                if self.sigma is None:
                    pairwise_dist = torch.cdist(X,X,p=2)**2 
                    self.sigma = torch.median(pairwise_dist[pairwise_dist!=0]) 
                    
                rcs_loss = RCS(FX,y,l,alpha=self.alpha,sigma=self.sigma,lamda=self.lamda)
                loss = negative_log_loss + self.gamma * rcs_loss
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                train_err += loss.item() * m
                m_train += m

            with torch.no_grad():
                Xt,yt = torch.as_tensor(Xt,dtype=torch.float32,device=self.device),torch.as_tensor(yt,dtype=torch.int64,device=self.device)    
                pred = torch.argmax(net(Xt)[1],dim=1)
                correct = torch.sum((pred == yt)).item()
                m_test = len(yt)
            
            if True == self.log:
                print('epoch ',epoch,', training error ',train_err / m_train,', test acc. ',(correct / m_test) * 100) 
        self.net = net
        
    def score(self,Xt,yt):
        with torch.no_grad():
            Xt,yt = torch.as_tensor(Xt,dtype=torch.float32,device=self.device),torch.as_tensor(yt,dtype=torch.int64,device=self.device)    
            pred = torch.argmax(self.net(Xt)[1],dim=1)
            correct = torch.sum((pred == yt)).item()
            m_test = len(yt)
        return (correct / m_test) * 100

In [3]:
def readData(tg,domains):
    data = sio.loadmat('datasets/PIE/' + tg + '.mat')
    Xt,yt = data['fea'].astype(np.float64),data['gnd'].ravel()
    yt = LabelEncoder().fit(yt).transform(yt).astype(np.float64)
    Xt = scale(Xt / Xt.sum(axis=1,keepdims=True))
    
    Xs_list,ys_list = [],[]
    for sc in domains:
        if sc != tg:
            data = sio.loadmat('datasets/PIE/' + sc + '.mat')
            Xs,ys = data['fea'].astype(np.float64),data['gnd'].ravel()
            ys = LabelEncoder().fit(ys).transform(ys).astype(np.float64)
            Xs = scale(Xs / Xs.sum(axis=1,keepdims=True))
            Xs_list.append(Xs),ys_list.append(ys)        
    
    return Xs_list,ys_list,Xt,yt

domains = ['PIE05','PIE07','PIE09','PIE27','PIE29']

In [4]:
DEVICE = torch.device('cuda:0')

X_list,y_list,Xt,yt = readData('PIE05',domains)
    
instance = JPDA(input_size=1024,hidden_size=512,output_size=68,seed=0,device=DEVICE,
                         epoch=300,alpha=0.5,lamda=1e-3,gamma=100,batch_size=5,lr=1e-2,log=False)
instance.fit(X_list,y_list,Xt,yt)
instance.score(Xt,yt)

86.46458583433373