# Executing AWS CloudWatch Queries

In [12]:
import boto3
import subprocess
import json
import os
import time
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

## Step 1: Create session and client

In [13]:
def set_aws_credentials(profile, region_name='us-east-1'):
    result = subprocess.run(f"aws-vault exec {profile} --json", shell=True, capture_output=True)
    credentials = json.loads(result.stdout)

    # Create a session with the retrieved credentials
    session = boto3.session.Session(
        aws_access_key_id=credentials['AccessKeyId'],
        aws_secret_access_key=credentials['SecretAccessKey'],
        aws_session_token=credentials['SessionToken'],
        region_name=region_name        
    )

    return session

# Use the function with your profile to get a session
aws_session = set_aws_credentials('acl-production', 'ca-central-1')

# Create clients using the session
sts_client = aws_session.client('sts')
logs_client = aws_session.client('logs')

# Example usage of the clients
account_id = sts_client.get_caller_identity()["Account"]
print("Current AWS Account ID:", account_id)

Current AWS Account ID: 707785685172


In [14]:
def current_timestamp():
    return int(time.time() * 1000)

def hours_ago(hours):
    return int(time.time() - 60 * 60 * hours)

def days_ago(days):
    return int(time.time() - 60 * 60 * 24 * days)

In [15]:
# Define your query
def execute_query(log_group, query, start_time, end_time):
    response = logs_client.start_query(
        logGroupName=log_group,
        startTime=start_time,
        endTime=end_time,
        queryString=query,
    )
    return response['queryId']

def wait_for_query(query_id):
    done = False
    while not done:
        stats = logs_client.get_query_results(queryId=query_id)
        status = stats['status']
        if status == 'Complete':
            print("Query completed.")
            done = True
        elif status == 'Running':
            print("Query still running...")
        elif status == 'Scheduled':
            print("Query scheduled...")
        else:
            print(f"Query status: {status}")
        time.sleep(1)

def get_query_results(query_id):
    response = logs_client.get_query_results(queryId=query_id)
    return response['results'], int(response['statistics']['recordsScanned']), int(response['statistics']['recordsMatched'])


def cloudwatch_query(log_group, query, start_time=current_timestamp(), end_time=hours_ago(1)):
    query_id = execute_query(log_group, query, start_time, end_time)
    wait_for_query(query_id)
    response, records_scanned, records_matched = get_query_results(query_id)
    print (f'records_scanned: {records_scanned}')
    print (f'records_matched: {records_matched}')

    # Flatten the data
    data = []
    for entry in response:
        row = {item['field']: item['value'] for item in entry}
        data.append(row)
    return data

In [16]:
def parse_messages(data):
    # Parse the @message field in each entry
    for entry in data:
        message_json = entry["@message"]
        
        # Parse the JSON string in @message field
        try:
            message_data = json.loads(message_json)
        except:
            continue
        
        entry["@message"] = message_data


## Collect the request ids

In [17]:
def collect_request_ids(data):
    request_ids = []

    # Iterate over the entries and extract the request_id and x_request_id
    for entry in data:
        message = entry["@message"]
        x_request_id = message.get("x_request_id", None)
        x_nginx_id = message.get("x_nginx_id", None)

        if x_request_id or x_nginx_id:
            if x_request_id:
                request_ids.append(x_request_id)
            if x_nginx_id != x_request_id:
                request_ids.append(x_nginx_id)

    return request_ids


## Collect slow queries and corresponding request ids

In [18]:
log_group = 'projects-main'
#log_group =  'api_proxy-main'
min_duration = 5

query = f"fields @timestamp, @message \
    | filter request_time > {min_duration} \
    | sort @timestamp desc \
    | limit 100"

print (f'Querying log group: {log_group}. Looking for slow requests... (> {min_duration}s)')
data = cloudwatch_query(log_group, query, start_time=days_ago(3))
parse_messages(data)
print (json.dumps(data, indent=4))

Querying log group: projects-main. Looking for slow requests... (> 5s)
Query still running...
Query still running...
Query still running...
Query still running...
Query completed.
records_scanned: 2437573
records_matched: 174
[
    {
        "@timestamp": "2023-12-18 08:07:57.872",
        "@message": {
            "time_local": "18/Dec/2023:08:07:57 +0000",
            "client": "10.184.255.38",
            "ACL_Cloudfront": "",
            "method": "GET",
            "scheme": "http",
            "host": "fortunasilverminesinc.projects-ca.highbond.com",
            "request": "GET /task_tracker/request_items?page%5Bnumber%5D=1&filter%5Bstatus%5D=false&filter%5Baudit_id%5D=10194&sort%5Bcolumn%5D=request_items.due_date&sort%5Border%5D=asc HTTP/1.1",
            "request_length": "4315",
            "status": "200",
            "bytes_sent": "11551",
            "body_bytes_sent": "10070",
            "referrer": "https://fortunasilverminesinc.projects-ca.highbond.com/task_tracker/requ

In [19]:
request_ids = collect_request_ids(data)

print (json.dumps(request_ids, indent=4))

[
    "5ff612d4f1b967426df61592c380fa87",
    "fa7feda4e5fe96243fd29d32bb778fe0",
    "18711f3266e352f171cc4d9a90e5a465",
    "04fde56b07690521d3e825b4766fbbc6",
    "c5bd92df0a8261d6fba773be9344fefb",
    "e2c0a789057c573d80e83464bf91dddc",
    "1e4c2f86776c191211d0944d9f64cf41",
    "5d3df10e84a918e3d67f43e7ea586eb1",
    "f493df70b217fd6f3a5ae19060c3acbb",
    "3a3c44dfd95a30942329a030ce08a56f",
    "6e398aeb42a7e58b438e70e1063823e0",
    "5bcf8b39806f52acff144045e456fd9b",
    "c54ad4b95294e239c5873c6a34710da3",
    "d2857278e2445ca2aa64af4fc312430b",
    "145de1392b937829534ef9ed88ec8c34",
    "0db3df0a50b27388d71e6447b928544f",
    "ed6647caa5c43bd6b90c4f4b96fed340",
    "3ab21791cc5f417b57b5418144b3c5d7",
    "cade8a11cfb3526e926ca6ea95ac7d54",
    "76d9e58c7950adc8045067265748f8a6",
    "b25021cb88d65df645e4e6e74eaa1e86",
    "bfab9411896f7823fbd87cb3a01b763b",
    "fea579df70261ffbbe29dc1be360d27a",
    "ec3cf9b7f544695e6d5676720129a06c",
    "593a1284a8583c4fc0517ca9e6973845"

In [26]:
log_group = 'projects-main'
#log_group =  'api_proxy-main'

def process_request(request_id):
    query = f"fields @timestamp, @message \
        | filter @message like /{request_id}/ \
        | sort @timestamp desc \
        | limit 100"

    result = cloudwatch_query(log_group, query, start_time=days_ago(3))
    parse_messages(result)
    return result


data = []

# Use ThreadPoolExecutor to run the loop in parallel
with ThreadPoolExecutor(max_workers=8) as executor:
    # Map the function to your request_ids
    results = executor.map(process_request, request_ids[:24])

    # Extend the data list with results from the threads
    for result in results:
        data.extend(result)


print (json.dumps(data, indent=4))

Query scheduled...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query scheduled...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running...
Query still running