In [5]:
# !pip install elasticsearch7

# Environment

In [3]:
import requests
import json
import time
import pprint

headers = {
    'Content-Type': 'application/json'
}

HOST='localhost'
URL_PREFIX = 'http://{host}:9200'.format(host=HOST)

INDEX_NAME_CONDUCTOR_WORKFLOW="workflow"
INDEX_NAME_CONDUCTOR_TASK="task"

# Requests
# https://docs.python-requests.org/en/latest/api
def print_request(pr):
    body = pr.body
    if body != None and len(body) > 1000:
        body = str(body[:100]) + '...'
    print("""=== >
{method} {url}
{headers}
{body}""".format(method=pr.method, url=pr.url, headers=pr.headers, body=body))

def print_response(r):
    print("====== Response\n", r.status_code)
    print_request(r.request)
    print("=== <")
    print(r.headers)
    if 'content-type' in r.headers and  'application/json' in r.headers['content-type']:
        pprint.pprint(r.json())
    else:
        if len(r.content) > 1000:
            print(r.content[:100], '...')
        else:
            print(r.content)

# Python Client

- https://elasticsearch-py.readthedocs.io/en/latest/

In [8]:
from elasticsearch7 import Elasticsearch

client = Elasticsearch(URL_PREFIX)
resp = client.info()
pprint.pprint(resp)

{'cluster_name': 'elasticsearch',
 'cluster_uuid': 'jLVWo8NlREGkEDmFjv4dJQ',
 'name': '',
 'tagline': 'You Know, for Search',
 'version': {'build_date': 'unknown',
             'build_flavor': 'unknown',
             'build_hash': 'unknown',
             'build_snapshot': True,
             'build_type': 'unknown',
             'lucene_version': '8.11.3',
             'minimum_index_compatibility_version': '6.0.0-beta1',
             'minimum_wire_compatibility_version': '6.8.0',
             'number': '7.17.23'}}
{'cluster_name': 'elasticsearch',
 'cluster_uuid': 'jLVWo8NlREGkEDmFjv4dJQ',
 'name': '',
 'tagline': 'You Know, for Search',
 'version': {'build_date': 'unknown',
             'build_flavor': 'unknown',
             'build_hash': 'unknown',
             'build_snapshot': True,
             'build_type': 'unknown',
             'lucene_version': '8.11.3',
             'minimum_index_compatibility_version': '6.0.0-beta1',
             'minimum_wire_compatibility_version': '6.8

# Mapping

## Explicit mapping

In [2]:
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/explicit-mapping.html

# MetadataCreateIndexService

url = URL_PREFIX + \
    '/{index_name}'.format(index_name=INDEX_NAME_CONDUCTOR_WORKFLOW)
params = {}
mapping_data = {
    "properties": {
        "correlationId": {
            "type": "keyword",
            "index": True,
            "doc_values": True
        },
        "endTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis",
            "doc_values": True
        },
        "executionTime": {
            "type": "long",
            "doc_values": True
        },
        "failedReferenceTaskNames": {
            "type": "text",
            "index": False
        },
        "input": {
            "type": "text",
            "index": True
        },
        "output": {
            "type": "text",
            "index": True
        },
        "reasonForIncompletion": {
            "type": "keyword",
            "index": True,
            "doc_values": True
        },
        "startTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis",
            "doc_values": True
        },
        "status": {
            "type": "keyword",
            "index": True,
            "doc_values": True
        },
        "updateTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis",
            "doc_values": True
        },
        "version": {
            "type": "long",
            "doc_values": True
        },
        "workflowId": {
            "type": "keyword",
            "index": True,
            "doc_values": True
        },
        "workflowType": {
            "type": "keyword",
            "index": True,
            "doc_values": True
        },
        "rawJSON": {
            "type": "text",
            "index": False
        },
        "event": {
            "type": "keyword",
            "index": True
        }
    }
}

data = {
  "mappings": mapping_data
}

print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.put(url=url, headers=headers,
                 params=params, data=json.dumps(data))
print_response(r)

http://localhost:9200/workflow
{'Content-Type': 'application/json'}
{}
 200
