# Reward Model Class

In [None]:
import yaml
import numpy as np
from tqdm import tqdm
import torch

from models.model_itm import ALBEF as ALBEF_itm
from models.vit import interpolate_pos_embed
from transformers import BertTokenizer
from torchvision.transforms import Compose, Normalize, Resize, InterpolationMode
from PIL import Image

class REWARD_MODULE:
    def __init__(self,
                config,
                checkpoint,
                input_resolution):
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') 
        self.config = yaml.load(open(config, 'r'), Loader=yaml.Loader)
        self.input_resolution = input_resolution
        self.itm_labels = {'negative':0, 'positive':2}
        
        self.load_albef_itm(checkpoint)
        
    def load_albef_itm(self, checkpoint_path):
        model = ALBEF_itm(config=self.config, 
                         text_encoder='bert-base-uncased', 
                         tokenizer=self.tokenizer
                         ).to(self.device)  
        checkpoint = torch.load(checkpoint_path, map_location='cpu') 
        state_dict = checkpoint['model']
        pos_embed_reshaped = interpolate_pos_embed(state_dict['visual_encoder.pos_embed'],model.visual_encoder)         
        state_dict['visual_encoder.pos_embed'] = pos_embed_reshaped
        msg = model.load_state_dict(state_dict,strict=False)
        model = model.eval()
        self.model = model
        
    def forward(self, image, text):
        image_embeds = self.model.visual_encoder(image)
        image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(image.device)
        
        output = self.model.text_encoder(text.input_ids,
                                    attention_mask = text.attention_mask, 
                                    encoder_hidden_states = image_embeds,
                                    encoder_attention_mask = image_atts,        
                                    return_dict = True
                                   )
        prediction = self.model.cls_head(output.last_hidden_state[:,0,:])
        positive_score = prediction[:,self.itm_labels['positive']]
        return positive_score
        
    def predict_itm(self, images,reports):
        image = torch.unsqueeze(images, axis = 0).to(self.device, dtype = torch.float)
        image_embeds = self.model.visual_encoder(image)
        image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(image.device)
        #preds = torch.Tensor([]).to(self.device)
        try:
            text = self.tokenizer(reports, padding='longest', return_tensors='pt').to(self.device) 
            output = self.model.text_encoder(text.input_ids,
                                        attention_mask = text.attention_mask, 
                                        encoder_hidden_states = image_embeds,
                                        encoder_attention_mask = image_atts,        
                                        return_dict = True
                                       )
            prediction = self.model.cls_head(output.last_hidden_state[:,0,:])
            positive_score = prediction[:,self.itm_labels['positive']]
        except:
            positive_score = torch.Tensor([0]).cuda()

        return positive_score.detach().cpu().numpy()

In [None]:
reward_model = REWARD_MODULE(
    config="configs/ITM.yaml", 
    checkpoint="/nfs/turbo/umms-vgvinodv/models/ALBEF/checkpoint_7.pth",
    input_resolution=384
)

In [None]:
from pathlib import Path
import datasets
#dataset_config = 'mimic-cxr','mimic-iii'  
#split = 'train','validate',test

def build_dataset(dataset_config, split):
    data_path = '/nfs/turbo/umms-vgvinodv/data/bioNLP23-Task-1B/data/'
    findings_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.findings.tok')
    impression_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.impression.tok')
    image_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.image.tok')


    findings = [line.strip() for line in open(findings_file_path).readlines()]
    impression = [line.strip() for line in open(impression_file_path).readlines()]
    
    def generate_image_path(line):
        return str(Path(data_path).joinpath(dataset_config).joinpath(line.strip().split(',')[0]))
    image_paths = [generate_image_path(line) for line in open(image_file_path).readlines()]

    #dataset = datasets.Dataset.from_dict({"text":findings,"summary":impression}) 
    dataset = datasets.Dataset.from_dict({"text":findings,"summary":impression,"img_path":image_paths}) 
    
    def check_img_exists(example):
        return example["img_path"].split('/')[10] != 'p10'

    dataset = dataset.filter(check_img_exists)

    return dataset

dataset = build_dataset("mimic-cxr", "train")

In [None]:
from torchvision import transforms

