In [None]:
# This notebook is intended to generate alt text image descriptions
# from a metadata file and a folder containing a set of images, using
# versions of the Qwen2.5-VL vision-language model. It is adapted from
# https://huggingface.co/Ertugrul/Qwen2-VL-7B-Captioner-Relaxed
# Inference can run on CPU, CUDA or Apple MLX (your mileage may vary).

In [None]:
import csv
import string

import pandas as pd
import torch
from datasets import Dataset
from PIL import Image
from qwen_vl_utils import process_vision_info
from tabulate import tabulate
from transformers import (
    AutoProcessor,
    Qwen2_5_VLForConditionalGeneration,
)

# For Apple ARM/MLX
if torch.backends.mps.is_available():
    from mlx_vlm import generate, load
    from mlx_vlm.prompt_utils import apply_chat_template
    from mlx_vlm.utils import load_config

    # model_id = "mlx-community/Qwen2.5-VL-7B-Instruct-8bit"
    # We'll use a cloned version of the model with a slightly tweaked processor_config.json file
    # instead, due to a bug that causes the MLX-LVM version of the model to inherit a bad setting.
    # (This bug may eventually be fixed in the upstream version)
    # Specifically, the "image_processor_type" should be "Qwen2VLImageProcessor"
    # ("AutoImageProcessor" probably would work, too) instead of "Qwen2_5_VLImageProcessor"
    # see https://huggingface.co/Qwen/Qwen2.5-VL-7B-Instruct/discussions/24
    model_id = "broadwell/Qwen2.5-VL-7B-Instruct-8bit-fixed-config"
    torch.set_default_device("mps:0")
    device = "mps:0"
else:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_id = "Qwen/Qwen2.5-VL-7B-Instruct"

# The metadata file can be tab-separated (.txt) or comma-separated (.csv)
# Every line must contain the following:
# - filename (the path to the image file)
# - caption (the original caption of the image; can be an empty string)
# - permalink (link to the image's original page; can be an empty string)
# - place (the place associated with the image; can be an empty string)
# - date (can be an empty string)
# If the metadata includes an *image_url* column, this is used in place of the
# local *filename* field to display the image in the HTML of the results table

# The metadata file and a folder containing all of the images to the captioned
# must be provided and specified here
metadata_csv = "datasets/next-spotlight/next-SPOTLIGHT-metadata.csv"
images_dir = "datasets/next-spotlight/images"

# metadata_csv = "../captions/loc_images_metadata.txt"
# images_dir = "../ma/japanese_images"

MAX_RECORDS = -1  # Set to -1 to process all records

PROMPT = """Please write a caption in English for this image that can be used as alt text on a website and is at most 125 characters long. Please do not mention proper names (persons, locations) unless they are included in the following information, which you may use when writing the text: """
UNPROMPT = """Please write a caption in English for this image that can be used as alt text on a website and is at most 125 characters long. Please do not mention proper names (persons, locations) unless contained in the picture itself."""

In [None]:
prompted_inputs = []
unprompted_inputs = []

with open(metadata_csv, mode="r", encoding="utf-8") as data_file:
    if metadata_csv.endswith(".csv"):
        csvFile = csv.DictReader(data_file)
    else:
        csvFile = csv.DictReader(data_file, delimiter="\t", quoting=csv.QUOTE_NONE)
    for l, line in enumerate(csvFile):
        if MAX_RECORDS > 0 and l >= MAX_RECORDS:
            break
        extra_metadata = []
        if line["place"].strip() != "":
            extra_metadata.append(
                "The location of the image is " + line["place"].strip() + "."
            )
        if line["date"].strip() != "":
            extra_metadata.append(
                "The approximate date of the image is " + line["date"].strip() + "."
            )
        if line["caption"].strip() != "":
            extra_metadata.append(
                "A factual description of the image is the following: "
                + line["caption"].strip()
            )

        prompt = PROMPT + " ".join(extra_metadata)

        desc = line["caption"]

        img = Image.open(images_dir + "/" + line["filename"].replace("tif", "jpg"))

        prompted_inputs.append(
            {
                "file": line["filename"],
                "image": img.copy(),
                "query": prompt,
                "label": [desc.strip()],
            }
        )
        unprompted_inputs.append(
            {
                "file": line["filename"],
                "image": img.copy(),
                "query": UNPROMPT,
                "label": [desc.strip()],
            }
        )

        img.close()

