In [None]:
%pip install opensearch-py

In [None]:
import boto3
import concurrent.futures
import logging
import uuid

from datetime import datetime
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk

In [None]:
# setup logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
                    # log to file
                    # filename='example.log', encoding='utf-8',
                    force=True)

logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("opensearch").setLevel(logging.ERROR)

In [None]:
# Environment variables
create_template = False
create_index = False
index_prefix = "s3-catalog"

# None means use default profile
profile = None
defaultProfileName = 'covid'

# Filter = { bucket: [prefix, ...], bucket: [prefix, ...], ... }
# None means no filter
# filter = None
# filter = {
#    'dev-ncis-cov19-jpss': ['VIIRS_SDR_Test/Cloud_LUTs/SDRs/GITCO'],
#    'dev-ncis-cov19-jpss-result': ['EDR-VIIRS-AOD-MATCHUP/NOAA20-MAN']
# }
filter = {
   'dev-ncis-cov19-snpp-gitco-reprocessed-v2': ['2013', '2012']
}

# exclude storage class to rollup (to prefix):
# None means do not rollup
# exclude_rollup = None
exclude_rollup = ['STANDARD']

# openSearch domain
host = "search-s3-catalog-01-t7kxo5axijkjjfolddhnaqb3gq.us-east-1.es.amazonaws.com"
# host = "search-s3-catalog-02-6qotxfn5xtynachla53iczd5qq.us-east-1.es.amazonaws.com"
port = 443
auth = ('admin', 'OpenSearch1#')

pageSize = 1021
batchSize = 1021

paginatorThreads = 10
indexingThreads = 5

In [None]:
# Derived
lastIndexed = datetime.now()

if profile is None:
    session = boto3.session.Session()
    profileName = defaultProfileName
else:
    session = boto3.session.Session(profile_name=profile)
    profileName = profile

s3_client = session.client('s3')

openSearch = OpenSearch(
    hosts=[{'host': host, 'port': port}],
    http_compress=True,
    http_auth=auth,
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False)

In [None]:
# create index template
def do_create_template():
    templateName = index_prefix + '-templates'
    openSearch.indices.put_index_template(
      name=templateName,
      body={
        'index_patterns': [index_prefix + '-*'],
        'template': {
          'settings': {
            'index': {
              'number_of_shards': 1,
              'number_of_replicas': 0
            }
          },
          'mappings': {
            "dynamic_templates": [
              {
                "strings": {
                  "mapping": {
                    "type": "keyword"
                  },
                  "match_mapping_type": "string"
                }
              }
            ],
            'properties': {
              "Key": {
                "fields": {
                  "keyword": {
                    "ignore_above": 256,
                    "type": "keyword"
                  }
                },
                "type": "text"
              }
            }
          }
        }
      }
    )
    logging.info('index template [{}] created'.format(templateName))

In [None]:
# create index 
def do_create_index():
    indexName = index_prefix + '-' + profileName
    openSearch.indices.create(index=indexName)
    logging.info('index [{}] created'.format(indexName))

In [None]:
# To get list of buckets present in AWS using S3 client
def get_buckets():
    buckets = []
    response = s3_client.list_buckets()
    for bucket in response['Buckets']:
        buckets.append(bucket["Name"])
    return buckets

In [None]:
def do_indexing(Batch):
    actions = []
    for document in Batch:
        actions.append({
            "_op_type": "index", 
            "_index": index_prefix + '-{}'.format(document['Profile']),
            "_id": uuid.uuid5(uuid.NAMESPACE_X500, document['Key']),
            "_source": document
        })

    success, failed = bulk(openSearch, actions)
    if len(failed) > 0:
        logging.info('bulk: success={} | failed={}'.format(success, failed))

In [None]:
# enrich content
def do_enrich(Bucket, Aggregate, Content):
    key = Content['Key']

    # prefix
    keyParts = key.split('/')
    keyPartsLen = len(keyParts)
    if keyPartsLen > 1:
        prefix = keyParts[0]
        Content.update({'Prefix1': prefix})
        for i in range(1, keyPartsLen-1):
            prefix = '/'.join([prefix, keyParts[i]])
            Content.update({'Prefix{}'.format(i+1): prefix})
    else:
        prefix = ''

    Content.update({'Prefix': prefix})
    Content.update({'FileName': keyParts[-1]})
    Content.update({'LastIndexed': lastIndexed})
    Content.update({'Bucket': Bucket})
    Content.update({'Profile': profileName})

    if not exclude_rollup or Content['StorageClass'] in exclude_rollup:
        Content['ObjectCount'] = 1
    else:
        if prefix in Aggregate:
            item = Aggregate[prefix]
            item['ObjectCount'] += 1
            item['Size'] += Content['Size']
            Aggregate[prefix] = item
        else:
            Content['ObjectCount'] = 1
            Aggregate[prefix] = Content

