This repo is created for my blog article. I'm also posting that article here.
New to Elasticsearch, I have spend a significant amount of time to figure out how to iterate through a large data set. In the beginning, it looked strange to me that Elasticsearch does not return a cursor when one makes a query. Instead, it only returns a page of the full data set (with page size default to 10). To a person more familiar with python's SQL and MongoDB drivers and less with Elasticsearch's driver, I felt at loss when a query did not return me a cursor. Although I heard that the Elasticsearch helpers might be designed for this purpose, I didn't came to it directly. Instead, my attention was attracted by the "from" and "size" parameters in the query body.
"Hmm. It seems I can retrieve the query results in batches. With some knowledge of Python's iterator class protocol, I should be able to wrap the returned data into a cursor myself!"
And I wrote one. Although in the end it didn't work well for large data sets, I think it's worth writing it down. It will benefit readers who:
- want to see a real-world example of Python's iterator class protocol,
- want to know the limitation of Elasticsearch's "from" and "size" parameters, or
- simply want to know the right way to scan iterate through query results (jump to the last section for the scroll API, and in particular, the last code block for Python's
elasticsearch.helpers.scan
function).
I will first give a brief introduction on how to make a Python iterator class. Then we will go ahead and write the ESCursor
class and give it some test. Then I'll show it crashes on a large dataset. In the end, I'll discuss the scroll API and the Python helper called scan
.
Let's get started.
How can we make our own class object an iterator? If the class support iteration, one should be able to write the following code
foo = Foo() # a class supporting iteration.
for x in foo:
process(x) # process x
Indeed, the cursors given by a SQL query or a MongoDB query is nothing but such an object.
A good short discussion about the iterator class interface can be found here. To be brief, any Python class which has the methods __iter__
and __next__
implemented is an iterator class. The first method is used to initialize the iterator, and the second to retrieve the next item. If the iteration should terminate at some point, the __next__
method should throw a StopIteration
exception. Therefore, what a for loop does is basically calling the __iter__
to initialize it, then calling the __next__
function to obtain the next item. When it catches the StopIteration
exception, it breaks the loop.
In other words, the following two pieces of codes are equivalent.
for - loop version
foo = Foo()
for x in foo:
process(x) # do something about x
generic version using while loop
foo = Foo()
iter(foo) # equivalent to foo.__iter__()
try:
while True:
x = next(foo) # alternatively, use x = foo.__next__()
process(x)
except StopIteration:
pass
This not only demystifies how to over iterators, but also gives some hints on how the __next__
and __iter__
function should be implemented. The __iter__
function do whatever needed for initialization, and (not demonstrated in the above codes) return the iterator object (usually self
). The __next__
method keep tract of the current state, and return the next item.
A few wrods to readers new to Elasticsearch. Elasticsearch is a very excellent database supporting searching in various ways. You can check here for a simple but interesting (especially to Star War fans) tutorial.
Elasticsearch provides a standard RESTful API interface for communicating with the database server. For example, to match all documents in an index (imagine an "index" as "database") and a doc_type (imagine it as a "table" in SQL jargon, or a "collection" in MongoDB jargon), we simply send a POST request to the url http://localhost:9200///_search
. The request body describes the query. In this "match all" scenario, it looks like the following:
# written in simplified HTTP style
POST /<index>/<doc_type>/_search
{
"query": {"match_all": {}}
}
If you have a large dataset, you'll immediately notice something strange. Look at the "hits" field in the response data. It's a dictionary of the following format:
{
"total: <total number of documents matched>,
"max_score": <maximum matching score>,
"hits": [{...}, ...] # list of all matched documents
}
Since I have a big database, the total
field give more than 20 million (since I ran a "match all" query, that basically means that there were 20 million documents in my database). However, the interior hits
field, which is supposed to contain all documents matched, does not contain 20 million documents (it would be a nightmare if it did). Instead, only 10 documents were included.
Our task now is to figure out how to retrieve all data iteratively from the Elasticsearch server.
The upsort in the last section is that instead of returning all matched documents in one query, Elasticsearch divide them into pages. As you can guess now, the default page size is 10 documents. We can easily specify which page to get and the size of each page by using "from" and "size" in the query body. See we want to keep the default 10-doc-per-page size, but we want the data on the 9th page, we do the following.
POST /<index>/<doc_type>/_search
{
"query": {"match_all": {}},
"size": 10,
"from": 81 # note the first page starts from 0 to 10.
}
With the help of size
and from
, we should be able to gradually take the documents on each page from cover to cover. If we iterate over documents, all we have to do is to take a batch, enumerate each document in that batch, and then take another batch, until we exhaust all batches.
Now let's go ahead and construct this cursor class.
The module can be found in my Github repo. The main class is the ESCursor
class as follows.
class ESCursor:
# class constant, can be used directly as ESCursor.default_query_body.
default_query_body = {'query': {"match_all": {}}}
def __init__(self, es_conn, index, doc_type="_doc", body=default_query_body,
total_number=None, batch_size=1000):
self.es_conn = es_conn
self.index = index
self.doc_type = doc_type
self.body = body
# total_number is the maximum number of docs user wants.
# If it's larger than the actual total number,
# use the actual total number instead.
actual_total_number = get_total_hits(es_conn, index, doc_type, body)
if total_number is None or total_number > actual_total_number:
total_number = actual_total_number
self.total_number = total_number
if batch_size < total_number:
self.batch_size = batch_size # parameter to "size", when fetching each batch
else:
self.batch_size = total_number
self.offset = 0 # the parameter to "from" keyword, when fetching each batch
self.inbtach_idx = 0 # index in the current batch
self.current_batch_size = 0 # the actual batch size.
# May be smaller than "batch_size".
def __iter__(self):
"""
function to initialize the iteration.
Set all parameters to its initial state, and fetch the first batch.
"""
self.offset = 0
self.inbtach_idx = 0
self.current_batch_size = 0
self._fetch_next_batch()
return self
def _fetch_next_batch(self):
"""
fetch the next page.
increment the offset, retrieve the data on the next page,
compute the batch size, and move the inbatch_idx to the beginning
of the batch.
"""
self.offset += self.current_batch_size
if self.offset >= self.total_number:
raise StopIteration
body_batch = {**self.body, "from": self.offset, "size": self.batch_size}
res_data = self.es_conn.search(index=self.index,
doc_type=self.doc_type,
body=body_batch)
self.hits = res_data["hits"]["hits"]
self.current_batch_size = len(self.hits)
self.inbtach_idx = 0
def __next__(self):
"""
Obtain the next document. If the current batch has
been exhausted, it calls the "_fetch_next_batch"
function to retrieve documents from the next apge.
If all pages have been covered or if it goes beyond
the limit set by the "total_number" parameter (see
the constructor", then raise the "StopIteration"
exception to end the iteration.
"""
if self.inbtach_idx >= self.current_batch_size:
self._fetch_next_batch()
if self.inbtach_idx == self.current_batch_size:
raise StopIteration
r = self.hits[self.inbtach_idx]
self.inbtach_idx += 1
return r
The comments in the codes are quite descriptive, hopefully. The "private" method _fetch_next_batch
takes data in the next page, by adding {'from': self.offset, 'size':self.batch_size}
to the query body.
Now the ESCursor
object will really behave like a cursor. Let's say we want to retrive the "_id" fields of the first 200 documents. We can simply do the following.
es = Elasticsearch() # elasticsearch connection to localhost:9200
body = ESCursor.default_query_body # see the class definition above
body['stored_fields'] = [] # add a field to prevent fields from returning
# we just need _id, and don't want to wait network resource
es_cursor = ESCursor(es, "bank", "customer", body=body, total_number=200, batch_size=10)
# here index="bank", doc_type="customer"
# iterating over the cursor
for id in es_cursor:
print(id)
This closely immitates the cusor interface of SQL or pymongo. Note that in the constructor, I set batch_size=10
. If you leave this parameter unspecified, it defaults to 1000. This number determines how many documents are on one page, but is not relevant to the users' viewpoint. It does affect the performance a bit, as a small batch_size
means a frequent communication to the server, increasing the networking latency, while too big a batch_size
means the server has to return a huge chunk of data within one http query. Here batch_size=10
is only set for pedagogical reason.
With the joy to sea the above codes working, I delightfully applied it to my database containing 20 million documents. I guess you can already smell the smoke of it crashing and burning.
The error message is as follows.
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) elasticsearch.exceptions.TransportError: TransportError(500, 'search_phase_execution_exception', 'Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.')
It mentioned two important things.
- from + size must be less than or equal to [index.max_result_window] default to 10000.
- The scroll API provides a more efficient way to request large data sets.
One can check the official documents for some detailed discussion on the scroll API. The upshot is, when one does a query which potentially returns a large data set, he/she should add a url parameter called scroll
and specify a lifespan. For example, in our "match all" query, we should write:
POST /<index>/<doc_type>/_search?scroll=5m
{
"query": {"match_all": {}},
"size": 100 # we don't specify "from" now
}
Here we created a server-side scrolling cursor which is to be kept alive for 5 minutes. The response to this query will contain a _scroll_id
field. One can then go the the /_search/scroll
endpoint to retrieve data back.
{
"scroll": "5m", #prolong cursor's life by another 5 mins
"scroll_id": <_scroll_id given above>
}
Not surprisingly, Python's elasticsearch
module has prepared a cursor-like generator function called scan
. Here is the shortest answer to the question in the beginning.
from elasticsearch.helpers import scan
it = scan(es, query=query_body, index=index, doc_type=doc_type)
for doc in it:
process(doc)
To retrieve "_id" only, we can wrap up this iterator as follows.
def get_id_iter(es, index, doc_type, query_body): # get the _id of each document
query_body["stored_field"] = []
it = scan(es, query=query_body, index=index, doc_type=doc_type)
for rec in it:
yield rec['_id']
Conclusion: In this article, we wrote a DIY cursor for the Elasticsearch python driver, using size
and from
in the query body. While it doesn't work well for large datasets, we demonstrated, using this example, how the database cursor objects are made of in Python, and discussed how the python iterator class works. It's always fun to build something from the beginning, even it doesn't work as well as its "official" counter-part.
Happy coding!