In [1]:
import torch
import torch.fx as fx
import numpy as np
from torch.fx import symbolic_trace
from torch import nn
import torchvision
from torchvision import models
from torchvision import transforms
from PIL import Image
from scipy.stats import ttest_ind
import torch.optim as optim
import math
import torchvision
import pandas as pd
from numpy import array
from numpy.linalg import norm
from torch.optim import lr_scheduler

In [2]:
def get_few_shot_learing_data(_class,shot_number,trainloader):
    dataiter = iter(trainloader)
    _data = [ [] for i in range(len(_class))]
    _data_label = [ [] for i in range(len(_class))]

    while True:
        image, label = dataiter.next()

        if label in _class:
            if len(_data[label])!=shot_number:
                _data[label].append(image)
                _data_label[label].append(label)

        if sum([len(tab) for tab in _data]) ==   shot_number * len(_class):
             break

    out_data= []
    out_label= [] 

    out_data_tensor = torch.Tensor(len(_class)*shot_number,3, 224, 224)
    out_label_tensor= torch.Tensor(len(_class)*shot_number)

    for i in range(len(_class)):
        for j in range(shot_number):
            out_data.append(torch.tensor(_data[i][j]).clone().detach().unsqueeze(0))        
            out_label.append(torch.tensor(_data_label[i][j]).clone().detach()) 

    torch.cat(out_data , out=out_data_tensor)    
    torch.cat(out_label , out=out_label_tensor)    
    out_data_tensor=out_data_tensor.squeeze()
    out_label_tensor = out_label_tensor.type(torch.long)
    trainset_few_shot = out_data_tensor.squeeze(),out_label_tensor.squeeze()
    return trainset_few_shot



In [3]:
np.random.seed = 123
torch.manual_seed(0)

transform = transforms.Compose([                        #[1]
             transforms.Resize(256),                    #[2]
             transforms.CenterCrop(224),                #[3]
             transforms.ToTensor(),                     #[4]
             transforms.Normalize(                      #[5]
             mean=[0.485, 0.456, 0.406],                #[6]
             std=[0.229, 0.224, 0.225]                  #[7]
             )])


trainset = torchvision.datasets.CIFAR100(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=1,
                                          shuffle=False, num_workers=2)


_class = [0,1,2,3,4]
shot_number = 5

trainset_few_shot = get_few_shot_learing_data(_class,shot_number,trainloader)


Files already downloaded and verified


  out_data.append(torch.tensor(_data[i][j]).clone().detach().unsqueeze(0))
  out_label.append(torch.tensor(_data_label[i][j]).clone().detach())


In [4]:

