## Group 4: Reranking (Yet Another Performance Boost !)

## 1. Create Elastic Cloud deployment






## Installing packages



In [1]:
!pip install -U elasticsearch sentence-transformers datasets pytrec_eval

Collecting elasticsearch
  Downloading elasticsearch-8.16.0-py3-none-any.whl.metadata (8.8 kB)
Collecting sentence-transformers
  Downloading sentence_transformers-3.3.1-py3-none-any.whl.metadata (10 kB)
Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting pytrec_eval
  Downloading pytrec_eval-0.5.tar.gz (15 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting elastic-transport<9,>=8.15.1 (from elasticsearch)
  Downloading elastic_transport-8.15.1-py3-none-any.whl.metadata (3.7 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Dow

### Importing necessary dependencies

In [None]:
from collections import defaultdict
from getpass import getpass
from typing import Any, Union

from datasets.arrow_dataset import Dataset
from datasets.dataset_dict import DatasetDict, IterableDatasetDict
from datasets.iterable_dataset import IterableDataset
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from sentence_transformers import CrossEncoder
from tqdm import tqdm
import datasets
import numpy as np
import pytrec_eval

### Global Variables for dataset and Elastic Cloud Setup

In [7]:
DATASET = "trec-covid"
INDEX_NAME = f"reranking-test-{DATASET}"

In [8]:
ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")

Elastic Cloud ID: ··········
Elastic Api Key: ··········


Initialize the Elasticseach Python client

In [9]:
client = Elasticsearch(
    cloud_id=ELASTIC_CLOUD_ID,
    api_key=ELASTIC_API_KEY,
)

### Test the elastic cloud client



In [10]:
client_info = client.info()

f"Successfully connected to cluster {client_info['cluster_name']} (version {client_info['version']['number']})"

'Successfully connected to cluster 4d8234b1d2f04059853b4ef370d46520 (version 8.16.1)'

---

In [None]:
def create_index(es_client: Elasticsearch, name: str, analyzer: str = "english"):
    """
    Creating an index into our deployment

    Args:
        `es_client`: An instance of a Python Elasticsearch client
        `analyzer`: A string identifier of the language analyzer to be used. By default we use `english`
            (more details at https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-lang-analyzer.html)
    Returns:
        None
    """

    # we store `title` & `text` into separate fields and
    _mappings = {
        "properties": {
            "title": {"type": "text", "analyzer": analyzer},
            "txt": {"type": "text", "analyzer": analyzer},
        }
    }

    # create an index with the specified name
    es_client.options(ignore_status=[400]).indices.create(
        index=name,
        settings={"number_of_shards": 1},
        mappings=_mappings,
    )


def index_corpus(
    corpus: Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset],
    index_name: str,
    es_client: Elasticsearch,
):
    """
    Pushing documents over to our index

    Args:
        `corpus`: The corpus of the dataset we have selected. It's a Huggingface dataset with the three fields (`_id`, `title`, `text`)
        `index_name`: The name of the Elasticsearch index
        `es_client`: An instance of a Python Elasticsearch client
    Returns:
        None
    """

    def get_iterable():
        for docid, doc_title, doc_txt in tqdm(
            zip(corpus["_id"], corpus["title"], corpus["text"]), total=corpus.num_rows
        ):
            yield {
                "_id": docid,
                "_op_type": "index",
                "title": doc_title,
                "txt": doc_txt,
            }

    # and bulk index them
    bulk(client=es_client, index=index_name, actions=get_iterable(), max_retries=3)

    # making sure that the index has been refreshed
    es_client.indices.refresh(index=index_name)

In [12]:
def retrieve(
    queries: Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset],
    es_client: Elasticsearch,
    index_name: str,
    size: int = 10,
    batch_size: int = 32,
):
    """
    Retrieve docs from the index by matching title, txt separately
    Args:
        `queries`: The queries of the dataset we have selected. It's a Huggingface dataset with the two fields (`_id`, `text`)
        `es_client`: An instance of a Python Elasticsearch client
        `index_name`: The name of the Elasticsearch index
        `size`: The (maximum) number of documents that we will retrieve per query
        `batch_size`: It represents the number of queries we can send per request.

    Returns:
        A nested dictionary where the outer key is the "query id" that points to (<doc_id>, <BM25-score>) key-value pairs e.g.
        {"my_query_id_1": {"my_doc_1": 23.5, "my_doc_2": 11.33}, "my_query_id_22": {"my_doc_3": 20.5, "my_doc_4": 4.3}, ...}

    """

    def generate_request(query_text: str):
        """Create the request body for the ES requests"""
        return {
            "_source": False,
            "query": {
                "multi_match": {
                    "query": query_text,
                    "type": "best_fields",
                    "fields": ["title", "txt"],
                    "tie_breaker": 0.5,
                }
            },
            "size": size,
        }

    def retrieve_batch(query_ids, es_requests):
        """Get docs for a mini-batch of requests"""
        batch_dict = dict()
        kwargs: dict[str, Any] = {
            "index": index_name,
            "search_type": "dfs_query_then_fetch",
        }
        try:
            es_response = es_client.msearch(searches=es_requests, **kwargs)
            for qid, resp in zip(query_ids, es_response["responses"]):
                batch_dict[qid] = {
                    hit["_id"]: hit["_score"] for hit in resp["hits"]["hits"]
                }
        except Exception as e:
            print(str(e))
        return batch_dict

    qids, requests = [], []
    es_responses = dict()

    for query in queries:
        qids.append(query["_id"])
        requests.append({})
        requests.append(generate_request(query["text"]))

        # retrieve in batches
        if len(qids) == batch_size:
            es_responses.update(retrieve_batch(qids, requests))
            qids = []
            requests = []

    # check for leftovers
    if len(qids) > 0:
        es_responses.update(retrieve_batch(qids, requests))
        qids, requests = [], []

    return es_responses

