In [1]:
%config IPCompleter.greedy=True
import re
import json
from collections import defaultdict
from tqdm import tqdm_notebook as tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from pymystem3 import Mystem
from sklearn.feature_extraction.text import CountVectorizer
import requests
from time import time

In [2]:
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'timeout': 360, 'maxsize': 25}])

In [3]:
settings = {
    'mappings': {
        'properties': {
            'content': {
                'type': 'text'
            }
        }
    }
}

In [4]:
def recreate_index():
    es.indices.delete(index='hw2index')
    es.indices.create(index='hw2index', body=settings)

In [5]:
recreate_index()

In [6]:
def create_es_action(index, doc_id, document):
    return {
        '_index': index,
        '_id': doc_id,
        '_source': document
    }

In [7]:
class Document:
    def __init__(self, doc_url, doc_id, sz_bytes, sz_words):
        self.url = doc_url       # document url
        self.id = doc_id         # unique document id (str)
        self.sz_bytes = sz_bytes # document size in bytes before deleting html markup
        self.sz_words = sz_words # number of words in document before deleting html markup
        self.words = []          # list of words in document after deleting html markup
        self.links = []          # lisk of links in document

In [8]:
import os

In [9]:
from tqdm import tqdm
from tqdm import tqdm_notebook
import pickle

class BaseDocumentProcessor:
    def process(self, document):
        pass
    def result(self):
        pass

def process_file(d, f, processor, pbar):
    print("processing", os.path.join(d, f))
    with open(os.path.join(d, f), "rb") as fin:
        while True:
            pbar.update(1)
            try:
                document = pickle.load(fin)
            except:
                break
            processor.process(document)

def process_collection(directory, processor):
    pbar = tqdm(total = 200000)
    for file in os.listdir(directory):
        if (file.endswith(".out")):
            process_file(directory, file, processor, pbar)

In [10]:
COLLECTION_DIRECTORY = "byweb" # directory with .out files to process

class IndexDocs(BaseDocumentProcessor):
    def __init__(self):
        """ do all initialization here """
        self.actions = []
    
    def process(self, document):
        """ document: Document (see first cell)
            process each document here """
        #print(json.dumps({'content' : document.words}))
        self.actions.append(create_es_action('hw2index', document.id, json.dumps({'content' : document.words})))
        
    def result(self):
        return self.actions
        


In [11]:
def es_actions_generator():
    processor = IndexDocs()        
    process_collection(COLLECTION_DIRECTORY, processor)
    return processor.result()

In [12]:
import time

In [13]:
start = time.time()
for ok, result in tqdm_notebook(parallel_bulk(es, es_actions_generator(), queue_size=4, thread_count=4, chunk_size=1000)):
    if not ok:
        print(result)
end = time.time()
print('Time=' + str(end - start))


  0%|          | 230/200000 [00:00<03:15, 1022.83it/s]

processing byweb/byweb.5.out


 10%|█         | 20250/200000 [00:18<02:26, 1228.25it/s]

processing byweb/byweb.4.out


 20%|██        | 40147/200000 [00:34<02:00, 1330.84it/s]

processing byweb/byweb.6.out


 30%|███       | 60168/200000 [00:51<01:36, 1452.38it/s]

processing byweb/byweb.7.out


 40%|████      | 80018/200000 [01:08<02:05, 953.31it/s] 

processing byweb/byweb.3.out


 50%|█████     | 100149/200000 [01:29<01:24, 1177.95it/s]

processing byweb/byweb.2.out


 60%|██████    | 120106/200000 [01:46<00:54, 1467.51it/s]

processing byweb/byweb.0.out


 70%|███████   | 140135/200000 [02:07<01:05, 919.56it/s] 

processing byweb/byweb.1.out


 80%|████████  | 160148/200000 [02:28<00:42, 945.39it/s] 

processing byweb/byweb.9.out


 90%|█████████ | 180101/200000 [02:50<00:31, 623.70it/s] 

processing byweb/byweb.8.out


100%|█████████▉| 199983/200000 [03:12<00:00, 1479.79it/s]

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

200010it [03:30, 1479.79it/s]                            


Time=526.7886590957642


In [14]:
import requests
param = (('v', ''),) # '-v' is for --verbose

# call the class's method to get an HTTP response model
resp = requests.get('http://localhost:9200/_cat/indices', params=param)


In [15]:
resp.text

'health status index    uuid                   pri rep docs.count docs.deleted store.size pri.store.size\nyellow open   hw2index zhD-IERTSaO-K6Z-Ta7X4w   1   1     196928            0      2.9gb          2.9gb\n'