
In this experiment, we fine-tune the multi-modal IDEFICS Vision Language Model, following the example put together by Nitan Tiwari (https://github.com/NSTiwari/Fine-tune-IDEFICS-Vision-Language-Model)

Working this through, I created training data by hand. But this is also an situation where a larger model like Gemini 2 might help you generate training data; you could give Gemini 2 an image of eg a context sheet and the following prompt:

system prompt: You generate high-quality training data for image question and answering from scans of archaeological context sheets. It is important to work with highquality extracted text. The meaning of the texts in the scans can be deduced from which boxes the text was entered. Stratigraphic relationships can be inferred by the placement of context numbers in the rows of boxes; the present context will always be in the centre, and other contexts will be entered in stratigraphic relationship above or below as appropriate. Data should be returned in csv format: query, answer. Like so: What are the general easting and northing for this site?,"['443281.71','258449.217']". For each image provided, generate a dozen highquality questions and answers.

It's worth exploring.

#### Step 1: Install libraries and dependencies.

In [None]:
!pip install -q git+https://github.com/huggingface/transformers.git
!pip install -q accelerate datasets peft bitsandbytes

In [None]:
# drag and drop a zip with your images into the file tray.
!unzip your-images.zip -d training_images

### Step 1. Make Your Dataset

Drag and drop your csv into the file tray. You should have:

```
|-training_images
    |-image1.jpg
    |-image2.jpg
    |-etc
|-qa_pairs.csv
```

In [None]:
from datasets import Dataset, DatasetDict, Image
import pandas as pd
import os

TRAIN_SAMPLES = 1000
TEST_SAMPLES = 200
TEST_SIZE = 0.166

# Define the directory containing the images.
train_images_directory = '/content/content/training_images'
test_images_directory = '/content/test/'

# Read the CSV Q&A text.

qa_text = pd.read_csv('/content/qa_pairs.csv') #remember, 3 columns id,query,answer


# Get the list of ids from the csv, which we'll use to match filenames
ids_from_csv = qa_text['id'].tolist()

print(ids_from_csv)

# Create a mapping between ids from csv and filenames
image_paths = []
for file_id in ids_from_csv:
    # Try to find the image in the training directory.
    image_path = os.path.join(train_images_directory, f'{file_id}.jpg')
    if os.path.exists(image_path):
        image_paths.append(image_path)
        continue #move on to next id
    # if it's not found in the training directory, check the test directory
    image_path = os.path.join(test_images_directory, f'{file_id}.jpg')
    if os.path.exists(image_path):
        image_paths.append(image_path)
        continue #move on to next id
    # Special case: check for summary.png in training directory
    if file_id == "summary":
         image_path = os.path.join(train_images_directory, f'summary.jpg')
         if os.path.exists(image_path):
            image_paths.append(image_path)
            continue #move on to next id
    # If none of the above, raise error
    raise ValueError(f"Could not find a relevant image file for {file_id} from csv id")

# Create a list of other columns such as id, query, and answer.
ids = ids_from_csv
#queries = qa_text['query'].tolist()
queries = qa_text['query'].tolist()
answers = qa_text['answers'].tolist()

# Create the dataset dictionary
dataset_dict = {
    'id': ids,
    'image': image_paths,
    'query': queries,
    'answers': answers
}

# Create the dataset.
dataset = Dataset.from_dict(dataset_dict)

# Cast the 'image' column to Image type.
dataset = dataset.cast_column("image", Image())

# Split the dataset into train and test.
split_dataset = dataset.train_test_split(test_size=TEST_SIZE, shuffle=False)

print(split_dataset)



#### Step 2: Push the dataset on Hugging Face Hub (optional)

In [None]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
# change your-username!
split_dataset.push_to_hub("your-username/DocumentIDEFICS_QA_archae_test")

#### Step 3: Load the dataset

In [None]:
from datasets import load_dataset

train_dataset = load_dataset("your-username/DocumentIDEFICS_QA_archae_test", split="train")
eval_dataset = load_dataset("your-username/DocumentIDEFICS_QA_archae_test", split="test")


Inspect the training data

In [None]:
# this will return the first row of your training data set
train_dataset[0]

In [None]:
# this will show you the first image in that first row
train_dataset[0]['image']

#### Step 4: Configure LoRA adapters

This is a particular fine-tuning strategy. Don't touch these cells. Run them, but don't modify them.

In [None]:
import torch
from peft import LoraConfig
from transformers import AutoProcessor, BitsAndBytesConfig, Idefics2ForConditionalGeneration

DEVICE = "cuda:0"
USE_LORA = False
USE_QLORA = True

processor = AutoProcessor.from_pretrained(
    "HuggingFaceM4/idefics2-8b",
    do_image_splitting=False
)

In [None]:
if USE_QLORA or USE_LORA:
    lora_config = LoraConfig(
        r=8,
        lora_alpha=8,
        lora_dropout=0.1,
        target_modules='.*(text_model|modality_projection|perceiver_resampler).*(down_proj|gate_proj|up_proj|k_proj|q_proj|v_proj|o_proj).*$',
        use_dora=False if USE_QLORA else True,
        init_lora_weights="gaussian"
    )
    if USE_QLORA:
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.float16
        )
    model = Idefics2ForConditionalGeneration.from_pretrained(
        "HuggingFaceM4/idefics2-8b",
        torch_dtype=torch.float16,
        quantization_config=bnb_config if USE_QLORA else None,
    )
    model.add_adapter(lora_config)
    model.enable_adapters()
