## Upload the sample_corpus zip file with name 'sample_corpus'


Note: Make sure it is a zip file.

In [None]:
from google.colab import files
import zipfile
import os

# Upload ZIP file.
uploaded = files.upload()  # Manually select sample_corpus.zip

# Extract ZIP file.
zip_file = "sample_corpus.zip"
if zip_file in uploaded:
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall("/content/")  # Extracts directly inside /content/
    print("Extracted 'sample_corpus' successfully!")

# Verify extraction
print("Extracted folder structure:")
os.system("ls -R /content/sample_corpus")  # Lists all files & subfolders.

Saving sample_corpus.zip to sample_corpus.zip
Extracted 'sample_corpus' successfully!
Extracted folder structure:


0

## Upload the sample_benchmarks zip file with only one .json Q&A file at a time. 

Note: Make sure it is a zip file.

In [None]:
from google.colab import files
import zipfile
import os

# Upload ZIP file.
uploaded = files.upload()  # Manually select sample_corpus.zip

# Extract ZIP file.
zip_file = "sample_benchmarks.zip"
if zip_file in uploaded:
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall("/content/")  # Extracts directly inside /content/
    print("Extracted 'sample_benchmarks' successfully!")

# Verify extraction.
print("Extracted folder structure:")
os.system("ls -R /content/sample_benchmarks")

Saving sample_benchmarks.zip to sample_benchmarks.zip
Extracted 'sample_benchmarks' successfully!
Extracted folder structure:


0

## Chunking and Embedding Test Files for Retrievalfor colab.

Code cell below processed the text documents stored in the sampl_corpus folder located in content folder in google colab, splits them using the given chunking method and generates embeddings using SentenceTransformer models. These processed chunks and their corresponding embeddings are then stored in json files following the same folder structure as the original sample corpus folder for convenience.these chunked documents are stored in the 'sample_corpus_chunked' folder in the '/content' folder. These json files will be used for similarity search and reranking.

In [None]:
# Load model directly
from transformers import AutoTokenizer, AutoModelForMaskedLM

tokenizer = AutoTokenizer.from_pretrained("Shitao/RetroMAE")
model = AutoModelForMaskedLM.from_pretrained("Shitao/RetroMAE")

In [None]:
import os
import json
import numpy as np
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter

#  Define Model Storage.
supported_models = {
    "sbert": "sentence-transformers/all-mpnet-base-v2",
    "distilled_sbert": "sentence-transformers/all-MiniLM-L6-v2",
    "gte_large": "thenlper/gte-large",
    "retromae": "Shitao/RetroMAE"
}

#  Load Model.
def load_embedding_model(model_name):
    return SentenceTransformer(supported_models[model_name])

#  Chunking Function.
def naive_chunking(text, chunk_size, overlap):
    """Splits text into overlapping chunks"""
    start = 0
    chunks = []
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append(text[start:end])
        start += chunk_size - overlap
    return chunks

def recursive_chunking(text, chunk_size, overlap):
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap,
                                            separators=["\n\n", "\n", "!", "?", ".", ":", ";", ",", " ", ""])
    return splitter.split_text(text)


#  Batch Embedding Function.
def get_embeddings(chunks, model_name, batch_size):
    model = load_embedding_model(model_name)
    embeddings = []
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        batch_embeddings = model.encode(batch, convert_to_numpy=True)
        embeddings.extend(batch_embeddings)
    return np.array(embeddings)

#  Save Chunks to JSON
def save_chunks_to_json(chunks, file_name, output_folder, sub_folder):
    """Save chunked text in structured JSON format."""

    output_path = os.path.join(output_folder, sub_folder)
    os.makedirs(output_path, exist_ok=True)  # Ensure subdirectory exists

    json_file_path = os.path.join(output_path, f"{file_name}.json")

    # Save JSON File
    with open(json_file_path, "w", encoding="utf-8") as f:
        json.dump(chunks, f, indent=2)

    print(f"Saved: {json_file_path}")

