# This code Explains how to connect to elastic search and index in python

** Environment **

Language: Pyhton 3.5

Elasticsearch: version 2.x

Note: 
    - To run the script, elasticsearch(es) should be running in the backend
    - To check es status go to http://localhost:9200/ (if default settings)
    
Current elasticsearch settings:
- Host: localhost
- port: 9200
- Index: pdf_indexing_docwise_temp
- type: documents

### Connecting to ES

In [3]:
#Import required libraries
from elasticsearch import Elasticsearch

from elasticsearch import helpers

#set variables
ES_HOST = {"host" : "localhost", "port" : 9200}
INDEX_NAME = 'my_index'
TYPE_NAME = 'documents'

#create connection
es = Elasticsearch(hosts = [ES_HOST])

### Create new index

This consist of two important steps,
- Defining index setting
- Defining indexing mapping

Note: If settings and mapping not defined, default one's will be created based on input data. Which is not advisable.

In [6]:
#Check if index already exist. if yes, delete and then create new index

if es.indices.exists(INDEX_NAME):
    print("Deleting '%s' index..." % (INDEX_NAME))
    res = es.indices.delete(index = INDEX_NAME)
    print("response: '%s'" % (res))
    
# Define settings 
# Explore more on setting in official ES site

request_body = {
        "settings" : {
                "number_of_shards": 1,
                "number_of_replicas": 0
                }
        }

#Define mapping
# Explore more on mapping in official ES site
# Each elements in mapping is called fields

mapping = {
        "documents": {
          "properties": {
            "content": {"type": "string"},
            "title" : {"type": "string"},
            "date" : {"type": "string"},
            }
          }
        }

# create indexing

print("creating '%s' index..." % (INDEX_NAME))
res = es.indices.create(index = INDEX_NAME, body = request_body)
print(" response: '%s'" % (res))
        
# put mapping to created index

print("Mapping.......")
res = es.indices.put_mapping(
    index = INDEX_NAME,
    doc_type = TYPE_NAME,
    body = mapping)

Deleting 'pdf_indexing_docwise_temp' index...
response: '{'acknowledged': True}'
creating 'pdf_indexing_docwise_temp' index...
 response: '{'acknowledged': True}'
Mapping.......


### create sample document for indexing  in required format

Note: 
- Here manually creating document to show how data need to be formated according to mapping for proper indexing 

Some important points:
- If mapping is not defined for a fields, defaults field derived based on first input
- While indexing it is not necessary that data should contain all the fields. If not given, field will be updated with empyt/'na'.

In [7]:
data = {"content": "blah blah blah..",
       "title":"tile_name",
       "data":"13/12/2016"
       }

# Try to link mapping defined and document above

###  Push documents to index

Note: It is always good to have manually defined ids for each document/data point. If not ES will create its own ids 

In [8]:
doc_id = "0"
res = es.index(index = INDEX_NAME, body = data ,doc_type = TYPE_NAME ,id = doc_id, refresh = True)

print(" response: '%s'" % (res))

 response: '{'_version': 1, 'created': True, '_index': 'pdf_indexing_docwise_temp', '_id': '0', '_shards': {'failed': 0, 'total': 1, 'successful': 1}, '_type': 'documents'}'


### Update index

Code to update existing document

Note:
- While updating a document, document need to be appended with "doc" as shown below

In [9]:
# document content need to be updated

#data with updating exsisting content
data = {"doc":
        {"content": "blah blah blah blah....",
         "date": "14/12/2016"
        }
       }

# data with updating and adding new field
# Note: As mensioned earlier, for new field, mapping is derived by default. In this case, "string"
data2 = {"doc":
         {"new_field": "new feild content"}
        }
         
# update data

doc_id = "0"
res = es.update(index = INDEX_NAME, body = data ,doc_type = TYPE_NAME ,id = doc_id, refresh = True)
print(" response: '%s'" % (res))

# update data2

res = es.update(index = INDEX_NAME, body = data2 ,doc_type = TYPE_NAME ,id = doc_id, refresh = True)
print(" response: '%s'" % (res))

 response: '{'_index': 'pdf_indexing_docwise_temp', '_id': '0', '_version': 2, '_type': 'documents', '_shards': {'failed': 0, 'total': 1, 'successful': 1}}'
 response: '{'_index': 'pdf_indexing_docwise_temp', '_id': '0', '_version': 3, '_type': 'documents', '_shards': {'failed': 0, 'total': 1, 'successful': 1}}'


### Sample query

Simple query to select all the results and subset of fields in the data. Please refer official documentation of complete understanding for queries.

In [10]:
# query to select all the entries and only title and content fields
my_query = """{
  "fields": [
     "title","content"
  ], 
  "query": {"match_all":{}}
  }
  """

### Search in a index

Given an query, search through the index

In [16]:
res = es.search(index=INDEX_NAME, doc_type=TYPE_NAME, body=my_query)
print("%d documents found" % res['hits']['total'])

# print the content
for doc in res['hits']['hits']:
    print("%s : %s" % (doc['_id'], doc['fields']['title']))

1 documents found
0 : ['tile_name']


### Scrolling through the index to get complete content

This is very useful when complete data needed to analysis and transform for selected/all fields.

Below is a scroll function which preforms n_gram tokenization scrolling through complete index data

In [None]:
def es_scroll():

    # Function scroll through all the documents in current index and returns content

    scroll = helpers.scan(
            es,
            index=INDEX_NAME,
            doc_type=TYPE_NAME,
            size = 1000,
            scroll = '5m', 
            query={
            "fields": ["title","content"],
            "query": {"match_all":{}}
            }
        )
    
    
    top_keywords = {}
    count = 0
    for doc in scroll:
        count += 1
        print(count)
        
    return count

### Bulk indexing 

If indexing many documnets together, always use bulk indexing. It will reduce time significantly

In [4]:
from elasticsearch import helpers

actions = []

for text in [{"content":"dac"},{"content":"dsafa"},{"content":"adsf"}]:
    doc_id = text["id"]
    action = {
        "_index": INDEX_NAME,
        "_type": TYPE_NAME,
        "_id": doc_id,
        "_source": text
        }

    actions.append(action)

print(time.time() - start)
start = time.time()

if len(actions) > 0:
    helpers.bulk(es, actions)

indexed 5 docs


### Bulk indexing for steaming data

There are two functions, streaming_bulk and parallel_bulk for indexing streaming data

Below code only indicate how to use above api's

In [None]:
def generate_actions():
    for text in wr.records(min_len=100,limit=5000):
        doc_id = text["page_id"]
        yield {
            "_index": INDEX_NAME,
            "_type": TYPE_NAME,
            "_id": doc_id,
            "_source": text
            }

for success, info in helpers.parallel_bulk(es, generate_actions(),thread_count=4):
    if not success: print('Doc failed', info)