else:
    model = Idefics2ForConditionalGeneration.from_pretrained(
        "HuggingFaceM4/idefics2-8b",
        torch_dtype=torch.float16,
        _attn_implementation="flash_attention_2", # This works for A100 or H100
    ).to(DEVICE)

#### Step 5: Create Data Collator for IDEFICS2 format.

In [None]:
import random
import ast

class MyDataCollator:
    def __init__(self, processor):
        self.processor = processor

    def __call__(self, examples):
        texts = []
        images = []
        for example in examples:
            image = example["image"].convert("RGB") # my images are greyscale
            question = example["query"]
            # Check if the answer is already a list or needs to be converted
            answer = example["answers"]
            # if the answer is an integer, wrap it in a list to make random.choice work
            if isinstance(answer, int):
                answer = [answer]
            elif isinstance(answer, str):
                answer = ast.literal_eval(answer)
            if not isinstance(answer, list):
                answer = [answer] # handles the case where answer is a single int or str

            #Now the answer should always be a list
            answer = random.choice(answer)

            messages = [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Answer briefly."},
                        {"type": "image"},
                        {"type": "text", "text": question}
                    ]
                },
                {
                    "role": "assistant",
                    "content": [
                        {"type": "text", "text": answer}
                    ]
                }
            ]
            text = processor.apply_chat_template(
                messages, add_generation_prompt=False)
            texts.append(text.strip())
            images.append([image])

        batch = processor(text=texts, images=images,
                          return_tensors="pt", padding=True)
        labels = batch.input_ids.clone()
        labels[labels == processor.tokenizer.pad_token_id] = -100
        labels[labels == model.config.image_token_id] = -100
        batch["labels"] = labels
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        return batch

data_collator = MyDataCollator(processor)

#### Step 6: Setup training parameters

In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir = "IDEFICS_DocVQA_1",
    learning_rate = 2e-4,
    fp16 = True,
    per_device_train_batch_size = 2,
    per_device_eval_batch_size = 2,
    gradient_accumulation_steps = 8,
    dataloader_pin_memory = False,
    save_total_limit = 3,
    evaluation_strategy ="steps",
    save_strategy = "steps",
    eval_steps = 10,
    save_steps = 25,
    max_steps = 60,
    logging_steps = 5,
    remove_unused_columns = False,
    push_to_hub=False,
    label_names = ["labels"],
    load_best_model_at_end = False,
    report_to = "none",
    optim = "paged_adamw_8bit",
)




In [None]:
trainer = Trainer(
    model = model,
    args = training_args,
    data_collator = data_collator,
    train_dataset = train_dataset,
    eval_dataset = eval_dataset
)

#### Step 7: Start Training

In [None]:
trainer.train()

#### Step 8: Evaluate the model

In [None]:
eval_dataset

In [None]:
test_example = eval_dataset[3] # ie, the 4th record in the data held back for testing
test_example

In [None]:
test_example["image"]

In [None]:
model.eval()

image = test_example["image"]
query = test_example["query"]
print(query)

messages = [
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "Give your best answer."},
            {"type": "image"},
            {"type": "text", "text": query}
        ]
    }
]


text = processor.apply_chat_template(messages, add_generation_prompt=True)
inputs = processor(text=[text.strip()], images=[image], return_tensors="pt", padding=True)

# Move inputs to the same device as the model
inputs = inputs.to(DEVICE)  # This line is added

generated_ids = model.generate(**inputs, max_new_tokens=64)
generated_texts = processor.batch_decode(generated_ids[:, inputs["input_ids"].size(1):], skip_special_tokens=True)
print(generated_texts)

#### Step 9: Push the model on Hugging Face

In [None]:
#push the complete model:
from transformers import IdeficsForVisionText2Text, AutoProcessor
from huggingface_hub import whoami, upload_folder, create_repo

# First, save the model and processor properly
def save_model(model, processor, output_dir):
    """Save the model and processor with all necessary files"""
    model.save_pretrained(output_dir)
    processor.save_pretrained(output_dir)

# Then push to Hugging Face
output_dir = "IDEFICS_DocVQA_1"
repo_name = "IDEFICS2-DocVQA-fine-tuned-PALP"

# Assuming you have your fine-tuned model in a variable called 'model'
# and processor in a variable called 'processor'
save_model(model, processor, output_dir)

# Now push to Hugging Face
username = whoami()["name"] # you don't have to specify your username this time, since it's reading it from your token
repo_id = f"{username}/{repo_name}"

