In [101]:
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torch
from torch.utils.data import DataLoader,Dataset
import random
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data.sampler import Sampler
import numpy as np
import pandas as pd

In [100]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.optim.lr_scheduler import StepLR
import numpy as np
import os
import math
import argparse
import scipy as sp
import scipy.stats


# Hyper Parameters
#FEATURE_DIM = 32
#RELATION_DIM = 8
CLASS_NUM = 5
SAMPLE_NUM_PER_CLASS = 1   #5
BATCH_NUM_PER_CLASS = 10   #10
EPISODE = 1000
TEST_EPISODE = 20
LEARNING_RATE = 0.001
GPU = 0
HIDDEN_UNIT = 10
PIXEL_SIM_METHOD = 'cosine'
RESNET_OUT_H=4
SEG=False# using segmentation result as dataset when SEG=True


In [6]:
def stanford_dog_folders(seg=True):
    if (seg):
      train_folder = "data/seg/train"
      test_folder = "data/seg/test"                
      val_folder = "data/seg/val"
    else:
      train_folder = "data/sp/train"
      test_folder = "data/sp/test"                
      val_folder = "data/sp/val"

    metatrain_folders = [os.path.join(train_folder, label) \
                for label in os.listdir(train_folder) \
                if os.path.isdir(os.path.join(train_folder, label)) \
                ]
    metatest_folders = [os.path.join(test_folder, label) \
                for label in os.listdir(test_folder) \
                if os.path.isdir(os.path.join(test_folder, label)) \
                ]
    metaval_folders = [os.path.join(val_folder, label) \
                for label in os.listdir(val_folder) \
                if os.path.isdir(os.path.join(val_folder, label)) \
                ]
    
    #print(metaval_folders)

    random.seed(1)
    random.shuffle(metatrain_folders)
    random.shuffle(metatest_folders)
    random.shuffle(metaval_folders)
    

    return metatrain_folders,metatest_folders, metaval_folders

In [198]:
import pandas as pd
import numpy as np
def generate_csv(seg=False):
    #return 3 list:train_folder,vla_folder,test_folder
    #each list contains name of the specific class of the dog
    #eg: train_folder=['data/sp/tain/n02085620-Chihuahua',...,...]
    train_class_name_list=[]
    val_class_name_list=[]
    test_class_name_list=[]
    train_img_name_list=[]
    val_img_name_list=[]
    test_img_name_list=[]
    
    if (seg):#if use segmentation data
      train_folder = "data/seg/train"
      test_folder = "data/seg/test"
      val_folder = "data/seg/val"
    else:
      train_folder = "data/sp/train"
      test_folder = "data/sp/test"                
      val_folder = "data/sp/val"
    
    
    for train_class_name in os.listdir(train_folder):
      for train_img_name in os.listdir(os.path.join(train_folder,train_class_name)):
        train_class_name_list.append(train_class_name)
        train_img_name_list.append(os.path.join(os.path.join(train_folder,train_class_name),train_img_name))
    df_train=pd.DataFrame({'category_name':'train','class_name':train_class_name_list,'img_name':train_img_name_list})
      
    for test_class_name in os.listdir(test_folder):
      for test_img_name in os.listdir(os.path.join(test_folder,test_class_name)):
        test_class_name_list.append(test_class_name)
        test_img_name_list.append(os.path.join(os.path.join(test_folder,test_class_name),test_img_name))
    df_test=pd.DataFrame({'category_name':'test','class_name':test_class_name_list,'img_name':test_img_name_list})
      
    for val_class_name in os.listdir(val_folder):
      for val_img_name in os.listdir(os.path.join(val_folder,val_class_name)):
        val_class_name_list.append(val_class_name)
        val_img_name_list.append(os.path.join(os.path.join(val_folder,val_class_name),val_img_name))
    df_val=pd.DataFrame({'category_name':'val','class_name':val_class_name_list,'img_name':val_img_name_list})
    return (df_train,df_test,df_val)

In [199]:
(df_train,df_test,df_val)=generate_csv()

