In [1]:
import os
import time
import yaml
import json
import argparse
import re
import base64
import torch
from torch.autograd import Variable
from PIL import Image
from io import BytesIO
from pprint import pprint
import numpy as np
from skimage import transform
from scipy import ndimage
from skimage import io

import torchvision.transforms as transforms
import vqa.lib.utils as utils
import vqa.datasets as datasets
import vqa.models as models
import vqa.models.convnets as convnets
from vqa.datasets.vqa_processed import tokenize_mcb

def load_checkpoint(model, optimizer, path_ckpt):
    path_ckpt_info  = path_ckpt + '_info.pth.tar'
    path_ckpt_model = path_ckpt + '_model.pth.tar'
    path_ckpt_optim = path_ckpt + '_optim.pth.tar'
    if os.path.isfile(path_ckpt_info):
        info = torch.load(path_ckpt_info)
        start_epoch = 0
        best_acc1   = 0
        exp_logger  = None
        if 'epoch' in info:
            start_epoch = info['epoch']
        else:
            print('Warning train.py: no epoch to resume')
        if 'best_acc1' in info:
            best_acc1 = info['best_acc1']
        else:
            print('Warning train.py: no best_acc1 to resume')
        if 'exp_logger' in info:
            exp_logger = info['exp_logger']
        else:
            print('Warning train.py: no exp_logger to resume')
    else:
        print("Warning train.py: no info checkpoint found at '{}'".format(path_ckpt_info))
    if os.path.isfile(path_ckpt_model):
        model_state = torch.load(path_ckpt_model)
        model.load_state_dict(model_state)
    else:
        print("Warning train.py: no model checkpoint found at '{}'".format(path_ckpt_model))
    if optimizer is not None and os.path.isfile(path_ckpt_optim):
        optim_state = torch.load(path_ckpt_optim)
        optimizer.load_state_dict(optim_state)
    else:
        print("Warning train.py: no optim checkpoint found at '{}'".format(path_ckpt_optim))
    print("=> loaded checkpoint '{}' (epoch {}, best_acc1 {})"
              .format(path_ckpt, start_epoch, best_acc1))
    return start_epoch, best_acc1, exp_logger

def process_question(question_str, trainset):
    question_tokens = tokenize_mcb(question_str)
    question_data = torch.LongTensor(1, len(question_tokens))
    for i, word in enumerate(question_tokens):
        if word in trainset.word_to_wid:
            question_data[0][i] = trainset.word_to_wid[word]
        else:
            question_data[0][i] = trainset.word_to_wid['UNK']
    if torch.cuda.is_available():
        question_data = question_data.cuda(non_blocking=True)
    with torch.no_grad():
        question_input = Variable(question_data, volatile=True)
    #print('question', question_str, question_tokens, question_data)
    
    return question_input

def process_answer(answer_var, trainset, model):
    with torch.no_grad():
        answer_sm = torch.nn.functional.softmax(answer_var.data[0].cpu(), dim=0)
        max_, aid = answer_sm.topk(5, 0, True, True)
    ans = []
    val = []
    for i in range(5):
        ans.append(trainset.aid_to_ans[aid.data[i]])
        val.append(max_.data[i].cpu().item())
    """
    att = []
    for x_att in model.list_att:
        img = x_att.view(1,14,14).cpu()
        img = transforms.ToPILImage()(img)
        buffer_ = BytesIO()
        img.save(buffer_, format="PNG")
        img_str = base64.b64encode(buffer_.getvalue()).decode()
        img_str = 'data:image/png;base64,'+img_str
        att.append(img_str)
        buffer_.close()
    """
    answer = {'ans':ans,'val':val} # 'att': att
    answer_str = json.dumps(answer)

    return answer_str

def process_visual(visual_pil,transform, options, cnn):
    #visual_strb64 = re.sub('^data:image/.+;base64,', '', visual_strb64)
    #visual_PIL = Image.open(BytesIO(base64.b64decode(visual_strb64)))
    visual_PIL = visual_pil
    visual_tensor = transform(visual_PIL)
    visual_data = torch.FloatTensor(1, 3,
                                       visual_tensor.size(1),
                                       visual_tensor.size(2))
    visual_data[0][0] = visual_tensor[0]
    visual_data[0][1] = visual_tensor[1]
    visual_data[0][2] = visual_tensor[2]
    #visual_data = visual_pil
    #print('visual', visual_data.size(), visual_data.mean())
    if torch.cuda.is_available():
        visual_data = visual_data.cuda(non_blocking=True)
    with torch.no_grad():
        visual_input = Variable(visual_data, volatile=True)
        visual_features = cnn(visual_input)
    if 'NoAtt' in options['model']['arch']:
        nb_regions = visual_features.size(2) * visual_features.size(3)
        with torch.no_grad():
            visual_features = visual_features.sum(3).sum(2).div(nb_regions).view(-1, 2048)
    return visual_features

