In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import os
import cv2
from torchvision import models
from torch.autograd import Variable
from torchsummary import summary

# Define whether to use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def save_checkpoint(state, save_dir, filename='checkpoint.pth'):
    save_name = save_dir + '_{}'.format(filename)
    torch.save(state, save_name)
########################################################################
# The output of torchvision datasets are PILImage images of range [0, 1].
# We transform them to Tensors of normalized range [-1, 1].
class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()

        # calculate same padding:
        # (w - k + 2*p)/s + 1 = o
        # => p = (s(o-1) - w + k)/2

        self.feature = nn.Sequential(
            nn.Conv2d(in_channels=3,
                      out_channels=64,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      # (1(32-1)- 32 + 3)/2 = 1
                      padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(in_channels=64,
                      out_channels=64,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2),
                         stride=(2, 2)),
                         
            nn.Conv2d(in_channels=64,
                      out_channels=256,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(in_channels=256,
                      out_channels=256,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2),
                         stride=(2, 2)),
            nn.Conv2d(in_channels=256,
                      out_channels=256,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(in_channels=256,
                      out_channels=256,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(in_channels=256,
                      out_channels=256,
                      kernel_size=(3, 3),
                      stride=(1, 1),
                      padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2),
                         stride=(2, 2)),
        #     nn.Conv2d(in_channels=256,
        #               out_channels=1024,
        #               kernel_size=(3, 3),
        #               stride=(1, 1),
        #               padding=1),
        #     nn.BatchNorm2d(1024),
        #     nn.ReLU(),
        #     nn.Conv2d(in_channels=1024,
        #               out_channels=1024,
        #               kernel_size=(3, 3),
        #               stride=(1, 1),
        #               padding=1),
        #     nn.BatchNorm2d(1024),
        #     nn.ReLU(),
        #     nn.Conv2d(in_channels=1024,
        #               out_channels=1024,
        #               kernel_size=(3, 3),
        #               stride=(1, 1),
        #               padding=1),
        #     nn.BatchNorm2d(1024),
        #     nn.ReLU(),
        #     nn.MaxPool2d(kernel_size=(2, 2),
        #                  stride=(2, 2)),
        #     nn.Conv2d(in_channels=1024,
        #               out_channels=2048,
        #               kernel_size=(3, 3),
        #               stride=(1, 1),
        #               padding=1),
        #     nn.BatchNorm2d(2048),
        #     nn.ReLU(),
        #     nn.Conv2d(in_channels=2048,
        #               out_channels=2048,
        #               kernel_size=(3, 3),
        #               stride=(1, 1),
        #               padding=1),
        #     nn.BatchNorm2d(2048),
        #     nn.ReLU(),
        #     nn.Conv2d(in_channels=2048,
        #               out_channels=2048,
        #               kernel_size=(3, 3),
        #               stride=(1, 1),
        #               padding=1),
        #     nn.BatchNorm2d(2048),
        #     nn.ReLU(),
        #     nn.MaxPool2d(kernel_size=(2, 2),
        #                  stride=(2, 2))                         
         )


        self.classifier = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(True),
            nn.Dropout(p=0.5),
            nn.Linear(128, 64),
            nn.ReLU(True),
            nn.Dropout(p=0.5),
            nn.Linear(64, 10),
        )

        for m in self.modules():
            if isinstance(m, torch.nn.Conv2d) or isinstance(m, torch.nn.Linear):
                nn.init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='leaky_relu')

                if m.bias is not None:
                    m.bias.detach().zero_()



    def forward(self, x):

        x = self.feature(x)

        x = x.view(x.size(0), -1)
        x = self.classifier(x)

        return x

