# Decode all arguments in AWS Cloudwatch auditd logs

In [None]:
import boto3
from binascii import a2b_hex
from datetime import datetime
import re

Get logs in this date-time interval 

In [None]:
dt_start = datetime(2021, 11, 10, 17, 30, 0)
dt_end = datetime(2021, 11, 10, 18, 0, 0)

In [None]:
ts = (int(dt_start.timestamp() * 1000), int(dt_end.timestamp() * 1000))

def overlap(a, b):
    return a[0] <= b[0] <= a[1] or b[0] <= a[0] <= b[1]

def parse_execve(m):
    d = dict(token.split("=", 1) for token in m.split())
    for k, v in d.items():
        if k == "argc":
            d[k] = int(v)
        elif re.match(r"a\d+", k):
            if v.startswith('"') and v.endswith('"'):
                d[k] = v[1:-1]
            else:
                d[k] = a2b_hex(v).decode()
    return d

Optionally set the AWS `profile_name`

In [None]:
session = boto3.session.Session(profile_name=None)
logs = session.client("logs")

Fetch all logs in the requested date-time interval

In [None]:
relevant_streams = []
log_streams = logs.describe_log_streams(logGroupName="audit.log")
for s in log_streams["logStreams"]:
    if overlap((s["firstEventTimestamp"], s["lastEventTimestamp"]), ts):
        relevant_streams.append(s["logStreamName"])

In [None]:
# Concatenate all log streams together
events = []
for stream in relevant_streams:
    print(f"Getting logs for {stream}")
    kwargs = {
        "logGroupName": "audit.log", "logStreamName": stream,
        "startTime": ts[0], "endTime": ts[1],
        "startFromHead": True,
    }
    e = logs.get_log_events(**kwargs)
    events.extend(e["events"])
    prevForwardToken = ""
    while e["nextForwardToken"] and e["nextForwardToken"] != prevForwardToken:
        print(e["nextForwardToken"])
        prevForwardToken = e["nextForwardToken"]
        e = logs.get_log_events(**kwargs, nextToken=e["nextForwardToken"])
        events.extend(e["events"])


Look for `EXECVE` messages, parse arguments, print out unique arguments.

Manually review these for sensitive identifiers

In [None]:
all_args = set()

for e in events:
    if "type=EXECVE" in e["message"]:
        m = parse_execve(e["message"])
        for a in range(m["argc"]):
            all_args.add(m[f"a{a}"])

for arg in sorted(all_args):
    print(arg)