sample = dataset[500]
raw_image = Image.open(sample["img_path"]).convert("RGB").resize((384, 384))
# caption = "a random irrelevant caption"

normalize = transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711))
transform = transforms.Compose([
    transforms.ToTensor(),
    normalize,
])

image = transform(raw_image)

display(raw_image)

caption = sample["summary"]
print(f"\nImpression Text:\n{caption}\n")
print(reward_model.predict_itm(image,caption))

text = sample["text"]
print(f"\nFindings:\n{text}\n")
print(reward_model.predict_itm(image, text))

# Testing

In [None]:
import yaml
import numpy as np
from tqdm import tqdm
import torch
from torch import nn
import torch.nn.functional as F

from models.model_itm import ALBEF as ALBEF_itm
from models.vit import interpolate_pos_embed
from transformers import BertTokenizer
from torchvision.transforms import Compose, Normalize, Resize, InterpolationMode
from PIL import Image

class REWARD_MODULE(nn.Module):
    def __init__(self,
                config,
                checkpoint,
                input_resolution):
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') 
        self.config = yaml.load(open(config, 'r'), Loader=yaml.Loader)
        self.input_resolution = input_resolution
        self.itm_labels = {'negative':0, 'positive':2}
        
        self.load_albef_itm(checkpoint)
        
    def load_albef_itm(self, checkpoint_path):
        model = ALBEF_itm(config=self.config, 
                         text_encoder='bert-base-uncased', 
                         tokenizer=self.tokenizer
                         ).to(self.device)  
        checkpoint = torch.load(checkpoint_path, map_location='cpu') 
        state_dict = checkpoint['model']
        pos_embed_reshaped = interpolate_pos_embed(state_dict['visual_encoder.pos_embed'],model.visual_encoder)         
        state_dict['visual_encoder.pos_embed'] = pos_embed_reshaped
        msg = model.load_state_dict(state_dict,strict=False)
        model = model.eval()
        self.model = model
        
    def forward(self, image, text):
        image_embeds = self.model.visual_encoder(image)
        image_atts = torch.ones(image_embeds.size()[:-1],dtype=torch.long).to(image.device)
        
        output = self.model.text_encoder(text.input_ids,
                                    attention_mask = text.attention_mask, 
                                    encoder_hidden_states = image_embeds,
                                    encoder_attention_mask = image_atts,        
                                    return_dict = True
                                   )
        prediction = self.model.cls_head(output.last_hidden_state[:,0,:])
        positive_score = prediction[:,self.itm_labels['positive']]
        return positive_score
        

In [None]:
reward_model = REWARD_MODULE(
    config="configs/ITM.yaml", 
    checkpoint="/nfs/turbo/umms-vgvinodv/models/ALBEF/checkpoint_7.pth",
    input_resolution=384
)

In [None]:
import yaml
import numpy as np
from tqdm import tqdm
import torch

from models.model_itm import ALBEF
from models.vit import interpolate_pos_embed
from transformers import BertTokenizer
from PIL import Image

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') 
config = yaml.load(open("configs/ITM.yaml", 'r'), Loader=yaml.Loader)
input_resolution = 384
itm_labels = {'negative':0, 'positive':2}
checkpoint_path = "/nfs/turbo/umms-vgvinodv/models/ALBEF/checkpoint_7.pth"

def get_reward_model(): 
    model = ALBEF(config=config, 
                     text_encoder='bert-base-uncased', 
                     tokenizer=tokenizer
                     ).to(device)  
    checkpoint = torch.load(checkpoint_path, map_location='cpu') 
    state_dict = checkpoint['model']
    pos_embed_reshaped = interpolate_pos_embed(state_dict['visual_encoder.pos_embed'],model.visual_encoder)         
    state_dict['visual_encoder.pos_embed'] = pos_embed_reshaped
    msg = model.load_state_dict(state_dict,strict=False)
    model = model.eval()
    return model

In [None]:
from pathlib import Path
import datasets
#dataset_config = 'mimic-cxr','mimic-iii'  
#split = 'train','validate',test

