# The definitive guide
## 02 - Data

In [4]:
from pprint import pprint as pp
import elasticsearch as es
e = es.Elasticsearch([{ 'host': 'localhost', 'port': 9200 }])
e.ping()

True

In [7]:
idx = { 'index': 'blog', 'doc_type': 'nope' }

def proxy(fn, *args, **kwargs):
    return fn(*args, **{**idx, **kwargs})

if not e.indices.exists(index=idx['index']):
    ret = e.indices.create(index=idx['index'])
    print('creating index: ', ret)

# test proxy
pp(proxy(e.search))

creating index:  {'acknowledged': True, 'shards_acknowledged': True, 'index': 'blog'}
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 5, 'total': 5},
 'hits': {'hits': [], 'max_score': None, 'total': 0},
 'timed_out': False,
 'took': 1}


In [38]:
# purge!
e.indices.delete(index=idx['index'])

{'acknowledged': True}

In [39]:
entries = [{
    'title': 'The first blog post',
    'text': 'Just trying this out...',
    'date': '2018/27/04',
}, {
    'title': 'The second blog post',
    'text': 'Still trying this out...',
    'date': '2018/05/03',
}]

for i, entry in enumerate(entries, start=1):
    if not proxy(e.exists, id=i):
        print('creating document', i)
        proxy(e.create, id=i, body=entry)

# sometimes not indexed, run this cell again...
pp(proxy(e.search))

creating document 1
creating document 2
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 5, 'total': 5},
 'hits': {'hits': [], 'max_score': None, 'total': 0},
 'timed_out': False,
 'took': 0}


## Retrieve subsets of document

In [20]:
print('only metadata')
pp(proxy(e.get, id=1, _source=False))

print('\nonly title')
pp(proxy(e.get, id=1, _source=['title'])['_source'])

only metadata
{'_id': '1', '_index': 'blog', '_type': 'nope', '_version': 1, 'found': True}

only title
{'title': 'The first blog post'}


## Updating a document

In [44]:
ID = 1

res = proxy(e.get, id=ID)
print('document version: ', res['_version'])
print('re-creating document')

proxy(e.delete, id=ID)
assert not proxy(e.exists, id=ID)

proxy(e.create, id=ID, body=res['_source'])
res = proxy(e.get, id=ID)
print('document version', res['_version'])

print('changing document text')
doc = res['_source']
doc['text'] = 'This is now another text'
    
print('re-indexing document')
proxy(e.index, id=ID, body=doc)
    
res = proxy(e.get, id=ID)
print('new document version', res['_version'])
print('new document text', res['_source']['text'])

document version:  9
re-creating document
document version 11
changing document text
re-indexing document
new document version 12
new document text This is now another text


deleted documents do not get their version number re-setted.

## Concurrency control

In [50]:
import os
import multiprocessing as mp

### Experiment 1 - Counting from multiple processes

Two processes re-index the same document while stealing increments from each other.
The expected result would be a count of 200, as two processes increment 100 times each.

In [59]:
_conc_ctx = {'id': 1, 'index': 'conc', 'doc_type': 'nop'}
e.index(body={'count': 0}, **_conc_ctx)
print('indexed count document')

def loop():
    # the elasticsearch client is not thread-safe
    cli = es.Elasticsearch([{ 'host': 'localhost', 'port': 9200 }])
    
    pid = os.getpid()
    print(pid, 'starting')
    
    skipped = 0
    for i in range(100):
        res = cli.get(**_conc_ctx)
        if not '_source' in res:
            skipped += 1
            continue
        
        count = res['_source']['count']
        cli.index(body={'count': count + 1}, **_conc_ctx)
        
    print(pid, 'finished, skipped', skipped)

p1 = mp.Process(target=loop)
p2 = mp.Process(target=loop)

p1.start()
p2.start()

p1.join()
p2.join()

print('final count:', e.get(**_conc_ctx)['_source']['count'])
    

indexed count document
29463 starting
29464 starting
29463 finished, skipped 0
29464 finished, skipped 0
final count: 102


### Experiment 2 - Resolving conflicts by _version

Everytime a version conflict occurs, the document is re-read and the increment is attempted again. Note that it fails around every second time.

In [69]:
e.index(body={'count': 0}, **_conc_ctx)
print('indexed count document')


def loop():
    # the elasticsearch client is not thread-safe
    cli = es.Elasticsearch([{ 'host': 'localhost', 'port': 9200 }])
    
    pid = '[{}]'.format(os.getpid())
    print(pid, 'starting')
    
    retries = 0
    for i in range(100):
        worked = False
        while not worked:
            res = cli.get(**_conc_ctx)     
            version = res['_version']
            count = res['_source']['count']
            
            try:
                cli.index(body={'count': count + 1}, version=version, **_conc_ctx);
                worked = True
            except es.exceptions.ConflictError:
                retries += 1
        
    print(pid, 'finished, retries:', retries)

p1 = mp.Process(target=loop)
p2 = mp.Process(target=loop)

print('initial count', e.get(**_conc_ctx)['_source']['count'])

p1.start()
p2.start()

p1.join()
p2.join()

print('final count:', e.get(**_conc_ctx)['_source']['count'])

indexed count document
initial count 0
[30792] starting
[30793] starting


PUT http://localhost:9200/conc/nop/1?version=2960 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2962 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2964 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2966 [status:409 request:0.001s]
PUT http://localhost:9200/conc/nop/1?version=2968 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2970 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2972 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2974 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2976 [status:409 request:0.001s]
PUT http://localhost:9200/conc/nop/1?version=2979 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2981 [status:409 request:0.002s]
PUT http://localhost:9200/conc/nop/1?version=2983 [status:409 request:0.001s]
PUT http://localhost:9200/conc/nop/1?version=2985 [status:409 re

