# Analyze Unusual API Activity

There are various signals that indicate unusual API activity in AWS CloudTrail logs.

One kind of unusual activity is a high rate of failed requests.

Sometimes these failures can be caused by threat actors who are attempting to perform actions that are prohibited. A large number of failed requests from a given user may indicate threat behavior, eg. an enumeration attack.

Install libraries. May take up to 30 seconds on first run.

In [None]:
%pip install seaborn matplotlib

In [None]:
%pip install https://scanner-dev-public.s3.us-west-2.amazonaws.com/sdks/python/scanner_client-0.0.1-py3-none-any.whl

Import library dependencies. May take up to 15 seconds on first run.

In [None]:
from scanner_client import Scanner
import seaborn as sns
from matplotlib import pyplot as plt
from datetime import datetime, timezone, timedelta
import pandas as pd
import os

Change display config for plot and table libraries

In [None]:
plt.style.use("dark_background")
sns.set_style("darkgrid")
pd.set_option('max_colwidth', None)

In [None]:
def convert_results_to_data_frame(results):
    rows = [row.columns.to_dict() for row in results.rows]
    column_tags = results.column_tags.to_dict()
    if len(column_tags) > 0:
        # If this is a table, use the column ordering in the data frame
        return pd.DataFrame(data=rows, columns=results.column_ordering)
    else:
        # Otherwise, this is a list of log events, so use pandas JSON
        # normalization to set the table columns to the union of all keys.
        return pd.json_normalize(rows)

Initialize Scanner API client:

In [None]:
scanner = Scanner(
    api_url=os.environ["SCANNER_API_URL"],
    api_key=os.environ["SCANNER_API_KEY"],
)

Set analyzed time range to be the last 7 days.

In [None]:
end_time = datetime.now(tz=timezone.utc)
start_time = end_time - timedelta(days=7)

Run a Scanner query to look for AWS CloudTrail events showing failed API calls.

In [None]:
response = scanner.query.blocking_query(
    start_time=start_time.isoformat(),
    end_time=end_time.isoformat(),
    query_text="""
        %ingest.source_type: 'aws:cloudtrail'
        errorCode: *
        | stats 
          min(eventTime) as firstTime,
          max(eventTime) as lastTime
          by 
          eventSource,
          eventName,
          errorCode,
          userIdentity.accountId,
          userIdentity.arn
    """,
)

Convert search results to a `pandas` data frame, and preview the top rows:

In [None]:
error_events_df = convert_results_to_data_frame(response.results)
error_events_df.head()

Compute a new column called `userIdentityName` which is simply the second part of the ARN path, i.e. the user name. Helps us simplify the grouping.

In [None]:
error_events_df['userIdentityName'] = error_events_df["userIdentity.arn"]\
    .str.split("/")\
    .str[1]

Group results by the event source, name, error code, and the name of the user who made the errors. Sum the total count, sort descending.

In [None]:
top_errors_by_user_identity_name_df = error_events_df.groupby(["eventSource", "eventName", "errorCode", "userIdentityName"])["@q.count"]\
    .sum()\
    .sort_values(ascending=False)\
    .reset_index()
top_errors_by_user_identity_name_df.head()

Compute a new column to serve as the label in a chart.

In [None]:
top_errors_by_user_identity_name_df["userErrorLabel"] = top_errors_by_user_identity_name_df["userIdentityName"] + " - "\
  + top_errors_by_user_identity_name_df["errorCode"] + " - "\
  + top_errors_by_user_identity_name_df["eventSource"] + "/"\
  + top_errors_by_user_identity_name_df["eventName"]
  
group_by_label_df = top_errors_by_user_identity_name_df.groupby("userErrorLabel")["@q.count"]\
    .sum()\
    .sort_values(ascending=False)\
    .reset_index()

Render the top failed API calls in a chart.

A large number of errors for a given user and API call type may indicate attempts at enumeration attacks.

In [None]:
plt.figure(figsize=(12, 8))
sns.barplot(data=group_by_label_df[:10], y='userErrorLabel', x='@q.count')
plt.title('Top 10 Failed API Calls')
plt.xlabel('Frequency')
plt.ylabel('API Call Type')
plt.show()

# View Activity for User With Highest Failures

Given the user who has made the top most failed requests, query for any CloudTrail requests containing their user name, and visualize these requests.

In [None]:
user_identity_name = top_errors_by_user_identity_name_df.iloc[0]['userIdentityName']

In [None]:
response = scanner.query.blocking_query(
    start_time=start_time.isoformat(),
    end_time=end_time.isoformat(),
    query_text=f"""
        %ingest.source_type: "aws:cloudtrail"
        "{user_identity_name}"
        | rename
          userIdentity.sessionContext.sessionIssuer.arn as userArn
        | stats
          min(eventTime) as firstTime,
          max(eventTime) as lastTime
          by
          userArn,
          eventSource,
          eventName,
          errorCode
    """,
)

