# SimCLR transforms

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(0)


class GaussianBlur(object):
    # Implements Gaussian blur as described in the SimCLR paper
    def __init__(self, kernel_size, min=0.1, max=2.0):
        self.min = min
        self.max = max
        # kernel size is set to be 10% of the image height/width
        self.kernel_size = kernel_size

    def __call__(self, sample):
        sample = np.array(sample)

        # blur the image with a 50% chance
        prob = np.random.random_sample()

        if prob < 0.5:
            sigma = (self.max - self.min) * np.random.random_sample() + self.min
            sample = cv2.GaussianBlur(sample, (self.kernel_size, self.kernel_size), sigma)

        return sample

In [None]:
### import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision.transforms as transforms
from torchvision import datasets
from PIL import Image
import random

class DataSetWrapper(object):
    def __init__(self, batch_size, num_workers, valid_size, input_shape, strength):
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.valid_size = valid_size
        self.input_shape = input_shape
        self.strength = strength

    def get_data_loaders(self):
        data_augment = self._simclr_transform()
        train_dataset = datasets.ImageFolder(root='extra/',
                                               transform=SimCLRDataTransform(data_augment))
        
        train_loader, valid_loader = self.get_train_validation_data_loaders(train_dataset)
        return train_loader, valid_loader 

    def _simclr_transform(self):
        
        data_transforms: transforms.Compose

        s = self.strength # color distortion strength
        color_jitter = transforms.ColorJitter(0.8*s, 0.8*s, 0.8*s, 0.2*s)
        gaussian_blur = GaussianBlur(kernel_size = int(0.1* self.input_shape[0]))
        
        data_transforms = transforms.Compose([
            transforms.Reszie((512,512)),
            transforms.RandomApply([color_jitter], p=0.4), # Random color distortions
            transforms.RandomGrayscale(p=0.2),
            transforms.ToTensor()
        ])

        return data_transforms

    def get_train_validation_data_loaders(self, train_dataset):
        # obtain training indices that will be used for validation
        num_train = len(train_dataset)
        indices = list(range(num_train))
        np.random.shuffle(indices)

        split = int(np.floor(self.valid_size * num_train))
        train_idx, valid_idx = indices[split:], indices[:split]

        # define samplers for obtaining training and validation batches
        train_sampler = SubsetRandomSampler(train_idx)
        valid_sampler = SubsetRandomSampler(valid_idx)

        train_loader = DataLoader(train_dataset, batch_size=self.batch_size, sampler=train_sampler,
                                  num_workers=self.num_workers, drop_last=True, shuffle=False)

        valid_loader = DataLoader(train_dataset, batch_size=self.batch_size, sampler=valid_sampler,
                                  num_workers=self.num_workers, drop_last=True)
        return train_loader, valid_loader


class SimCLRDataTransform(object):
    def __init__(self, transform):
        self.transform = transform
    
    #Credits: https://github.com/JDAI-CV/DCL
    #DCL based jigsaw patch shuffling
    def swap(self,img, crop):
        def crop_image(image, cropnum):
            width, high = image.size
            crop_x = [int((width / cropnum[0]) * i) for i in range(cropnum[0] + 1)]
            crop_y = [int((high / cropnum[1]) * i) for i in range(cropnum[1] + 1)]
            im_list = []
            for j in range(len(crop_y) - 1):
                for i in range(len(crop_x) - 1):
                    im_list.append(image.crop((crop_x[i], crop_y[j], min(crop_x[i + 1], width), min(crop_y[j + 1], high))))
            return im_list

        widthcut, highcut = img.size
        img = img.crop((10, 10, widthcut-10, highcut-10))
        images = crop_image(img, crop)
        pro = 5
        if pro >= 5:          
            tmpx = []
            tmpy = []
            count_x = 0
            count_y = 0
            k = 1
            RAN = 2
            for i in range(crop[1] * crop[0]):
                tmpx.append(images[i])
                count_x += 1
                #ensuring shuffling only in neighboring patch
                if len(tmpx) >= k:
                    tmp = tmpx[count_x - RAN:count_x]
                    random.shuffle(tmp)
                    tmpx[count_x - RAN:count_x] = tmp
                if count_x == crop[0]:
                    tmpy.append(tmpx)
                    count_x = 0
                    count_y += 1
                    tmpx = []
                if len(tmpy) >= k:
                    tmp2 = tmpy[count_y - RAN:count_y]
                    random.shuffle(tmp2)
                    tmpy[count_y - RAN:count_y] = tmp2
            random_im = []
            for line in tmpy:
                random_im.extend(line)
        
            width, high = img.size
            iw = int(width / crop[0])
            ih = int(high / crop[1])
            toImage = Image.new('RGB', (iw * crop[0], ih * crop[1]))
            x = 0
            y = 0
            for i in random_im:
                i = i.resize((iw, ih), Image.ANTIALIAS)
                toImage.paste(i, (x * iw, y * ih))
                x += 1
                if x == crop[0]:
                    x = 0
                    y += 1
        else:
            toImage = img
        toImage = toImage.resize((widthcut, highcut))
        return toImage
    
    #applying transforms
    def __call__(self, sample):

        sample2 = self.swap(sample,(7,7)) #change size here for different jigsaw size
        xi = self.transform(sample2)
        xj = self.transform(sample)
        return xi, xj