In [216]:
def get_task(df,n_way,m_shot,t_query):
    #generate task
    #input: df:dataframe ie:df_train
    #       n_way: how many class in each task
    #       m_shot: use how many img in each class as support set
    #       t_query: use how many img in each class to test the model
    #output: df_support_set: dataframe of support_set,
    #                        columns=['category_name','class_name','img_name'],
    #                        len=(n_way*m_shot)
    #
    #        df_query_set: dataframe of query_set,
    #                        columns=['category_name','class_name','img_name'],
    #                        len=(n_way*t_query)
    #
    #        class_name_dict: dictionary from classname to class_number
    df_support_set=pd.DataFrame(columns=['category_name','class_name','img_name'])
    df_query_set=pd.DataFrame(columns=['category_name','class_name','img_name'])
    class_name_dict={}
    class_name_list=df.class_name.sample(n_way).values#randomly sample n_way different class
    
    for class_num in range(n_way):
        class_name=class_name_list[class_num]
        #print(class_name,class_num)
        class_name_dict[class_name]=class_num
        #print('mshot+tquery:',m_shot+t_query)
        df_class_n=df[df['class_name']==class_name]
        #print('class n len:',len(df_class_n))
        df_class_n_random_pick=df_class_n.sample((m_shot+t_query))
        #df_class_n=df[df['class_name']==class_name].sample((m_shot+t_query))
        df_support_set=pd.concat([df_support_set,df_class_n_random_pick.head(m_shot)],axis=0,ignore_index=True)
        df_query_set=pd.concat([df_query_set,df_class_n_random_pick.tail(t_query)],axis=0,ignore_index=True)
    #df_support_set=df_support_set.reset_index()
    #df_query_set=df_query_set.reset_index() 
    return (df_support_set,df_query_set,class_name_dict)
        
            


In [217]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, df, class_name_dict, transform = None):
        self.df = df
        self.transform = transform
        self.class2index = class_name_dict
        #print(self.class2index)

    def __len__(self):
        return len(self.df)
    def __getitem__(self, index):
        filename = self.df.iloc[index].img_name
        label = self.class2index[self.df.iloc[index].class_name]
        image = Image.open( filename)
        if self.transform is not None:
            image = self.transform(image)
        return image, label

In [218]:
def set_transform(normalizing=True):
    if(normalizing):
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
        #resizing =  transforms.Resize(224, interpolation=2)
        transform_method=transforms.Compose([transforms.ToTensor(),normalize])
    else:
        transform_method=transforms.ToTensor()
    return transform_method

transform_method=set_transform(normalizing=True)
support_dataset=CustomDataset(df=df_support_set,class_name_dict=class_name_dict,transform=transform_method)

In [219]:
import torchvision.models as models
class PreTrainedResNet(nn.Module):
  def __init__(self):
    super(PreTrainedResNet, self).__init__()
    
    self.resnet18 = models.resnet18(pretrained=True)

    #Set gradients to false
    
    for param in self.resnet18.parameters():
        param.requires_grad = False
    
    num_feats = self.resnet18.fc.in_features
    self.resnet18 = nn.Sequential(*(list(self.resnet18.children())[:-1]))

    
  def forward(self, x):
    #print('in defining network, input of Resnet is:',x.shape)
    x = self.resnet18 (x)
    #print(x.size())

    x = x.view(x.shape[0],32,4,4)
    #print('in defining network, output of Resnet is:',x.shape)
    return x

