In [1]:
import boto3
import os
import keys

os.environ['AWS_ACCESS_KEY_ID'] = keys.AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = keys.AWS_SECRET_ACCESS_KEY
os.environ['AWS_DEFAULT_REGION'] = keys.AWS_DEFAULT_REGION

In [2]:
import logging
from pprint import pprint
import sys
import threading
import time
import boto3

from kinesis.streams.kinesis_stream import KinesisStream
from kinesis.analytics.analytics_application import KinesisAnalyticsApplication

sys.path.append('../..')
from demo_tools.custom_waiter import CustomWaiter, WaitState
from demo_tools.retries import exponential_retry

logger = logging.getLogger(__name__)

In [3]:
class ApplicationRunningWaiter(CustomWaiter):
    """
    Waits for the application to be in a running state.
    """
    def __init__(self, client):
        super().__init__(
            'ApplicationRunning', 'DescribeApplication',
            'ApplicationDetail.ApplicationStatus',
            {'RUNNING': WaitState.SUCCESS, 'STOPPING': WaitState.FAILURE},
            client)

    def wait(self, app_name):
        self._wait(ApplicationName=app_name)

In [4]:
print('-'*88)
print("Setting up Data Streams and Continuous Query.")
print('-'*88)

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

kinesis_client = boto3.client('kinesis')
iam_resource = boto3.resource('iam')
analytics_client = boto3.client('kinesisanalytics')
application = KinesisAnalyticsApplication(analytics_client)
app_running_waiter = ApplicationRunningWaiter(analytics_client)

input_stream_name = 'canaal-input'
input_prefix = 'SOURCE_SQL_STREAM'
output_stream_name = 'canaal-output'
app_name = 'canaal-app'
role_name = 'kinesis-read-write'

print(f"Creating input stream {input_stream_name} and output stream {output_stream_name}.")
input_stream = KinesisStream(kinesis_client)
input_stream.create(input_stream_name)
output_stream = KinesisStream(kinesis_client)
output_stream.create(output_stream_name)

print(f"Creating role {role_name} to let Kinesis Analytics read from the input stream and write to the output stream.")
role = application.create_read_write_role(role_name, input_stream.arn(), output_stream.arn(), iam_resource)
print("Waiting for role to be ready.")
time.sleep(10)

INFO: Found credentials in environment variables.


----------------------------------------------------------------------------------------
Setting up Data Streams and Continuous Query.
----------------------------------------------------------------------------------------
Creating input stream canaal-input and output stream canaal-output.


INFO: Created stream canaal-input.
INFO: Waiting until exists.
INFO: Got stream canaal-input.
INFO: Created stream canaal-output.
INFO: Waiting until exists.
INFO: Got stream canaal-output.


Creating role kinesis-read-write to let Kinesis Analytics read from the input stream and write to the output stream.


INFO: Created role kinesis-read-write-role and attached policy kinesis-read-write-policy to allow read from stream arn:aws:kinesis:us-east-2:333152911718:stream/canaal-input and write to stream arn:aws:kinesis:us-east-2:333152911718:stream/canaal-output.


Waiting for role to be ready.


In [5]:
response = kinesis_client.list_streams()
response['StreamNames']

['canaal-input', 'canaal-output']

In [6]:
response = kinesis_client.describe_stream_summary(StreamName='canaal-input')
response['StreamDescriptionSummary']

{'StreamName': 'canaal-input',
 'StreamARN': 'arn:aws:kinesis:us-east-2:333152911718:stream/canaal-input',
 'StreamStatus': 'ACTIVE',
 'StreamModeDetails': {'StreamMode': 'PROVISIONED'},
 'RetentionPeriodHours': 24,
 'StreamCreationTimestamp': datetime.datetime(2022, 11, 14, 17, 18, 17, tzinfo=tzlocal()),
 'EnhancedMonitoring': [{'ShardLevelMetrics': []}],
 'EncryptionType': 'NONE',
 'OpenShardCount': 1,
 'ConsumerCount': 0}

In [9]:
print(f"Creating application {app_name}.")
app_data = exponential_retry('InvalidArgumentException')(application.create)(app_name, role.arn)
application.create_timestamp = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['CreateTimestamp']
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']

pprint(app_data)
print(f"Discovering schema of input stream {input_stream.name}.")
input_schema = application.discover_input_schema(input_stream.arn(), role.arn)
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']
pprint(input_schema)