def preprocess_image(cv2im, resize_im=True):

    # Resize image
    if resize_im:
        cv2im = cv2.resize(cv2im, (64, 64))
    im_as_arr = np.float32(cv2im)
    im_as_arr = np.ascontiguousarray(im_as_arr[..., ::-1])
    im_as_arr = im_as_arr.transpose(2, 0, 1)  # Convert array to D,W,H
    # Normalize the channels
    for channel, _ in enumerate(im_as_arr):
        im_as_arr[channel] /= 255
    # Convert to float tensor
    im_as_ten = torch.from_numpy(im_as_arr).float()
    # Add one more channel to the beginning. Tensor shape = 1,3,224,224
    im_as_ten.unsqueeze_(0)
    # Convert to Pytorch variable
    im_as_var = Variable(im_as_ten, requires_grad=True)
    return im_as_var





class FeatureVisualization():
    def __init__(self,img_path,selected_layer):
        self.img_path=img_path
        self.selected_layer=selected_layer
        # Load pretrained model
        net = Net().to(device)
        checkpoint = torch.load('./test_epoch1_checkpoint.pth')
        net.load_state_dict(checkpoint['net'])
        self.pretrained_model = net.feature
        #self.pretrained_model = net.features
        self.pretrained_model2 = net

    def process_image(self):
        img=cv2.imread(self.img_path)
        img=preprocess_image(img)
        return img

    def get_feature(self):
        # Image  preprocessing
        input=self.process_image()
        #print("input.shape:{}".format(input.shape))
        x=input.to(device)
        for index,layer in enumerate(self.pretrained_model):
            x=layer(x)
            #print("x:{}".format(x.shape))
            if (index == self.selected_layer):
                return x

    def get_single_feature(self):
        # Get the feature map

        features=self.get_feature()
        #print(features.shape)
        feature=features[:,0,:,:]
        feature=feature.view(feature.shape[1],feature.shape[2])

        #print("feature")
        #print(feature.shape)
        return feature

    def get_multi_feature(self):
        # Get the feature map
        features=self.get_feature()
        #print(features.shape)
        result_path = './feat_first_' + str(self.selected_layer)

        if not os.path.exists(result_path):
            os.makedirs(result_path)
        print("On layer:{}, We can get the {} feature maps".format(self.selected_layer,features.shape[1]))    
        #print(features.shape[1])
        for i in range(features.shape[1]):
            feature=features[:,i,:,:]
            feature=feature.view(feature.shape[1],feature.shape[2])
            feature = feature.cpu().data.numpy()
            feature = 1.0 / (1 + np.exp(-1 * feature))
            feature = np.round(feature * 255)
            save_name = result_path + '/' + str(i) + '.jpg'
            cv2.imwrite(save_name, feature)
    def get_multi_feature1(self):
        # Get the feature map
        features=self.get_feature()
        #print(features.shape)
        result_path = './feat_second_' + str(self.selected_layer)

        if not os.path.exists(result_path):
            os.makedirs(result_path)
        print("On layer:{}, We can get the {} feature maps".format(self.selected_layer,features.shape[1]))    
        #print(features.shape[1])
        for i in range(features.shape[1]):
            feature=features[:,i,:,:]
            feature=feature.view(feature.shape[1],feature.shape[2])
            feature = feature.cpu().data.numpy()
            feature = 1.0 / (1 + np.exp(-1 * feature))
            feature = np.round(feature * 255)
            save_name = result_path + '/' + str(i) + '.jpg'
            cv2.imwrite(save_name, feature)        


    def save_feature_to_img1(self):
        #to numpy
        feature=self.get_single_feature()
        self.get_multi_feature()
        feature=feature.cpu().data.numpy()

        #use sigmod to [0,1]
        # print(feature[0])
        feature= 1.0/(1+np.exp(-1*feature))

        # to [0,255]
        feature=np.round(feature*255)
        #print(self.selected_layer)
        save_name = './feat_first' + str(self.selected_layer) + '.jpg'
        cv2.imwrite(save_name, feature)
    def save_feature_to_img2(self):
        #to numpy
        feature=self.get_single_feature()
        self.get_multi_feature1()
        feature=feature.cpu().data.numpy()

        #use sigmod to [0,1]
        # print(feature[0])
        feature= 1.0/(1+np.exp(-1*feature))

        # to [0,255]
        feature=np.round(feature*255)
        #print(self.selected_layer)
        save_name = './feat_second' + str(self.selected_layer) + '.jpg'
        cv2.imwrite(save_name, feature)    

    def plot_probablity(self,outputs):

        outputs = outputs.cpu().data.numpy()
        outputs = np.ndarray.tolist(outputs)

        x = range(0, 1000)
        plt.bar(x, outputs[0])
        plt.xlabel("Class")
        plt.ylabel("Probablity")
        plt.title("Image classifier")
        plt.show()


    def predict(self):
        input=self.process_image().cuda()
        outputs = self.pretrained_model2(input)

        return outputs



    




    