In [None]:
if metadata_csv.endswith(".csv"):
    df = pd.read_csv(metadata_csv, sep=",")
else:
    df = pd.read_csv(metadata_csv, sep="\t")

In [None]:
system_message = """You are a Vision Language Model specialized in generating descriptive alt text for images on websites.
Your task is to analyze the provided image and produce a concise description of the image, based on the visual information in the image and any additional data provided in the prompt.
Do not provide any critical or positive evaluation of the image and its aesthetic qualities.
Do not provide the names of people, places or works of art related to the image unless they are present in the prompt."""
# The description should be no longer than 125 characters. -- This is better placed in the instruction/prompt.

In [None]:
def format_data(sample):
    return [
        {
            "role": "system",
            "content": [{"type": "text", "text": system_message}],
        },
        {
            "role": "user",
            "content": [
                {"type": "image", "image": sample["image"]},
                {"type": "text", "text": sample["query"]},
            ],
        },
        {
            "role": "assistant",
            "content": [{"type": "text", "text": sample["label"][0]}],
        },
        {
            "role": "metadata",
            "content": [{"type": "text", "text": sample["file"]}],
        },
    ]

In [None]:
prompted_dataset = [
    format_data(sample) for sample in Dataset.from_list(prompted_inputs)
]
unprompted_dataset = [
    format_data(sample) for sample in Dataset.from_list(unprompted_inputs)
]

In [None]:
# A sample of he conditioned prompt data:
prompted_dataset[6]

In [None]:
# A sample of the "unconditioned" prompt data:
unprompted_dataset[6]

In [None]:
# An image in the input to the captioning model
prompted_dataset[6][1]["content"][0]["image"]

In [None]:
if torch.backends.mps.is_available():
    model, processor = load(model_id)
    config = load_config(model_id)
else:
    model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
        model_id, torch_dtype=torch.bfloat16, device_map="auto"
    )
    processor = AutoProcessor.from_pretrained(model_id)
    config = None

In [None]:
def generate_text_from_sample(
    model,
    processor,
    sample,
    device="cuda",
    config=None,
    max_new_tokens=1024,
):
    if device == "mps:0":
        images = [images_dir + "/" + sample[3]["content"][0]["text"]]
        formatted_prompt = apply_chat_template(
            processor, config, sample[:2], num_images=len(images)
        )
        output = generate(model, processor, formatted_prompt, images, verbose=False)

        return output

    else:
        # Prepare the text input by applying the chat template
        text_input = processor.apply_chat_template(
            sample[
                :2
            ],  # 1:2 would be just the sample (and prompt) without the system message
            tokenize=False,
            add_generation_prompt=True,
        )

        # Process the visual input from the sample
        image_inputs, _ = process_vision_info(sample)

        # Prepare the inputs for the model
        model_inputs = processor(
            text=[text_input],
            images=image_inputs,
            return_tensors="pt",
        ).to(device)  # Move inputs to the specified device

        # Generate text with the model
        generated_ids = model.generate(**model_inputs, max_new_tokens=max_new_tokens)

        # Trim the generated token ids to remove the input ids
        trimmed_generated_ids = [
            out_ids[len(in_ids) :]
            for in_ids, out_ids in zip(model_inputs.input_ids, generated_ids)
        ]

        # Decode the output text
        output_text = processor.batch_decode(
            trimmed_generated_ids,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False,
        )

        return output_text[0]  # Return the first decoded output text

In [None]:
captioning_results = open("output_captions.txt", "w", encoding="utf-8")

captioning_results.write(
    "\t".join(["filename", "path", "permalink", "human", "collaborative", "model"])
    + "\n"
)