class ATL(nn.Module):
    def __init__(self, train, Nclasses, Nlayer=3, Pmax=0.4):
        # call constructor from superclass
        super().__init__()
        self.resnet = models.resnet50(pretrained=True)
        layers = self.get_layers(train,Nlayer)
        fm_indicies, resnet_out_size = self.get_featuremaps_idicies(train,layers) 
        self.resnet = self._transform(self.resnet, Nlayer, layers, fm_indicies)
        for param in self.resnet.parameters():
            param.requires_grad = False
        self.relu = nn.ReLU()
        self.fcl = nn.Linear(resnet_out_size, Nclasses)
        self.softmax = torch.nn.Softmax(-1)
            
    def get_layers(self, train,number_of_layers):
        images, labels = train
        conv_layers,model_weights = self.get_resnet_conv_layers()
        feature_map_outputs_for_images = [self.get_feature_map_outputs(image) for image in images]
        labels_set = set(np.asarray(labels))
        
        R_scores_for_layers = []

        for conv_ind in range(len(conv_layers)):
            
            centroids_for_class =[]
            LAV_vec = []             
                                
            for image_ind,image in enumerate(images):
                LAV_vec_image = []
                out_fm = feature_map_outputs_for_images[image_ind]
                for fm_ind in range(len(model_weights[conv_ind])):
                    LAV_vec_image.append(self.LAV(out_fm[conv_ind][fm_ind])) ##lav_vec jest vectorem przyjmującym lav dla każdego obrazka
                LAV_vec.append(LAV_vec_image)
            
              
                #iterujemy po klasach w danych
            for class_id, _class in enumerate(labels_set):
                    LAV_vec_curent_class = []
                    for i,fm in enumerate(LAV_vec):
                        if labels[i]==_class:
                            LAV_vec_curent_class.append(fm)
                    
                    n = len(LAV_vec_curent_class)
                    suma = np.asarray([0.0]*len(LAV_vec_curent_class[0]))
                    for k in range(n):
                        norm_l2 = norm(LAV_vec_curent_class[k])
                        nmn = np.asarray(LAV_vec_curent_class[k])/norm_l2
                        suma += nmn
                    centroid = 1/n * suma
                    centroids_for_class.append(centroid)
            
            #R-score
            r_val = []
            for a in range(len(centroids_for_class)):
                for b in range(len(centroids_for_class)):
                    if a !=b:
                        rrrr = np.linalg.norm(centroids_for_class[a]-centroids_for_class[b])
                        r_val.append(rrrr)
            R = min(r_val)
            R_scores_for_layers.append(R)

        # choosing the best n layers
        lst = pd.Series(R_scores_for_layers)
        i = lst.nlargest(number_of_layers)        
        return i.index.values.tolist() # najlepiej indeksy warstw konwolucyjnych 
        
    def get_featuremaps_idicies(self, train, layers, N_feature  = 10):
        
        dim0=[112]
        dim1=[56]*9
        dim2=[28]*12
        dim3=[14]*19
        dim4=[7]*9
        fm_dim = np.concatenate((dim0,dim1,dim2,dim3,dim4))        
        images, labels = train
        #print(labels)
        labels_set = set(np.asarray(labels))                
        conv_layers,model_weights = self.get_resnet_conv_layers()    
        choose_fm = []
        choose_fm_output_len=0        
        feature_map_outputs_for_images = [self.get_feature_map_outputs(image) for image in images]
                
        # iteruje po warstwach konwolucyjnuch 
        for conv_ind in layers:
            choose_fm_curent_conv_layer = []            
            p_score_for_classs_and_maps  = [{} for i in range(len(labels_set))]
            
            # iteruje po mapach             
            for fm_ind in range(len(model_weights[conv_ind])):                
                LAV_vec = []
                    
                for image_ind,image in enumerate(images):
                    out_fm = feature_map_outputs_for_images[image_ind]
                    LAV_vec.append(self.LAV(out_fm[conv_ind][fm_ind]))
                 
                #iterujemy po klasach w danych
                for class_id, _class in enumerate(labels_set):
                    LAV_vec_curent_class = []
                    LAV_vec_other_class = []
                    for i,fm in enumerate(LAV_vec):
                        if labels[i]==_class:
                            LAV_vec_curent_class.append(fm)
                        else:     
                            LAV_vec_other_class.append(fm)
                    
                    # zwraca nan przy LAV_vec_current_class długości 1 !!
                    # czyli wtedy gdy z jakiejś klasy była tylko jedna obserwacja
                    # przy treningu nie będzie takiej sytucji
                    t_stat, p = ttest_ind(LAV_vec_curent_class, LAV_vec_other_class)
                    
                    if(math.isnan(p)): 
                        print("error: klasa o jednym elemencie")
                        p=1
                    
                    _dict = p_score_for_classs_and_maps[class_id]
                    _dict[p] = fm_ind
            
            for class_id, p_scores_and_map_indexes in enumerate(p_score_for_classs_and_maps): 
                
                #wybranie N_feature, "najlepszych" map dla każdej klasy"
                p_scores = list(p_scores_and_map_indexes.keys())                
                sort_p_scores =  np.sort(p_scores)
                low_p_scores = sort_p_scores[:N_feature]
                
                for p_score in low_p_scores:
                    fm_ind = p_scores_and_map_indexes.get(p_score)
                    choose_fm_curent_conv_layer.append(fm_ind)
                    choose_fm_output_len += fm_dim[conv_ind]**2
                                        
            choose_fm.append(choose_fm_curent_conv_layer)
        
        print (choose_fm)
        print (choose_fm_output_len)
        
        # Musi zwracać obiekt który ma N iterowalnych rzeczy, z których każda ma ileś indeksów feature map. 
        # W sumie ilość indeksów musi być równa ilości klas * Nfeature z 2.3 w artykule
        #return [[1],[1]], 1568
        
        return choose_fm, choose_fm_output_len 
    
    
    def LAV(self,featureMap):
        return featureMap.max().detach().numpy().item(0)
    
    def _transform(self, m: torch.nn.Module, n, layers, idx) -> torch.nn.Module:
        gm : torch.fx.GraphModule = torch.fx.symbolic_trace(m)
        graph=gm.graph
        blocks_in_layers = [1,3,4,6,3]
        idx_to_layer_name = ['conv1'] +[f"layer{i}_{j}_conv{k}" for i in range(1,5) for j in range(blocks_in_layers[i]) for k in range(1,4)]
        layer_names = [idx_to_layer_name[idx] for idx in layers]
        final_nodes=[]
        last_node=None

        for node in graph.nodes:
            if node.name in layer_names:
                final_nodes.append(node)
            if not last_node and len(final_nodes)==n:
                last_node = node
            if node.name == 'output':
                out_node = node

        i=0
        nodes_to_output=[]
        for i in range(n):
            with graph.inserting_after(last_node):
            # Insert a new `call_function` node calling `torch.relu`
                new_node = graph.call_function(torch.tensor,
                                               args=(idx[i],),
                                              kwargs={"dtype":torch.int32})
                last_node = new_node

            with graph.inserting_after(last_node):
                new_node = graph.call_function(torch.index_select,
                                              args=(final_nodes[i], 1, last_node))
                last_node = new_node

            with graph.inserting_after(last_node):
                new_node = graph.call_function(torch.flatten,
                                              args=(last_node,1))
                nodes_to_output.append(new_node)
                last_node = new_node
        with graph.inserting_after(last_node):
                new_node = graph.call_function(torch.cat,
                                              args=(nodes_to_output,1))
        out_node.args=(new_node,)
        graph.eliminate_dead_code()
        graph.lint() 
        gm.recompile()

        return gm
        
    
    def forward(self, x):
        x = self.resnet(x)
        x = self.relu(x)
        x = self.fcl(x)
        x = self.softmax(x)
        return x
    
    
    def get_resnet_conv_layers(self):
        model = self.resnet
        # we will save the conv layer weights in this list
        model_weights =[]

        #we will save the 49 conv layers in this list
        conv_layers = []

        # get all the model children as list
        model_children = list(model.children())

        #counter to keep count of the conv layers
        counter = 0
        #append all the conv layers and their respective wights to the list

        for i in range(len(model_children)):

            if type(model_children[i]) == nn.Conv2d:
                counter+=1
                model_weights.append(model_children[i].weight)
                conv_layers.append(model_children[i])

            elif type(model_children[i]) == nn.Sequential:
                for j in range(len(model_children[i])):
                    for child in model_children[i][j].children():
                        if type(child) == nn.Conv2d:
                            counter+=1
                            model_weights.append(child.weight)
                            conv_layers.append(child)

        #print(f"Total convolution layers: {counter}")
        #print("conv_layers")
        return (conv_layers,model_weights)
    
    def get_feature_map_outputs(self,image):
        conv_layers, _ = self.get_resnet_conv_layers()
        outputs = []
        names = []

        for layer in conv_layers[0:]:
            image = layer(image)
            outputs.append(image)
            names.append(str(layer))
            
        #print(len(outputs))

        # print feature_maps
        #for feature_map in outputs:
            #print(feature_map.shape)
            
        return outputs;   