# SimCLR model
Credit: https://github.com/ssumin6/SimCLR

In [None]:
import torch 
import torch.nn as nn
from torchvision.models import resnet18

class SimCLR(nn.Module):
    def __init__(self, out_dim):
        super(SimCLR, self).__init__()
        resnet = resnet18()
        res_out_dim = resnet.fc.in_features
        #resnet encoder
        self.f = nn.Sequential(*list(resnet.children())[:-1])
        #non linear projection head
        self.g = nn.Sequential(nn.Linear(res_out_dim, res_out_dim), nn.ReLU(), nn.Linear(res_out_dim, out_dim))

    def forward(self, xi, xj):
        x = torch.cat([xi, xj], dim=0)
        h = self.f(x)
        z = self.g(h.squeeze())
        return h, z

    def get_hidden(self, x):
        h = self.f(x)
        return h.squeeze()

# SimCLR training

In [None]:
import os
import time
import argparse
import numpy as np
import torch
import torch.nn.functional as F

def main():
    device = 'cuda' if torch.cuda.is_available else 'cpu'

    ### Hyperparameters setting ###
    epochs = 500
    batch_size = 64
    T = 0.5
    proj_dim = 512
    num_worker = 8
    valid_size = 0.2
    strength = 1.0
    trainloss = []
    valloss = []

    ### DataLoader ###
    dataset = DataSetWrapper(batch_size, num_worker , valid_size, input_shape = (512,512, 3), strength=strength)
    train_loader , valid_loader = dataset.get_data_loaders()

    model = SimCLR(out_dim=proj_dim).to(device)

    ### Optimizer & scheduler ###
    optimizer = torch.optim.Adam(model.parameters(), 1e-3, weight_decay=1e-5)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=len(train_loader), eta_min=0, last_epoch=-1)

    '''
    Model-- ResNet18(encoder network) + MLP with one hidden layer(projection head)
    Loss -- NT-Xent Loss
    '''
    val_loss = 1e9
    
    #Loss function
    def NTXent(z, N):
        z = F.normalize(z, dim=1)
        mask = torch.eye(N*2).to(device).bool()
        tmp = torch.mm(z, z.T).masked_fill(mask, float('-inf'))
        loss_matrix = - F.log_softmax(tmp / T, dim=1) 
        loss = sum(torch.diag(loss_matrix[:N, N:]))+sum(torch.diag(loss_matrix[N:, :N]))
        loss /= 2 * N
        return loss

    def train(xi, xj):
        xi, xj = xi.to(device), xj.to(device)
        h, z = model(xi, xj)
        optimizer.zero_grad()
        loss = NTXent(z, xi.shape[0])
        loss.backward()
        optimizer.step()
        return loss

    def valid(xi, xj):
        xi, xj = xi.to(device), xj.to(device)
        h, z = model(xi, xj)
        loss = NTXent(z, xi.shape[0])
        return loss
    
    print('started train')
    #Training and validation
    for epoch in range(epochs):
        start = time.time()
        for (xi, xj),_ in train_loader:
            loss = train(xi, xj)
        print("[Epoch %d] Train Loss %f. Time takes %s" %(epoch, loss, time.time()-start))
        trainloss.append(loss)

        with torch.no_grad():
            losses = []
            for (val_xi, val_xj),_ in valid_loader:
                losses.append(valid(val_xi, val_xj))
            loss = sum(losses) / len(losses)
            valloss.append(loss)
            print("[Epoch %d] Valid Loss %f" %(epoch, loss))
            
            if (loss < val_loss):
                val_loss = loss
                torch.save({"model": model.state_dict(), "epoch": epoch, "loss": loss}, "model_trial.ckpt")
                
    eps = range(1, len(trainloss) + 1)
    plt.plot(eps, trainloss, 'b', label='Training Accuracy')
    plt.plot(eps, valloss, 'r', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    plt.savefig('Simclr_acc_plot.png')
            
main()

#  Attaching classifier to Resnet encoder

In [None]:
import torch 
import torch.nn as nn

#Creating custom model by attaching classifier to SimCLR encoder

class CustomModel(nn.Module):
    def __init__(self):
        super(CustomModel, self).__init__()
        proj_dim = 512
        hidden_dim = 256
        self.encoder = SimCLR(out_dim=proj_dim)
        ckpt = torch.load('model_trial.ckpt')
        print("Load checkpoint trained for %d epochs. Loss is %f." %(ckpt["epoch"], ckpt["loss"]))
        (self.encoder).load_state_dict(ckpt["model"])
        self.fc = nn.Sequential(nn.Linear(proj_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 5))

    def forward(self,x):
        
        out = self.encoder.get_hidden(x)
        out = self.fc(out)
        
        return out

# Downstream task training

In [None]:
import time
import torch
import argparse
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets
from torch.utils.data import DataLoader
from torchvision.models import resnet18
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

def evaluate(test_loader, model, device):
    correct = 0    
    total = 0
    confusion_matrix = torch.zeros(5,5)
    model.eval()
    
    #Prints confusion matrix
    with torch.no_grad():
        for test_x, test_y in test_loader:
            test_x, test_y = test_x.to(device), test_y.to(device) 
            h = model.encoder.get_hidden(test_x)
            logits = model.fc(h)
              
            pred = torch.argmax(logits, dim=1)
            total += test_y.shape[0]
            correct += (pred == test_y).sum().item()
            for l, p in zip(test_y.view(-1), pred.view(-1)):
                confusion_matrix[l.long(), p.long()] += 1
                
    total_acc = 100.0 * correct / total
    print(confusion_matrix)
    return total_acc

def main2():
    device = 'cuda' if torch.cuda.is_available else 'cpu'

    ## Hyperparameters
    finetune = True 
    proj_dim = 512
    hidden_dim = 256

    model = CustomModel().to(device)
    
    img_transform = transforms.Compose([transforms.Resize(size=(512,512)), transforms.ToTensor()])
    train_dataset = datasets.ImageFolder(root='train_combined',transform=img_transform)
    
    num_train = len(train_dataset)
    indices = list(range(num_train))
    np.random.shuffle(indices)

    split = int(np.floor(0.2 * num_train))
    train_idx, valid_idx = indices[split:], indices[:split]

    # define samplers for obtaining training and validation batches
    train_sampler = SubsetRandomSampler(train_idx)
        
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = DataLoader(train_dataset, batch_size=16,sampler=train_sampler,num_workers=8, drop_last=True, shuffle=False)

    test_loader = DataLoader(train_dataset, batch_size=16,sampler=valid_sampler,num_workers=8, drop_last=True)
    n_classes = 5

    # Freeze encoder network
    if finetune:
        for p in model.encoder.f.parameters():
            p.requires_grad = False

    optimizer = torch.optim.Adam(list(model.encoder.f.parameters())+list(model.fc.parameters()), lr=3e-04) 
    
    target_acc = 0.0 
    token = 0 # For early stopping
    trainAcc = []
    valAcc = []
    
    # Train Linear Classifier
    for epoch in range(100):
        start = time.time()
        t_total = 0
        t_correct = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            h = model.encoder.get_hidden(x)
            logits = model.fc(h)
            
            pred = torch.argmax(logits, dim=1)
            t_total += y.shape[0]
            t_correct += (pred == y).sum().item()           
            
            
            optimizer.zero_grad()
            loss = F.cross_entropy(logits, y)
            loss.backward()
            optimizer.step()
            
        trainAcc.append(100.0 * t_correct / t_total)
        correct = 0    
        total = 0
        model.eval()
        with torch.no_grad():
            for test_x, test_y in test_loader:
                test_x, test_y = test_x.to(device), test_y.to(device) 
                h = model.encoder.get_hidden(test_x)
                logits = model.fc(h)
              
                pred = torch.argmax(logits, dim=1)
                total += test_y.shape[0]
                correct += (pred == test_y).sum().item()
        valid_acc = 100.0 * correct / total
        valAcc.append(valid_acc)
        
        print("[Epoch %2d] Valid Acc %f. Time takes %s" %(epoch, valid_acc, time.time()-start))
        if (valid_acc > target_acc):
            target_acc = valid_acc
            torch.save({"model": model.state_dict(), "epoch": epoch, "loss": loss}, "fullmodel_trial.ckpt")
            token = 0 
        elif token >= 5:
            print("Early Stop at Epoch %d." %(epoch))
            break
        else:
            token += 1

    # Evaluate
    eps = range(1, len(trainAcc) + 1)
    plt.plot(eps, trainAcc, 'b', label='Training Accuracy')
    plt.plot(eps, valAcc, 'r', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    plt.savefig('Simclr_acc_trial.png')
    acc = evaluate(test_loader, model, device)

main2()

# Generate Grad-CAM
Credits: https://github.com/yaleCat/Grad-CAM-pytorch

In [None]:
import torch
import argparse
import cv2
import numpy as np
import torch
from torch.autograd import Function
from torchvision import models, transforms


#Generates Grad-CAM for an input image

class FeatureExtractor():
    """ Class for extracting activations and
    registering gradients from targetted intermediate layers """

    def __init__(self, model, target_layers):
        self.model = model
        self.target_layers = target_layers
        self.gradients = []

    def save_gradient(self, grad):
        self.gradients.append(grad)

    def __call__(self, x):
        outputs = []
        self.gradients = []
        for name, module in self.model._modules.items():
            x = module(x)
            if name in self.target_layers:
                x.register_hook(self.save_gradient)
                outputs += [x]
        return outputs, x.squeeze()

class ModelOutputs():
    """ Class for making a forward pass, and getting:
    1. The network output.
    2. Activations from intermeddiate targetted layers.
    3. Gradients from intermeddiate targetted layers. """

    def __init__(self, model, feature_module, target_layers):
        self.model = model
        self.feature_module = feature_module
        self.feature_extractor = FeatureExtractor(self.feature_module, target_layers)

    def get_gradients(self):
        return self.feature_extractor.gradients

    def __call__(self, x):
        target_activations = []
        for name, module in self.model._modules.items():
            if "avgpool" in name.lower():
                x = module(x)
                x = x.view(x.size(0),-1)
            elif "encoder" in name.lower():
                target_activations, x = self.feature_extractor(x)
            else:
                x = module(x)
        
        return target_activations, x

def preprocess_image(img):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
    preprocessing = transforms.Compose([
        transforms.ToTensor(),
        normalize,
    ])
    return preprocessing(img.copy()).unsqueeze(0)

def show_cam_on_image(img, mask):
    heatmap = cv2.applyColorMap(np.uint8(255 * mask), cv2.COLORMAP_JET)
    heatmap = np.float32(heatmap) / 255
    cam = heatmap + np.float32(img)
    cam = cam / np.max(cam)
    return np.uint8(255 * cam)

class GradCam:
    def __init__(self, model, feature_module, target_layer_names, use_cuda):
        self.model = model
        self.feature_module = feature_module
        self.model.eval()
        self.cuda = use_cuda
        if self.cuda:
            self.model = model.cuda()

        self.extractor = ModelOutputs(self.model, self.feature_module, target_layer_names)

    def forward(self, input_img):
        return self.model(input_img)

    def __call__(self, input_img, target_category=None):
        if self.cuda:
            input_img = input_img.cuda()

        features, output = self.extractor(input_img)

        if target_category == None:
            target_category = np.argmax(output.cpu().data.numpy())

        one_hot = np.zeros((1, output.size()[-1]), dtype=np.float32)
        one_hot[0][target_category] = 1
        one_hot = torch.from_numpy(one_hot).requires_grad_(True)
        if self.cuda:
            one_hot = one_hot.cuda()
        
        one_hot = torch.sum(one_hot * output)

        self.feature_module.zero_grad()
        self.model.zero_grad()
        one_hot.backward(retain_graph=True)

        grads_val = self.extractor.get_gradients()[-1].cpu().data.numpy()

        target = features[-1]
        target = target.cpu().data.numpy()[0, :]

        weights = np.mean(grads_val, axis=(2, 3))[0, :]
        cam = np.zeros(target.shape[1:], dtype=np.float32)

        for i, w in enumerate(weights):
            cam += w * target[i, :, :]

        cam = np.maximum(cam, 0)
        cam = cv2.resize(cam, input_img.shape[2:])
        cam = cam - np.min(cam)
        cam = cam / np.max(cam)
        return cam


class GuidedBackpropReLU(Function):
    @staticmethod
    def forward(self, input_img):
        positive_mask = (input_img > 0).type_as(input_img)
        output = torch.addcmul(torch.zeros(input_img.size()).type_as(input_img), input_img, positive_mask)
        self.save_for_backward(input_img, output)
        return output

    @staticmethod
    def backward(self, grad_output):
        input_img, output = self.saved_tensors
        grad_input = None

        positive_mask_1 = (input_img > 0).type_as(grad_output)
        positive_mask_2 = (grad_output > 0).type_as(grad_output)
        grad_input = torch.addcmul(torch.zeros(input_img.size()).type_as(input_img),
                                   torch.addcmul(torch.zeros(input_img.size()).type_as(input_img), grad_output,
                                                 positive_mask_1), positive_mask_2)
        return grad_input


class GuidedBackpropReLUModel:
    def __init__(self, model, use_cuda):
        self.model = model
        self.model.eval()
        self.cuda = use_cuda
        if self.cuda:
            self.model = model.cuda()

        def recursive_relu_apply(module_top):
            for idx, module in module_top._modules.items():
                recursive_relu_apply(module)
                if module.__class__.__name__ == 'ReLU':
                    module_top._modules[idx] = GuidedBackpropReLU.apply

        # replace ReLU with GuidedBackpropReLU
        recursive_relu_apply(self.model)

    def forward(self, input_img):
        return self.model(input_img)

    def __call__(self, input_img, target_category=None):
        if self.cuda:
            input_img = input_img.cuda()

        input_img = input_img.requires_grad_(True)

        output = self.forward(input_img)

        if target_category == None:
            target_category = np.argmax(output.cpu().data.numpy())

        one_hot = np.zeros((1, output.size()[-1]), dtype=np.float32)
        one_hot[0][target_category] = 1
        one_hot = torch.from_numpy(one_hot).requires_grad_(True)
        if self.cuda:
            one_hot = one_hot.cuda()

        one_hot = torch.sum(one_hot * output)
        one_hot.backward(retain_graph=True)

        output = input_img.grad.cpu().data.numpy()
        output = output[0, :, :, :]

        return output

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--use-cuda', action='store_true', default=False,
                        help='Use NVIDIA GPU acceleration')
    parser.add_argument('--image-path', type=str, default='./examples/both.png',
                        help='Input image path')
    args = parser.parse_args()
    args.use_cuda = args.use_cuda and torch.cuda.is_available()
    if args.use_cuda:
        print("Using GPU for acceleration")
    else:
        print("Using CPU for computation")

    return args

def deprocess_image(img):
    """ see https://github.com/jacobgil/keras-grad-cam/blob/master/grad-cam.py#L65 """
    img = img - np.mean(img)
    img = img / (np.std(img) + 1e-5)
    img = img * 0.1
    img = img + 0.5
    img = np.clip(img, 0, 1)
    return np.uint8(img*255)

if __name__ == '__main__':
    """ python grad_cam.py <path_to_image>
    1. Loads an image with opencv.
    2. Preprocesses it for ResNet50 and converts to a pytorch variable.
    3. Makes a forward pass to find the category index with the highest score,
    and computes intermediate activations.
    Makes the visualization. """

    #Load saved best model
    model = CustomModel()
    ckpt = torch.load('fullmodel_trial.ckpt')
    model.load_state_dict(ckpt["model"])
    
    #specifying layer to visualize activations
    grad_cam = GradCam(model=model, feature_module=model.encoder.f, \
                       target_layer_names=["7"], use_cuda=True)

    img = cv2.imread('train/cgm/train-cgm-651.jpg', 1)
    img = np.float32(img) / 255
    # Opencv loads as BGR:
    img = img[:, :, ::-1]
    input_img = preprocess_image(img)

    # If None, returns the map for the highest scoring category.
    # Otherwise, targets the requested category.
    target_category = None
    grayscale_cam = grad_cam(input_img, target_category)

    grayscale_cam = cv2.resize(grayscale_cam, (img.shape[1], img.shape[0]))
    cam = show_cam_on_image(img, grayscale_cam)

    gb_model = GuidedBackpropReLUModel(model=model, use_cuda=True)
    gb = gb_model(input_img, target_category=target_category)
    gb = gb.transpose((1, 2, 0))

    cam_mask = cv2.merge([grayscale_cam, grayscale_cam, grayscale_cam])
    cam_gb = deprocess_image(cam_mask*gb)
    gb = deprocess_image(gb)

    cv2.imwrite("cgm.jpg", cam)
    cv2.imwrite('gb.jpg', gb)
    cv2.imwrite('cam_gb.jpg', cam_gb)
    print('done')