class MutanAttInference():
    """
    MutanAtt model wrapper
    """

    def __init__(self, dir_logs='logs/vqa/mutan_att_trainval', config='options/vqa/mutan_att_trainval.yaml', resume='ckpt'):
        self.options = {
            'logs': {
                'dir_logs': dir_logs
            }
        }
        with open(config, 'r') as handle:
            options_yaml = yaml.load(handle)
        self.options = utils.update_values(self.options, options_yaml)
        
        self.trainset = datasets.factory_VQA(self.options['vqa']['trainsplit'],
                                        self.options['vqa'])

        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                    std=[0.229, 0.224, 0.225])
        self.transform = transforms.Compose([
            transforms.Resize(self.options['coco']['size']),
            transforms.CenterCrop(self.options['coco']['size']),
            transforms.ToTensor(),
            normalize,
        ])

        opt_factory_cnn = {
            'arch': self.options['coco']['arch']
            }
        self.cnn = convnets.factory(opt_factory_cnn, cuda=torch.cuda.is_available(), data_parallel=False)

        self.model = models.factory(
            self.options['model'],
            self.trainset.vocab_words(),
            self.trainset.vocab_answers(),
            cuda=torch.cuda.is_available(),
            data_parallel=False
            )
        start_epoch, best_acc1, _ = load_checkpoint(self.model, None,
            os.path.join(self.options['logs']['dir_logs'], resume))
        activation = {}
        def get_activation(name):
            def hook(model, input, output):
                activation[name] = output.detach()
            return hook
        with torch.no_grad():
            self.model.seq2vec.register_forward_hook(get_activation('seq2vec'))
            self.model.conv_att.register_forward_hook(get_activation('conv_att'))
        self.activation = activation
        
    def interpolate(self, img, heat_map):
        height = img.shape[0]
        width = img.shape[1]

        # resize heat map
        heat_map_resized = transform.resize(heat_map, (height, width,1))
        # normalize
        max_value = np.max(heat_map_resized)
        min_value = np.min(heat_map_resized)
        normalized_heat_map = (heat_map_resized - min_value) / (max_value - min_value)
        return normalized_heat_map

    def grad_cam(self, img, logits, activations, sigma=1):
        img = img.permute(1,2,0)
        relu = torch.nn.ReLU()
        pred = logits.argmax(dim=1)
        logits[:,pred.item()].backward()
        gradients = self.model.conv_att.weight.grad
        pooled_gradients = torch.mean(gradients, dim=[0,2,3])
        for i in range(2):
            activations[:, i, :, :] *= pooled_gradients[i]
        activations = relu(activations)

        heat = torch.mean(activations, dim=1).squeeze()
        heat = ndimage.filters.gaussian_filter(heat.cpu(), sigma=sigma)

        heat = self.interpolate(np.array(img), 1 * heat)
        #heat[heat < 0.1] = 0
        #heat[heat < 0.5] *= 0.5
        heat = torch.Tensor(heat)
        fg_im = (heat * img).permute(2,0,1)
        bg_im = ((1-heat) * img).permute(2,0,1)
        
        return heat, fg_im[None,:,:], bg_im[None,:,:]
    
    def infer(self, img, question):
        """
        :param img: PIL image object
        :param question (str): 

        Returns:
            The top five answers, the final logits weight vector, the question embedding, and the attention map
        """
        with torch.no_grad():
            v = process_visual(img, self.transform, self.options, self.cnn)
            q = process_question(question, self.trainset)
        
        # get the output after the first layer
        logits = self.model(v,q) # logit weight vector of the answers
        a = process_answer(logits, self.trainset, self.model)
        
        q_emb = self.activation['seq2vec']
        att_activation = self.activation['conv_att']
        del v, q
        return a, logits, q_emb, att_activation
    
    