# Create or get existing repo
repo_id = create_repo(
    repo_id=repo_id,
    exist_ok=True,
    private=False
).repo_id

# Upload the complete model
uploaded_files = upload_folder(
    repo_id=repo_id,
    folder_path=output_dir,
    commit_message="Pushing complete fine-tuned model with all necessary files"
)

print(f"Successfully uploaded to: https://huggingface.co/{repo_id}")


In [None]:
from huggingface_hub import whoami
from pathlib import Path

# Output directory.
output_dir = "IDEFICS_DocVQA_1"
repo_name = "IDEFICS2-DocVQA-fine-tuned-PALP"
username = whoami(token=Path("/root/.cache/huggingface/"))["name"]
repo_id = f"{username}/{repo_name}"

In [None]:
from huggingface_hub import upload_folder, create_repo

repo_id = create_repo(repo_id, exist_ok=True).repo_id


upload_folder(
    repo_id=repo_id,
    folder_path=output_dir,
    commit_message="Pushed the IDEFICS2 fine-tuned model on some archae context sheets just to figure out the workflow.",
    ignore_patterns=["step_*", "epoch_*"],
)

## Testing things out on more data

Below, we load some images the model hasn't seen, and then we try them out.

In [None]:
!unzip random-images.zip

In [None]:
#Mac users, if you see a bunch of filenames starting with a period, we can clean
#those out with this code:
#remove dot files in the folder random-images

import os

def remove_dot_files(directory):
  for filename in os.listdir(directory):
    if filename.startswith('.'):
      filepath = os.path.join(directory, filename)
      try:
        if os.path.isfile(filepath):
          os.remove(filepath)
        elif os.path.isdir(filepath):
          # Handle directories if needed (e.g., recursively remove contents)
          # For simplicity, this example only removes files.
          pass
        print(f"Removed: {filepath}")
      except OSError as e:
        print(f"Error removing {filepath}: {e}")

remove_dot_files("random-images")

In [None]:
from PIL import Image #Import from PIL, not datasets


## a function to let you ask one question at a time of the images.
## ugly, but ok

def process_new_images(model, processor, image_dir):
    for filename in os.listdir(image_dir):
        if filename.endswith(".jpg"):  ## pay attention to file endings! Should modify this for multiple file types
            filepath = os.path.join(image_dir, filename)
            try:
                # Correctly open the image using PIL
                image = Image.open(filepath).convert("RGB")
                query = input(f"Enter question for {filename}: ")  #Get question from user
                messages = [
                    {"role": "user", "content": [{"type": "text", "text": "Give your best answer."}, {"type": "image"}, {"type": "text", "text": query}]}
                ]
                text = processor.apply_chat_template(messages, add_generation_prompt=True)
                inputs = processor(text=[text.strip()], images=[image], return_tensors="pt", padding=True).to(DEVICE)
                generated_ids = model.generate(**inputs, max_new_tokens=64)
                generated_texts = processor.batch_decode(generated_ids[:, inputs["input_ids"].size(1):], skip_special_tokens=True)
                print(f"Answer for {filename}: {generated_texts[0]}")
            except FileNotFoundError:
                print(f"Error: Image file not found: {filepath}")
            except Exception as e:
                print(f"Error processing {filename}: {e}")

#


In [None]:
# When you run this, it will iterate over each file in your random-images folder
# giving you a query box into which you may type your question
# ie, What is the nature of the soil?
process_new_images(model, processor, "/content/random-images")

In [None]:
## a function to ask the same question of every image in a folder

def process_images(model, processor, image_dir, question):
    all_answers = {} #Use a dictionary to store answers with filenames as keys.
    for filename in os.listdir(image_dir):
        if filename.endswith(".jpg"): ### file endings!!
            filepath = os.path.join(image_dir, filename)
            try:
                image = Image.open(filepath).convert("RGB")
                messages = [
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": f"Give your best answer to the following question: '{question}'"},
                            {"type": "image"},
                        ]
                    }
                ]
                text = processor.apply_chat_template(messages, add_generation_prompt=True)
                inputs = processor(text=[text.strip()], images=[image], return_tensors="pt", padding=True).to(DEVICE)
                generated_ids = model.generate(**inputs, max_new_tokens=64)
                generated_texts = processor.batch_decode(generated_ids[:, inputs["input_ids"].size(1):], skip_special_tokens=True)
                all_answers[filename] = generated_texts[0] #Store answer with filename
            except Exception as e:
                print(f"Error processing {filename}: {e}")
                all_answers[filename] = f"Error processing {filename}: {e}" #Store error message

    return all_answers



In [None]:
# do it
image_directory = "/content/random-images"
question_to_ask = "Identify the stratigraphic relationships present"
individual_answers = process_images(model, processor, image_directory, question_to_ask)

# Print the answers
for filename, answer in individual_answers.items():
    print(f"\nImage: {filename}\nAnswer: {answer}")