In [1]:
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch("localhost:9200")

## Создадим индекс в Elasticsearch 

In [2]:
%%time

INDEX_NAME = "item_details_full"
request_body = {
    "settings" : {
        "number_of_shards": 4,
        "number_of_replicas": 1
    }
}
print("creating index...")
es.indices.create(index = INDEX_NAME, body = request_body)

creating index...
CPU times: user 8.71 ms, sys: 716 µs, total: 9.43 ms
Wall time: 158 ms


## Проверим содержимое файла с данными

In [3]:
FILE_NAME = '/home/de_makhover_anton/lab1/downloads/item_details_full.json'

lines = []
N = 1000
with open(FILE_NAME) as f:
    lines.extend(f.readline() for i in range(N))
print(lines[:1])

['{"attr0":"<B> Прижизненное издание.</B><BR> С.-Петербург, 1900 год. Издание А.Ф.Девриена. <BR> С 23 рисунками. <BR> Типографский переплет. Сохранность хорошая. <BR> Наиболее известное произведение американской писательницы Ф.Бернетт для детей - \\"Маленький лорд Фаунтлерой\\" (\\"Little Lord Fauntleroy\\") появилось в 1886 и имело огромный успех. Книга переведена на множество языков (в русском переводе имеется несколько вариантов названия, в т.ч. \\"История маленького лорда\\" и т.п.). <BR> К маленькому лорду Фаунтлерою жизнь не была сурова, но на его долю выпало серьезное испытание - испытание богатством, властью. Маленький лорд сумел сохранить доброту и благородство в самых тяжелых условиях.  ","attr1":"История маленького лорда  ","itemid":"12808031","attr32":"Русский  ","attr18":"Издание А. Ф. Девриена   ","attr2":"Ф. Бернет  ","attr3":"Фрэнсис Элиза Ходгстон   Бернетт   ","parent_id":"17969540"}\n']


## Запишем данные построчно в созданный ранее индекс

In [4]:
%%time
import json

N = 1000

with open(FILE_NAME) as f:
    for i in range(N):
        line = json.loads(f.readline())
        doc = {
            "title": line.get("attr1", ""), 
            "summary_processed": line.get("attr0", ""), 
            "timestamp": datetime.now()
        }
        es.index(index=INDEX_NAME, doc_type='items', body=doc)
        if i % 100 == 0:
            print("%s rows inserted" % i)

res = es.search(index=INDEX_NAME, body={"query": {"match_all": {}}}) 
print("index rows: %s" % res['hits']['total'])

0 rows inserted
100 rows inserted
200 rows inserted
300 rows inserted
400 rows inserted
500 rows inserted
600 rows inserted
700 rows inserted
800 rows inserted
900 rows inserted
index rows: 962
CPU times: user 1.19 s, sys: 114 ms, total: 1.3 s
Wall time: 15.1 s


## Воспользуемся массовой вставкой данных 
[Bulk insert documentation](https://elasticsearch-py.readthedocs.io/en/master/helpers.html#bulk-helpers)

In [5]:
%%time

N = 100002

def gendata(file_name, N=10000000000000):
    with open(file_name) as f:
        for i in range(N):
            line = json.loads(f.readline())
            yield {
                "_index": INDEX_NAME,
                "_type": "items",
                "_id": i,
                "_source":  {
                    "title": line.get("attr1", ""), 
                    "summary_processed": line.get("attr0", ""), 
                    "timestamp": datetime.now()
                }
            }
        
success, _ = helpers.bulk(es, gendata(FILE_NAME, N), chunk_size=5000)
print(success)

100002
CPU times: user 6.89 s, sys: 494 ms, total: 7.39 s
Wall time: 15 s


## Попробуем распараллелить

In [6]:
%%time

N = 100001

for success, info in helpers.parallel_bulk(es, gendata(FILE_NAME, N), thread_count=4, chunk_size=5000):
    if not success: print('loading failed', info)

CPU times: user 8.8 s, sys: 787 ms, total: 9.59 s
Wall time: 10.3 s


In [7]:
res = es.search(index=INDEX_NAME, body={"query": {"match_all": {}}}) 
print("index rows: %s" % res['hits']['total'])

index rows: 101002


## Последний метод наиоблее подошёл для загрузки данных

In [None]:
for success, info in helpers.parallel_bulk(es, gendata(FILE_NAME), thread_count=4, chunk_size=5000):
    if not success: print('loading failed', info)