# CloudWatch to Elasticsearch Links

----
### [Elasticsearch](http://localhost:9200)
### [Kibana](http://localhost:5601)
### [Jupyter](http://localhost:8888)
----
### [Chrome Extention(Elasticearch Head)](https://chrome.google.com/webstore/detail/elasticsearch-head/ffmkiejjmecolpfloofpjologoblkegm)
----

In [None]:
###############################################
# Initialize
###############################################
from datetime import datetime,timedelta
import time
from boto3.session import Session
from elasticsearch import Elasticsearch, RequestsHttpConnection,helpers

##################
# Constants
##################
ES_HOST="es01"
ES_PORT="9200"
ES_INDEX="cwlogs"

##################
# Parameters
##################

# AWS Connection
aws_profile = 'YOUR-PROFILE-NAME'

# CloudWatch
cw_logs_fields = [
    '@timestamp',
    '@log',
    '@message',
]
cw_logs_groups=[
    '/aws/lambda/your-function-a',
    '/aws/lambda/your-function-b',
]
cw_logs_start='2020-08-16T00:00:00'
cw_logs_end='2020-08-17T00:00:00'
cw_logs_query=f"""
fields {' ,'.join(cw_logs_fields)}
| limit 100
"""

##################
# Initialization
##################
session = Session(profile_name=aws_profile)
es_mapping_template = { 
    'index_patterns': f"{ES_INDEX}*",
    'mappings':{
        'properties' : { 
            '@timestamp' : { 
                'type': 'date',
                'format': 'yyyy-MM-dd HH:mm:ss.SSS',
            },
        }
    }
}

In [None]:
###############################################
# Load Data
###############################################
def gen_es_data_from_cwlogs(res):
    for r in res['results']:
        entry = {}
        doc_id = ""
        for c in r:
            for f in cw_logs_fields:
                if c['field'] == '@ptr':
                    doc_id = c['value']
                elif c['field'] == f: 
                    entry.update({f:c['value']})
        yield {
            "_index": ES_INDEX, 
            "_id": doc_id ,
            "_source": entry,
        }
        
cwlogs = session.client('logs')
res_start = cwlogs.start_query(
    logGroupNames=cw_logs_groups,
    startTime=int(datetime.fromisoformat(cw_logs_start).timestamp()),
    endTime=int(datetime.fromisoformat(cw_logs_end).timestamp()),
    queryString=cw_logs_query,
)
query_id = res_start['queryId']
res = cwlogs.get_query_results(queryId=query_id)

res_start = cwlogs.start_query(
    logGroupNames=cw_logs_groups,
    startTime=int(datetime.fromisoformat(cw_logs_start).timestamp()),
    endTime=int(datetime.fromisoformat(cw_logs_end).timestamp()),
    queryString=cw_logs_query,
)
query_id = res_start['queryId']
print(f"Query ID -> {query_id}")
res = cwlogs.get_query_results(queryId=query_id)

while res['status'].lower() != 'complete':   
    print(res['status'] + "...") 
    time.sleep(3) 
    res = cwlogs.get_query_results(queryId=query_id)
print(res['status'])

########
# Create Elasticsearch client
es = Elasticsearch(hosts=[{"host": ES_HOST, "port": int(ES_PORT)}],
                   use_ssl=False,
                   verify_certs=False,
                   connection_class=RequestsHttpConnection
                   )
es.indices.put_template("cwlogs-template",es_mapping_template)
# Bulk Insert to Elasticsearch
es_bulk_res = helpers.bulk(es, gen_es_data_from_cwlogs(res))
print(f"{es_bulk_res[0]} items")