# Hybrid search with Cohere reranking

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/elastic/elasticsearch-labs/blob/demjened/cohere-reranking/notebooks/integrations/cohere/cohere-reranking.ipynb)

## 1. Install dependencies

In [1]:
%%capture

%pip install cohere elasticsearch datasets

## 2. Read credentials

In [2]:
from getpass import getpass

ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")
ELASTIC_USERNAME = getpass("Elastic username: ")
ELASTIC_PASSWORD = getpass("Elastic password: ")
COHERE_API_KEY = getpass("Cohere API key: ")

## 3. Connect to Elastic Cloud

Note: you need a Cloud instance with the `.elser_model_2_linux-x86_64` model deployed.

In [3]:
from elasticsearch import Elasticsearch, helpers

es_client = Elasticsearch(cloud_id=ELASTIC_CLOUD_ID, basic_auth=(ELASTIC_USERNAME, ELASTIC_PASSWORD), request_timeout=3600)
es_client.info()

ObjectApiResponse({'name': 'instance-0000000000', 'cluster_name': 'a5e67b93dad1495db96cb15d86779b87', 'cluster_uuid': 'Qxr1n6rWRDWQ6DsAM5-SwA', 'version': {'number': '8.12.0', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '1665f706fd9354802c02146c1e6b5c0fbcddfbc9', 'build_date': '2024-01-11T10:05:27.953830042Z', 'build_snapshot': False, 'lucene_version': '9.9.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

## 4. Initialize Cohere

In [4]:
import cohere

co = cohere.Client(COHERE_API_KEY)

## 5. Create Elasticsearch index and inference pipeline

In [5]:
index = "rerank_poc"
model_id = ".elser_model_2_linux-x86_64"
pipeline_id = f"ml-inference-{index}-elser"

if not es_client.indices.exists(index=index):
    es_client.indices.create(
        index=index,
        mappings={
            "properties": {
                "title": {"type": "text"},
                "text": {"type": "text"},
                "ml": {
                    "type": "object",
                    "properties": {
                        "inference": {
                            "type": "object",
                            "properties": {
                                "text_expanded": {
                                    "type": "object",
                                    "properties": {
                                        "predicted_value": {"type": "sparse_vector"},
                                        "model_id": {"type": "keyword"},
                                    },
                                }
                            },
                        }
                    },
                },
            }
        },
    )

    es_client.ingest.put_pipeline(
        id=pipeline_id,
        processors=[
            {"remove": {"field": "ml.inference.text_expanded", "ignore_missing": True}},
            {
                "inference": {
                    "field_map": {"text": "text_field"},
                    "model_id": model_id,
                    "on_failure": [
                        {
                            "append": {
                                "field": "_source._ingest.inference_errors",
                                "allow_duplicates": False,
                                "value": [
                                    {
                                        "message": f"Processor 'inference' in pipeline '{pipeline_id}' failed for field 'text' with message '{{ _ingest.on_failure_message }}'",
                                        "pipeline": pipeline_id,
                                        "timestamp": "{{{ _ingest.timestamp }}}",
                                    }
                                ],
                            }
                        }
                    ],
                    "target_field": "ml.inference.text_expanded",
                }
            },
            {
                "append": {
                    "field": "_source._ingest.processors",
                    "value": [
                        {
                            "model_version": "12.0.0",
                            "pipeline": pipeline_id,
                            "processed_timestamp": "{{{ _ingest.timestamp }}}",
                            "types": ["pytorch", "text_expansion"],
                        }
                    ],
                }
            },
        ],
        version=1,
    )

    print("Index and pipeline created.")

Index and pipeline created.


## 6. Index documents with ELSER text expansion inference

In [6]:
from datasets import load_dataset

# Load 500 articles from the simple English Wikipedia dataset and index it
data = load_dataset(
    f"Cohere/wikipedia-22-12",
    "simple",
    split="train[:500]",
    trust_remote_code=True,
)

all_docs = map(
    lambda doc: {
        "_index": index,
        "_id": doc["id"],
        "pipeline": pipeline_id,
        "_source": {
            "title": doc["title"],
            "text": doc["text"],
        },
    },
    data,
)

print("Start indexing docs. This might take few minutes.")

bulk_index_result = helpers.bulk(es_client, all_docs, raise_on_error=False)
print("Indexing result", bulk_index_result)



Start indexing docs. This might take few minutes.
Indexing result (500, [])


## 7. Run hybrid query with linear combination

In [7]:
query = "What's the story with Schrodinger's cat?"

print("QUERY", query, sep="\n")

resp = es_client.search(
    index=index,
    size=100,
    query={
        "bool": {
            "should": [
                {"multi_match": {"query": query, "fields": ["text"], "boost": "1"}},
                {
                    "text_expansion": {
                        "ml.inference.text_expanded.predicted_value": {
                            "model_text": query,
                            "model_id": model_id,
                            "boost": "0.5",
                        }
                    }
                },
            ],
            "minimum_should_match": 1,
        }
    },
)

resp_docs = list(
    map(
        lambda doc: {
            "_id": doc["_id"],
            "_score": doc["_score"],
            "title": doc["_source"]["title"],
            "text": doc["_source"]["text"],
        },
        resp["hits"]["hits"],
    )
)

texts = [hit["_source"]["text"] for hit in resp["hits"]["hits"]]

print("ORIGINAL DOCS", *resp_docs[:10], sep="\n")

QUERY
What's the story with Schrodinger's cat?
ORIGINAL DOCS
{'_id': '104', '_score': 17.17821, 'title': "Schrödinger's cat", 'text': 'The question now is: at the end of the hour, is the cat alive or dead? Schrödinger says that according to the Copenhagen Interpretation, as long as the door is closed, the cat is dead and alive. There is no way to know until the door is opened. But by opening the door, the person is interfering with the experiment. The person and the experiment have to be described with reference to each other.'}
{'_id': '96', '_score': 15.896298, 'title': "Schrödinger's cat", 'text': 'In simple terms, Schrödinger stated that if you place a cat and something that could kill the cat (a radioactive atom) in a box and sealed it, you would not know if the cat was dead or alive until you opened the box, so that until the box was opened, the cat was (in a sense) "both" "dead and alive". This is used to represent how scientific theory works. No one knows if any scientific theo

## 8. Rerank search results with Cohere

In [8]:
rerank_hits = co.rerank(
    query=query,
    documents=texts,
    max_chunks_per_doc=1,
    top_n=3,
    model="rerank-english-v2.0",
)

rerank_docs = list(
    map(
        lambda ranked: {
            **resp_docs[ranked.index],
            "_reranked_score": ranked.relevance_score,
        },
        rerank_hits,
    )
)

print("RERANKED DOCS", *rerank_docs, sep="\n")

RERANKED DOCS
{'_id': '95', '_score': 15.726769, 'title': "Schrödinger's cat", 'text': "Schrödinger's cat is a thought experiment about quantum physics. Erwin Schrödinger suggested it in 1935, in reaction to the Copenhagen interpretation of quantum physics.", '_reranked_score': 0.9953182}
{'_id': '96', '_score': 15.896298, 'title': "Schrödinger's cat", 'text': 'In simple terms, Schrödinger stated that if you place a cat and something that could kill the cat (a radioactive atom) in a box and sealed it, you would not know if the cat was dead or alive until you opened the box, so that until the box was opened, the cat was (in a sense) "both" "dead and alive". This is used to represent how scientific theory works. No one knows if any scientific theory is right or wrong until said theory can be tested and proved.', '_reranked_score': 0.987375}
{'_id': '100', '_score': 5.830132, 'title': "Schrödinger's cat", 'text': 'Schrödinger wanted to show that this way of thinking about quantum mechanic

## 9. Clean up resources

In [9]:
es_client.ingest.delete_pipeline(id=pipeline_id)
es_client.indices.delete(index=index)

ObjectApiResponse({'acknowledged': True})