{'ApplicationARN': 'arn:aws:kinesisanalytics:us-east-2:333152911718:application/canaal-app',
 'ApplicationName': 'canaal-app',
 'ApplicationStatus': 'READY'}
Discovering schema of input stream canaal-input.


INFO: Discovered input schema for stream arn:aws:kinesis:us-east-2:333152911718:stream/canaal-input.


{'RecordColumns': [{'Mapping': '$.tweet',
                    'Name': 'tweet',
                    'SqlType': 'VARCHAR(128)'},
                   {'Mapping': '$.pos',
                    'Name': 'pos',
                    'SqlType': 'DECIMAL(4,3)'},
                   {'Mapping': '$.neu',
                    'Name': 'neu',
                    'SqlType': 'DECIMAL(4,3)'},
                   {'Mapping': '$.neg',
                    'Name': 'neg',
                    'SqlType': 'DECIMAL(1,1)'},
                   {'Mapping': '$.block',
                    'Name': 'block',
                    'SqlType': 'INTEGER'}],
 'RecordEncoding': 'UTF-8',
 'RecordFormat': {'MappingParameters': {'JSONMappingParameters': {'RecordRowPath': '$'}},
                  'RecordFormatType': 'JSON'}}


In [10]:
print("Adding input stream to the application.")
input_details = application.add_input(
    input_prefix, input_stream.arn(), role.arn, input_schema)
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']
print("Input details:")
pprint(input_details)

print("Uploading SQL code to the application to process the input stream.")
with open('example.sql') as code_file:
    code = code_file.read()
application.update_code(code)
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']

print("Adding output stream to the application.")
application.add_output('DESTINATION_SQL_STREAM', output_stream.arn(), role.arn)

INFO: Add input stream arn:aws:kinesis:us-east-2:333152911718:stream/canaal-input to application canaal-app.


Adding input stream to the application.
Input details:
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '2',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Mon, 14 Nov 2022 23:19:39 GMT',
                                      'x-amzn-requestid': '22581c8a-5332-4672-833b-33269d7b53d3'},
                      'HTTPStatusCode': 200,
                      'RequestId': '22581c8a-5332-4672-833b-33269d7b53d3',
                      'RetryAttempts': 0}}
Uploading SQL code to the application to process the input stream.


INFO: Update code for application canaal-app.
INFO: Added output arn:aws:kinesis:us-east-2:333152911718:stream/canaal-output to canaal-app.


Adding output stream to the application.


In [11]:
print("Starting the application.")
application.start(analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['InputDescriptions'][0]['InputId'])
print("Waiting for the application to start (this may take a minute or two).")
app_running_waiter.wait(application.name)

INFO: Started application canaal-app.


Starting the application.
Waiting for the application to start (this may take a minute or two).


INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got RUNNING.


In [12]:
print("Application started. Getting records from the output stream.")
logging.disable(level=logging.INFO)
for records in output_stream.get_records(50):
    if records != []:
        print(*[rec['Data'].decode() for rec in records], sep='\n')

Application started. Getting records from the output stream.
{"block":29098772,"AVG_POS":0.172,"AVG_NEU":0.828,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098777,"AVG_POS":0.172,"AVG_NEU":0.828,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098779,"AVG_POS":0.172,"AVG_NEU":0.828,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098783,"AVG_POS":0,"AVG_NEU":1,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098784,"AVG_POS":0,"AVG_NEU":1,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098789,"AVG_POS":0.351,"AVG_NEU":0.649,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098790,"AVG_POS":0,"AVG_NEU":1,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098793,"AVG_POS":0,"AVG_NEU":1,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098795,"AVG_POS":0,"AVG_NEU":1,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098801,"AVG_POS":0.172,"AVG_NEU":0.828,"AVG_NEG":0,"TOTAL_TWT":1}
{"block":29098802,"AVG_POS":0.243,"AVG_NEU":0.757,"AVG_NEG":0,"TOTAL_TWT":2}
{"block":29098803,"AVG_POS":0,"AVG_NEU":0.861,"AVG_NEG":0.100,"TOTAL_TWT":1}
{"block":29098804,"AVG_POS":0,"AVG_NEU":0.769,"AVG_NEG"

In [13]:
print("Cleaning up...")
application.delete()
input_stream.delete()
output_stream.delete()
print("Deleting read/write role.")
for policy in role.attached_policies.all():
    role.detach_policy(PolicyArn=policy.arn)
    policy.delete()
role.delete()
print('-'*88)

Cleaning up...
Deleting read/write role.
----------------------------------------------------------------------------------------
