## Documentation

To read more about collapsing search results, visit the [docs](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/collapse-search-results).



## Connect to ElasticSearch

In [None]:
from pprint import pprint
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")
client_info = es.info()
print("Connected to Elasticsearch!")
pprint(client_info.body)

## Index documents

Let's use the `APOD` dataset in this notebook.

In [None]:
import json

with open("../data/apod.json") as f:
    documents = json.load(f)

Create the `apod` index and index.

In [None]:
es.indices.delete(index="apod", ignore_unavailable=True)
es.indices.create(index="apod")

Use the `bulk` API to index the documents in the `apod` index.

In [None]:
from tqdm import tqdm

operations = []
index_name = "apod"
for document in tqdm(documents, total=len(documents), desc="Indexing documents"):
    year = document["date"].split("-")[0]
    document["year"] = int(year)

    operations.append({"index": {"_index": index_name}})
    operations.append(document)

response = es.bulk(operations=operations)

If the indexing is successful, you should see `response["errors"]` as `False`.

In [None]:
response["errors"]

## Collapse search results

Without collapsing, the search results will return all documents that match the query.

In [None]:
response_no_collapsing = es.search(
    index="apod",
    body={
        "query": {"match": {"title": "Andromeda galaxy"}},
        "size": 10_000,
    },
)
total_hits = response_no_collapsing["hits"]["total"]["value"]
print(f"Total hits before collapsing: {total_hits}")
total_returned_hits = len(response_no_collapsing["hits"]["hits"])
print(f"Total returned hits before collapsing: {total_returned_hits}")

Let's look at the count of documents that matched the query per year in the `apod` index. We observe that we have a lot of documents per year. What would happen if we collapse the search results by year?

In [None]:
from elastic_transport import ObjectApiResponse


def get_hits_per_year(response: ObjectApiResponse) -> dict:
    hits_per_year_count = {}
    for hit in response["hits"]["hits"]:
        year = hit["_source"]["year"]
        if year not in hits_per_year_count:
            hits_per_year_count[year] = 0
        hits_per_year_count[year] += 1
    return hits_per_year_count


print("Hits per year count:")
pprint(get_hits_per_year(response_no_collapsing))

Collapsing search results by year will return only one document per year that matches the query. That returned document will be the one with the highest `_score` for that year.

In [None]:
response_collapsing = es.search(
    index="apod",
    body={
        "query": {"match": {"title": "Andromeda galaxy"}},
        "collapse": {"field": "year"},
        "size": 10_000,
    },
)
total_hits = response_collapsing["hits"]["total"]["value"]
print(f"Total hits before collapsing: {total_hits}")
total_returned_hits = len(response_collapsing["hits"]["hits"])
print(f"Total returned hits after collapsing: {total_returned_hits}")

As you can see, now we have only one document per year that matches the query.

In [None]:
print("Hits per year count:")
pprint(get_hits_per_year(response_collapsing))

Let's verify if the document in year 2024 is the one with the highest `_score`.

From the response with collapsing, we can see that the document in year 2024 has a `_score` of `7.789091`.

In [None]:
for hit in response_collapsing["hits"]["hits"]:
    year = hit["_source"]["year"]
    if year == 2024:
        score = hit["_score"]
        print(f"Document with a score of {score} for year {year}:")
        pprint(hit["_source"])
        break

And in the response without collapsing, we confirm that the first hits from 2024 has a `_score` of `7.789091`, which is the same as the one in the response with collapsing.

In [None]:
for hit in response_no_collapsing["hits"]["hits"]:
    year = hit["_source"]["year"]
    if year == 2024:
        score = hit["_score"]
        print(f"Score {score}:")
        pprint(hit["_source"])
        print("-" * 50)

## Expand collapsed results

Expanding collapsed results allows you to retrieve more than one document per year that matches the query. Control how documents are sorted within each collapsed group and more.

In [None]:
response_collapsing = es.search(
    index="apod",
    body={
        "query": {"match": {"title": "Andromeda galaxy"}},
        "collapse": {
            "field": "year",
            "inner_hits": {
                "name": "most_recent",
                "size": 3,  # Number of documents to return per collapsed group
            },
        },
        "size": 10_000,
    },
)
total_hits = response_collapsing["hits"]["total"]["value"]
print(f"Total hits before collapsing: {total_hits}")
total_returned_hits = len(response_collapsing["hits"]["hits"])
print(f"Total returned hits after collapsing: {total_returned_hits}")
inner_hits = response_collapsing["hits"]["hits"][0]["inner_hits"]["most_recent"]
total_returned_hits_after_expanding = len(inner_hits["hits"]["hits"])
print(f"Total returned hits after expanding: {total_returned_hits_after_expanding}")

After expanding the collapsed results, we can see that we have more than one document per year that matches the query.

In [None]:
print("Hits per year count:")
pprint(get_hits_per_year(inner_hits))

The documents are sorted by `_score` within each collapsed group. They also match the scores in the response without collapsing.

In [None]:
for hit in inner_hits["hits"]["hits"]:
    score = hit["_score"]
    print(f"Score: {score}")

## Collapsing with search_after

When collapsing on a field with a lot of unique values, you can use the `search_after` parameter to paginate through the results. This is useful when you want to retrieve all collapsed results without missing any.

> Note: You can't use the `scroll` API with collapsing. Use `search_after` instead.

In [None]:
documents = []
number_of_unique_user_ids = 20_000
for user_id in range(number_of_unique_user_ids):
    for i in range(2):
        documents.append(
            {
                "user_id": user_id,
                "title": f"Document {i} for user {user_id}",
                "content": f"This is the content of document {i} for user {user_id}.",
            }
        )

es.indices.delete(index="my_index", ignore_unavailable=True)
es.indices.create(index="my_index")

operations = []
for document in tqdm(documents, total=len(documents), desc="Indexing documents"):
    operations.append({"index": {"_index": "my_index"}})
    operations.append(document)

response = es.bulk(operations=operations)
response["errors"]

We indexed 40000 documents, now we are ready to use `search_after` to paginate through the collapsed results. Since we have 2 documents per user, we can expect to have 20000 collapsed results.

In [None]:
document_count = es.count(index="my_index")
print(f"Total documents indexed: {document_count['count']}")

And we can see that the last user ID in the collapsed results is `19999` and the number of collapsed hits is `20000`, which is what we expected.

In [None]:
collapsed_hits = []
search_after = None

while True:
    body = {
        "query": {"match": {"content": "document"}},
        "collapse": {"field": "user_id"},
        "sort": ["user_id"],
        "size": 10_000,
    }

    if search_after is not None:
        body["search_after"] = [search_after]

    response_collapsing = es.search(index="my_index", body=body)
    hits = response_collapsing["hits"]["hits"]

    if not hits:
        break

    search_after = hits[-1]["_source"]["user_id"]
    print(f"Last user ID: {search_after}")

    collapsed_hits.extend(hits)

print(f"Total collapsed hits: {len(collapsed_hits)}")