In [None]:
# This script is designed to do somee simple tests for Enterprise Search using Elastic Cloud
# It will use the Elastic Cloud CLI. Please asetup as per the docs
# https://www.elastic.co/guide/en/ecctl/current/index.html
# on Mac use
# brew tap elastic/tap
# brew install elastic/tap/ecctl
# ecctl init 
# Need to add the App Search specific python client
# python -m pip install elastic-app-search


In [None]:

engine_name = "sizing"

doc_size = 200
batch_size = 100
number_of_batches = 1000

cluster_info_path= "./cluster-info.json"


In [None]:
import os
import json
import time
import requests
import subprocess
from requests.auth import HTTPBasicAuth
from urllib.parse import urljoin
from elastic_app_search import Client

from elasticsearch import Elasticsearch

from nltk import download as nltk_download
from nltk.corpus import words
from random import sample
nltk_download('words')

In [None]:
# First, lets setup the env and do a smoke test

In [None]:
# So there is currently a bug in ecctl which should be fixed soon.
# until then please run the command directly in the terminal to create the file.
# when the issue is fixed we can reuse this code
#
# ecctl deployment create -f ./deployment-config.json > cluster-info.json

#print("Checking if the cluster is already created")
#if not(os.path.exists(cluster_info_path) and os.path.getsize(cluster_info_path)>0):
#    print("No cluster info file so lets create a new cluster in cloud")
#    info_file=open(cluster_info_path,"w+")
#    out = subprocess.run(args=['ecctl', 'deployment', 'create', '-f ./deployment-config.json'], 
#           stdout=info_file, 
#           stderr=subprocess.STDOUT)
#    stdout,stderr = out.communicate()
#    print(stdout,stderr)
        

In [None]:

print("Reading cluster info file: ",cluster_info_path)
with open(cluster_info_path) as json_file:
    cluster_info = json.load(json_file)
    deployment_name = cluster_info['name']
    #print("Cluster name: ",deployment_name)
    deployment_id = cluster_info['id']
    #print("Cluster ID: ",deployment_id)
    cloud_id = cluster_info['resources'][0]['cloud_id']
    #print("CloudID: ",cloud_id)
    elastic_password = cluster_info['resources'][0]['credentials']['password']
    #print("elastiic password: ",elastic_password)
    cluster_exists = True

In [None]:
# To get the endpoints for App Search, Elasticsearch etc we need to run a second command
# This will get all the details of the cluster and return json
out = subprocess.run(['ecctl', 'deployment', 'show', deployment_id], 
           stdout=subprocess.PIPE, 
           stderr=subprocess.STDOUT)
cluster_details = json.loads(out.stdout)

In [None]:
# Now we can extract any details we need
as_endpoint = cluster_details['resources']['enterprise_search'][0]['info']['metadata']['endpoint']
#print('App Search endpoint: ',as_endpoint)
es_endpoint = cluster_details['resources']['elasticsearch'][0]['info']['metadata']['endpoint']
#print('Elasticsearch endpoint: ',es_endpoint)



In [None]:
# The App Search Client needs an API key so we will need to call the API directly using basic auth to generate a key

# The only way to get this atm is through an undocumented API
create_key_url = urljoin("https://"+as_endpoint, '/as/credentials/collection?page[current]=1')
#print(create_key_url)
response =requests.get(create_key_url, auth=('elastic', elastic_password))
#print(response)
api_key_info = response.json()
for key in api_key_info['results']:
    if key['type'] == "private":
        private_key = key['key']
        #print("Private key: ",private_key)
    elif key['type'] == "search":
        search_key = key['key']
        #print("Search key: ",search_key)
#as_client = Client(

In [None]:
# The App Search client need s base endpoint which included the API path 
base_endpoint = as_endpoint+'/api/as/v1'
as_client = Client(
    base_endpoint=base_endpoint,
    api_key=private_key
)

In [None]:
as_client.create_engine(engine_name, 'en')

In [None]:
# Lets do a simple ingest and query before we delete the engine as a smoke test

documents = [
    {
      'id': 'INscMGmhmX4',
      'url': 'https://www.youtube.com/watch?v=INscMGmhmX4',
      'title': 'The Original Grumpy Cat',
      'body': 'A wonderful video of a magnificent cat.'
    },
    {
      'id': 'JNDFojsd02',
      'url': 'https://www.youtube.com/watch?v=dQw4w9WgXcQ',
      'title': 'Another Grumpy Cat',
      'body': 'A great video of another cool cat.'
    }
]

as_client.index_documents(engine_name, documents)

In [None]:
# Lets search for something
as_client.search(engine_name, 'grumpy cat', {})

In [None]:
# Now let's destroy the engine
as_client.destroy_engine(engine_name)

