# Real-time Anomaly Detection: How Amazon Kinesis Can Help You Stay Ahead of the Game

Anomaly detection is a critical task in data processing, especially when it comes to streaming data. As more and more data is generated in real-time, it's important to detect any unusual patterns or outliers that may indicate a problem or opportunity.

> The traditional approach of batch processing cannot keep up with the volume, velocity, and variety of streaming data generated by modern applications. This is where continuous analytics comes into play. 

Continuous analytics is an essential part of modern data processing, especially when it comes to analysing streaming data in real-time. With the growth of the internet of things (IoT), the need for continuous analytics has become even more critical, as the data generated by these devices needs to be analysed in real-time to detect any anomalies or trends.

<font color="blue">In this notebook we set up Amazon Kinesis Data Firehose and Amazon Kinesis Data Analytics Application (KDA) for near real-time detection of anomalies and push the notification using Amazon SNS.</font>

## Imports

In [3]:
import json
import time

import boto3
import pandas as pd
import sagemaker
from botocore.exceptions import ClientError

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sts = boto3.Session().client(service_name="sts", region_name=region)
iam = boto3.Session().client(service_name="iam", region_name=region)
lam = boto3.Session().client(service_name="lambda", region_name=region)
firehose = boto3.Session().client(service_name="firehose", region_name=region)
sns = boto3.Session().client(service_name="sns", region_name=region)
kinesis_analytics = boto3.Session().client(
    service_name="kinesisanalytics", region_name=region
)

## Setting up IAM and Permissions

In [None]:
iam_kinesis_role_name = "ADOAWS_Kinesis"

assume_role_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "kinesis.amazonaws.com"},
            "Action": "sts:AssumeRole",
        },
        {
            "Effect": "Allow",
            "Principal": {"Service": "firehose.amazonaws.com"},
            "Action": "sts:AssumeRole",
        },
        {
            "Effect": "Allow",
            "Principal": {"Service": "kinesisanalytics.amazonaws.com"},
            "Action": "sts:AssumeRole",
        },
    ],
}

try:
    iam_role_kinesis = iam.create_role(
        RoleName=iam_kinesis_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description="DSOAWS Kinesis Role",
    )
    print("Role succesfully created.")
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        iam_role_kinesis = iam.get_role(RoleName=iam_kinesis_role_name)
        print("Role already exists.")
    else:
        print("Unexpected error: %s" % e)

# time.sleep(30)

iam_role_kinesis_name = iam_role_kinesis["Role"]["RoleName"]
print(f"Role Name: {iam_role_kinesis_name}")

iam_role_kinesis_arn = iam_role_kinesis["Role"]["Arn"]
print(f"Role ARN: {iam_role_kinesis_arn}")

account_id = sts.get_caller_identity()["Account"]

In [5]:
stream_name = "adows-kinesis-data-stream"

In [6]:
firehose_name = "adows-kinesis-data-firehose"

In [7]:
lambda_fn_name_sns = "PushNotificationToSNS"

In [8]:
kinesis_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
            ],
            "Resource": [f"arn:aws:s3:::{bucket}", f"arn:aws:s3:::{bucket}/*"],
        },
        {
            "Effect": "Allow",
            "Action": ["logs:PutLogEvents"],
            "Resource": [f"arn:aws:logs:{region}:{account_id}:log-group:/*"],
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:*",
            ],
            "Resource": [f"arn:aws:kinesis:{region}:{account_id}:stream/{stream_name}"],
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:*",
            ],
            "Resource": [
                f"arn:aws:firehose:{region}:{account_id}:deliverystream/{firehose_name}"
            ],
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesisanalytics:*",
            ],
            "Resource": ["*"],
        },
        {
            "Sid": "UseLambdaFunction",
            "Effect": "Allow",
            "Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration"],
            "Resource": ["*"],
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": ["arn:aws:iam::*:role/service-role/kinesis*"],
        },
    ],
}

response = iam.put_role_policy(
    RoleName=iam_role_kinesis_name,
    PolicyName="ADOAWS_KinesisPolicy",
    PolicyDocument=json.dumps(kinesis_policy_doc),
)

# time.sleep(30)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

{
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "206",
            "content-type": "text/xml",
            "date": "Mon, 24 Apr 2023 08:23:55 GMT",
            "x-amzn-requestid": "0b2a00c4-d8ba-46a8-8eab-14ef2236446a"
        },
        "HTTPStatusCode": 200,
        "RequestId": "0b2a00c4-d8ba-46a8-8eab-14ef2236446a",
        "RetryAttempts": 0
    }
}