[30792] finished, retries: 47
[30793] finished, retries: 50
final count: 200


## Partial Document Updates

### Using a doc body

In [78]:
# adding "tags" and "views" to blog posts
print('updating id=1 document')
pp(proxy(e.update, id=1, body={'doc': {'tags': ['test'], 'views': 0}}))

print('new document')
pp(proxy(e.get, id=1)['_source'])


updating id=1 document
{'_id': '1',
 '_index': 'blog',
 '_primary_term': 1,
 '_seq_no': 14,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'nope',
 '_version': 15,
 'result': 'updated'}
new document
{'date': '2018/27/04',
 'tags': ['test'],
 'text': 'This is now another text',
 'title': 'The first blog post',
 'views': 0}


### Using scripts

In [88]:
print('incrementing views on id=1')
body = {'script': 'ctx._source.views += 1'}
pp(proxy(e.update, id=1, body=body, retry_on_conflict=1))

print('new document state')
pp(proxy(e.get, id=1)['_source'])

incrementing views on id=1
{'_id': '1',
 '_index': 'blog',
 '_primary_term': 1,
 '_seq_no': 18,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'nope',
 '_version': 19,
 'result': 'updated'}
new document state
{'date': '2018/27/04',
 'tags': ['test'],
 'text': 'This is now another text',
 'title': 'The first blog post',
 'views': 4}


### Set default values using upsert

In [94]:
pp(proxy(e.get, id=2))

body = {
    'script': 'ctx._source.views += 1',
    'params': {
        'views': 10000,}}

print('incrementing views on id=2')
pp(proxy(e.update, id=2, body=body, retry_on_conflict=1))

print('new document state')
pp(proxy(e.get, id=2)['_source'])

POST http://localhost:9200/blog/nope/2/_update?retry_on_conflict=1 [status:400 request:0.002s]


{'_id': '2',
 '_index': 'blog',
 '_source': {'date': '2018/05/03',
             'text': 'Still trying this out...',
             'title': 'The second blog post'},
 '_type': 'nope',
 '_version': 1,
 'found': True}
incrementing views on id=2


RequestError: TransportError(400, 'illegal_argument_exception', '[DbF5o3U][172.17.0.2:9300][indices:data/write/update[s]]')

## Obtain multiple documents at once

In [95]:
pp(proxy(e.mget, body={"ids": [1, 2]}))

{'docs': [{'_id': '1',
           '_index': 'blog',
           '_source': {'date': '2018/27/04',
                       'tags': ['test'],
                       'text': 'This is now another text',
                       'title': 'The first blog post',
                       'views': 4},
           '_type': 'nope',
           '_version': 19,
           'found': True},
          {'_id': '2',
           '_index': 'blog',
           '_source': {'date': '2018/05/03',
                       'text': 'Still trying this out...',
                       'title': 'The second blog post'},
           '_type': 'nope',
           '_version': 1,
           'found': True}]}


## Bulk Operations

In [110]:
bulk = """{{"delete": {{"_index": "{index}", "_type": "{doc_type}", "_id": "2"}}}}
{{"create": {{"_index": "{index}", "_type": "{doc_type}", "_id": "2"}}}}
{{"title": "The first bulk blog post"}}
{{"update": {{"_index": "{index}", "_type": "{doc_type}", "_id": "2"}}}}
{{"doc": {{"text": "Aleph! Pape satan, pape satan!"}}}}
{{"index": {{"_index": "{index}", "_type": "{doc_type}"}}}}
{{"title": "The second bulk blog post", "text": "should have an autogenerated ID"}}
""".format(**idx)

pp(bulk)

print('execute')
pp(proxy(e.bulk, body=bulk))

('{"delete": {"_index": "blog", "_type": "nope", "_id": "2"}}\n'
 '{"create": {"_index": "blog", "_type": "nope", "_id": "2"}}\n'
 '{"title": "The first bulk blog post"}\n'
 '{"update": {"_index": "blog", "_type": "nope", "_id": "2"}}\n'
 '{"doc": {"text": "Aleph! Pape satan, pape satan!"}}\n'
 '{"index": {"_index": "blog", "_type": "nope"}}\n'
 '{"title": "The second bulk blog post", "text": "should have an autogenerated '
 'ID"}\n')
execute
{'errors': False,
 'items': [{'delete': {'_id': '2',
                       '_index': 'blog',
                       '_primary_term': 1,
                       '_seq_no': 4,
                       '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                       '_type': 'nope',
                       '_version': 5,
                       'result': 'deleted',
                       'status': 200}},
           {'create': {'_id': '2',
                       '_index': 'blog',
                       '_primary_term': 1,
                    

In [111]:
pp(proxy(e.search))

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 5, 'total': 5},
 'hits': {'hits': [{'_id': 'o9N1T2MBiyfvdTN_bC1Z',
                    '_index': 'blog',
                    '_score': 1.0,
                    '_source': {'text': 'should have an autogenerated ID',
                                'title': 'The second bulk blog post'},
                    '_type': 'nope'},
                   {'_id': '2',
                    '_index': 'blog',
                    '_score': 1.0,
                    '_source': {'text': 'Aleph! Pape satan, pape satan!',
                                'title': 'The first bulk blog post'},
                    '_type': 'nope'},
                   {'_id': '1',
                    '_index': 'blog',
                    '_score': 1.0,
                    '_source': {'date': '2018/27/04',
                                'tags': ['test'],
                                'text': 'This is now another text',
                                'title': 'The first blog p