In [None]:
#!pip install -q git+https://github.com/huggingface/peft.git transformers datasets

In [None]:
import torch
import random
import numpy as np
import os

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed(42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name()}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

In [None]:
from datasets import Dataset, Image
import json
from PIL import Image as PILImage


def load_custom_dataset(jsonl_path, base_dir):
    """
    Custom function to load the dataset with full control
    """
    data = [] 
    with open(jsonl_path, 'r') as f:
        for line in f:
            item = json.loads(line.strip())
            # Make sure image paths are absolute
            item['image'] = os.path.join(base_dir, item['image'])
            data.append(item)
    
    dataset = Dataset.from_list(data)
    dataset = dataset.cast_column('image', Image())
    
    return dataset

dataset_path = '/kaggle/input/polanecvlm'
jsonl_path = os.path.join(dataset_path, 'dataset.jsonl')

dataset = load_custom_dataset(jsonl_path, dataset_path)


print(dataset[0]['text'])   
print(dataset[0]['image'])
dataset[0]['image']  

In [None]:
from torch.utils.data import Dataset, DataLoader

PROMPT = "Question: What kind of anomaly is on this object? Answer:"

class ImageDataset(Dataset):
    def __init__(self, dataset, processor, prompt = PROMPT):
        self.dataset = dataset
        self.processor = processor
        self.prompt = prompt

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        
        encoding = self.processor(images=item["image"], padding="max_length", return_tensors="pt")
        encoding = {k: v.squeeze() for k, v in encoding.items()}
        
        encoding["text"] = self.prompt + item["text"]
        
        return encoding

def collate_fn(batch):
    
    processed_batch = {}
    for key in batch[0].keys():
        if key != "text":
            processed_batch[key] = torch.stack([example[key] for example in batch])
        else:
            text_inputs = processor.tokenizer(
                [example["text"] for example in batch], padding=True, return_tensors="pt"
            )
            processed_batch["input_ids"] = text_inputs["input_ids"]
            processed_batch["attention_mask"] = text_inputs["attention_mask"]
    return processed_batch
    

In [None]:
from transformers import AutoProcessor, Blip2ForConditionalGeneration, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model

processor = AutoProcessor.from_pretrained(
                "Salesforce/blip2-opt-2.7b",
                use_fast=True)

model = Blip2ForConditionalGeneration.from_pretrained(
                "Salesforce/blip2-opt-2.7b", 
                device_map = "auto", 
                torch_dtype = torch.float16)

lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "fc1", "fc2"]
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

In [None]:
dataset_split = dataset.train_test_split(test_size=0.2, seed=42)

train_split_dataset = dataset_split["train"]
test_split_dataset  = dataset_split["test"]

train_dataset = ImageDataset(train_split_dataset, processor)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=4, collate_fn=collate_fn)

test_dataset = ImageDataset(test_split_dataset, processor)
test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=1, collate_fn=collate_fn)

print('Dataloaders setup!')

In [None]:
import torch
from tqdm import tqdm 
from transformers import get_scheduler

NUM_EPOCHS = 10
LEARNING_RATE = 1e-4


optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)

num_training_steps = NUM_EPOCHS * len(train_dataloader)
warmup_steps = 0.03 * num_training_steps
lr_scheduler = get_scheduler(
    name="linear",
    optimizer=optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=num_training_steps,
)

device = "cuda" if torch.cuda.is_available() else "cpu"

training_losses = []

model.train()
for epoch in range(NUM_EPOCHS):

    progress_bar = tqdm(train_dataloader, desc=f'Training Epoch {epoch+1}/{NUM_EPOCHS}')
    
    epoch_losses = []
    for idx, batch in enumerate(progress_bar):

        input_ids = batch.pop("input_ids").to(device)
        pixel_values = batch.pop("pixel_values").to(device, torch.float16)
        attention_mask = batch.pop("attention_mask").to(device)
    
        outputs = model(input_ids=input_ids,
                        attention_mask=attention_mask,
                        pixel_values=pixel_values,
                        labels=input_ids)
        
        loss = outputs.loss
        epoch_losses.append(loss.item())
    
        loss.backward()
   
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

        progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})

        
    avg_epoch_loss = sum(epoch_losses) / len(epoch_losses)
    training_losses.append(avg_epoch_loss)
    print(f'Epoch {epoch + 1} Average Loss: {avg_epoch_loss:.4f}')

print("Training completed!")
print(f"Final average loss: {training_losses[-1]:.4f}")

# Inference

In [None]:
index = 10

example_text = test_split_dataset[index]['text']
example_image = test_split_dataset[index]['image']

print(example_text)   
example_image

## Saving model on HuggingFace

```python
from huggingface_hub import notebook_login
notebook_login()
```
---
```python
model.push_to_hub("tonipol/blip2-opt-2.7b-anomaly-detection-description")
```

## How to load from HuggingFace

```python
from transformers import Blip2ForConditionalGeneration, AutoProcessor
from peft import PeftModel, PeftConfig

peft_model_id = "tonipol/blip2-opt-2.7b-anomaly-detection-description"
config = PeftConfig.from_pretrained(peft_model_id)

model = Blip2ForConditionalGeneration.from_pretrained(config.base_model_name_or_path, device_map="auto")
model = PeftModel.from_pretrained(model, peft_model_id)

processor = AutoProcessor.from_pretrained("Salesforce/blip2-opt-2.7b")
```

## How to generate captions

```python
inputs = processor(images=img, text=PROMPT, return_tensors="pt").to(device, torch.float16)
pixel_values = inputs.pixel_values

generated_ids = model.generate(
    pixel_values=pixel_values, 
    max_length=50,
    temperature=0.7,  
    top_p=0.8,
    do_sample=True
    )
generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

```