class MutanAttInference2():
    """
    MutanAtt model wrapper
    """

    def __init__(self, classes_idxs=None, dir_logs='logs/vqa/mutan_att_trainval', config='options/vqa/mutan_att_trainval.yaml', resume='ckpt'):
        self.options = {
            'logs': {
                'dir_logs': dir_logs
            }
        }
        with open(config, 'r') as handle:
            options_yaml = yaml.load(handle)
        self.options = utils.update_values(self.options, options_yaml)
        
        self.trainset = datasets.factory_VQA(self.options['vqa']['trainsplit'],
                                        self.options['vqa'])

        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                    std=[0.229, 0.224, 0.225])
        self.transform = transforms.Compose([
            transforms.Resize(self.options['coco']['size']),
            transforms.CenterCrop(self.options['coco']['size']),
            transforms.ToTensor(),
            normalize,
        ])

        opt_factory_cnn = {
            'arch': self.options['coco']['arch']
            }
        self.cnn = convnets.factory(opt_factory_cnn, cuda=torch.cuda.is_available(), data_parallel=False)

        self.model = models.factory(
            self.options['model'],
            self.trainset.vocab_words(),
            self.trainset.vocab_answers(),
            cuda=torch.cuda.is_available(),
            data_parallel=False
            )
        start_epoch, best_acc1, _ = load_checkpoint(self.model, None,
            os.path.join(self.options['logs']['dir_logs'], resume))
        self.classes = classes_idxs
        activation = {}
        def get_activation(name):
            def hook(model, input, output):
                activation[name] = output.detach()
            return hook
        with torch.no_grad():
            self.model.seq2vec.register_forward_hook(get_activation('seq2vec'))
            self.model.conv_att.register_forward_hook(get_activation('conv_att'))
        self.activation = activation

    def interpolate(self, img, heat_map):
        height = img.shape[0]
        width = img.shape[1]

        # resize heat map
        heat_map_resized = transform.resize(heat_map, (height, width,1))
        # normalize
        max_value = np.max(heat_map_resized)
        min_value = np.min(heat_map_resized)
        normalized_heat_map = (heat_map_resized - min_value) / (max_value - min_value)
        return normalized_heat_map

    def grad_cam(self, img, logits, activations):
        img = img.permute(1,2,0)
        relu = torch.nn.ReLU()
        pred = logits.argmax(dim=1)
        logits[:,pred.item()].backward()
        gradients = self.model.conv_att.weight.grad
        pooled_gradients = torch.mean(gradients, dim=[0,2,3])
        for i in range(2):
            activations[:, i, :, :] *= pooled_gradients[i]
        activations = relu(activations)

        heat = torch.mean(activations, dim=1).squeeze()
        heat = ndimage.filters.gaussian_filter(heat.cpu(), sigma=0)

        heat = self.interpolate(np.array(img), 1 * heat)
        heat[heat < 0.1] = 0
        heat = torch.Tensor(heat)
        fg_im = (heat * img).permute(2,0,1)
        bg_im = ((1.-heat) * img).permute(2,0,1)
        
        return heat, fg_im[None,:,:], bg_im[None,:,:]
    
    def infer(self, img, question):
        """
        :param img: PIL image object
        :param question (str): 

        Returns:
            The top five answers, the final logits weight vector, and the question embedding
        """
        with torch.no_grad():
            v = process_visual(img, self.transform, self.options, self.cnn)
            q = process_question(question, self.trainset)
        
        # get the output after the first layer
        
        logits = self.model(v,q) # logit weight vector of the answers
        a = process_answer(logits, self.trainset, self.model)

        #logits_cls = torch.clone(logits)
        #logits_cls = logits_cls.cpu().detach().numpy()[:,self.classes]
        #logits_cls = torch.Tensor(logits_cls).to("cuda")
        q_emb = self.activation['seq2vec']
        att_activation = self.activation['conv_att']
        
        del v,q
        return a, logits, q_emb, att_activation

You must use pip==1.16.1

In [2]:
import numpy as np
from medcam import medcam
print(np.__version__)
#np_load_old = np.load
#np.load = lambda *a,**k: np_load_old(*a, allow_pickle=True, **k)
#with torch.no_grad():
model = MutanAttInference(dir_logs='logs/vqa/mutan_att_trainval', config='options/vqa/mutan_att_trainval.yaml')
model.model.to("cuda:0")
    
model.model.eval()

1.19.5


  options_yaml = yaml.load(handle)


KeyboardInterrupt: 

In [None]:
import glob
gen_files = glob.glob("../../model/evaluation/training/y_gen_q*.png")
gen_files.sort(key=os.path.getmtime,reverse=True)
orig_files = glob.glob("../../model/evaluation/training/orig_q*.png")
orig_files.sort(key=os.path.getmtime, reverse=True)

In [None]:
import re
rx = r'(\{[^{}]+\})'


In [None]:
from PIL import Image
import requests
import cv2
#url = "http://images.cocodataset.org/train2017/000000505539.jpg"
idx = 10

q = json.loads(re.search(rx, gen_files[idx])[0].replace("'", "\""))["question"]
print(q)
orig = Image.open(orig_files[idx])
gen = Image.open(gen_files[idx])

