# Kinesis Data Analytics for SQL Applications

To get started with Kinesis Data Analytics, you create a Kinesis data analytics application that continuously reads and processes streaming data.

<img src="img/use_case_1_analytics.png" width="80%" align="left">

In [1]:
import boto3
import sagemaker
import pandas as pd
import json

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sts = boto3.Session().client(service_name='sts', region_name=region)
account_id = sts.get_caller_identity()['Account']

sm = boto3.Session().client(service_name='sagemaker', region_name=region)
firehose = boto3.Session().client(service_name='firehose', region_name=region)
kinesis_analytics = boto3.Session().client(service_name='kinesisanalytics', region_name=region)

In [2]:
%store -r firehose_name

In [3]:
try:
    firehose_name
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [4]:
print(firehose_name)

dsoaws-kinesis-data-firehose


In [5]:
%store -r firehose_arn

In [6]:
try:
    firehose_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [7]:
print(firehose_arn)

arn:aws:firehose:us-west-2:085964654406:deliverystream/dsoaws-kinesis-data-firehose


In [8]:
%store -r iam_role_kinesis_arn

In [9]:
try:
    iam_role_kinesis_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [10]:
print(iam_role_kinesis_arn)

arn:aws:iam::085964654406:role/DSOAWS_Kinesis


In [11]:
%store -r stream_arn

In [12]:
try:
    stream_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [13]:
print(stream_arn)

arn:aws:kinesis:us-west-2:085964654406:stream/dsoaws-kinesis-data-stream


In [14]:
%store -r lambda_fn_arn

In [15]:
try:
    lambda_fn_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [16]:
print(lambda_fn_arn)

arn:aws:lambda:us-west-2:085964654406:function:DeliverKinesisAnalyticsToCloudWatch


In [17]:
%store -r iam_role_lambda_arn

In [18]:
try:
    iam_role_lambda_arn
except NameError:
    print('+++++++++++++++++++++++++++++++')
    print('[ERROR] Please run all previous notebooks in this section before you continue.')
    print('+++++++++++++++++++++++++++++++')

In [19]:
print(iam_role_lambda_arn)

arn:aws:iam::085964654406:role/DSOAWS_Lambda


# Create a Kinesis Data Analytics for SQL Application


## Define the Kinesis Analytics Application Name

In [20]:
kinesis_data_analytics_app_name = 'dsoaws-kinesis-data-analytics-sql-app'

In [21]:
in_app_stream_name = 'firehose_001' # Default

## Create Application

In [22]:
window_seconds = 5

In [23]:
sql_code = ''' \
        CREATE OR REPLACE STREAM "AVG_STAR_RATING_SQL_STREAM" ( \
            avg_star_rating DOUBLE); \
        CREATE OR REPLACE PUMP "AVG_STAR_RATING_STREAM_PUMP" AS \
            INSERT INTO "AVG_STAR_RATING_SQL_STREAM" \
                SELECT STREAM AVG(CAST("star_rating" AS DOUBLE)) AS avg_star_rating \
                FROM "{}" \
                GROUP BY \
                STEP("{}".ROWTIME BY INTERVAL '{}' SECOND); \
         \
        CREATE OR REPLACE STREAM "ANOMALY_SCORE_SQL_STREAM" (anomaly_score DOUBLE); \
        CREATE OR REPLACE PUMP "ANOMALY_SCORE_STREAM_PUMP" AS \
            INSERT INTO "ANOMALY_SCORE_SQL_STREAM" \
            SELECT STREAM anomaly_score \
            FROM TABLE(RANDOM_CUT_FOREST( \
                CURSOR(SELECT STREAM "star_rating" \
                    FROM "{}" \
            ) \
          ) \
        ); \
         \
        CREATE OR REPLACE STREAM "APPROXIMATE_COUNT_SQL_STREAM" (number_of_distinct_items BIGINT); \
        CREATE OR REPLACE PUMP "APPROXIMATE_COUNT_STREAM_PUMP" AS \
            INSERT INTO "APPROXIMATE_COUNT_SQL_STREAM" \
            SELECT STREAM number_of_distinct_items \
            FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING( \
                CURSOR(SELECT STREAM "review_id" FROM "{}"), \
                'review_id', \
                {} \
              ) \
        ); \
    '''.format(in_app_stream_name, 
             in_app_stream_name, 
             window_seconds,
             in_app_stream_name, 
             in_app_stream_name, 
             window_seconds)