# Main Function to Process Text Files
def process_text_file(chunk_size, overlap, batch_size, chunking_method, model_name):
    folder_path = "/content/sample_corpus"  # Google Colab Path
    output_folder = "/content/sample_corpus_chunked"  # Output Directory

    if not os.path.exists(folder_path):
        print(f"Folder '{folder_path}' does not exist!")
        return

    sub_folders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]

    print("Subfolders:", sub_folders)
    chunk_id = 0

    for sub in sub_folders:
        sub_folder_path = os.path.join(folder_path, sub)
        text_files = [f for f in os.listdir(sub_folder_path) if f.endswith(".txt")]

        for filename in text_files:
            file_path = os.path.join(sub_folder_path, filename)
            print(f"\nProcessing file: {file_path}")

            try:
                with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
                    text = file.read()

                # Generate Chunks
                if chunking_method=='naive':
                    chunks = naive_chunking(text, chunk_size, overlap)
                elif chunking_method == 'recursive':
                    chunks = recursive_chunking(text, chunk_size, overlap)

                # Add span information
                spans = []
                current_position = 0
                for chunk in chunks:
                    start = text.find(chunk, current_position)
                    end = start + len(chunk)
                    spans.append((start, end))
                    current_position = end

                print(f"Total chunks created: {len(chunks)}")

                # Compute Embeddings
                embeddings = get_embeddings(chunks, model_name, batch_size)
                print(f"Embeddings generated")

                # Store Chunks in JSON Format
                chunk_data = [{"chunk_id": chunk_id + i + 1, "text": chunk, "embedding": embedding.tolist(), "span": spans[i], "filepath": file_path}
                                for i, (chunk, embedding) in enumerate(zip(chunks, embeddings))]
                chunk_id += len(chunks)
                file_base_name = filename.removesuffix(".txt")
                save_chunks_to_json(chunk_data, file_base_name, output_folder, sub)

            except Exception as e:
                print(f"Error processing {file_path}: {e}")

    print("Processing completed!")

# Run
if __name__ == "__main__":
    chunk_size = 500
    overlap = 0
    batch_size = 32
    chunking_method =  'recursive' #select either 'naive' or 'recursive'.
    model_name = 'sbert'
    process_text_file(chunk_size, overlap, batch_size, chunking_method, model_name)

Subfolders: ['contractnli', 'cuad', 'privacy_qa', 'maud']

Processing file: /content/sample_corpus/contractnli/Evelozcity%20OESA%20NDA.txt
Total chunks created: 28


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.4k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embeddings generated
Saved: /content/sample_corpus_chunked/contractnli/Evelozcity%20OESA%20NDA.json

Processing file: /content/sample_corpus/contractnli/FNHA-2019RFP-02-NDA-form.txt
Total chunks created: 18
Embeddings generated
Saved: /content/sample_corpus_chunked/contractnli/FNHA-2019RFP-02-NDA-form.json

Processing file: /content/sample_corpus/contractnli/IGC-Non-Disclosure-Agreement-LSE-Sample.txt
Total chunks created: 47
Embeddings generated
Saved: /content/sample_corpus_chunked/contractnli/IGC-Non-Disclosure-Agreement-LSE-Sample.json

Processing file: /content/sample_corpus/contractnli/Geheimhaltungsvereinbarung_Abschlussarbeiten_HFU_englisch.txt
Total chunks created: 14
Embeddings generated
Saved: /content/sample_corpus_chunked/contractnli/Geheimhaltungsvereinbarung_Abschlussarbeiten_HFU_englisch.json

Processing file: /content/sample_corpus/contractnli/INFOMAGNET%20NDA.txt
Total chunks created: 28
Embeddings generated
Saved: /content/sample_corpus_chunked/contractnli/INFOMAGNET

## Chunked Data Retreival and Similarity Search for Google colab.

The code cell below retrievs text chunks from the json files that were stored in the sample_corpus_chunked folder, along with conducting a similarity search. It supports cosine similarity and BM25.

Key steps are:
  1. Loads the JSON files containing all the sampled text chunks and embeddings from the sample_corpus_folder.
  2. Similarity Search: retrival of most relevant chunks for given k_value using either cosine similarity or BM25.
  3. Duplicates JSON file inside the 'sample_benchmarks_unranked' folder.
  4. updating the duplicated JSON file: Modifying the duplication sample_benchmarks folder to include the retrived text chunk for each of the query in the same json format as the q&a pairs.