In [None]:
iam_lambda_role_name = "ADOAWS_Lambda"

assume_role_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "lambda.amazonaws.com"},
            "Action": "sts:AssumeRole",
        },
        {
            "Effect": "Allow",
            "Principal": {"Service": "kinesisanalytics.amazonaws.com"},
            "Action": "sts:AssumeRole",
        },
    ],
}

try:
    iam_role_lambda = iam.create_role(
        RoleName=iam_lambda_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description="ADOAWS Lambda Role",
    )
    print("Role succesfully created.")
except ClientError as e:
    if e.response["Error"]["Code"] == "EntityAlreadyExists":
        iam_role_lambda = iam.get_role(RoleName=iam_lambda_role_name)
        print("Role already exists")
    else:
        print("Unexpected error: %s" % e)

# time.sleep(30)

iam_role_lambda_name = iam_role_lambda["Role"]["RoleName"]
print(f"Role Name: {iam_role_lambda_name}")

iam_role_lambda_arn = iam_role_lambda["Role"]["Arn"]
print(f"Role ARN: {iam_role_lambda_arn}")

In [10]:
lambda_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "UseLambdaFunction",
            "Effect": "Allow",
            "Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration"],
            "Resource": f"arn:aws:lambda:{region}:{account_id}:function:*",
        },
        {"Effect": "Allow", "Action": "cloudwatch:*", "Resource": "*"},
        {"Effect": "Allow", "Action": "sns:*", "Resource": "*"},
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": f"arn:aws:logs:{region}:{account_id}:*",
        },
        {"Effect": "Allow", "Action": "sagemaker:InvokeEndpoint", "Resource": "*"},
        {
            "Effect": "Allow",
            "Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
            "Resource": f"arn:aws:logs:{region}:{account_id}:log-group:/aws/lambda/*",
        },
    ],
}

response = iam.put_role_policy(
    RoleName=iam_role_lambda_name,
    PolicyName="ADOAWS_LambdaPolicy",
    PolicyDocument=json.dumps(lambda_policy_doc),
)

# time.sleep(30)

print(json.dumps(response, indent=4, sort_keys=True, default=str))

{
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "206",
            "content-type": "text/xml",
            "date": "Mon, 24 Apr 2023 08:24:15 GMT",
            "x-amzn-requestid": "9d075c70-1b51-4052-abf0-1145903f0713"
        },
        "HTTPStatusCode": 200,
        "RequestId": "9d075c70-1b51-4052-abf0-1145903f0713",
        "RetryAttempts": 0
    }
}


## Create Lambda to Return Prediction

### Create a .zip file for the Python dependencies (Lambda Layer)

In [11]:
!mkdir src
!touch src/get_pred_from_kinesis.py

**If you do not have zip in your environment use one of the below commands to install zip**

In [None]:
# !conda install -c conda-forge zip -y
# !pip install zip

This requires us to create a directory called `layer/python`

Note: This may take 5-10 minutes. Please be patient.

In [32]:
!rm -rf layer/python
!mkdir -p layer/python
!pip install -q --target layer/python transformers
!cd layer && zip -q --recurse-paths layer.zip .

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pytest-astropy 0.8.0 requires pytest-cov>=2.0, which is not installed.
pytest-astropy 0.8.0 requires pytest-filter-subpackage>=0.1, which is not installed.
conda 22.9.0 requires ruamel_yaml_conda>=0.11.14, which is not installed.
sparkmagic 0.20.4 requires nest-asyncio==1.5.5, but you have nest-asyncio 1.5.6 which is incompatible.
sagemaker 2.145.0 requires importlib-metadata<5.0,>=1.4.0, but you have importlib-metadata 6.6.0 which is incompatible.
sagemaker 2.145.0 requires PyYAML==5.4.1, but you have pyyaml 6.0 which is incompatible.
docker-compose 1.29.2 requires PyYAML<6,>=3.10, but you have pyyaml 6.0 which is incompatible.
awscli 1.27.111 requires PyYAML<5.5,>=3.10, but you have pyyaml 6.0 which is incompatible.
awscli 1.27.111 requires rsa<4.8,>=3.1.2, but you have rsa 4.9 which is incompatible.
aiobot

**Load the layer as binary**

In [33]:
with open("layer/layer.zip", "rb") as f:
    layer = f.read()

In [34]:
!ls -l layer/