print(sql_code)

         CREATE OR REPLACE STREAM "AVG_STAR_RATING_SQL_STREAM" (             avg_star_rating DOUBLE);         CREATE OR REPLACE PUMP "AVG_STAR_RATING_STREAM_PUMP" AS             INSERT INTO "AVG_STAR_RATING_SQL_STREAM"                 SELECT STREAM AVG(CAST("star_rating" AS DOUBLE)) AS avg_star_rating                 FROM "firehose_001"                 GROUP BY                 STEP("firehose_001".ROWTIME BY INTERVAL '5' SECOND);                  CREATE OR REPLACE STREAM "ANOMALY_SCORE_SQL_STREAM" (anomaly_score DOUBLE);         CREATE OR REPLACE PUMP "ANOMALY_SCORE_STREAM_PUMP" AS             INSERT INTO "ANOMALY_SCORE_SQL_STREAM"             SELECT STREAM anomaly_score             FROM TABLE(RANDOM_CUT_FOREST(                 CURSOR(SELECT STREAM "star_rating"                     FROM "firehose_001"             )           )         );                  CREATE OR REPLACE STREAM "APPROXIMATE_COUNT_SQL_STREAM" (number_of_distinct_items BIGINT);         CREATE OR REPLACE PUMP "APPROXIMATE

In [24]:
from botocore.exceptions import ClientError

try: 
    response = kinesis_analytics.create_application(
        ApplicationName=kinesis_data_analytics_app_name,
        Inputs=[
            {
                'NamePrefix': 'firehose',
                'KinesisFirehoseInput': {
                    'ResourceARN': '{}'.format(firehose_arn),
                    'RoleARN': '{}'.format(iam_role_kinesis_arn)
                },
                'InputSchema': {
                    'RecordFormat': {
                        'RecordFormatType': 'CSV',
                        'MappingParameters': {
                            'CSVMappingParameters': {
                                'RecordRowDelimiter': '\n',
                                'RecordColumnDelimiter': '\t'
                            }
                        }
                    },
                    'RecordColumns': [
                        {
                            'Name': 'review_id',
                            'Mapping': 'review_id',
                            'SqlType': 'VARCHAR(14)'
                        },                    
                        {
                            'Name': 'star_rating',
                            'Mapping': 'star_rating',
                            'SqlType': 'INTEGER'
                        },
                        {
                            'Name': 'product_category',
                            'Mapping': 'product_category',
                            'SqlType': 'VARCHAR(24)'
                        },                    
                        {
                            'Name': 'review_body',
                            'Mapping': 'review_body',
                            'SqlType': 'VARCHAR(65535)'
                        }                    
                    ]
                }
            },
        ],
        Outputs=[
            {
                'Name': 'AVG_STAR_RATING_STREAM',            
                'KinesisStreamsOutput': {
                    'ResourceARN': '{}'.format(stream_arn),
                    'RoleARN': '{}'.format(iam_role_kinesis_arn)
                },
                'DestinationSchema': {
                    'RecordFormatType': 'CSV'
                }
            },
            {
                'Name': 'AVG_STAR_RATING_SQL_STREAM',
                'LambdaOutput': {
                    'ResourceARN': '{}'.format(lambda_fn_arn),
                    'RoleARN': '{}'.format(iam_role_lambda_arn)
                },
                'DestinationSchema': {
                    'RecordFormatType': 'CSV'
                }
            }
        ],
        ApplicationCode=sql_code
    )
    print('SQL application {} successfully created.'.format(kinesis_data_analytics_app_name))
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceInUseException':
        print('SQL App {} already exists.'.format(kinesis_data_analytics_app_name))
    else:
        print('Unexpected error: %s' % e)
    

SQL application dsoaws-kinesis-data-analytics-sql-app successfully created.
{
    "ApplicationSummary": {
        "ApplicationARN": "arn:aws:kinesisanalytics:us-west-2:085964654406:application/dsoaws-kinesis-data-analytics-sql-app",
        "ApplicationName": "dsoaws-kinesis-data-analytics-sql-app",
        "ApplicationStatus": "READY"
    },
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "227",
            "content-type": "application/x-amz-json-1.1",
            "date": "Sat, 26 Sep 2020 20:52:28 GMT",
            "x-amzn-requestid": "795a6d7b-f0bb-42d3-ab0d-32e6462fa29a"
        },
        "HTTPStatusCode": 200,
        "RequestId": "795a6d7b-f0bb-42d3-ab0d-32e6462fa29a",
        "RetryAttempts": 0
    }
}


In [25]:
response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)
print(json.dumps(response, indent=4, sort_keys=True, default=str))