# CIFAR100 

# transform = transforms.Compose([                        #[1]
#              transforms.Resize(256),                    #[2]
#              transforms.CenterCrop(224),                #[3]
#              transforms.ToTensor(),                     #[4]
#              transforms.Normalize(                      #[5]
#              mean=[0.485, 0.456, 0.406],                #[6]
#              std=[0.229, 0.224, 0.225]                  #[7]
#              )])


# trainset = torchvision.datasets.CIFAR100(root='./data', train=True,
#                                         download=True, transform=transform)

# trainloader = torch.utils.data.DataLoader(trainset, batch_size=10,
#                                           shuffle=True, num_workers=2)

# testset = torchvision.datasets.CIFAR100(root='./data', train=False,
#                                        download=True, transform=transform)
# testloader = torch.utils.data.DataLoader(testset, batch_size=4,
#                                          shuffle=False, num_workers=2)

# classes = ('plane', 'car', 'bird', 'cat',
#            'deer', 'dog', 'frog', 'horse', 'ship', 'truck')



# torch.manual_seed(0)
# # get some random training images
# dataiter = iter(trainloader)
# images, labels = dataiter.next()
# trainset = images, labels
# #print(trainset)

# print(images)
# print(labels)



m = ATL(trainset_few_shot, 10, 3)
m.eval()