In [None]:
def download_corpus(
    dataset_name: str,
) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]:
    """
    Download corpus from Huggingface
    Args:
        `dataset_name`: The name of the BEIR dataset that we have selected
    Returns:
        An instance of a Hugggingface dataset
    """

    mteb_dataset_name = f"mteb/{dataset_name}"


    corpus = datasets.load_dataset(mteb_dataset_name, "corpus", split="corpus")

    return corpus


def download_queries_and_qrels(dataset_name: str):
    """
    Download queries, qrels from Huggingface
    Args:
        `dataset_name`: The name of the BEIR dataset that we have selected
    Returns:
        A tuple of: (<an instance of a Hugggingface dataset>, <a dictionary holding the qrels information>)
    """

    mteb_dataset_name = f"mteb/{dataset_name}"
    qrels_raw = datasets.load_dataset(
        mteb_dataset_name,
        "default",
        split="test" if dataset_name != "msmarco" else "dev",
    )

    qrels = defaultdict(dict)
    for q in qrels_raw:
        qrels[q["query-id"]][q["corpus-id"]] = int(q["score"])

    queries = datasets.load_dataset(
        mteb_dataset_name, "queries", split="queries"
    ).filter(lambda r: r["_id"] in qrels)

    return queries, dict(qrels)

---

## Running the pipeline

Create the index that will host the corpus

In [14]:
create_index(name=INDEX_NAME, es_client=client)

## Download the dataset

In [15]:
corpus = download_corpus(dataset_name=DATASET)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/1.20k [00:00<?, ?B/s]

corpus.jsonl:   0%|          | 0.00/200M [00:00<?, ?B/s]

Generating corpus split:   0%|          | 0/171332 [00:00<?, ? examples/s]

In [16]:
index_corpus(es_client=client, corpus=corpus, index_name=INDEX_NAME)

100%|██████████| 171332/171332 [01:09<00:00, 2477.72it/s]


### 1st stage retrieval with BM25
Download the `test` split of the dataset we have selected

In [17]:
queries, qrels = download_queries_and_qrels(dataset_name=DATASET)

qrels/test.jsonl:   0%|          | 0.00/3.83M [00:00<?, ?B/s]

Generating test split:   0%|          | 0/66336 [00:00<?, ? examples/s]

queries.jsonl:   0%|          | 0.00/4.75k [00:00<?, ?B/s]

Generating queries split:   0%|          | 0/50 [00:00<?, ? examples/s]

Filter:   0%|          | 0/50 [00:00<?, ? examples/s]

* The `queries` file is a Hugginface dataset with two keys ['_id', 'text'],
* The `qrels` file contains the relationships between a `query_id` and a list of documents. We have transformed into a `pytrec_eval`-compatible format i.e. it's a nested dictionary where the outer key is the query id that points to dictionary with (`doc_id`, `score`) key-value pairs (a score >0 denotes relevance)