{
    "ApplicationDetail": {
        "ApplicationARN": "arn:aws:kinesisanalytics:us-west-2:085964654406:application/dsoaws-kinesis-data-analytics-sql-app",
        "ApplicationCode": "CREATE OR REPLACE STREAM \"AVG_STAR_RATING_SQL_STREAM\" (             avg_star_rating DOUBLE);         CREATE OR REPLACE PUMP \"AVG_STAR_RATING_STREAM_PUMP\" AS             INSERT INTO \"AVG_STAR_RATING_SQL_STREAM\"                 SELECT STREAM AVG(CAST(\"star_rating\" AS DOUBLE)) AS avg_star_rating                 FROM \"firehose_001\"                 GROUP BY                 STEP(\"firehose_001\".ROWTIME BY INTERVAL '5' SECOND);                  CREATE OR REPLACE STREAM \"ANOMALY_SCORE_SQL_STREAM\" (anomaly_score DOUBLE);         CREATE OR REPLACE PUMP \"ANOMALY_SCORE_STREAM_PUMP\" AS             INSERT INTO \"ANOMALY_SCORE_SQL_STREAM\"             SELECT STREAM anomaly_score             FROM TABLE(RANDOM_CUT_FOREST(                 CURSOR(SELECT STREAM \"star_rating\"                     FROM \"fireho

In [26]:
input_id = response['ApplicationDetail']['InputDescriptions'][0]['InputId']
print(input_id)

1.1


# Start the Kinesis Data Analytics App

In [27]:
try: 
    response = kinesis_analytics.start_application(
        ApplicationName=kinesis_data_analytics_app_name,
        InputConfigurations=[
            {
                'Id': input_id,
                'InputStartingPositionConfiguration': {
                    'InputStartingPosition': 'NOW'
                }
            }
        ]
    )
    print(json.dumps(response, indent=4, sort_keys=True, default=str))
except ClientError as e:
    if e.response['Error']['Code'] == 'ResourceInUseException':
        print('Application {} is already starting.'.format(kinesis_data_analytics_app_name))
    else:
        print('Error: {}'.format(e))

{
    "ResponseMetadata": {
        "HTTPHeaders": {
            "content-length": "2",
            "content-type": "application/x-amz-json-1.1",
            "date": "Sat, 26 Sep 2020 20:52:28 GMT",
            "x-amzn-requestid": "23608d5a-95c6-4978-a3e8-e0e2c8aad044"
        },
        "HTTPStatusCode": 200,
        "RequestId": "23608d5a-95c6-4978-a3e8-e0e2c8aad044",
        "RetryAttempts": 0
    }
}


In [28]:
%store kinesis_data_analytics_app_name

Stored 'kinesis_data_analytics_app_name' (str)


# Explore Kinesis Data Analytics App

In [29]:
from IPython.core.display import display, HTML
        
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/hub?applicationName={}"> Kinesis Data Analytics App</a></b>'.format(region, kinesis_data_analytics_app_name)))


In [30]:
response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)

In [31]:
%%time

import time

app_status = response['ApplicationDetail']['ApplicationStatus']
print('Application status {}'.format(app_status))

while app_status != 'RUNNING':
    time.sleep(5)
    response = kinesis_analytics.describe_application(
        ApplicationName=kinesis_data_analytics_app_name)
    app_status = response['ApplicationDetail']['ApplicationStatus']
    print('Application status {}'.format(app_status))

print('Application status {}'.format(app_status))

Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status STARTING
Application status S

# _Please be patient.  ^^ This may take a few minutes. ^^_

# Store Variables for Next Notebooks

In [32]:
%store

Stored variables and their in-db values:
auto_ml_job_name                                      -> 'automl-dm-26-16-00-25'
autopilot_endpoint_name                               -> 'automl-dm-ep-26-16-21-49'
autopilot_train_s3_uri                                -> 's3://sagemaker-us-west-2-085964654406/data/amazon
balance_dataset                                       -> True
experiment_name                                       -> 'Amazon-Customer-Reviews-BERT-Experiment-160114585
firehose_arn                                          -> 'arn:aws:firehose:us-west-2:085964654406:deliverys
firehose_name                                         -> 'dsoaws-kinesis-data-firehose'
iam_kinesis_role_name                                 -> 'DSOAWS_Kinesis'
iam_kinesis_role_passed                               -> True
iam_lambda_role_name                                  -> 'DSOAWS_Lambda'
iam_lambda_role_passed                                -> True
iam_role_kinesis_arn                             

In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();