total 44084
-rw-r--r--  1 root root 45137090 Apr 24 08:59 layer.zip
drwxr-xr-x 40 root root     6144 Apr 24 08:58 python


**Publish layer**

In [None]:
transformers_lambda_layer_name = "transformers-python-sdk-layer"
layer_response = lam.publish_layer_version(
    LayerName=transformers_lambda_layer_name,
    Content={"ZipFile": layer},
    Description="Layer with 'pip install transformers'",
    CompatibleRuntimes=["python3.9"],
)

layer_version_arn = layer_response["LayerVersionArn"]

print(
    f"Lambda layer {transformers_lambda_layer_name} successfully created with LayerVersionArn {layer_version_arn}."
)

<font color="red">If this doesn't work due to upload limit, manually create a layer, upload the zip file and update variable name `layer_version_arn` with your ARN</font>

## Create a .zip file for our Python code (Lambda Function)

**Contents of `src/get_pred_from_kinesis.py` file**

```
"""Python Lambda Handler function."""
import base64
import json
import random

from transformers import pipeline

sent_pipeline = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english"
)

def lambda_handler(event, context) -> json:
    """Lambda handler function."""
    outputs = []

    for record in event["records"]:
        payload = base64.b64decode(record["data"])
        text = payload.decode("utf-8")
        print(f"text: {text}")
        # Do custom processing on the payload here
        split_inputs = text.split("\t")
        review_body = split_inputs[2]
        print(f"Review body: {review_body}")
        
        predictions = sent_pipeline(review_body)
        inputs = [{"features": [review_body]}]

        for pred, input_data in zip(predictions, inputs):
            # Built output_record
            review_id = random.randint(0, 1000)
            # review_id, star_rating, review_body
            output_data = "{}\t{}\t{}".format(
                split_inputs[0], 0 if pred["label] == "NEGATIVE" else 1, input_data["features"]
            )
            output_data_encoded = output_data.encode("utf-8")
            output_record = {
                "recordId": record["recordId"],
                "result": "Ok",
                "data": base64.b64encode(output_data_encoded).decode("utf-8"),
            }
            outputs.append(output_record)

    return {"records": outputs}
```

If you face problems with Amazon Lambda you can:
1. Instead of using `transformers` layer hardcode some predictions, to see if it works
1. Use other sentiment analysis libraries (upload them via layer `lam.publish_layer_version` api
1. Use HuggingFace Lambda Serverless 
1. Or use a sagemaker endpoint

In [26]:
!zip src/GetPredFromKinesis.zip src/get_pred_from_kinesis.py

  adding: src/get_pred_from_kinesis.py (deflated 37%)


**Load file as binary**

In [28]:
with open("src/GetPredFromKinesis.zip", "rb") as f:
    code = f.read()

<font color="blue">You can also use lambda to invoke a sagemaker endpoint of your trained model. Lambda function code for that will change, plus you will have to attach the endpint to the lambda.</font>

## Create The Lambda Function

In [29]:
pred_lambda_fn_name = "GetPredFromKinesis"

In [36]:
try:
    response = lam.create_function(
        FunctionName=f"{pred_lambda_fn_name}",
        Runtime="python3.9",
        Role=f"{iam_role_lambda_arn}",
        Handler="src/get_pred_from_kinesis.lambda_handler",
        Code={"ZipFile": code},
        Layers=[layer_version_arn],
        Description="Get sentiment prediction using transformers on review input text.",
        # max timeout supported by Firehose is 5min
        Timeout=300,
        MemorySize=128,
        Publish=True,
    )
    print(f"Lambda Function {pred_lambda_fn_name} successfully created.")
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceConflictException":
        response = lam.update_function_code(
            FunctionName=f"{pred_lambda_fn_name}",
            ZipFile=code,
            Publish=True,
            DryRun=False,
        )
        print(f"Updating existing Lambda Function {pred_lambda_fn_name}.  This is OK.")
    else:
        print(f"Error: {e}")

Lambda Function GetPredFromKinesis successfully created.


In [None]:
response = lam.get_function(FunctionName=pred_lambda_fn_name)

pred_lambda_fn_arn = response["Configuration"]["FunctionArn"]
print(pred_lambda_fn_arn)

## Create a Kinesis Data Firehose Delivery Stream

In [None]:
try:
    response = firehose.create_delivery_stream(
        DeliveryStreamName=firehose_name,
        DeliveryStreamType="DirectPut",
        ExtendedS3DestinationConfiguration={
            "RoleARN": iam_role_kinesis_arn,
            "BucketARN": f"arn:aws:s3:::{bucket}",
            "Prefix": "kinesis-data-firehose/",
            "ErrorOutputPrefix": "kinesis-data-firehose-error/",
            "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},
            "CompressionFormat": "UNCOMPRESSED",
            "CloudWatchLoggingOptions": {
                "Enabled": True,
                "LogGroupName": "/aws/kinesisfirehose/dsoaws-kinesis-data-firehose",
                "LogStreamName": "S3Delivery",
            },
            "ProcessingConfiguration": {
                "Enabled": True,
                "Processors": [
                    {
                        "Type": "Lambda",
                        "Parameters": [
                            {
                                "ParameterName": "LambdaArn",
                                "ParameterValue": f"{pred_lambda_fn_arn}:$LATEST",
                            },
                            {"ParameterName": "BufferSizeInMBs", "ParameterValue": "1"},
                            {
                                "ParameterName": "BufferIntervalInSeconds",
                                "ParameterValue": "60",
                            },
                        ],
                    }
                ],
            },
            "S3BackupMode": "Enabled",
            "S3BackupConfiguration": {
                "RoleARN": iam_role_kinesis_arn,
                "BucketARN": f"arn:aws:s3:::{bucket}",
                "Prefix": "kinesis-data-firehose-source-record/",
                "ErrorOutputPrefix": "!{firehose:error-output-type}/",
                "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},
                "CompressionFormat": "UNCOMPRESSED",
            },
            "CloudWatchLoggingOptions": {
                "Enabled": False,
            },
        },
    )
    print(f"Delivery stream {firehose_name} successfully created.")
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print(f"Delivery stream {firehose_name} already exists.")
    else:
        print("Unexpected error: %s" % e)

In [41]:
# This will take a while
status = ""
while status != "ACTIVE":
    r = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)
    description = r.get("DeliveryStreamDescription")
    status = description.get("DeliveryStreamStatus")
    time.sleep(5)