In [None]:
import json
import numpy as np
!pip install rank_bm25
!pip install nltk
from sklearn.metrics.pairwise import cosine_similarity
from rank_bm25 import BM25Okapi
import nltk
import shutil
import os
from nltk.tokenize import word_tokenize
from google.colab import drive

nltk.download('punkt_tab')
current_directory = os.getcwd()

# Set base path for Colab (inside /content/)
BASE_PATH = "/content/sample_corpus_chunked"

def load_chunked_data(directory):
    """Load precomputed chunked embeddings from JSON files."""
    # folder_path = directory.
    chunked_data = []

    for folder in os.listdir(directory):
      folder_path = os.path.join(directory, folder)
      for files in os.listdir(folder_path):
          if files.endswith(".json"):
            file_path = os.path.join(folder_path, files)
            with open(file_path, "r", encoding="utf-8") as f:
                file_data = json.load(f)
                if isinstance(file_data, list):
                        chunked_data.extend(file_data)  # Append data from each file.

    return chunked_data

def load_specific_chunked_file(file_path_specific):
    chunked_data_specfic = []
    with open(file_path_specific,"r", encoding="utf-8" ) as f:
        file_data_specific = json.load(f)
        if isinstance(file_data_specific, list):
            # The line below was changed to fix the error
            chunked_data_specfic.extend(file_data_specific)
            # The previous line was: file_data_specific.extend(file_data_specific)
    return chunked_data_specfic

# Similarity Search (Cosine)
def cosine_retrieved_chunks(query, embed_model, chunked_data, top_k):
    """Find the most similar chunks using cosine similarity."""

    embedding_vectors = np.array([chunk["embedding"] for chunk in chunked_data])
    if embedding_vectors.ndim == 1:
      embedding_vectors = embedding_vectors.reshape(1, -1)

    query_embedding = embed_model.encode([query], convert_to_numpy=True)
    query_embedding = np.array(query_embedding).reshape(1, -1)

    similarities = cosine_similarity(query_embedding, embedding_vectors)[0]
    top_indices = np.argsort(similarities)[::-1][:top_k]

    return [chunked_data[i] for i in top_indices]

# BM25 Search
def BM25_retrieved_chunks(query, chunked_data, top_k):
    corpus = [word_tokenize(chunk["text"].lower()) for chunk in chunked_data]
    tokenized_query = word_tokenize(query.lower())

    bm25 = BM25Okapi(corpus)
    scores = bm25.get_scores(tokenized_query)

    top_indices = sorted(range(len(scores)), key=lambda x: scores[x], reverse=True)[:top_k]
    return [chunked_data[i] for i in top_indices]

# Select Similarity Search Method
def similarity_search_model(model, embed_model, query, chunked_data, top_k):
    if model == "cosine":
        return cosine_retrieved_chunks(query, embed_model, chunked_data, top_k)
    elif model == "BM25":
        return BM25_retrieved_chunks(query, chunked_data, top_k)

# Duplicate JSON Folder for Unranked Chunks
def duplicate_json_file(original_folder, duplicate_folder):
    """Duplicate folder inside Colab environment."""
    original_path = os.path.join("/content", original_folder)
    duplicated_path = os.path.join("/content", duplicate_folder)

    if os.path.exists(duplicated_path):
        shutil.rmtree(duplicated_path)
    shutil.copytree(original_path, duplicated_path)