class InnerproductSimilarity(nn.Module):
    def __init__(self, CLASS_NUM = 5, shots = SAMPLE_NUM_PER_CLASS, metric='cosine',resnetoutsize=RESNET_OUT_H):
        super().__init__()
        self.n_way = CLASS_NUM
        self.k_shot = shots
        self.metric = metric
        self.cs=nn.CosineSimilarity(dim=2)
        
        #self.cs=nn.PairwiseDistance()
        
        self.findmaxNotrain=nn.AvgPool3d(kernel_size=(shots,4,4),stride=(shots,4,4))
        self.softmax=nn.Softmax(dim=1)
        
    def forward(self, support_features,query_features):
        numOfSupport=support_features.size(dim=0)
        numOfQuery=query_features.size(dim=0)
        support_5d=support_features.unsqueeze(0)
        support_5d_match=support_5d.repeat(numOfQuery,1,1,1,1)
        query_5d=query_features.unsqueeze(1)
        query_5d_match=query_5d.repeat(1,numOfSupport,1,1,1)#50,25,32,4,4
        cos_similarity=self.cs(support_5d_match,query_5d_match)#50,25,4,4
        score=self.findmaxNotrain(cos_similarity)#50,5每张test图片在每一类中的夹角均值
        prob=self.softmax(score)
        return score
        
        

In [220]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
        m.weight.data.normal_(0, math.sqrt(2. / n))
        if m.bias is not None:
            m.bias.data.zero_()
    elif classname.find('BatchNorm') != -1:
        m.weight.data.fill_(1)
        m.bias.data.zero_()
    elif classname.find('Linear') != -1:
        n = m.weight.size(1)
        m.weight.data.normal_(0, 0.01)
        m.bias.data = torch.ones(m.bias.data.size())

In [221]:

feature_encoder = PreTrainedResNet()
#feature_encoder.apply(weights_init)
feature_encoder.cuda(0)


pixelsimilarity_network = InnerproductSimilarity(CLASS_NUM = 5,shots = SAMPLE_NUM_PER_CLASS, metric='cosine',resnetoutsize=RESNET_OUT_H)
pixelsimilarity_network.apply(weights_init)
pixelsimilarity_network.cuda(0)

feature_encoder_optim = torch.optim.Adam(feature_encoder.parameters(),lr=0.001)
feature_encoder_scheduler = StepLR(feature_encoder_optim,step_size=100000,gamma=0.5)
#pixelsimilarity_network_optim = torch.optim.Adam(pixelsimilarity_network.parameters(),lr=0.001)
#pixelsimilarity_network_scheduler = StepLR(pixelsimilarity_network_optim,step_size=100000,gamma=0.5)


In [222]:
def mean_confidence_interval(data, confidence=0.95):
    a = 1.0*np.array(data)
    n = len(a)
    m, se = np.mean(a), scipy.stats.sem(a)
    h = se * sp.stats.t._ppf((1+confidence)/2., n-1)
    return m,h

In [223]:
N_WAY=5
M_SHOT=5
T_QUERY=10

feature_encoder = PreTrainedResNet()
#feature_encoder.apply(weights_init)
feature_encoder.cuda(0)


pixelsimilarity_network = InnerproductSimilarity(CLASS_NUM = N_WAY,shots = M_SHOT, metric='cosine',resnetoutsize=RESNET_OUT_H)
pixelsimilarity_network.apply(weights_init)
pixelsimilarity_network.cuda(0)


print("Testing...")
accuracies = []
for i in range(100):
  total_rewards = 0
  df_support_set,df_query_set,class_name_dict=get_task(df=df_val,n_way=N_WAY,m_shot=M_SHOT,t_query=T_QUERY)
  
  transform_method=set_transform(normalizing=True)#preprocess of the data, usually are normalizing+totensor
  
  support_dataset=CustomDataset(df=df_support_set,class_name_dict=class_name_dict,transform=transform_method)
  query_dataset=CustomDataset(df=df_query_set,class_name_dict=class_name_dict,transform=transform_method)
  
  
  
  support_dataloader=DataLoader(dataset=support_dataset,batch_size=N_WAY*M_SHOT,shuffle=False)
  query_dataloader=DataLoader(dataset=query_dataset,batch_size=N_WAY*T_QUERY,shuffle=False)
  

  support_images,support_labels = support_dataloader.__iter__().next()
  for test_images,test_labels in query_dataloader:
    #print(test_labels)
    support_features = feature_encoder(Variable(support_images).cuda(GPU)) # 5x64
                
    test_features = feature_encoder(Variable(test_images).cuda(GPU)) # 20x64

               
    
    relations = pixelsimilarity_network(support_features,test_features)

    _,predict_labels = torch.max(relations.data,dim=1)
    

    rewards = [1 if predict_labels[j]==test_labels[j] else 0 for j in range(N_WAY*T_QUERY)]
    print('correct:{}in{}'.format(sum(rewards),len(rewards)))

    total_rewards += np.sum(rewards)
    #counter +=batch_size   #remove


  accuracy = total_rewards/(1.0*N_WAY*T_QUERY)
  #accuracy = total_rewards/1.0/counter
  accuracies.append(accuracy)


