## Documentation

To read more about collapsing search results, visit the [docs](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/collapse-search-results).

![collapse_search_results](../images/collapse_search_results.png)

## Connect to ElasticSearch

In [7]:
from pprint import pprint
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")
client_info = es.info()
print("Connected to Elasticsearch!")
pprint(client_info.body)

Connected to Elasticsearch!
{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'DlYG5m9gR3upn7qgaYyAJA',
 'name': 'df0334cb3063',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2024-08-05T10:05:34.233336849Z',
             'build_flavor': 'default',
             'build_hash': '1a77947f34deddb41af25e6f0ddb8e830159c179',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '9.11.1',
             'minimum_index_compatibility_version': '7.0.0',
             'minimum_wire_compatibility_version': '7.17.0',
             'number': '8.15.0'}}


## Index documents

Let's use the `APOD` dataset in this notebook.

In [8]:
import json

with open("../data/apod.json") as f:
    documents = json.load(f)

Create the `apod` index and index.

In [9]:
es.indices.delete(index="apod", ignore_unavailable=True)
es.indices.create(index="apod")

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'apod'})

Use the `bulk` API to index the documents in the `apod` index.

In [10]:
from tqdm import tqdm

operations = []
index_name = "apod"
for document in tqdm(documents, total=len(documents), desc="Indexing documents"):
    year = document["date"].split("-")[0]
    document["year"] = int(year)

    operations.append({"index": {"_index": index_name}})
    operations.append(document)

response = es.bulk(operations=operations)

Indexing documents: 100%|██████████| 3333/3333 [00:00<00:00, 1067309.15it/s]


If the indexing is successful, you should see `response["errors"]` as `False`.

In [11]:
response["errors"]

False

## Collapse search results

Without collapsing, the search results will return all documents that match the query.

In [14]:
response_no_collapsing = es.search(
    index="apod",
    body={
        "query": {"match": {"title": "Andromeda galaxy"}},
        "size": 10_000,
    },
)
total_hits = response_no_collapsing["hits"]["total"]["value"]
print(f"Total hits before collapsing: {total_hits}")
total_returned_hits = len(response_no_collapsing["hits"]["hits"])
print(f"Total returned hits before collapsing: {total_returned_hits}")

Total hits before collapsing: 270
Total returned hits before collapsing: 270


Let's look at the count of documents that matched the query per year in the `apod` index. We observe that we have a lot of documents per year. What would happen if we collapse the search results by year?

In [15]:
from elastic_transport import ObjectApiResponse


def get_hits_per_year(response: ObjectApiResponse) -> dict:
    hits_per_year_count = {}
    for hit in response["hits"]["hits"]:
        year = hit["_source"]["year"]
        if year not in hits_per_year_count:
            hits_per_year_count[year] = 0
        hits_per_year_count[year] += 1
    return hits_per_year_count


print("Hits per year count:")
pprint(get_hits_per_year(response_no_collapsing))

Hits per year count:
{2015: 28,
 2016: 29,
 2017: 31,
 2018: 19,
 2019: 32,
 2020: 25,
 2021: 24,
 2022: 30,
 2023: 32,
 2024: 20}


Collapsing search results by year will return only one document per year that matches the query. That returned document will be the one with the highest `_score` for that year.

In [16]:
response_collapsing = es.search(
    index="apod",
    body={
        "query": {"match": {"title": "Andromeda galaxy"}},
        "collapse": {"field": "year"},
        "size": 10_000,
    },
)
total_hits = response_collapsing["hits"]["total"]["value"]
print(f"Total hits before collapsing: {total_hits}")
total_returned_hits = len(response_collapsing["hits"]["hits"])
print(f"Total returned hits after collapsing: {total_returned_hits}")

Total hits before collapsing: 270
Total returned hits after collapsing: 10


As you can see, now we have only one document per year that matches the query.

In [17]:
print("Hits per year count:")
pprint(get_hits_per_year(response_collapsing))

Hits per year count:
{2015: 1,
 2016: 1,
 2017: 1,
 2018: 1,
 2019: 1,
 2020: 1,
 2021: 1,
 2022: 1,
 2023: 1,
 2024: 1}


Let's verify if the document in year 2024 is the one with the highest `_score`.

From the response with collapsing, we can see that the document in year 2024 has a `_score` of `7.789091`.

In [18]:
for hit in response_collapsing["hits"]["hits"]:
    year = hit["_source"]["year"]
    if year == 2024:
        score = hit["_score"]
        print(f"Document with a score of {score} for year {year}:")
        pprint(hit["_source"])
        break

