#### Resources

- [Amazon CloudWatch concepts](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html): Namespaces, Metrics, Dimensions, Resolution, Alarms, etc.

- [CloudWatch examples using SDK for Python (Boto3)](https://docs.aws.amazon.com/code-library/latest/ug/python_3_cloudwatch_code_examples.html)

- [Embedding metrics within logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html)

- [AWS Logs, Query Logs, Query  Syntax Details](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html)
  - [Useful Insights queries](https://docs.aws.amazon.com/lambda/latest/operatorguide/useful-queries.html)

In [1]:
! pip install --quiet --upgrade pip
! pip install --quiet \
    boto3 \
    'boto3-stubs[cloudwatch]' \
    'moto[cloudwatch]' \
    rich \
    pandas \
    numpy \
    localstack

## Using `boto3`

In [2]:
! localstack start -d


     __                     _______ __             __
    / /   ____  _________ _/ / ___// /_____ ______/ /__
   / /   / __ \/ ___/ __ `/ /\__ \/ __/ __ `/ ___/ //_/
  / /___/ /_/ / /__/ /_/ / /___/ / /_/ /_/ / /__/ ,<
 /_____/\____/\___/\__,_/_//____/\__/\__,_/\___/_/|_|

 💻 [1mLocalStack CLI[0m [1;34m3.6[0m[34m.[0m[1;34m0[0m
 👤 [1mProfile:[0m [34mdefault[0m

[2;36m[19:41:18][0m[2;36m [0mstarting LocalStack in Docker mode 🐳               ]8;id=797688;file:///Users/eric/repos/python-aws-course/python-on-aws-course/section-4--observability/metrics/venv/lib/python3.11/site-packages/localstack/cli/localstack.py\[2mlocalstack.py[0m]8;;\[2m:[0m]8;id=498564;file:///Users/eric/repos/python-aws-course/python-on-aws-course/section-4--observability/metrics/venv/lib/python3.11/site-packages/localstack/cli/localstack.py#503\[2m503[0m]8;;\
[2;36m          [0m[2;36m [0mpreparing environment                               ]8;id=740166;file:///Users/eric/repos/python

In [3]:
import pandas as pd
import numpy as np
import boto3
import time
import datetime
from moto import mock_aws
import os
from rich import print

## Mock AWS

Calls in this notebook will not actually reach out to AWS

In [4]:
os.environ["AWS_REGION"] = "mock"
os.environ["AWS_SECRET_ACCESS_KEY"] = "mock"
os.environ["AWS_ACCESS_KEY_ID"] = "mock"
os.environ["AWS_ENDPOINT_URL"] = "http://localhost:4566/"

# mock_aws().start()

In [5]:
os.environ["AWS_PROFILE"] = "cloud-course"
os.environ["AWS_REGION"] = "us-west-2"
os.environ.pop("AWS_SECRET_ACCESS_KEY", None)
os.environ.pop("AWS_ACCESS_KEY_ID", None)
os.environ.pop("AWS_ENDPOINT_URL", None)

'http://localhost:4566/'

In [6]:
# Step 1: Generate a Pandas DataFrame with Random Values
np.random.seed(42)
data = np.random.randint(50, 150, size=10)  # Generate 10 random values between 50 and 150
simulated_response_latencies = pd.DataFrame(data, columns=["response latency (ms)"])
simulated_response_latencies

Unnamed: 0,response latency (ms)
0,101
1,142
2,64
3,121
4,110
5,70
6,132
7,136
8,124
9,124


In [7]:
simulated_response_latencies.describe()

Unnamed: 0,response latency (ms)
count,10.0
mean,112.4
std,26.742392
min,64.0
25%,103.25
50%,122.5
75%,130.0
max,142.0


In [20]:
METRICS_NAMESPACE = "example-namespace"

# Step 2: Initialize the CloudWatch client
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')    

# Namespace and metric name
for value in simulated_response_latencies["response latency (ms)"]:
    cloudwatch.put_metric_data(
        Namespace=METRICS_NAMESPACE,
        MetricData=[
            {
                'MetricName': "ResponseLatency",
                'Dimensions': [{'Name': 'API', 'Value': 'MyAPI'}],
                'Timestamp': time.time(),
                'Value': value,
                'Unit': "Milliseconds"
            },
        ]
    )

# Wait for a few seconds to ensure the metrics are available in CloudWatch
# time.sleep(20)  # Sleep for 60 seconds

response = cloudwatch.get_metric_statistics(
    Namespace=METRICS_NAMESPACE,
    MetricName="ResponseLatency",
    Dimensions=[{'Name': 'API', 'Value': 'MyAPI'}],
    StartTime=datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(minutes=15),
    EndTime=datetime.datetime.now(tz=datetime.timezone.utc),
    Period=60,
    Statistics=['Average', 'Sum', 'Minimum', 'Maximum', 'SampleCount', 'p25', 'p50', 'p75']
)
print("Statistics queried from cloudwatch", response)


comparison_df = None
# Check if there are datapoints in the response
if response['Datapoints']:
    # Mapping describe() to the CloudWatch statistics
    cloudwatch_stats = {
        "mean":  response['Datapoints'][0]['Average'],
        "sum":   response['Datapoints'][0]['Sum'],
        "min":   response['Datapoints'][0]['Minimum'],
        "max":   response['Datapoints'][0]['Maximum'],
        "count": response['Datapoints'][0]['SampleCount'],
        "25%":   response['Datapoints'][0]['p25'],
        "50%":   response['Datapoints'][0]['p50'],
        "75%":   response['Datapoints'][0]['p75']
    }

    describe = simulated_response_latencies.describe()
    describe["response latency (ms)"][-1] = {"sum": describe["response latency (ms)"].sum()}
    
    # Showing the comparison
    comparison_df = pd.DataFrame({
        "pd.describe()": describe["response latency (ms)"],
        "CloudWatch": pd.Series(cloudwatch_stats)
    })
    comparison_df["pd.describe()"]["sum"] = simulated_response_latencies.sum().values[0]

comparison_df

InvalidParameterValueException: An error occurred (InvalidParameterValue) when calling the GetMetricStatistics operation: The collection Statistics must not have a size greater than 5.

In [9]:
import time
import random

try:
    from mypy_boto3_cloudwatch.type_defs import ListMetricsOutputTypeDef
except ImportError:
    pass


def process_request(path) -> dict:
    """Simulate processing a request and return response data"""

    start_time = time.time()
    # Simulate processing time
    time.sleep(random.uniform(0.1, 0.9))
    process_time = time.time() - start_time

    # Simulate response status code
    status_code = 200

    # Create metric data
    response_time_metric: list[dict] = create_metric_data(
        metric_name="ResponseTime", value=process_time, unit="Seconds", dimensions=[{"Name": "Path", "Value": path}]
    )
    status_code_metric: list[dict] = create_metric_data(
        metric_name="StatusCode", value=status_code, unit="Count", dimensions=[{"Name": "Path", "Value": path}]
    )

    # Put metric data
    put_metric_data(
        namespace=NAMESPACE,
        metric_data=response_time_metric + status_code_metric,
    )

    # Retrieve and display metrics to verify
    metrics: "ListMetricsOutputTypeDef" = list_metrics(namespace=NAMESPACE)
    return {
        "message": "Hello World",
        "response_time": process_time,
        "status_code": status_code,
        "metrics": metrics,
    }

In [10]:
from moto import mock_aws
from rich import print


@mock_aws
def main() -> None:
    response = process_request(path="/process")
    print(response)


if __name__ == "__main__":
    main()

NameError: name 'create_metric_data' is not defined

## Using [`aws-embedded-metrics`](https://github.com/awslabs/aws-embedded-metrics-python/tree/master) Python SDK

- [`put_metric(key: str, value: float, unit: str = "None", storage_resolution: int = 60)`](https://github.com/awslabs/aws-embedded-metrics-python/tree/master?tab=readme-ov-file#metricslogger)
  - Adds a new metric to the current logger context. 
  - Multiple metrics using the same key will be appended to an array of values. Multiple metrics cannot have same key and different storage resolution. 
  - The Embedded Metric Format supports a maximum of 100 values per key.

- `set_dimensions(*dimensions: Dict[str, str], use_default: bool = False)`
  - Explicitly override all dimensions. By default, this will disable the default dimensions, but can be configured using the keyword-only parameter `use_default`.

- `set_namespace(value: str)`
  - Sets the CloudWatch namespace that extracted metrics should be published to.

In [None]:
# import boto3
# from typing import Optional
# from mypy_boto3_logs import CloudWatchLogsClient


# def get_logs_data(log_group_name: str, client: Optional["CloudWatchLogsClient"] = None) -> list[str]:
#     """
#     Get logs data from a log group

#     Args:
#         log_group_name (str): The name of the log group
#         client (CloudWatchLogsClient, optional): The CloudWatch Logs client to use. Defaults to None.

#     Returns:
#         list[str]: The log data
#     """
#     client = client or boto3.client("logs")
#     logs = client.describe_log_streams(logGroupName=log_group_name)
#     log_data = []

#     for log_stream in logs["logStreams"]:
#         log_events = client.get_log_events(
#             logGroupName=log_group_name, logStreamName=log_stream["logStreamName"], limit=10
#         )
#         print(log_events)
#         # for event in log_events["events"]:
#         #     log_data.append(event["message"])

#     return log_data


# # def get_metrics_from_log_group(log_group_name: str, client) -> list:
# #     """Retrieve metrics from a specified CloudWatch log group"""
# #     logs_client = boto3.client('logs')

# #     # Get log streams in the log group
# #     log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)

# #     metrics = []

# #     for log_stream in log_streams['logStreams']:
# #         log_events = logs_client.get_log_events(
# #             logGroupName=log_group_name,
# #             logStreamName=log_stream['logStreamName'],
# #             limit=10  # Limit to the last 10 log events for simplicity
# #         )

# #         for event in log_events['events']:
# #             metrics.append(event['message'])

# #     return metrics

In [None]:
import time
import json
import random


def print(*objects, **kwargs):
    from rich import print as pprint

    try:
        for obj in objects:
            pprint(obj)
            pprint(json.loads(obj))
    except Exception as e:
        pprint(e)
        pprint(*objects, **kwargs)


from aws_embedded_metrics import metric_scope
from aws_embedded_metrics.config import get_config
from aws_embedded_metrics import MetricsLogger
from aws_embedded_metrics.config.configuration import Configuration
from aws_embedded_metrics.storage_resolution import StorageResolution
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.environment.local_environment import LocalEnvironment

import nest_asyncio

# Allow the current event loop to be re-entered
nest_asyncio.apply()


NAMESPACE: str = "FastAPIExample"


config: Configuration = get_config()
config.service_name = "FastAPIExampleService"
config.log_group_name = "FastAPIExampleLogGroup"
config.log_stream_name = "FastAPIExampleLogStream"
config.service_type = "API"
config.namespace = NAMESPACE
config.environment = "local"


@metric_scope
def process_request(path: str, metrics: MetricsLogger) -> dict:
    """Simulate processing a request and return response data"""
    start_time = time.time()
    # Simulate processing time
    time.sleep(random.uniform(0.1, 0.9))
    process_time = time.time() - start_time

    # Simulate response status code
    status_code = 200

    # Log metrics
    metrics.put_dimensions({"Path": path})
    metrics.put_metric(
        key="ResponseTime", value=process_time, unit="Seconds", storage_resolution=StorageResolution.STANDARD
    )
    metrics.put_metric("StatusCode", status_code, "Count", StorageResolution.STANDARD)

    # Retrieve and display metrics from log group
    return {
        "message": "Hello World",
        "response_time": process_time,
        "status_code": status_code,
        # "metrics": ""
    }


response = process_request(path="/process")
print(response)

{"LogGroup": "FastAPIExampleLogGroup", "ServiceName": "FastAPIExampleService", "ServiceType": "API", "Path": "/process", "_aws": {"Timestamp": 1724082899385, "CloudWatchMetrics": [{"Dimensions": [["LogGroup", "ServiceName", "ServiceType", "Path"]], "Metrics": [{"Name": "ResponseTime", "Unit": "Seconds"}, {"Name": "StatusCode", "Unit": "Count"}], "Namespace": "FastAPIExample"}]}, "ResponseTime": 0.2578089237213135, "StatusCode": 200}


In [None]:
from aws_embedded_metrics.sinks import Sink
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers import Serializer


class CustomSink(Sink):
    def __init__(self, serializer: Serializer):
        self.serializer = serializer

    def accept(self, context: MetricsContext) -> None:
        serialized_data = self.serializer.serialize(context)
        self.write(serialized_data)

    def write(self, serialized_data: str) -> None:
        # Customize the output format here
        print(f"Custom Metrics Log:\n{serialized_data}")

In [None]:
# from aws_embedded_metrics.logger.metrics_logger
from aws_embedded_metrics.serializers.log_serializer import LogSerializer
from aws_embedded_metrics.environment.local_environment import LocalEnvironment

# Create the custom sink
serializer = LogSerializer()
custom_sink = CustomSink(serializer=serializer)


# Set up a custom environment to use the custom sink
class CustomLocalEnvironment(LocalEnvironment):
    def get_sink(self):
        return custom_sink


# Set the environment to use the custom sink
config.environment = CustomLocalEnvironment()


# Your metric logging code continues as before
@metric_scope
def process_request(path: str, metrics: MetricsLogger) -> dict:
    """Simulate processing a request and return response data"""
    start_time = time.time()
    # Simulate processing time
    time.sleep(random.uniform(0.1, 0.9))
    process_time = time.time() - start_time

    # Simulate response status code
    status_code = 200

    # Log metrics
    metrics.put_dimensions({"Path": path})
    metrics.put_metric(
        key="ResponseTime", value=process_time, unit="Seconds", storage_resolution=StorageResolution.STANDARD
    )
    metrics.put_metric("StatusCode", status_code, "Count", StorageResolution.STANDARD)

    # Retrieve and display metrics from log group
    return {
        "message": "Hello World",
        "response_time": process_time,
        "status_code": status_code,
    }


# Invoke the function to see custom log output
response = process_request(path="/process")
print(response)

TypeError: Can't instantiate abstract class CustomSink with abstract method name

In [None]:
# from moto import mock_aws
# from rich import print


# @mock_aws
# def main() -> None:
#     response = process_request(path="/process")
#     print(response)


# if __name__ == "__main__":
#     main()

## Using `aws_lambda_powertools`

In [None]:
from aws_lambda_powertools import Metrics
from aws_lambda_powertools.metrics import MetricUnit
from moto import mock_aws
import boto3
import time
from rich import print


# Mock CloudWatch using moto
# @mock_cloudwatch
@mock_aws
def process_request(path):
    # Create a Boto3 CloudWatch client within moto context
    cloudwatch_client = boto3.client("cloudwatch")

    metrics = Metrics(namespace="FastAPIExample", service="FastAPIService")

    start_time = time.time()
    # Simulate processing time
    time.sleep(0.3)
    process_time = time.time() - start_time

    # Simulate response status code
    status_code = 200

    # Record custom metrics
    metrics.add_metric(name="ResponseTime", unit=MetricUnit.Seconds, value=process_time)
    metrics.add_metric(name="StatusCode", unit=MetricUnit.Count, value=status_code)
    metrics.add_dimension(name="Path", value=path)

    metrics.flush_metrics()

    # Retrieve and display metrics to verify
    metrics_list = cloudwatch_client.list_metrics(Namespace="FastAPIExample")
    return {
        "message": "Hello World",
        "response_time": process_time,
        "status_code": status_code,
        "metrics": metrics_list,
    }


# Simulate a request
response = process_request("/process")
print(response)

{"_aws":{"Timestamp":1723808451112,"CloudWatchMetrics":[{"Namespace":"FastAPIExample","Dimensions":[["Path","service"]],"Metrics":[{"Name":"ResponseTime","Unit":"Seconds"},{"Name":"StatusCode","Unit":"Count"}]}]},"Path":"/process","service":"FastAPIService","ResponseTime":[0.3001108169555664],"StatusCode":[200.0]}