test_accuracy,h = mean_confidence_interval(accuracies)

print("test accuracy:",test_accuracy,"h:",h)
torch.cuda.empty_cache()


Testing...
correct:47in50
correct:48in50
correct:44in50
correct:47in50
correct:48in50
correct:46in50
correct:48in50
correct:46in50
correct:31in50
correct:49in50
correct:44in50
correct:45in50
correct:50in50
correct:45in50
correct:45in50
correct:31in50
correct:38in50
correct:48in50
correct:32in50
correct:35in50
correct:22in50
correct:49in50
correct:38in50
correct:38in50
correct:40in50
correct:37in50
correct:31in50
correct:45in50
correct:46in50
correct:46in50
correct:50in50
correct:46in50
correct:44in50
correct:47in50
correct:44in50
correct:40in50
correct:31in50
correct:46in50
correct:40in50
correct:49in50
correct:40in50
correct:50in50
correct:44in50
correct:48in50
correct:41in50
correct:45in50
correct:23in50
correct:47in50
correct:37in50
correct:48in50
correct:31in50
correct:32in50
correct:50in50
correct:42in50
correct:47in50
correct:50in50
correct:46in50
correct:48in50
correct:37in50
correct:48in50
correct:46in50
correct:49in50
correct:47in50
correct:47in50
correct:38in50
correct:46in50

超参数设置

In [152]:
class InnerproductSimilarity(nn.Module):
    def __init__(self, CLASS_NUM = 5, shots = SAMPLE_NUM_PER_CLASS, metric='cosine',resnetoutsize=RESNET_OUT_H):
        super().__init__()
        self.n_way = CLASS_NUM
        self.k_shot = shots
        self.metric = metric
        self.cs=nn.CosineSimilarity(dim=2)
        
        #self.cs=nn.PairwiseDistance()
        
        self.findmaxNotrain=nn.AvgPool3d(kernel_size=(shots,4,4),stride=(shots,4,4))
        self.softmax=nn.Softmax(dim=1)
    def forward(self, support_features,query_features):
        numOfSupport=support_features.size(dim=0)
        numOfQuery=query_features.size(dim=0)
        #support_features(25,32,4,4)    query_features(1,32,4,4)
        #print('before repeat: support',support_features.size(),'query:',query_features.size())
        support_5d=support_features.unsqueeze(0)
        support_5d_match=support_5d.repeat(numOfQuery,1,1,1,1)
        query_5d=query_features.unsqueeze(1)
        query_5d_match=query_5d.repeat(1,numOfSupport,1,1,1)#50,25,32,4,4
        #print('we are in cosin similarity forward')
        #print(support_5d_match.size(),query_5d_match.size())
        cos_similarity=self.cs(support_5d_match,query_5d_match)#50,25,4,4
        #print(cos_similarity.size())
        '''angle=self.findmax(cos_similarity)
        angle_reshape=angle.view((50,25))
        hiden_score=self.fc1(angle_reshape)
        score=self.fc2(hiden_score)'''
        score=self.findmaxNotrain(cos_similarity)#50,5每张test图片在每一类中的夹角均值
        prob=self.softmax(score)
        return score
        
        

In [79]:
pixel_certentity=torch.softmax((relations-torch.mean(relations,dim=1,keepdim=True))/torch.std(relations,dim=1,keepdim=True),dim=1)
_,b=torch.max(pixel_certentity,dim=1)
f=b.cuda(0)!=test_labels.cuda(0)
(pixel_certentity[f,predict_labels[f]]-pixel_certentity[f,test_labels[f]])*100