Document with a score of 7.789091 for year 2024:
{'authors': 'Subaru, Hubble, Mayall, R. Gendler, R. Croman\n',
 'date': '2024-09-08',
 'explanation': 'Explanation: The most distant object easily visible to the '
                'unaided eye is M31, the great Andromeda Galaxy. Even at some '
                'two and a half million light-years distant, this immense '
                'spiral galaxy -- spanning over 200,000 light years -- is '
                'visible, although as a faint, nebulous cloud in the '
                'constellation Andromeda. A bright yellow nucleus, dark '
                'winding dust lanes, and expansive spiral arms dotted with '
                'blue star clusters and red nebulae, are recorded in this '
                'stunning telescopic image which combines data from orbiting '
                'Hubble with ground-based images from Subaru and Mayall. In '
                'only about 5 billion years, the Andromeda galaxy may be even '
                'eas

And in the response without collapsing, we confirm that the first hits from 2024 has a `_score` of `7.789091`, which is the same as the one in the response with collapsing.

In [19]:
for hit in response_no_collapsing["hits"]["hits"]:
    year = hit["_source"]["year"]
    if year == 2024:
        score = hit["_score"]
        print(f"Score {score}:")
        pprint(hit["_source"])
        print("-" * 50)

Score 7.789091:
{'authors': 'Subaru, Hubble, Mayall, R. Gendler, R. Croman\n',
 'date': '2024-09-08',
 'explanation': 'Explanation: The most distant object easily visible to the '
                'unaided eye is M31, the great Andromeda Galaxy. Even at some '
                'two and a half million light-years distant, this immense '
                'spiral galaxy -- spanning over 200,000 light years -- is '
                'visible, although as a faint, nebulous cloud in the '
                'constellation Andromeda. A bright yellow nucleus, dark '
                'winding dust lanes, and expansive spiral arms dotted with '
                'blue star clusters and red nebulae, are recorded in this '
                'stunning telescopic image which combines data from orbiting '
                'Hubble with ground-based images from Subaru and Mayall. In '
                'only about 5 billion years, the Andromeda galaxy may be even '
                'easier to see -- as it will likely s

## Expand collapsed results

Expanding collapsed results allows you to retrieve more than one document per year that matches the query. Control how documents are sorted within each collapsed group and more.

In [23]:
response_collapsing = es.search(
    index="apod",
    body={
        "query": {"match": {"title": "Andromeda galaxy"}},
        "collapse": {
            "field": "year",
            "inner_hits": {
                "name": "most_recent",
                "size": 3,  # Number of documents to return per collapsed group
            },
        },
        "size": 10_000,
    },
)
total_hits = response_collapsing["hits"]["total"]["value"]
print(f"Total hits before collapsing: {total_hits}")
total_returned_hits = len(response_collapsing["hits"]["hits"])
print(f"Total returned hits after collapsing: {total_returned_hits}")
inner_hits = response_collapsing["hits"]["hits"][0]["inner_hits"]["most_recent"]
total_returned_hits_after_expanding = len(inner_hits["hits"]["hits"])
print(f"Total returned hits after expanding: {total_returned_hits_after_expanding}")

Total hits before collapsing: 270
Total returned hits after collapsing: 10
Total returned hits after expanding: 3


After expanding the collapsed results, we can see that we have more than one document per year that matches the query.

In [24]:
print("Hits per year count:")
pprint(get_hits_per_year(inner_hits))

Hits per year count:
{2024: 3}


The documents are sorted by `_score` within each collapsed group. They also match the scores in the response without collapsing.

In [25]:
for hit in inner_hits["hits"]["hits"]:
    score = hit["_score"]
    print(f"Score: {score}")

Score: 7.789091
Score: 3.0710938
Score: 2.792822


## Collapsing with search_after

When collapsing on a field with a lot of unique values, you can use the `search_after` parameter to paginate through the results. This is useful when you want to retrieve all collapsed results without missing any.

> Note: You can't use the `scroll` API with collapsing. Use `search_after` instead.

In [26]:
documents = []
number_of_unique_user_ids = 20_000
for user_id in range(number_of_unique_user_ids):
    for i in range(2):
        documents.append(
            {
                "user_id": user_id,
                "title": f"Document {i} for user {user_id}",
                "content": f"This is the content of document {i} for user {user_id}.",
            }
        )

es.indices.delete(index="my_index", ignore_unavailable=True)
es.indices.create(index="my_index")

operations = []
for document in tqdm(documents, total=len(documents), desc="Indexing documents"):
    operations.append({"index": {"_index": "my_index"}})
    operations.append(document)

response = es.bulk(operations=operations)
response["errors"]

Indexing documents: 100%|██████████| 40000/40000 [00:00<00:00, 1809508.07it/s]


False

We indexed 40000 documents, now we are ready to use `search_after` to paginate through the collapsed results. Since we have 2 documents per user, we can expect to have 20000 collapsed results.

In [28]:
document_count = es.count(index="my_index")
print(f"Total documents indexed: {document_count['count']}")

Total documents indexed: 40000


And we can see that the last user ID in the collapsed results is `19999` and the number of collapsed hits is `20000`, which is what we expected.

In [29]:
collapsed_hits = []
search_after = None

while True:
    body = {
        "query": {"match": {"content": "document"}},
        "collapse": {"field": "user_id"},
        "sort": ["user_id"],
        "size": 10_000,
    }

    if search_after is not None:
        body["search_after"] = [search_after]

    response_collapsing = es.search(index="my_index", body=body)
    hits = response_collapsing["hits"]["hits"]

    if not hits:
        break

    search_after = hits[-1]["_source"]["user_id"]
    print(f"Last user ID: {search_after}")

    collapsed_hits.extend(hits)

print(f"Total collapsed hits: {len(collapsed_hits)}")

Last user ID: 9999
Last user ID: 19999
Total collapsed hits: 20000
