In [2]:
import os
import numpy as np
from pathlib import Path
from typing import List, cast
from pdf2image import convert_from_path
from PIL import Image
import torch
from torch import nn
from transformers import LlamaTokenizerFast, PaliGemmaProcessor
from transformers.models.paligemma.configuration_paligemma import PaliGemmaConfig
from transformers.models.paligemma.modeling_paligemma import PaliGemmaForConditionalGeneration, PaliGemmaPreTrainedModel
import os

## Define ColPali model

Chức năng chính của lớp này là tạo ra các biểu diễn đầu ra (embeddings) đã được chiếu (projected embeddings) và chuẩn hoá.

In [3]:
class ColPali(PaliGemmaPreTrainedModel):
    def __init__(self, config: PaliGemmaConfig):
        super(ColPali, self).__init__(config=config)
        self.model = PaliGemmaForConditionalGeneration(config)
        self.dim = 128
        self.custom_text_proj = nn.Linear(self.model.config.text_config.hidden_size, self.dim)
        self.main_input_name = "doc_input_ids"

    def forward(self, *args, **kwargs) -> torch.Tensor:
        outputs = self.model(*args, output_hidden_states=True, **kwargs)
        last_hidden_states = outputs.hidden_states[-1]
        proj = self.custom_text_proj(last_hidden_states)
        proj = proj / proj.norm(dim=-1, keepdim=True)
        '''
        attention_mask ban đầu có dạng [batch_size, sequence_length], tức là một ma trận 2 chiều. 
        Mỗi giá trị trong attention_mask tương ứng với một token trong chuỗi đầu vào.
        Phép .unsqueeze(-1) thêm một chiều mới ở vị trí cuối cùng, biến đổi attention_mask từ [batch_size, sequence_length] thành [batch_size, sequence_length, 1]. 
        Điều này giúp cho phép nhân với tensor proj (kích thước [batch_size, sequence_length, self.dim]) có thể thực hiện được.
        
        Khi đó, phép nhân giữa proj và attention_mask.unsqueeze(-1) sẽ chỉ giữ lại các embedding của các token thực (với mask là 1) 
        và làm bằng 0 các embedding của token padding (với mask là 0).
        '''
        proj = proj * kwargs["attention_mask"].unsqueeze(-1)
        return proj

## Define input classes

- ColPaliTextInput: Được dùng để chứa dữ liệu đầu vào dạng văn bản (text), với các thành phần chính là input_ids (mã token) và attention_mask (mặt nạ).

- ColPaliImageInput: Được dùng để chứa dữ liệu đầu vào là văn bản kèm với hình ảnh. Ngoài input_ids và attention_mask, lớp này còn chứa một tensor pixel_values để lưu thông tin pixel của ảnh.

- Cả hai lớp đều có phương thức to(device), cho phép dễ dàng di chuyển dữ liệu sang các thiết bị khác nhau, như GPU hoặc CPU, giúp tối ưu hóa quá trình xử lý.

In [4]:
class ColPaliTextInput:
    def __init__(self, input_ids, attention_mask):
        self.input_ids = input_ids
        self.attention_mask = attention_mask

    def to(self, device):
        return ColPaliTextInput(
            input_ids=self.input_ids.to(device),
            attention_mask=self.attention_mask.to(device),
        )

class ColPaliImageInput:
    def __init__(self, input_ids, pixel_values, attention_mask):
        self.input_ids = input_ids
        self.pixel_values = pixel_values
        self.attention_mask = attention_mask

    def to(self, device):
        return ColPaliImageInput(
            input_ids=self.input_ids.to(device),
            pixel_values=self.pixel_values.to(device),
            attention_mask=self.attention_mask.to(device),
        )

## Define ColPali Processor

In [5]:
class ColPaliProcessor:
    def __init__(self, processor: PaliGemmaProcessor):
        self.processor = processor
        self.tokenizer = cast(LlamaTokenizerFast, self.processor.tokenizer)

    @staticmethod
    def from_pretrained(model_name: str) -> 'ColPaliProcessor':
        return ColPaliProcessor(processor=PaliGemmaProcessor.from_pretrained(model_name))

    def process_text(self, text: str | List[str], padding: str = "longest", return_tensors: str = "pt", add_special_tokens: bool = True) -> ColPaliTextInput:
        if add_special_tokens:
            if isinstance(text, str):
                text = self.tokenizer.bos_token + text + "\n"
            elif isinstance(text, list):
                text = [self.tokenizer.bos_token + t + "\n" for t in text]
            else:
                raise ValueError("text must be a string or a list of strings.")

        batch_output = self.tokenizer(text, padding=padding, return_tensors=return_tensors, add_special_tokens=add_special_tokens)

        return ColPaliTextInput(
            input_ids=batch_output["input_ids"],
            attention_mask=batch_output["attention_mask"],
        )

    def process_image(self, image: Image.Image | List[Image.Image], padding: str = "longest", do_convert_rgb: bool = True, return_tensors: str = "pt", add_special_prompt: bool = True) -> ColPaliImageInput:
        special_prompt = "Describe the image." if add_special_prompt else None
        if isinstance(image, Image.Image):
            text_input = [special_prompt]
        elif isinstance(image, list):
            text_input = [special_prompt] * len(image)
        else:
            raise ValueError("image must be a PIL Image or a list of PIL Images.")

        batch_output = self.processor(
            text=text_input,
            images=image,
            padding=padding,
            do_convert_rgb=do_convert_rgb,
            return_tensors=return_tensors,
        )

        return ColPaliImageInput(
            input_ids=batch_output["input_ids"],
            pixel_values=batch_output["pixel_values"],
            attention_mask=batch_output["attention_mask"],
        )

    def decode(self, *args, **kwargs):
        return self.tokenizer.decode(*args, **kwargs)

    def batch_decode(self, *args, **kwargs):
        return self.tokenizer.batch_decode(*args, **kwargs)