#img = Image.open("../../model/evaluation/training/orig_q_{'question': 'What color is the sofa?', 'image_id': 200305, 'question_id': 2003051} brown_3.png")
#img1 = Image.open("../../model/evaluation/training/y_gen_q_{'question': 'What color is the sofa?', 'image_id': 200305, 'question_id': 2003051}_3.png")
#orig_img = Image.open("../../model/evaluation/input_qid_0_0.png")

In [None]:
orig

In [None]:
gen

the visual feature must be of shape N x 2048 x 14 x 14

question embedding must be of shape N x num_worods

In [None]:
#with torch.no_grad():
a, logits_pred, q_emb, activation = model.infer(orig, q)
a

# Grad - cam

In [None]:
gradients = model.model.conv_att.weight.grad
pooled_gradients = torch.mean(gradients, dim=[0,2,3])
for i in range(2):
    activation[:, i, :, :] *= pooled_gradients[i]
grad_cam(model, orig,logits_pred, activation)


In [None]:
import sys
sys.path.append("../../")
import heatmap.heatmap as heatmap
from scipy import ndimage
from skimage import io
import numpy as np
from skimage import transform
from scipy import ndimage
from skimage import io
import matplotlib.pyplot as plt

In [None]:
for i in range(2):
    activation[:, i, :, :] *= pooled_gradients[i]

In [None]:
relu = torch.nn.ReLU()

In [None]:
def my_add(img, heat_map):
    height = img.shape[0]
    width = img.shape[1]
    
    # resize heat map
    heat_map_resized = transform.resize(heat_map, (height, width,1))
    # normalize
    max_value = np.max(heat_map_resized)
    min_value = np.min(heat_map_resized)
    normalized_heat_map = (heat_map_resized - min_value) / (max_value - min_value)
    return normalized_heat_map

In [None]:
import matplotlib.pyplot as plt
activation = relu(activation)
heat = torch.mean(activation, dim=1).squeeze()
#heat = np.maximum(heat.cpu(), 0)
#heat /= torch.max(heat)
heat = ndimage.filters.gaussian_filter(heat.cpu(), sigma=1)
heat = my_add(np.array(orig),heat*(-1))
#heat = heat*0.8
#heat[heat < 0.5] *= 0.5

heatmap.add(np.array(orig), heat, cmap="turbo",axis="off")

In [None]:
heat = transform.resize(heat, (np.array(orig).shape[0], np.array(orig).shape[1], 1))
testgrad1 = (heat * orig).astype("uint8")
testgrad2 = ((1-heat) * orig).astype("uint8")
plt.imshow(testgrad1)
plt.axis("off")
plt.show()

# heat maps that actually work <3

In [None]:
orig.size

In [None]:
att = model.model.list_att
att = sum(att)
heat_map = np.reshape(att.cpu(), (14,14))
heat_map = ndimage.filters.gaussian_filter(heat_map, sigma=2)
heat_map = my_add(np.array(orig), heat_map)
#heat_map = np.repeat(heat_map[None,:], 1, axis=2)

#heatmap.add(np.array(orig), heat_map, alpha=.6, cmap="turbo")

In [None]:
#heat_map[heat_map<0.1] = 0.
test1 = (heat_map*orig).astype("uint8")
test2 = (orig * (1-heat_map)).astype("uint8") #uint8 is absolutely necessary!!!
plt.imshow(test1)

In [None]:
plt.imshow(testgrad1)

In [None]:
def get_heatmap(img, model, sigma=2):
    att = model.model.list_att
    heat_maps = [np.reshape(a.cpu(),(14,14)) for a in att]
    heat_map = sum(heat_maps) / 4
    heat_map = heat_map = ndimage.filters.gaussian_filter(heat_map, sigma=sigma)
    heatmap.add(np.array(img), heat_map, alpha=.6, cmap="turbo")

In [None]:
att = model.model.list_att
att[0].shape
heat_map = np.reshape(att[0].cpu(), (14,14))
heat_map = ndimage.filters.gaussian_filter(heat_map, sigma=2)

In [None]:
get_heatmap(orig, model)

In [None]:
heat_maps = [np.reshape(a.cpu(),(14,14)) for a in att]
heat_map = sum(heat_maps) / 4
heat_map = ndimage.filters.gaussian_filter(heat_map, sigma=2)
heatmap.add(np.array(orig), heat_map, alpha=.7, cmap="turbo")

In [None]:
with torch.no_grad():
    a2, logits_pred2, q_emb2 = model.infer(gen, q)
a2

In [None]:
get_heatmap(gen, model)