In [None]:
# Process one paginator
def do_paginator(Bucket, Paginator):
    with concurrent.futures.ThreadPoolExecutor(max_workers=indexingThreads) as indexingExecutor:
        aggregate = {}
        indexingFutures = []
        batch = []
        count = 0

        # index standard
        for page in Paginator:
            for content in page["Contents"]:
                count += 1
                do_enrich(Bucket, aggregate, content)
                if not exclude_rollup or content['StorageClass'] in exclude_rollup:
                    batch.append(content)
                    if len(batch) >= batchSize:
                        indexingFutures.append(
                            indexingExecutor.submit(do_indexing, batch.copy()))
                        batch.clear()

                        # if all worker threads are busy, wait til one finish
                        if len(indexingFutures) >= indexingThreads:
                            done, not_done = concurrent.futures.wait(
                                indexingFutures,
                                return_when=concurrent.futures.FIRST_COMPLETED)
                            indexingFutures.clear()
                            indexingFutures.extend(not_done)

        logging.info('[{}] has {} objects'.format(Bucket, count))

        # index aggregate
        if len(aggregate) > 0:
            for value in aggregate.values():
                batch.append(value)
                if len(batch) >= batchSize:
                    indexingFutures.append(
                        indexingExecutor.submit(do_indexing, batch.copy()))
                    batch.clear()

                    # if all worker threads are busy, wait til one finish
                    if len(indexingFutures) >= indexingThreads:
                        done, not_done = concurrent.futures.wait(
                            indexingFutures,
                            return_when=concurrent.futures.FIRST_COMPLETED)
                        indexingFutures.clear()
                        indexingFutures.extend(not_done)

        # submit last left over batch
        if len(batch) > 0:
            indexingFutures.append(
                indexingExecutor.submit(do_indexing, batch))

        # wait for outstanding futures to finish
        if len(indexingFutures) > 0:
            done, not_done = concurrent.futures.wait(
                indexingFutures,
                timeout=300,
                return_when=concurrent.futures.ALL_COMPLETED)

        # clean up
        indexingExecutor.shutdown(wait=False)

In [None]:
# main logic
logging.info('started')

if create_template:
    do_create_template()

if create_index:
    do_create_index()

paginator = s3_client.get_paginator('list_objects_v2')

with concurrent.futures.ThreadPoolExecutor(max_workers=paginatorThreads) as paginatorExecutor:
    paginatorFutures = []

    buckets = get_buckets()
    logging.info('[{}] has {} buckets'.format(profileName, len(buckets)))

    for bucket in buckets:
        paginators = []

        # initialize paginators for the bucket
        if filter is None:
            paginators.append(paginator.paginate(
                Bucket=bucket,
                PaginationConfig={"PageSize": pageSize}))
        elif bucket in filter:
            prefixes = filter[bucket]
            if len(prefixes) > 0:
                for prefix in filter[bucket]:
                    paginators.append(paginator.paginate(
                        Bucket=bucket,
                        Prefix=prefix,
                        PaginationConfig={"PageSize": pageSize}))
            else:
                paginators.append(paginator.paginate(
                    Bucket=bucket,
                    PaginationConfig={"PageSize": pageSize}))

        # one thread per paginator
        for pages in paginators:
            paginatorFutures.append(
                paginatorExecutor.submit(do_paginator, bucket, pages))

            # if all worker threads are busy, wait til one finish
            if len(paginatorFutures) >= paginatorThreads:
                done, not_done = concurrent.futures.wait(
                    paginatorFutures,
                    return_when=concurrent.futures.FIRST_COMPLETED)
                paginatorFutures.clear()
                paginatorFutures.extend(not_done)

    # wait for outstanding futures to finish
    if len(paginatorFutures) > 0:
        done, not_done = concurrent.futures.wait(
            paginatorFutures,
            return_when=concurrent.futures.ALL_COMPLETED)

    # clean up
    paginatorExecutor.shutdown(wait=False)

# clean up
openSearch.close()
s3_client.close()

logging.info('finished')