In [18]:
len(queries)

50

Retrieve the **top-100** documents per query using BM25

In [19]:
bm25_responses = retrieve(
    queries=queries, index_name=INDEX_NAME, size=100, es_client=client
)

Compute the performance of BM25 on this dataset. We are using `nDCG@10` as our metric

In [20]:
# specify evaluator
METRICS_TO_EVALUATE = {"ndcg_cut_10"}
evaluator = pytrec_eval.RelevanceEvaluator(qrels, METRICS_TO_EVALUATE)


# get score per query
eval_per_query = evaluator.evaluate(bm25_responses)


# aggregate scores across queries
eval_scores = defaultdict(list)

for _, vals in eval_per_query.items():
    for metric, metric_score in vals.items():
        eval_scores[metric].append(metric_score)

for metric, _scores in eval_scores.items():
    print(f"{metric}: {np.mean(_scores)}")

ndcg_cut_10: 0.6880298232606303


## 2nd stage reranking

Reranking using a small cross-encoder model to optimize the ordering of our results. The `sentence-transformers` library is used to load the model and do the scoring

In [21]:
reranking_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", max_length=512)

config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

In [22]:
queries_dict = {q["_id"]: q["text"] for q in queries}
corpus_dict = {doc["_id"]: f"{doc['title']} {doc['text']}" for doc in corpus}

In [23]:
results_after_reranking = dict()

for qid, bm25_res in tqdm(bm25_responses.items(), total=len(bm25_responses)):

    query_text = queries_dict[qid]
    doc_ids = [doc_id for doc_id, _ in bm25_res.items()]
    if len(doc_ids) == 0:
        results_after_reranking[qid] = dict()
        continue

    doc_texts = [corpus_dict[doc_id] for doc_id in doc_ids]

    # rescore with the reranking model
    scores = reranking_model.predict([(query_text, doc_text) for doc_text in doc_texts])

    results_after_reranking[qid] = {
        doc_id: float(score) for doc_id, score in zip(doc_ids, scores)
    }

100%|██████████| 50/50 [17:56<00:00, 21.52s/it]


Calculate the metric scores for the reranked results

In [24]:
post_reranking_eval_scores_per_query = evaluator.evaluate(results_after_reranking)

post_reranking_eval_scores = defaultdict(list)

for qid, vals in post_reranking_eval_scores_per_query.items():
    for metric, metric_score in vals.items():
        post_reranking_eval_scores[metric].append(metric_score)

for metric, scores in post_reranking_eval_scores.items():
    print(f"{metric}: {np.mean(scores)}")

ndcg_cut_10: 0.7576350874801974




### Judge rate


In [25]:
TOP_K = 10

judge_rate_per_query = []

for qid, doc_scores in bm25_responses.items():
    top_k_doc_ids = [
        doc_id
        for doc_id, score in sorted(
            doc_scores.items(), key=lambda x: x[1], reverse=True
        )[:TOP_K]
    ]
    if len(top_k_doc_ids) == 0:
        continue

    nr_labeled_docs = sum(1 for doc_id in top_k_doc_ids if doc_id in qrels[qid])
    judge_rate_per_query.append(nr_labeled_docs / len(top_k_doc_ids))

print(f'"Judge rate" for {DATASET} is {np.mean(judge_rate_per_query) * 100.0:.3}%')

"Judge rate" for trec-covid is 92.4%


Judge rate for the reranked documents it is:

In [26]:
judge_rate_per_query = []

for qid, doc_scores in results_after_reranking.items():
    top_k_doc_ids = [
        doc_id
        for doc_id, score in sorted(
            doc_scores.items(), key=lambda x: x[1], reverse=True
        )[:TOP_K]
    ]
    if len(top_k_doc_ids) == 0:
        continue

    nr_labeled_docs = sum(1 for doc_id in top_k_doc_ids if doc_id in qrels[qid])
    judge_rate_per_query.append(nr_labeled_docs / len(top_k_doc_ids))

print(
    f'"Judge rate" for {DATASET} (reranked) is {np.mean(judge_rate_per_query) * 100.0:.3}%'
)

"Judge rate" for trec-covid (reranked) is 97.4%


---