In [None]:
api_calls_for_user_df = convert_results_to_data_frame(response.results)

In [None]:
api_calls_for_user_df['apiCallLabel'] = api_calls_for_user_df['userArn'] + ' - '\
  + api_calls_for_user_df['eventSource'] + '/'\
  + api_calls_for_user_df['eventName']

successful_api_calls_df = api_calls_for_user_df[api_calls_for_user_df["errorCode"].isna()]
failed_api_calls_df = api_calls_for_user_df[api_calls_for_user_df["errorCode"].notna()]

Render a chart of the top successful API calls for the user.

In [None]:
group_by_label_df = successful_api_calls_df.groupby("apiCallLabel")["@q.count"]\
    .sum()\
    .sort_values(ascending=False)\
    .reset_index()

plt.figure(figsize=(12, 8))
sns.barplot(data=group_by_label_df[:10], y='apiCallLabel', x='@q.count')
plt.title(f"Top 10 Successful API Calls for {user_identity_name}")
plt.xlabel('Frequency')
plt.ylabel('API Call Type')
plt.show()

Render a chart of the top failed API calls for the user.

In [None]:
group_by_label_df = failed_api_calls_df.groupby("apiCallLabel")["@q.count"]\
    .sum()\
    .sort_values(ascending=False)\
    .reset_index()

plt.figure(figsize=(12, 8))
sns.barplot(data=group_by_label_df[:10], y='apiCallLabel', x='@q.count', color='lightcoral')
plt.title(f"Top 10 Failed API Calls for {user_identity_name}")
plt.xlabel('Frequency')
plt.ylabel('API Call Type')
plt.show()

## Check for sensitive activity from this user

See if this user succeeded at executive any API calls that match a list of sensitive API calls:
- `CreateUser`: Creates a new IAM user.
- `DeleteUser`: Deletes an IAM user.
- `CreateAccessKey`: Creates a new access key for an IAM user.
- `DeleteAccessKey`: Deletes an access key associated with an IAM user.
- `PutUserPolicy`: Attaches an inline policy to an IAM user.
- `DeleteUserPolicy`: Deletes an inline policy from an IAM user.
- `CreateRole`: Creates a new IAM role.
- `DeleteRole`: Deletes an IAM role.
- `AttachRolePolicy`: Attaches a managed policy to an IAM role.
- `DetachRolePolicy`: Detaches a managed policy from an IAM role.
- `PutRolePolicy`: Adds or updates an inline policy for an IAM role.
- `DeleteRolePolicy`: Deletes an inline policy attached to an IAM role.
- `UpdateAssumeRolePolicy`: Updates the policy that grants an entity permission to assume a role.
- `CreatePolicy`: Creates a new IAM policy.
- `DeletePolicy`: Deletes an IAM policy.
- `PutBucketPolicy`: Attaches a policy to an S3 bucket.
- `DeleteBucketPolicy`: Deletes the policy from an S3 bucket.
- `AuthorizeSecurityGroupIngress`: Adds one or more ingress rules to a security group.
- `RevokeSecurityGroupIngress`: Removes one or more ingress rules from a security guarded group.
- `UpdateAccountPasswordPolicy`: The account password policy is updated. This policy defines the password requirements for IAM users within the account, such as minimum length, required characters, and password expiration.
- `DeleteAccountPasswordPolicy`: The account password policy is deleted.

In [None]:
sensitive_event_names = [
  "CreateUser", "DeleteUser", "CreateAccessKey", "DeleteAccessKey",
  "PutUserPolicy", "DeleteUserPolicy", "CreateRole",
  "DeleteRole", "AttachRolePolicy", "DetachRolePolicy", "PutRolePolicy",
  "DeleteRolePolicy", "UpdateAssumeRolePolicy", "CreatePolicy", "DeletePolicy",
  "PutBucketPolicy", "DeleteBucketPolicy", "AuthorizeSecurityGroupIngress",
  "RevokeSecurityGroupIngress", "UpdateAccountPasswordPolicy", 
  "DeleteAccountPasswordPolicy",
]

successful_sensitive_df = successful_api_calls_df[successful_api_calls_df['eventName'].isin(sensitive_event_names)]
successful_sensitive_df

Render a chart of successful API calls that are potentially sensitive, if any exist.

In [None]:
group_by_label_df = successful_sensitive_df.groupby("apiCallLabel")["@q.count"]\
    .sum()\
    .sort_values(ascending=False)\
    .reset_index()

if group_by_label_df.shape[0] > 0:
    plt.figure(figsize=(12, 8))
    sns.barplot(data=group_by_label_df[:10], y='apiCallLabel', x='@q.count')
    plt.title(f"Top 10 Successful Sensitive API Calls for {user_identity_name}")
    plt.xlabel('Frequency')
    plt.ylabel('API Call Type')
    plt.show()