[[23, 56, 35, 13, 42, 36, 33, 29, 32, 59, 19, 1, 22, 47, 12, 37, 4, 10, 29, 63, 19, 10, 22, 40, 51, 37, 57, 24, 33, 21, 35, 14, 0, 49, 22, 50, 2, 61, 37, 6, 36, 54, 56, 32, 13, 24, 46, 55, 15, 48], [88, 496, 21, 161, 261, 226, 434, 428, 130, 239, 187, 261, 117, 162, 157, 301, 185, 114, 306, 257, 328, 23, 416, 330, 84, 269, 69, 204, 18, 212, 385, 48, 488, 421, 427, 412, 74, 122, 457, 462, 328, 397, 461, 167, 323, 330, 60, 331, 172, 190], [163, 421, 181, 82, 51, 161, 351, 110, 94, 218, 76, 47, 73, 348, 234, 477, 301, 499, 369, 12, 76, 471, 117, 91, 481, 60, 98, 27, 478, 286, 220, 85, 188, 105, 429, 310, 282, 307, 499, 12, 218, 91, 480, 286, 421, 181, 414, 509, 394, 371]]
632100


ATL(
  (resnet): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Module(
      (0): Module(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (downsample): Module(
          (0): Conv2d(64, 2

In [5]:
criterion = nn.CrossEntropyLoss()
#optimizer = optim.SGD(m.parameters(), lr=0.001, momentum=0.9)
optimizer = torch.optim.Adam(m.parameters(),lr=0.01)
scheduler = lr_scheduler.MultiplicativeLR(optimizer, lr_lambda=0.8)

In [6]:
for epoch in range(50):  # loop over the dataset multiple times
    

    
    running_loss = 0.0
    # get the inputs
    inputs, labels = trainset_few_shot #data


    # zero the parameter gradients
    optimizer.zero_grad()
        
    outputs = m(inputs)
    #print(len(labels))
    #print(len(outputs))
    

        
    loss = criterion(outputs, labels)
    
    running_loss += loss.item()
    print('[%d] loss: %.3f' %
        (epoch + 1, running_loss / 25))
        
    loss.backward()
    optimizer.step()

    # print statistics
    
#     print('[%d] loss: %.3f' %
#         (epoch + 1, running_loss / 25))
        
    if epoch % 20 == 0:
        pass
        #scheduler.step()

print('Finished Training')

[1] loss: 0.093
[2] loss: 0.070
[3] loss: 0.070
[4] loss: 0.070
[5] loss: 0.070
[6] loss: 0.070
[7] loss: 0.070
[8] loss: 0.070
[9] loss: 0.070
[10] loss: 0.070
[11] loss: 0.070
[12] loss: 0.070
[13] loss: 0.070
[14] loss: 0.070
[15] loss: 0.070
[16] loss: 0.070
[17] loss: 0.070
[18] loss: 0.070
[19] loss: 0.070
[20] loss: 0.070
[21] loss: 0.070
[22] loss: 0.070
[23] loss: 0.070
[24] loss: 0.070
[25] loss: 0.070
[26] loss: 0.070
[27] loss: 0.070
[28] loss: 0.070
[29] loss: 0.070
[30] loss: 0.070
[31] loss: 0.070
[32] loss: 0.070
[33] loss: 0.070
[34] loss: 0.070
[35] loss: 0.070
[36] loss: 0.070
[37] loss: 0.070
[38] loss: 0.070
[39] loss: 0.070
[40] loss: 0.070
[41] loss: 0.070
[42] loss: 0.070
[43] loss: 0.070
[44] loss: 0.070
[45] loss: 0.070
[46] loss: 0.070
[47] loss: 0.070
[48] loss: 0.070
[49] loss: 0.070
[50] loss: 0.070
Finished Training