In [3]:
if __name__=="__main__":
    def imshow(img):
        img = img / 2 + 0.5  # unnormalize
        npimg = img.cpu().numpy()
        plt.imshow(np.transpose(npimg, (1, 2, 0)))
        plt.show()


    transform = transforms.Compose(
        [transforms.Resize(64),transforms.ToTensor(),
         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                            download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=4,
                                              shuffle=True, num_workers=2)

    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                           download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=4,
                                             shuffle=False, num_workers=2)






    net = Net().to(device)
 
    #checkpoint = torch.load('./test_epoch1_checkpoint.pth')
    #net.load_state_dict(checkpoint['net'])
    
    print(net)
    
    # %debug
    # summary(net,(3, 32, 32))
    
    ########################################################################
    # 3. Define a Loss function and optimizer
    # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    # Let's use a Classification Cross-Entropy loss and Adam with momentum.



    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters(), lr=0.001)

    ########################################################################
    # 4. Train the network
    # ^^^^^^^^^^^^^^^^^^^^
    #
    # This is when things start to get interesting.
    # We simply have to loop over our data iterator, and feed the inputs to the
    # network and optimize.

    for epoch in range(3):  # loop over the dataset multiple times

        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
        
                print("Epoch : {} steps : {} Training Loss : {}".format(epoch + 1, i + 1, running_loss / 2000) )
                running_loss = 0.0
        save_checkpoint({'net':net.state_dict()}, 'test_epoch{}'.format(epoch+1))        

    print('Finished Training')

Files already downloaded and verified
Files already downloaded and verified
Net(
  (feature): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (7): Conv2d(64, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): ReLU()
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (12): ReLU()
    (13): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilatio

RuntimeError: ignored

In [None]:
    from google.colab import files
    from io import BytesIO
    from PIL import Image

    uploaded = files.upload()
    im1 = Image.open(BytesIO(uploaded['Nebula.jpg']))

In [None]:
    myClass=FeatureVisualization('Highsierra.jpg',2)
    Compare=FeatureVisualization('Nebula.jpg',2)    
    print(myClass.pretrained_model2)

    #image1 = cv2.imread('./Highsierra.jpg')
    #image2 = cv2.imread('./Nebula.jpg')
    #myClass1 = cv2.resize(image1, 64)
    #Compare1 = cv2.resize(image2, 64)

    myClass.save_feature_to_img1()
    Compare.save_feature_to_img2()
    # print("The first picture classification predict:")
    myClass_vector = myClass.predict()
    # print("The second picture classification predict:")
    Compare_vector = Compare.predict()
    #Define cosine similarity
    cos= nn.CosineSimilarity(dim=1)
    #Define Euclidean distance
    euclidean_dist = torch.dist(myClass_vector,Compare_vector,p=2)
    cosine_dist = cos(myClass_vector,Compare_vector)
    print("Verification:")
    if cosine_dist > 0.4:
        print("They are the same!")
        print("Their cosine_similarity:{}".format(cosine_dist))
    else:
        print("They are not the same!")
        print("Their cosine_similarity:{}".format(cosine_dist))
       
    print("Their euclidean_dist:{}".format(euclidean_dist))