print(f"Delivery Stream {firehose_name} is active")

Delivery Stream adows-kinesis-data-firehose is active


In [None]:
firehose_arn = r["DeliveryStreamDescription"]["DeliveryStreamARN"]
print(firehose_arn)

## Create Lambda Destination SNS

### Create SNS Topic

In [None]:
topics = sns.list_topics()
print(topics)

In [None]:
response = sns.create_topic(
    Name="review_anomaly_scores",
)
print(response)

In [None]:
sns_topic_arn = response["TopicArn"]
print(sns_topic_arn)

### Lambda Function Code

In [48]:
!touch src/push_notification_to_sns.py

Contents of our `src/push_notification_to_sns.py` file:

```
"""Python Lambda SNS handler"""

import base64
import os

import boto3

SNS_TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]

sns = boto3.client("sns")


def lambda_handler(event, context):
    output = []
    success = 0
    failure = 0
    highest_score = 0

    print(f"event: {event}")
    r = event["records"]
    print(f"records: {r}")
    print(f"type_records: {type(r)}")

    for record in event["records"]:
        try:
            # Uncomment the below line to publish the decoded data to the SNS topic.
            payload = base64.b64decode(record["data"])
            print(f"payload: {payload}")
            text = payload.decode("utf-8")
            print(f"text: {text}")
            score = float(text)
            if (score != 0) and (score > highest_score):
                highest_score = score
                print(f"New highest_score: {highest_score}")
                # sns.publish(TopicArn=SNS_TOPIC_ARN, Message='New anomaly score: {}'.format(text), Subject='New Reviews Anomaly Score Detected')
                output.append({"recordId": record["recordId"], "result": "Ok"})
                success += 1
        except Exception as e:
            print(e)
            output.append({"recordId": record["recordId"], "result": "DeliveryFailed"})
            failure += 1
    if highest_score != 0:
        sns.publish(
            TopicArn=SNS_TOPIC_ARN,
            Message=f"New anomaly score: {str(highest_score)}",
            Subject="New Reviews Anomaly Score Detected",
        )
    print(
        f"Successfully delivered {success} records, failed to deliver {failure} records"
    )
    return {"records": output}
```

### Zip the code

In [49]:
!zip src/PushNotificationToSNS.zip src/push_notification_to_sns.py

  adding: src/push_notification_to_sns.py (deflated 62%)


### Load the zip as binary