for i, item in enumerate(unprompted_dataset):
    output = (
        generate_text_from_sample(model, processor, prompted_dataset[i], device, config)
        .replace('"', "")
        .replace("\n", " ")
        .replace("\t", " ")
    )
    imageonlyoutput = (
        generate_text_from_sample(model, processor, item, device, config)
        .replace('"', "")
        .replace("\n", " ")
        .replace("\t", " ")
    )
    filename = item[3]["content"][0]["text"]
    record = df[df["filename"] == filename]
    desc = item[2]["content"][0]["text"].replace("\n", " ").replace("\t", " ")
    print(
        f"{filename} | {desc} | {output} ({str(len(output))} chars) | {imageonlyoutput} ({str(len(imageonlyoutput))} chars)"
    )
    if "image_url" in df.columns:
        path = record["image_url"].item()
    else:
        path = "images/" + filename.replace("tif", "jpg")

    captioning_results.write(
        "\t".join(
            [filename, path, record["permalink"].item(), desc, output, imageonlyoutput]
        )
        + "\n"
    )

captioning_results.close()

In [None]:
presentation_results = []

human_color = "blue"
model_color = "red"
combined_color = "purple"


# Remove punctuation from a word
def clean_word(word):
    return "".join([char for char in word if char not in string.punctuation])


# Remove punctuation from all words in a list and return the "cleaned" words as a set
def get_clean_words(words):
    clean = set()
    for word in words:
        term = clean_word(word)
        clean.add(term)
    return clean


def colorize_string(s, human_terms, model_terms, combined_terms):
    colorized_words = []
    for word in s.split():
        clean = clean_word(word)
        word_color = None
        if clean in combined_terms:
            word_color = combined_color
        elif clean in human_terms:
            word_color = human_color
        elif clean in model_terms:
            word_color = model_color

        if word_color is not None:
            colorized_words.append(f"<span style='color: {word_color}'>{word}</span>")
        else:
            colorized_words.append(word)
    return " ".join(colorized_words)


with open("output_captions.txt", "r", encoding="utf-8") as captioning_results:
    for line in captioning_results:
        filename, path, permalink, desc, output, imageonlyoutput = line.strip().split(
            "\t"
        )
        if filename == "filename":
            continue

        human_terms = get_clean_words(desc.split())
        model_terms = get_clean_words(imageonlyoutput.split())
        combined_terms = human_terms.intersection(model_terms)

        human_colorized = f"<span style='color: {human_color}'>{desc}</span>"
        model_colorized = f"<span style='color: {model_color}'>{imageonlyoutput}</span>"
        combined_colorized = colorize_string(
            output, human_terms, model_terms, combined_terms
        )

        presentation_results.append(
            {
                "Image": '<a target="_blank" href="'
                + permalink
                + '"> <img style = "width:400px;" src="'
                + path
                + '"/ ></a>',
                "Original Caption": f"🧑‍🏫 {human_colorized}",
                "Collaborative Caption": f"🤖🤝🧑‍🏫 {combined_colorized} ({str(len(output))} chars)",
                "Qwen2.5-VL-7B": f"🤖 {model_colorized} ({str(len(imageonlyoutput))} chars)",
            }
        )

presentation_df = pd.DataFrame(presentation_results)
results_table = tabulate(presentation_df, tablefmt="unsafehtml", headers="keys")
print(results_table)
results_html = f"""
<!DOCTYPE html>
<html>
  <head>
    <meta charset="UTF-8">
    <style>
      table, th, td {{
        border: 1px solid;
      }}
    </style>
  </head>
{results_table}
</html>
"""
with open("results.html", "w", encoding="utf-8") as html_results:
    html_results.write(results_html)

<table>
<thead>
<tr><th style="text-align: right;">  </th><th>Image                                                                                                                                                                                              </th><th>Original Caption                                                                                                                                                                                                                                                                                                                                                                </th><th>Collaborative Caption                                                                                                                                                                                                                                                                                                                                                