=== >
PUT http://localhost:9200/workflow
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Content-Length': '1044'}
{"mappings": {"properties": {"correlationId": {"type": "keyword", "index": true, "doc_values": true}...
=== <
{'X-elastic-product': 'Elasticsearch', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'content-length': '75'}
{'acknowledged': True, 'index': 'workflow', 'shards_acknowledged': True}


In [5]:
url = URL_PREFIX + '/{index_name}/_mapping'.format(index_name=INDEX_NAME_CONDUCTOR_WORKFLOW)
params = {
}
print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.get(url=url, headers=headers, params=params)
print_response(r)

http://localhost:9200/workflow/_mapping
{'Content-Type': 'application/json'}
{}
 200
=== >
GET http://localhost:9200/workflow/_mapping
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json'}
None
=== <
{'X-elastic-product': 'Elasticsearch', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'content-length': '239'}
{'workflow': {'mappings': {'properties': {'correlationId': {'type': 'keyword'},
                                          'endTime': {'type': 'date'},
                                          'event': {'type': 'keyword'},
                                          'executionTime': {'type': 'long'},
                                          'failedReferenceTaskNames': {'index': False,
                                                                       'type': 'text'},
                                          'input': {'type': 'text'},
         

In [4]:
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/explicit-mapping.html

# MetadataCreateIndexService

url = URL_PREFIX + \
    '/{index_name}'.format(index_name=INDEX_NAME_CONDUCTOR_TASK)
params = {}
mapping_data = {
    "properties": {
        "correlationId": {
            "type": "keyword",
            "index": True
        },
        "endTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
        },
        "executionTime": {
            "type": "long"
        },
        "input": {
            "type": "text",
            "index": True
        },
        "output": {
            "type": "text",
            "index": True
        },
        "queueWaitTime": {
            "type": "long"
        },
        "reasonForIncompletion": {
            "type": "keyword",
            "index": True
        },
        "scheduledTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
        },
        "startTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
        },
        "status": {
            "type": "keyword",
            "index": True
        },
        "taskDefName": {
            "type": "keyword",
            "index": True
        },
        "taskId": {
            "type": "keyword",
            "index": True
        },
        "taskType": {
            "type": "keyword",
            "index": True
        },
        "updateTime": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
        },
        "workflowId": {
            "type": "keyword",
            "index": True
        },
        "workflowType": {
            "type": "keyword",
            "index": True
        }
    }
}


data = {
    "mappings": mapping_data
}

print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.put(url=url, headers=headers,
                 params=params, data=json.dumps(data))
print_response(r)

http://localhost:9200/task
{'Content-Type': 'application/json'}
{}
 200
=== >
PUT http://localhost:9200/task
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Content-Length': '931'}
{"mappings": {"properties": {"correlationId": {"type": "keyword", "index": true}, "endTime": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}, "executionTime": {"type": "long"}, "input": {"type": "text", "index": true}, "output": {"type": "text", "index": true}, "queueWaitTime": {"type": "long"}, "reasonForIncompletion": {"type": "keyword", "index": true}, "scheduledTime": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}, "startTime": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}, "status": {"type": "keyword", "index": true}, "taskDefName": {"type": "keyword", "index": true}, "taskId": {"type": "keyword", "index": true}, "taskType": {

In [6]:
url = URL_PREFIX + '/{index_name}/_mapping'.format(index_name=INDEX_NAME_CONDUCTOR_TASK)
params = {
}
print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.get(url=url, headers=headers, params=params)
print_response(r)

http://localhost:9200/task/_mapping
{'Content-Type': 'application/json'}
{}
 200
=== >
GET http://localhost:9200/task/_mapping
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json'}
None
=== <
{'X-elastic-product': 'Elasticsearch', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'content-length': '218'}
{'task': {'mappings': {'properties': {'correlationId': {'type': 'keyword'},
                                      'endTime': {'type': 'date'},
                                      'executionTime': {'type': 'long'},
                                      'input': {'type': 'text'},
                                      'output': {'type': 'text'},
                                      'queueWaitTime': {'type': 'long'},
                                      'reasonForIncompletion': {'type': 'keyword'},
                                      'scheduledTime': {'

## Index template

In [10]:
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/index-templates.html
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/indices-put-template.html

# MetadataIndexTemplateService

url = URL_PREFIX + \
    '/_index_template/{template_name}'.format(template_name='template_event')
params = {}

data = {
    "index_patterns": ["*event*"],
    "template": {
        "settings": {
            "refresh_interval": "1s"
        },
        "mappings": {
            "properties": {
                "action": {
                    "type": "keyword",
                    "index": True
                },
                "created": {
                    "type": "long"
                },
                "event": {
                    "type": "keyword",
                    "index": True
                },
                "id": {
                    "type": "keyword",
                    "index": True
                },
                "messageId": {
                    "type": "keyword",
                    "index": True
                },
                "name": {
                    "type": "keyword",
                    "index": True
                },
                "output": {
                    "properties": {
                        "workflowId": {
                            "type": "keyword",
                            "index": True
                        }
                    }
                },
                "status": {
                    "type": "keyword",
                    "index": True
                }
            }
        },
        "aliases": {}
    }
}


print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.put(url=url, headers=headers,
                 params=params, data=json.dumps(data))
print_response(r)

http://localhost:9200/_index_template/template_event
{'Content-Type': 'application/json'}
{}
 200
=== >
PUT http://localhost:9200/_index_template/template_event
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Content-Length': '509'}
{"index_patterns": ["*event*"], "template": {"settings": {"refresh_interval": "1s"}, "mappings": {"properties": {"action": {"type": "keyword", "index": true}, "created": {"type": "long"}, "event": {"type": "keyword", "index": true}, "id": {"type": "keyword", "index": true}, "messageId": {"type": "keyword", "index": true}, "name": {"type": "keyword", "index": true}, "output": {"properties": {"workflowId": {"type": "keyword", "index": true}}}, "status": {"type": "keyword", "index": true}}}, "aliases": {}}}
=== <
{'X-elastic-product': 'Elasticsearch', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'content-length': '47'

In [13]:
url = URL_PREFIX + \
    '/_index_template/{template_name}'.format(template_name='template_message')
params = {}

data = {
  "index_patterns": [ "*message*" ],
  "priority" : 1,
  "template": {
    "settings": {
      "refresh_interval": "1s"
    },
    "mappings": {
      "properties": {
        "created": {
          "type": "long"
        },
        "messageId": {
          "type": "keyword",
          "index": True
        },
        "payload": {
          "type": "keyword",
          "index": True
        },
        "queue": {
          "type": "keyword",
          "index": True
        }
      }
    },
    "aliases": { }
  }
}

print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.put(url=url, headers=headers,
                 params=params, data=json.dumps(data))
print_response(r)

http://localhost:9200/_index_template/template_message
{'Content-Type': 'application/json'}
{}
 200
=== >
PUT http://localhost:9200/_index_template/template_message
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Content-Length': '317'}
{"index_patterns": ["*message*"], "priority": 1, "template": {"settings": {"refresh_interval": "1s"}, "mappings": {"properties": {"created": {"type": "long"}, "messageId": {"type": "keyword", "index": true}, "payload": {"type": "keyword", "index": true}, "queue": {"type": "keyword", "index": true}}}, "aliases": {}}}
=== <
{'X-elastic-product': 'Elasticsearch', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'content-length': '47'}
{'acknowledged': True}


In [16]:
url = URL_PREFIX + \
    '/_index_template/{template_name}'.format(
        template_name='template_task_log')
params = {}

data = {
    "index_patterns": ["*task*log*"],
    "priority": 2,
    "template": {
        "settings": {
            "refresh_interval": "1s"
        },
        "mappings": {
            "properties": {
                "createdTime": {
                    "type": "long"
                },
                "log": {
                    "type": "keyword",
                    "index": True
                },
                "taskId": {
                    "type": "keyword",
                    "index": True
                }
            }
        },
        "aliases": {}
    }
}


print(url)
pprint.pprint(headers)
pprint.pprint(params)

r = requests.put(url=url, headers=headers,
                 params=params, data=json.dumps(data))
print_response(r)

http://localhost:9200/_index_template/template_task_log
{'Content-Type': 'application/json'}
{}
 200
=== >
PUT http://localhost:9200/_index_template/template_task_log
{'User-Agent': 'python-requests/2.31.0', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Type': 'application/json', 'Content-Length': '270'}
{"index_patterns": ["*task*log*"], "priority": 2, "template": {"settings": {"refresh_interval": "1s"}, "mappings": {"properties": {"createdTime": {"type": "long"}, "log": {"type": "keyword", "index": true}, "taskId": {"type": "keyword", "index": true}}}, "aliases": {}}}
=== <
{'X-elastic-product': 'Elasticsearch', 'content-type': 'application/json; charset=UTF-8', 'content-encoding': 'gzip', 'content-length': '47'}
{'acknowledged': True}