## Helper functions
- convert_pdf_to_images: Chuyển đổi từng tệp PDF thành hình ảnh và lưu trữ chúng.
- process_pdfs_with_colpali: Xử lý từng tệp PDF bằng mô hình, lấy embedding của từng trang, và lưu kết quả vào các tệp .npy

In [6]:
def convert_pdf_to_images(pdf_file: str, save_folder: str) -> List[Image.Image]:
    images = convert_from_path(pdf_file)
    os.makedirs(save_folder, exist_ok=True)
    saved_images = []
    for i, image in enumerate(images):
        image_path = os.path.join(save_folder, f"page_{i+1}.jpg")
        image.save(image_path, "JPEG")
        saved_images.append(Image.open(image_path))
    return saved_images

def process_pdfs_with_colpali(pdf_files, output_dir, model, processor):
    all_embeddings = []
    all_page_info = []

    for pdf_file in pdf_files:
        pdf_images = convert_pdf_to_images(pdf_file, os.path.join(output_dir, "pdf_images"))

        for page_num, image in enumerate(pdf_images):
            image_input = processor.process_image(image).to(model.device)
            with torch.no_grad():
                page_embedding = model(**vars(image_input))

            all_embeddings.append(page_embedding.cpu().numpy().squeeze()) # the last .squeeze() call removes the batch dimension
            all_page_info.append({"pdf": pdf_file, "page": page_num})

    embeddings_array = np.array(all_embeddings)

    np.save(Path(output_dir) / "embeddings.npy", embeddings_array)
    np.save(Path(output_dir) / "page_info.npy", all_page_info)

    return embeddings_array, all_page_info


In [7]:
def answer_query_with_colpali(query, embeddings_array, page_info, model, processor):
    query_input = processor.process_text(query).to(model.device)
    with torch.no_grad():
        query_embedding = model(**vars(query_input))

    # Reshape embeddings if necessary
    if len(embeddings_array.shape) == 3:
        embeddings_array = embeddings_array.mean(axis=1)  # Average over sequence dimension
    if len(query_embedding.shape) == 3:
        query_embedding = query_embedding.mean(axis=1)  # Average over sequence dimension

    # Ensure both embeddings are 2D
    embeddings_array = embeddings_array.squeeze()
    query_embedding = query_embedding.cpu().numpy().squeeze() # remove batch_dimension with the last squeeze call

    # Compute similarity scores
    similarity_scores = np.dot(embeddings_array, query_embedding.T)

    K = 5
    top_k_indices = np.argsort(similarity_scores.flatten())[-K:][::-1]

    top_results = [
        {"score": similarity_scores.flatten()[i], "info": page_info[i]}
        for i in top_k_indices
    ]

    return top_results

In [8]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

model_path = "google/paligemma-3b-mix-448" # can also try smaller models
lora_path = "vidore/colpali"

model = ColPali.from_pretrained(model_path) # torch_dtype = torch.bfloat16 # there is also a `bitsandbytes` setup for 8-bit / 4-bit
model.load_adapter(lora_path, adapter_name="colpali")
model.to(device);

`config.hidden_act` is ignored, you should use `config.hidden_activation` instead.
Gemma's activation function will be set to `gelu_pytorch_tanh`. Please, use
`config.hidden_activation` if you want to override this behaviour.
See https://github.com/huggingface/transformers/pull/29402 for more details.


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Some weights of ColPali were not initialized from the model checkpoint at google/paligemma-3b-mix-448 and are newly initialized: ['custom_text_proj.bias', 'custom_text_proj.weight', 'language_model.lm_head.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
The installed version of bitsandbytes was compiled without GPU support. 8-bit optimizers, 8-bit multiplication, and GPU quantization are unavailable.


In [9]:
processor = ColPaliProcessor.from_pretrained(model_path)  # Load processor

# Danh sách các đường dẫn tới tệp PDF trên máy của bạn
pdf_files = [
    r"C:\Users\DuyTVB\PycharmProjects\ChatRAG\doc\ChartReportUTE.pdf" # Thay thế bằng đường dẫn thực tế của tệp trên máy bạn
]
# Output directory to save embeddings
output_dir = "colpali_output"
# Kiểm tra xem thư mục output có tồn tại không, nếu không thì tạo mới
os.makedirs(output_dir, exist_ok=True)
# Process the PDF files and save embeddings
embeddings, page_info = process_pdfs_with_colpali(pdf_files, output_dir, model, processor)

Starting from v4.46, the `logits` model output will have the same type as the model (except at train time, where it will always be FP32)


In [10]:
query = "Sơ đồ về tổng thí sinh nhập học năm 2019?" # The answer should be contained in NBA-mvp-voting.pdf
results = answer_query_with_colpali(query, embeddings, page_info, model, processor)

# Print results
for result in results:
  print(f"Score: {result['score']}, PDF: {result['info']['pdf']}, Page: {result['info']['page']}")


Score: 0.4399455189704895, PDF: C:\Users\DuyTVB\PycharmProjects\ChatRAG\doc\ChartReportUTE.pdf, Page: 0
Score: 0.4128127098083496, PDF: C:\Users\DuyTVB\PycharmProjects\ChatRAG\doc\ChartReportUTE.pdf, Page: 1
Score: 0.37878865003585815, PDF: C:\Users\DuyTVB\PycharmProjects\ChatRAG\doc\ChartReportUTE.pdf, Page: 2
Score: 0.34476253390312195, PDF: C:\Users\DuyTVB\PycharmProjects\ChatRAG\doc\ChartReportUTE.pdf, Page: 3