# Update JSON Files with Retrieved Chunks
def add_retrieved_chunks(duplicate_folder, chunked_data_regular, embed_model, model, similarity_score_threshold):
    """Update duplicated JSON files with retrieved chunks for each query."""
    duplicate_folder_path = os.path.join("/content", duplicate_folder)
    prefix_to_remove = "/content/sample_corpus/"

    for filename in os.listdir(duplicate_folder_path):
        if filename.endswith(".json"):
            file_path = os.path.join(duplicate_folder_path, filename)
            with open(file_path, "r", encoding="utf-8") as f:
                data = json.load(f)

            for test in data:


                similarity_score = test["query_rewriter"][0]["similarity_score"]

                if test["feature_extraction"][0]["complexity"] == "verbose":
                    top_k = 5
                elif test["feature_extraction"][0]["complexity"] == "vague":
                    top_k = 10


                if similarity_score > similarity_score_threshold:
                    query_specific = test["query_rewriter"][0]["only_question"]
                    # file_path_specific = test["query_rewriter"][0]["best_file_path"].replace('.txt', '')

                    # file_path_specific_new = os.path.splitext(file_path_specific)[0] + ".json"

                    # Original filename (could have many dots)
                    best_file_path = test["query_rewriter"][0]["best_file_path"]

                    # If it ends with .txt, strip it off
                    if best_file_path.endswith(".txt"):
                        best_file_path = best_file_path[:-4]

                    # Append .json extension
                    file_path_specific_new = best_file_path + ".json"

                    # print(file_path_specific_new)

                    file_path_specific_full = os.path.join(current_directory,"sample_corpus_chunked",file_path_specific_new)
                    chunked_data_specific = load_specific_chunked_file(file_path_specific_full)

                    retrieved_chunks_specific = similarity_search_model(model, embed_model, query_specific, chunked_data_specific, top_k)
                    test["retrieved_chunks_unranked"] = [{"chunk_id": chunk['chunk_id'],
                                                          "filepath": chunk["filepath"].replace(prefix_to_remove, ""),
                                                          "span": chunk["span"],
                                                          "text": chunk["text"],

                                                        #   "file_path_test": chunk["filepath"],
                                                          } for chunk in retrieved_chunks_specific]
                else:
                    query_text = test["query"]
                    retrieved_chunks = similarity_search_model(model, embed_model, query_text, chunked_data_regular, top_k)
                    test["retrieved_chunks_unranked"] = [{"chunk_id": chunk['chunk_id'],
                                                          "filepath": chunk["filepath"].replace(prefix_to_remove, ""),
                                                          "span": chunk["span"],
                                                          "text": chunk["text"]
                                                          } for chunk in retrieved_chunks]
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2)


# Main Execution
if __name__ == "__main__":
    from sentence_transformers import SentenceTransformer
    embed_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")  # Load model.

    original_folder_="sample_benchmarks"
    duplicate_folder_="sample_benchmarks_unranked"

    print("Loading chunked data...")
    BASE_PATH = "/content/sample_corpus_chunked"
    directory=BASE_PATH
    chunked_data = load_chunked_data(BASE_PATH)
    print(f"Loaded {len(chunked_data)} chunks.")

    model = "cosine"    #'cosine' or 'BM25'
    # top_k = 64

    print("Duplicating JSON folder...")
    duplicate_json_file(original_folder_, duplicate_folder_)

    print("Adding retrieved chunks...")
    add_retrieved_chunks(duplicate_folder_, chunked_data, embed_model, model, similarity_score_threshold=0.3)

    print("Processing complete.")

Collecting rank_bm25
  Downloading rank_bm25-0.2.2-py3-none-any.whl.metadata (3.2 kB)
Downloading rank_bm25-0.2.2-py3-none-any.whl (8.6 kB)
Installing collected packages: rank_bm25
Successfully installed rank_bm25-0.2.2


[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


Loading chunked data...
Loaded 21681 chunks.
Duplicating JSON folder...
Adding retrieved chunks...
Processing complete.


## Downloading the files

In [None]:
!zip -r sample_benchmarks_unranked.zip sample_benchmarks_unranked/
# !zip -r sample_benchmarks_ranked.zip sample_benchmarks_ranked/.
# !zip -r sample_corpus_chunked.zip sample_corpus_chunked/

from google.colab import files

files.download("sample_benchmarks_unranked.zip")
# files.download("sample_benchmarks_ranked.zip").
# files.download("sample_corpus_chunked.zip")  # Change filename accordingly.

  adding: sample_benchmarks_unranked/ (stored 0%)
  adding: sample_benchmarks_unranked/privacy_qa.json (deflated 92%)
  adding: sample_benchmarks_unranked/.DS_Store (deflated 92%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>