In [1]:
#!pip install opensearch-py

In [2]:
import boto3
import re
from datetime import datetime
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk

In [3]:
# initialize sqs
sqs_url = "https://sqs.us-east-1.amazonaws.com/874526227640/slurm-monitor-capture-job-01"
sqs_batch_size = 2
sqs_wait_time = 20

sqs = boto3.resource("sqs")
queue = sqs.Queue(sqs_url)

# openSearch domain
host = "search-slurm-monitor-utfpp72lqvdxwatjsnmdlmyqye.us-east-1.es.amazonaws.com"
port = 443
auth = ('admin', 'OpenSearch1#')

openSearch = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_compress=True,
    http_auth=auth,
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False)

In [4]:
# receive one batch of messages from sqs
def receive_sqs_messages():
    messages = queue.receive_messages(
        MaxNumberOfMessages=sqs_batch_size,
        AttributeNames=["All"],
        MessageAttributeNames=["All"],
        WaitTimeSeconds=sqs_wait_time
    )

    size = len(messages)
    if size > 0:
        for m in messages:
            if m.attributes is not None:
                capture_ts = datetime.fromtimestamp(int(m.attributes.get('SentTimestamp'))/1000)
                index_suffix = capture_ts.strftime('%Y%m')
            if m.message_attributes is not None:
                capture_src = m.message_attributes.get('capture_src').get('StringValue')

            if len(capture_src) > 0:
                do_indexing(parse(m.body, capture_ts), capture_src, index_suffix)
                m.delete()

In [5]:
def parse(body, capture_ts):
    jobs = []
    job = {}
    pattern = re.compile(r"(.+?=.*?)\s+")
    for match in pattern.finditer(body):
        item = re.split("=", match.group(1), 1)
        if (item[0] == 'JobId') and (len(job) > 0):
            job['capture_ts'] = capture_ts
            jobs.append(job)
            job = {}

        match len(item):
            case 2:
                job[item[0]] = item[1]

            case 1:
                job[item[0]] = None

            case _:
                print("invalid item: {}".format(item))

    if len(job) > 0:
        job['capture_ts'] = capture_ts
        jobs.append(job)

    return jobs

In [6]:
def do_indexing(jobs, capture_src, index_suffix):
    indexName = 'sm-{}-{}'.format(capture_src, index_suffix)
    if not openSearch.indices.exists(indexName):
        do_create_index(indexName)

    actions = []
    for document in jobs:
        actions.append({
            "_op_type": "index", 
            "_index": indexName,
            "_source": document
        })

    success, failed = bulk(openSearch, actions)
    if len(failed) > 0:
        print('{} bulk: success={} | failed={}'.format(datetime.now(), success, failed))

In [7]:
# create index 
def do_create_index(indexName):
    openSearch.indices.create(index=indexName)
    print('{} index [{}] created'.format(datetime.now(), indexName))

In [8]:
receive_sqs_messages()
openSearch.close()