In [None]:
!pip install -q pdf2image
!sudo apt-get install -q -y poppler-utils
from google.colab import drive
drive.mount('/content/drive')

Reading package lists...
Building dependency tree...
Reading state information...
The following NEW packages will be installed:
  poppler-utils
0 upgraded, 1 newly installed, 0 to remove and 49 not upgraded.
Need to get 186 kB of archives.
After this operation, 696 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 poppler-utils amd64 22.02.0-2ubuntu0.5 [186 kB]
Fetched 186 kB in 0s (609 kB/s)
debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 78, <> line 1.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 
Selecting previously unselected package poppler-utils.
(Reading database ... 123630 files and direc

In [None]:
from pdf2image import convert_from_path
from IPython.display import display
from io import BytesIO
import base64
from PIL import Image
import torch
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict
from tqdm import tqdm
import json
import os

def encode_base64_image(image):
    buffered = BytesIO()
    image.save(buffered, format="JPEG")
    return str(base64.b64encode(buffered.getvalue()), "utf-8")


def decode_base64_image(base64_string):
    # Decode the base64 string into bytes
    image_data = base64.b64decode(base64_string)

    # Convert the bytes back into an image
    buffered = BytesIO(image_data)
    image = Image.open(buffered)

    return image

def unique_positive_hash_64bit(value):
    hash_value = hash(value)
    unsigned_hash = hash_value & ((1 << 64) - 1)  # Treat as 64-bit unsigned
    return unsigned_hash


def resize_image(image, max_height=800):
    width, height = image.size
    if height > max_height:
        ratio = max_height / height
        new_width = int(width * ratio)
        new_height = int(height * ratio)
        return image.resize((new_width, new_height))
    return image


def save_json(data, filename):
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def load_json(filename):
    with open(filename, 'r') as f:
        return json.load(f)

def combine_json_files(directory: str):
    combined_data = []

    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            file_path = os.path.join(directory, filename)
            data = load_json(file_path)
            combined_data.extend(data)

    return combined_data

In [None]:
manuals = load_json("/content/drive/MyDrive/Data/manuals-data.json")

In [None]:
manuals[20]["headings"]

[['MiR 24V Battery Troubleshooting and Technical Guide', 0], ['4. Storage', 1]]

In [None]:
# coding=utf-8
# Copyright 2024 The GTE Team Authors and Alibaba Group.
# Licensed under the Apache License, Version 2.0 (the "License");

from collections import defaultdict
from typing import Dict, List, Tuple

import numpy as np
import torch
from transformers import AutoModelForTokenClassification, AutoTokenizer
from transformers.utils import is_torch_npu_available


class GTEEmbeddidng(torch.nn.Module):
    def __init__(self,
                 model_name: str = None,
                 normalized: bool = True,
                 use_fp16: bool = True,
                 device: str = None
                ):
        super().__init__()
        self.normalized = normalized
        if device:
            self.device = torch.device(device)
        else:
            if torch.cuda.is_available():
                self.device = torch.device("cuda")
            elif torch.backends.mps.is_available():
                self.device = torch.device("mps")
            elif is_torch_npu_available():
                self.device = torch.device("npu")
            else:
                self.device = torch.device("cpu")
                use_fp16 = False
        self.use_fp16 = use_fp16
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForTokenClassification.from_pretrained(
            model_name, trust_remote_code=True, torch_dtype=torch.float16 if self.use_fp16 else None
        )
        self.vocab_size = self.model.config.vocab_size
        self.model.to(self.device)

    def _process_token_weights(self, token_weights: np.ndarray, input_ids: list):
        # convert to dict with token IDs as keys
        result = defaultdict(int)
        unused_tokens = {self.tokenizer.cls_token_id, self.tokenizer.eos_token_id,
                        self.tokenizer.pad_token_id, self.tokenizer.unk_token_id}

        for w, idx in zip(token_weights, input_ids):
            idx = int(idx)  # Ensure idx is an integer
            if idx not in unused_tokens and w > 0:
                if w > result[idx]:
                    result[idx] = w
        return result

    @torch.no_grad()
    def encode(self,
               texts: None,
               dimension: int = None,
               max_length: int = 8192,
               batch_size: int = 16,
               return_dense: bool = True,
               return_sparse: bool = False):
        if dimension is None:
            dimension = self.model.config.hidden_size
        if isinstance(texts, str):
            texts = [texts]
        num_texts = len(texts)
        all_dense_vecs = []
        all_token_weights = []
        for n, i in enumerate(range(0, num_texts, batch_size)):
            batch = texts[i: i + batch_size]
            resulst = self._encode(batch, dimension, max_length, batch_size, return_dense, return_sparse)
            if return_dense:
                all_dense_vecs.append(resulst['dense_embeddings'])
            if return_sparse:
                all_token_weights.extend(resulst['token_weights'])
        if return_dense: all_densse_vecs = torch.cat(all_dense_vecs, dim=0)
        return {
            "dense_embeddings": all_dense_vecs,
            "token_weights": all_token_weights
        }

    @torch.no_grad()
    def _encode(self,
                texts: Dict[str, torch.Tensor] = None,
                dimension: int = None,
                max_length: int = 1024,
                batch_size: int = 16,
                return_dense: bool = True,
                return_sparse: bool = False):

        text_input = self.tokenizer(texts, padding=True, truncation=True, return_tensors='pt', max_length=max_length)
        text_input = {k: v.to(self.model.device) for k,v in text_input.items()}
        model_out = self.model(**text_input, return_dict=True)

        output = {}
        if return_dense:
            dense_vecs = model_out.last_hidden_state[:, 0, :dimension]
            if self.normalized:
                dense_vecs = torch.nn.functional.normalize(dense_vecs, dim=-1)
            output['dense_embeddings'] = dense_vecs
        if return_sparse:
            token_weights = torch.relu(model_out.logits).squeeze(-1)
            token_weights = list(map(self._process_token_weights, token_weights.detach().cpu().numpy().tolist(),
                                                    text_input['input_ids'].cpu().numpy().tolist()))
            output['token_weights'] = token_weights

        return output

    def _compute_sparse_scores(self, embs1, embs2):
        scores = 0
        # Now using token IDs directly
        for token_id, weight in embs1.items():
            if token_id in embs2:
                scores += weight * embs2[token_id]
        return scores

    def compute_sparse_scores(self, embs1, embs2):
        scores = [self._compute_sparse_scores(emb1, emb2) for emb1, emb2 in zip(embs1, embs2)]
        return np.array(scores)

    def compute_dense_scores(self, embs1, embs2):
        scores = torch.sum(embs1*embs2, dim=-1).cpu().detach().numpy()
        return scores

    @torch.no_grad()
    def compute_scores(self,
        text_pairs: List[Tuple[str, str]],
        dimension: int = None,
        max_length: int = 1024,
        batch_size: int = 16,
        dense_weight=1.0,
        sparse_weight=0.1):
        text1_list = [text_pair[0] for text_pair in text_pairs]
        text2_list = [text_pair[1] for text_pair in text_pairs]
        embs1 = self.encode(text1_list, dimension, max_length, batch_size, return_dense=True, return_sparse=True)
        embs2 = self.encode(text2_list, dimension, max_length, batch_size, return_dense=True, return_sparse=True)
        scores = self.compute_dense_scores(embs1['dense_embeddings'], embs2['dense_embeddings']) * dense_weight + \
            self.compute_sparse_scores(embs1['token_weights'], embs2['token_weights']) * sparse_weight
        scores = scores.tolist()
        return scores

In [None]:
model_name_or_path = 'Alibaba-NLP/gte-multilingual-base'
model = GTEEmbeddidng(model_name_or_path)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/1.15k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/964 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.43k [00:00<?, ?B/s]

configuration.py:   0%|          | 0.00/7.13k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/Alibaba-NLP/new-impl:
- configuration.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


modeling.py:   0%|          | 0.00/59.0k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/Alibaba-NLP/new-impl:
- modeling.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


model.safetensors:   0%|          | 0.00/611M [00:00<?, ?B/s]

In [None]:
tokens = []
for page in manuals:
    token_len = len(model.tokenizer.tokenize(page["text"]))
    tokens.append(token_len)

In [None]:
np.mean(tokens), np.std(tokens), np.max(tokens), np.min(tokens)

(278.4397031539889, 136.03028685972873, 938, 17)

In [None]:
max_tokens = 120
overlap = 20
pages_chunked = []

for page in manuals:
    token = model.tokenizer.tokenize(page["text"])
    chunk_number = 1
    for i in range(0, len(token), max_tokens - overlap):
        chunk = model.tokenizer.convert_tokens_to_string(token[i:i+max_tokens])
        page_chunk = page.copy()
        page_chunk["text"] = chunk
        page_chunk["chunk_number"] = chunk_number
        chunk_number += 1
        pages_chunked.append(page_chunk)

In [None]:
for page in tqdm(pages_chunked, desc="currently embedding ...", total=len(pages_chunked)):
    # create iid
    page["iid"] = unique_positive_hash_64bit(page["title"]+str(page["page_number"])+str(page["chunk_number"]))
    # dense embeddings with text
    page["dense_embeddings"] = model.encode([page["text"]], return_dense=True, return_sparse=False)["dense_embeddings"]
    # sparse embeddings of text with headings
    input_sparse = ""
    for heading in page["headings"]:
        input_sparse += heading[0] + " "
    input_sparse += page["text"]
    page["sparse_embeddings"] = model.encode([input_sparse], return_dense=False, return_sparse=True)["token_weights"]


In [None]:
import json

def convert_to_json_compatible(data):
    """
    Convert document data to JSON-compatible format by handling tensor and defaultdict objects
    """
    def convert_tensor(tensor_data):
        # Convert tensor to nested list
        return tensor_data.tolist()

    def convert_defaultdict(d):
        # Convert defaultdict to regular dict with float values
        return {k: float(v) for k, v in d.items()}

    json_data = []
    for doc in data:
        converted_doc = {
            'title': doc['title'],
            'text': doc['text'],
            'page_number': doc['page_number'],
            'iid': doc['iid'],
            'headings': doc['headings'],
            'chunk_number': doc['chunk_number'],
            'base64': doc['base64'],
            'dense_embeddings': convert_tensor(doc['dense_embeddings']),
            'sparse_embeddings': [convert_defaultdict(d) for d in doc['sparse_embeddings']]
        }
        json_data.append(converted_doc)

    return json_data

converted_data = convert_to_json_compatible(pages_chunked)


In [None]:
save_json(converted_data, "/content/drive/MyDrive/Data/chunked-and-embedded-manuals.json")

In [None]:
data = load_json("/content/drive/MyDrive/Data/chunked-and-embedded-manuals.json")

In [None]:
data[0]["iid"]

int

In [None]:
data = load_json("/content/drive/MyDrive/Data/chunked-and-embedded-manuals.json")
for item in data:
    item["label"] = "case" if item["title"] == "batterydoesntchargeQ&A" else "manual"
    item["language"] = "de" if item["title"] in ["Einbau der 24-V-Batterie mit erhöhter Kapazität am MiR100 und MiR200 1.0_de", "Mir200 Betriebsanleitung (de)"] else "en"

In [None]:
save_json(data, "/content/drive/MyDrive/Data/chunked-and-embedded-manuals.json")

In [None]:
data = load_json("/content/drive/MyDrive/Data/embedded-manuals.json")
for item in data:
    item["label"] = "case" if item["title"] == "batterydoesntchargeQ&A" else "manual"
    item["language"] = "de" if item["title"] in ["Einbau der 24-V-Batterie mit erhöhter Kapazität am MiR100 und MiR200 1.0_de", "Mir200 Betriebsanleitung (de)"] else "en"
save_json(data, "/content/drive/MyDrive/Data/embedded-manuals.json")

# Page wise

In [None]:
for page in tqdm(manuals, desc="currently embedding ...", total=len(manuals)):
    # create iid
    page["iid"] = unique_positive_hash_64bit(page["title"]+str(page["page_number"]))
    # dense embeddings with text
    page["dense_embeddings"] = model.encode([page["text"]], return_dense=True, return_sparse=False)["dense_embeddings"]
    # sparse embeddings of text with headings
    input_sparse = ""
    for heading in page["headings"]:
        input_sparse += heading[0] + " "
    input_sparse += page["text"]
    page["sparse_embeddings"] = model.encode([input_sparse], return_dense=False, return_sparse=True)["token_weights"]


currently embedding ...: 100%|██████████| 1078/1078 [1:01:25<00:00,  3.42s/it]


In [None]:
# language and label as meta data
for item in manuals:
    item["label"] = "case" if item["title"] == "batterydoesntchargeQ&A" else "manual"
    item["language"] = "de" if item["title"] in ["Einbau der 24-V-Batterie mit erhöhter Kapazität am MiR100 und MiR200 1.0_de", "Mir200 Betriebsanleitung (de)"] else "en"

In [None]:
def convert_to_json_compatible(data):
    """
    Convert document data to JSON-compatible format by handling tensor and defaultdict objects
    """
    def convert_tensor(tensor_data):
        # Convert tensor to nested list
        return tensor_data.tolist()

    def convert_defaultdict(d):
        # Convert defaultdict to regular dict with float values
        return {k: float(v) for k, v in d.items()}

    json_data = []
    for doc in data:
        converted_doc = {
            'title': doc['title'],
            'text': doc['text'],
            'page_number': doc['page_number'],
            'iid': doc['iid'],
            'headings': doc['headings'],
            'base64': doc['base64'],
            'dense_embeddings': convert_tensor(doc['dense_embeddings']),
            'sparse_embeddings': [convert_defaultdict(d) for d in doc['sparse_embeddings']]
        }
        json_data.append(converted_doc)

    return json_data


In [None]:
converted = convert_to_json_compatible(manuals)
save_json(converted, "/content/drive/MyDrive/Data/embedded-manuals.json")

# Blocker

In [None]:
data = load_json("/content/drive/MyDrive/Data/chunked-and-embedded-manuals.json")

In [None]:
for page in tqdm(data, total= len(data)):
    input_sparse = ""
    for heading in page["headings"]:
        input_sparse += heading[0] + " "
    input_sparse += page["text"]
    page["sparse_embeddings"] = model.encode(input_sparse, return_dense=False, return_sparse=True)["token_weights"]


100%|██████████| 3529/3529 [00:42<00:00, 83.87it/s]


In [None]:
save_json(data, "/content/drive/MyDrive/Data/chunked-and-embedded-manuals.json")

In [None]:
data_without_chunks = load_json("/content/drive/MyDrive/Data/embedded-manuals.json")
for page in tqdm(data_without_chunks, total= len(data_without_chunks)):
    input_sparse = ""
    for heading in page["headings"]:
        input_sparse += heading[0] + " "
    input_sparse += page["text"]
    page["sparse_embeddings"] = model.encode(input_sparse, return_dense=False, return_sparse=True)["token_weights"]
save_json(data_without_chunks, "/content/drive/MyDrive/Data/embedded-manuals.json")

 31%|███       | 1078/3529 [00:13<00:30, 79.80it/s]