In [50]:
with open("src/PushNotificationToSNS.zip", "rb") as f:
    code = f.read()

### Create the Lambda Function

In [51]:
try:
    response = lam.create_function(
        FunctionName=f"{lambda_fn_name_sns}",
        Runtime="python3.9",
        Role=f"{iam_role_lambda_arn}",
        Handler="src/push_notification_to_sns.lambda_handler",
        Code={"ZipFile": code},
        Description="Deliver output records from Kinesis Analytics application to CloudWatch.",
        Timeout=300,
        MemorySize=128,
        Publish=True,
    )
    print(f"Lambda Function {lambda_fn_name_sns} successfully created.")

except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceConflictException":
        response = lam.update_function_code(
            FunctionName=f"{lambda_fn_name_sns}",
            ZipFile=code,
            Publish=True,
            DryRun=False,
        )
        print(f"Updating existing Lambda Function {lambda_fn_name_sns}.  This is OK.")
    else:
        print(f"Error: {e}")

Lambda Function PushNotificationToSNS successfully created.


In [None]:
response = lam.get_function(FunctionName=lambda_fn_name_sns)

lambda_fn_arn_sns = response["Configuration"]["FunctionArn"]
print(lambda_fn_arn_sns)

### Update Lambda Function with SNS Topic ARN

In [53]:
response = lam.update_function_configuration(
    FunctionName=lambda_fn_name_sns,
    Environment={"Variables": {"SNS_TOPIC_ARN": sns_topic_arn}},
)

## Create Kinesis Data Analytics App

In [55]:
kinesis_data_analytics_app_name = "adoaws-kinesis-data-analytics-sql-app"
in_app_stream_name = "SOURCE_SQL_STREAM_001"  # Default
print(in_app_stream_name)

SOURCE_SQL_STREAM_001


### Create Application

In [56]:
sql_code = """ \
        CREATE OR REPLACE STREAM "ANOMALY_SCORE_SQL_STREAM" (anomaly_score DOUBLE); \
        CREATE OR REPLACE PUMP "ANOMALY_SCORE_STREAM_PUMP" AS \
            INSERT INTO "ANOMALY_SCORE_SQL_STREAM" \
            SELECT STREAM anomaly_score \
            FROM TABLE(RANDOM_CUT_FOREST( \
                CURSOR(SELECT STREAM "star_rating" \
                    FROM "{}" \
            ) \
          ) \
        ); \
    """.format(
    in_app_stream_name
)

In [None]:
try:
    response = kinesis_analytics.create_application(
        ApplicationName=kinesis_data_analytics_app_name,
        Inputs=[
            {
                "NamePrefix": "SOURCE_SQL_STREAM",
                "KinesisFirehoseInput": {
                    "ResourceARN": f"{firehose_arn}",
                    "RoleARN": f"{iam_role_kinesis_arn}",
                },
                "InputProcessingConfiguration": {
                    "InputLambdaProcessor": {
                        "ResourceARN": f"{pred_lambda_fn_arn}",
                        "RoleARN": f"{iam_role_lambda_arn}",
                    }
                },
                "InputSchema": {
                    "RecordFormat": {
                        "RecordFormatType": "CSV",
                        "MappingParameters": {
                            "CSVMappingParameters": {
                                "RecordRowDelimiter": "\n",
                                "RecordColumnDelimiter": "\t",
                            }
                        },
                    },
                    "RecordColumns": [
                        {
                            "Name": "review_id",
                            "Mapping": "review_id",
                            "SqlType": "VARCHAR(14)",
                        },
                        {
                            "Name": "star_rating",
                            "Mapping": "star_rating",
                            "SqlType": "INTEGER",
                        },
                        {
                            "Name": "review_body",
                            "Mapping": "review_body",
                            "SqlType": "VARCHAR(65535)",
                        },
                    ],
                },
            },
        ],
        Outputs=[
            {
                "Name": "ANOMALY_SCORE_SQL_STREAM",
                "LambdaOutput": {
                    "ResourceARN": f"{lambda_fn_arn_sns}",
                    "RoleARN": f"{iam_role_kinesis_arn}",
                },
                "DestinationSchema": {"RecordFormatType": "CSV"},
            },
        ],
        ApplicationCode=sql_code,
    )
    print(f"SQL application {kinesis_data_analytics_app_name} successfully created.")
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print(f"SQL App {kinesis_data_analytics_app_name} already exists.")
    else:
        print("Unexpected error: %s" % e)