In [None]:
# SO for our experiment, let's create a document generator to be able to load the engine

def generate_doc():
    """Generates a sinle example text document for indexing"""
    # The current document will be simple
    # 3 word title
    # 2 word category
    # body length determined by the parameter at the top of the file
    title = ' '.join(sample(words.words(), 3))
    category = ' '.join(sample(words.words(), 2))
    body = ' '.join(sample(words.words(), doc_size))
    doc={"title":title,"category":category,"body":body}
    # Lets calculate a rough number of bytes for the text
    # we should just be able to use _size in elasticsearch but this is abackup and will be very close
    str_size=len(json.dumps(doc).encode('utf-8'))
    doc['str_size']=str_size
    return doc
                    

In [None]:
# Lets create an engine to index in to
as_client.create_engine(engine_name, 'en')



In [None]:
# Lets also initialise the connection to Elasticsearch directly
client = Elasticsearch()
es_client = Elasticsearch(
    cloud_id=cloud_id,
    http_auth=("elastic", elastic_password),
)

# We need to create the index with the mapper size plugin
# and it turned on to map the size of the docs

# We can modify this mapping later to see the affect of different analysers, mappings etc
index_body="""{
  "mappings": {
    "_size": {
      "enabled": true
    }
  }
}"""

es_client.indices.create(engine_name,body=index_body)



In [None]:

def index_batch(batch_size):
    """
    Generates a batch of documents 
    and then indexes them in to App Search and Elasticsearch directly
    """
    # Generate docs
    documents=[]
    for i in range(batch_size):
        doc=generate_doc()
        documents.append(doc)
    
    # Index in to AS and then check the responses
    results = as_client.index_documents(engine_name, documents)
    for doc_result in results:
        if len(doc_result['errors'])>0:
            raise Exception("Got errors back from App search: "+" ".join(doc_result['errors']))
            
    # Index in to Elasticsearch
    # Lest's build a _bulk body
    opperation_json= json.dumps({ "index":{} })+"\n"
    bulk_json = ""
    for doc in documents:
        bulk_json = bulk_json+opperation_json
        bulk_json = bulk_json+ json.dumps(doc)+"\n"
    bulk_response = es_client.bulk(body=bulk_json,index=engine_name)


In [None]:
def get_es_stats():
    """Gather stats on the size of the source and the indexes so that we can track"""
    
    stats={}
    
    # We can run an aggregation to sum the _size field accross the docs 
    # This should give us the aprox size of the source data
    size_agg_body = """{
  "size": 0, 
  "aggs": {
    "total_size": {
      "sum": {
        "field": "_size"
      }
    },
    "total_str_size":{
      "sum": {
        "field": "str_size"
      }
    }
  }
}"""
    # We can aggregate the size of the doc direct as measured at ingest
    agg_response=es_client.search(index=engine_name, body=size_agg_body)
    stats['total_size'] = agg_response['aggregations']['total_size']['value']
    stats['total_str_size'] = agg_response['aggregations']['total_str_size']['value']
    
    # We can loop through the indexes and record their size but we probably also want a total for Ent Search
    # It would be good in the future to also split Ent Search in to logs/analytics and everything else
    for index_name in [engine_name,'.ent*']:
        cat_response = es_client.cat.indices(index=index_name,format="json",bytes='b')
        if index_name == '.ent*':
            stats['total_ent_search_primary_size']=0
            stats['total_ent_search_store_size']=0
            stats['total_ent_search_doc_count']=0
        for index in cat_response:
            name= index['index'].strip('.')
            stats[name+'_primary_size']=int(index['pri.store.size'])
            stats[name+'_store_size']=int(index['store.size'])
            stats[name+'_docs_count']=int(index['docs.count'])
            if index_name == '.ent*':
                stats['total_ent_search_primary_size']+=int(index['pri.store.size'])
                stats['total_ent_search_store_size']+=int(index['store.size'])
                stats['total_ent_search_doc_count']+=int(index['docs.count'])
        
    return stats

    


In [None]:
# Let's put it all together
# Run one batch as a time, index in to both 
# Get stats after each batch and then index those stats back in to Elasticsearch
for batch in range(number_of_batches):
    index_batch(batch_size)
    es_client.indices.refresh(index="*")
    es_client.indices.flush(index="*")
    stats = get_es_stats()
    #print(stats)
    # Lets index the stats in to Elasticsearch as a new index
    es_client.index(index='stats',body=json.dumps(stats))

In [None]:
# If we want to reset things let's delete what we have created
# Now let's destroy the engine
#try: 
#    as_client.destroy_engine(engine_name)
#except:
#    pass
        
#es_client.indices.delete(index=engine_name)
#es_client.indices.delete(index='stats')