def build_dataset(dataset_config, split):
    data_path = '/nfs/turbo/umms-vgvinodv/data/bioNLP23-Task-1B/data/'
    findings_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.findings.tok')
    impression_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.impression.tok')
    image_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.image.tok')


    findings = [line.strip() for line in open(findings_file_path).readlines()]
    impression = [line.strip() for line in open(impression_file_path).readlines()]
    
    def generate_image_path(line):
        return str(Path(data_path).joinpath(dataset_config).joinpath(line.strip().split(',')[0]))
    image_paths = [generate_image_path(line) for line in open(image_file_path).readlines()]

    #dataset = datasets.Dataset.from_dict({"text":findings,"summary":impression}) 
    dataset = datasets.Dataset.from_dict({"text":findings,"summary":impression,"img_path":image_paths}) 
    
    def check_img_exists(example):
        return example["img_path"].split('/')[10] != 'p10'

    dataset = dataset.filter(check_img_exists)

    return dataset

dataset = build_dataset("mimic-cxr", "train")

In [None]:
from torchvision import transforms

sample = dataset[500]
raw_image = Image.open(sample["img_path"]).convert("RGB").resize((384, 384))
# caption = "a random irrelevant caption"

normalize = transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711))
transform = transforms.Compose([
    transforms.ToTensor(),
    normalize,
])

image = transform(raw_image)

#display(raw_image)

caption = sample["summary"]
#print(f"\nImpression Text:\n{caption}\n")
#print(reward_model.predict_itm(image,caption))

text = sample["text"]
#print(f"\nFindings:\n{text}\n")
#print(reward_model.predict_itm(image, text))

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

images = [image, image]

#texts = [caption, text]
texts = [text]
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') 
text_inputs = tokenizer(texts, padding='longest', return_tensors="pt").to(device)  

#prediction = model(images,text_inputs)
prediction = model(image[None,:,:,:].cuda(),text_inputs)

In [None]:
print(prediction)

In [None]:
print(prediction[:,itm_labels['positive']].detach().cpu())

In [1]:
from pathlib import Path
import datasets
from datasets import Image
from torchvision import transforms

#dataset_config = 'mimic-cxr','mimic-iii'  
#split = 'train','validate',test
def build_dataset(dataset_config, tokenizer, split):
    def generate_image_path(line):
        return str(Path(data_path).joinpath(dataset_config).joinpath(line.strip().split(',')[0]))
    
    data_path = '/nfs/turbo/umms-vgvinodv/data/bioNLP23-Task-1B/data/'
    
    findings_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.findings.tok')
    impression_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.impression.tok')
    image_file_path = Path(data_path).joinpath(dataset_config).joinpath(split+'.image.tok')


    findings = [line.strip() for line in open(findings_file_path).readlines()]
    impression = [line.strip() for line in open(impression_file_path).readlines()]
    image_paths = [generate_image_path(line) for line in open(image_file_path).readlines()]
    
    dataset = datasets.Dataset.from_dict({"text":findings,"image":image_paths})
    
    def check_img_exists(example):
        return example["image"].split('/')[10] != 'p10'

    dataset = dataset.filter(check_img_exists)
    dataset = dataset.cast_column("image", Image())
    
    def tokenize(samples):
        input_text = ["summarize: "+text for text in samples["text"]]
        samples["input_ids"] = tokenizer(input_text)["input_ids"]
        return samples
    
    dataset = dataset.map(tokenize, batched=True, num_proc=4, remove_columns=["text"])
    
    normalize = transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711))
    transform = transforms.Compose([
        transforms.ToTensor(),
        normalize,
    ])

    def image_transforms(samples):
        samples["query"] = [transform(image.convert("RGB").resize((384,384))) for image in samples["image"]]
        return samples
    
    dataset.set_transform(image_transforms)
    
    return dataset

In [2]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("GanjinZero/biobart-base")

dataset = build_dataset("mimic-cxr", tokenizer, "train")

Filter:   0%|          | 0/125417 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/113182 [00:00<?, ? examples/s]

In [3]:
from models.reward import get_reward_model
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
reward_model = get_reward_model()

In [4]:
batch = dataset[0]
images = batch["query"].cuda()
text_inputs = batch["input_ids"]

#print(images)
#print(text_inputs)
prediction = reward_model(images.unsqueeze(0),text_inputs)

AttributeError: 'list' object has no attribute 'size'