In [None]:
response = kinesis_analytics.describe_application(
    ApplicationName=kinesis_data_analytics_app_name
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

In [65]:
input_id = response["ApplicationDetail"]["InputDescriptions"][0]["InputId"]
print(input_id)

1.1


## Start the Kinesis Data Analytics App

In [66]:
try:
    response = kinesis_analytics.start_application(
        ApplicationName=kinesis_data_analytics_app_name,
        InputConfigurations=[
            {
                "Id": input_id,
                "InputStartingPositionConfiguration": {"InputStartingPosition": "NOW"},
            }
        ],
    )
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response["Error"]["Code"] == "ResourceInUseException":
        print(f"Application {kinesis_data_analytics_app_name} is already starting.")
    else:
        print(f"Error: {e}")

{
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "2",
            "content-type": "application/x-amz-json-1.1",
            "date": "Mon, 24 Apr 2023 10:13:54 GMT",
            "x-amzn-requestid": "46423607-a054-4937-85c0-9b7e7aec7fca"
        },
        "HTTPStatusCode": 200,
        "RequestId": "46423607-a054-4937-85c0-9b7e7aec7fca",
        "RetryAttempts": 0
    }
}


## Put Reviews on Kinesis Data Firehose

In [67]:
firehoses = firehose.list_delivery_streams(DeliveryStreamType="DirectPut")

print(json.dumps(firehoses, indent=4, sort_keys=True, default=str))

{
    "DeliveryStreamNames": [
        "adows-kinesis-data-firehose"
    ],
    "HasMoreDeliveryStreams": false,
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "86",
            "content-type": "application/x-amz-json-1.1",
            "date": "Mon, 24 Apr 2023 10:14:09 GMT",
            "x-amz-id-2": "pTNqUk5OTYrgq/OchP7yD11y7AUtw2P9bUX0ijvq0cD+XzrT1EDu5N/+OxWcFVJHa8GZCdhK9vJVWPZUAqxliDmjSKll4uqC",
            "x-amzn-requestid": "f7413e6b-e874-8e39-ab81-66da231fb815"
        },
        "HTTPStatusCode": 200,
        "RequestId": "f7413e6b-e874-8e39-ab81-66da231fb815",
        "RetryAttempts": 0
    }
}


### Simulate Producer Application Writing Records to the Stream

### Put Records into Firehose

In [None]:
firehose_response = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)

print(json.dumps(firehose_response, indent=4, sort_keys=True, default=str))

In [76]:
pd.DataFrame(
    [
        {
            "review_id": 12,
            "review_body": "This is an awful waste of time.",
        },
    ],
    columns=["review_id", "star_rating", "review_body"],
).to_csv(sep="\t", header=None, index=False).encode("utf-8")

b'12\t\tThis is an awful waste of time.\n'

In [79]:
%%time

step = 1

for _ in range(0, 10, step):

    timestamp = int(time.time())

    df_anomalies = pd.DataFrame(
        [
            {
                "review_id": str(timestamp),
                "review_body": "This is an awful waste of time.",
            },
        ],
        columns=["review_id", "star_rating", "review_body"],
    )

    reviews_tsv_anomalies = df_anomalies.to_csv(sep="\t", header=None, index=False)

    response = firehose.put_record(
        Record={"Data": reviews_tsv_anomalies.encode("utf-8")},
        DeliveryStreamName=firehose_name,
    )

CPU times: user 45.1 ms, sys: 5.47 ms, total: 50.6 ms
Wall time: 198 ms


## Release Resources

- Delete Kinesis Data Analytics App

In [None]:
response = kinesis_analytics.stop_application(
    ApplicationName=kinesis_data_analytics_app_name
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

response = kinesis_analytics.describe_application(
    ApplicationName=kinesis_data_analytics_app_name
)
app_status = response["ApplicationDetail"]["ApplicationStatus"]

while app_status != "READY":
    response = kinesis_analytics.delete_application(
        ApplicationName=kinesis_data_analytics_app_name,
        CreateTimestamp="2023-04-24 11:44:00+00:00", # Change this
    )
print(json.dumps(response, indent=4, sort_keys=True, default=str))

try:
    response = kinesis_analytics.describe_application(
        ApplicationName=kinesis_data_analytics_app_name
    )
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ResourceNotFoundException:
    print("App deleted successfully")

- Delete Lambda Functions
- Delete SNS Topic
- Delete Kinesis Data Firhose Delivery Stream
- Delete other resources that I missed :)

***