In [1]:
import boto3
import os
import keys

os.environ['AWS_ACCESS_KEY_ID'] = keys.AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = keys.AWS_SECRET_ACCESS_KEY
os.environ['AWS_DEFAULT_REGION'] = keys.AWS_DEFAULT_REGION

In [2]:
import logging
from pprint import pprint
import sys
import threading
import time
import boto3

from kinesis.streams.kinesis_stream import KinesisStream
from kinesis.analytics.analytics_application import KinesisAnalyticsApplication

sys.path.append('../..')
from demo_tools.custom_waiter import CustomWaiter, WaitState
from demo_tools.retries import exponential_retry

logger = logging.getLogger(__name__)

In [3]:
class ApplicationRunningWaiter(CustomWaiter):
    """
    Waits for the application to be in a running state.
    """
    def __init__(self, client):
        super().__init__(
            'ApplicationRunning', 'DescribeApplication',
            'ApplicationDetail.ApplicationStatus',
            {'RUNNING': WaitState.SUCCESS, 'STOPPING': WaitState.FAILURE},
            client)

    def wait(self, app_name):
        self._wait(ApplicationName=app_name)

In [4]:
print('-'*88)
print("Setting up Data Streams and Continuous Query.")
print('-'*88)

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

kinesis_client = boto3.client('kinesis')
iam_resource = boto3.resource('iam')
analytics_client = boto3.client('kinesisanalytics')
application = KinesisAnalyticsApplication(analytics_client)
app_running_waiter = ApplicationRunningWaiter(analytics_client)

input_stream_name = 'moralis-input'
input_prefix = 'SOURCE_SQL_STREAM'
output_stream_name = 'moralis-output'
app_name = 'moralis-app'
role_name = 'kinesis-read-write'

print(f"Creating input stream {input_stream_name} and output stream {output_stream_name}.")
input_stream = KinesisStream(kinesis_client)
input_stream.create(input_stream_name)
output_stream = KinesisStream(kinesis_client)
output_stream.create(output_stream_name)

print(f"Creating role {role_name} to let Kinesis Analytics read from the input stream and write to the output stream.")
role = application.create_read_write_role(role_name, input_stream.arn(), output_stream.arn(), iam_resource)
print("Waiting for role to be ready.")
time.sleep(10)

INFO: Found credentials in environment variables.


----------------------------------------------------------------------------------------
Setting up Data Streams and Continuous Query.
----------------------------------------------------------------------------------------
Creating input stream moralis-input and output stream moralis-output.


INFO: Created stream moralis-input.
INFO: Waiting until exists.
INFO: Got stream moralis-input.
INFO: Created stream moralis-output.
INFO: Waiting until exists.
INFO: Got stream moralis-output.


Creating role kinesis-read-write to let Kinesis Analytics read from the input stream and write to the output stream.


INFO: Created role kinesis-read-write-role and attached policy kinesis-read-write-policy to allow read from stream arn:aws:kinesis:us-east-2:333152911718:stream/moralis-input and write to stream arn:aws:kinesis:us-east-2:333152911718:stream/moralis-output.


Waiting for role to be ready.


In [5]:
response = kinesis_client.list_streams()
response['StreamNames']

['moralis-input', 'moralis-output']

In [7]:
response = kinesis_client.describe_stream_summary(StreamName='moralis-input')
response['StreamDescriptionSummary']

{'StreamName': 'moralis-input',
 'StreamARN': 'arn:aws:kinesis:us-east-2:333152911718:stream/moralis-input',
 'StreamStatus': 'ACTIVE',
 'StreamModeDetails': {'StreamMode': 'PROVISIONED'},
 'RetentionPeriodHours': 24,
 'StreamCreationTimestamp': datetime.datetime(2022, 11, 11, 16, 23, 32, tzinfo=tzlocal()),
 'EnhancedMonitoring': [{'ShardLevelMetrics': []}],
 'EncryptionType': 'NONE',
 'OpenShardCount': 1,
 'ConsumerCount': 0}

In [8]:
print(f"Creating application {app_name}.")
app_data = exponential_retry('InvalidArgumentException')(application.create)(app_name, role.arn)
application.create_timestamp = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['CreateTimestamp']
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']

pprint(app_data)
print(f"Discovering schema of input stream {input_stream.name}.")
input_schema = application.discover_input_schema(input_stream.arn(), role.arn)
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']
pprint(input_schema)

Creating application moralis-app.


INFO: Application moralis-app created.


{'ApplicationARN': 'arn:aws:kinesisanalytics:us-east-2:333152911718:application/moralis-app',
 'ApplicationName': 'moralis-app',
 'ApplicationStatus': 'READY'}
Discovering schema of input stream moralis-input.


INFO: Discovered input schema for stream arn:aws:kinesis:us-east-2:333152911718:stream/moralis-input.


{'RecordColumns': [{'Mapping': '$.chain',
                    'Name': 'chain',
                    'SqlType': 'VARCHAR(4)'},
                   {'Mapping': '$.from',
                    'Name': 'COL_from',
                    'SqlType': 'VARCHAR(64)'},
                   {'Mapping': '$.to',
                    'Name': 'COL_to',
                    'SqlType': 'VARCHAR(64)'},
                   {'Mapping': '$.amount',
                    'Name': 'amount',
                    'SqlType': 'DOUBLE'},
                   {'Mapping': '$.symbol',
                    'Name': 'symbol',
                    'SqlType': 'VARCHAR(8)'}],
 'RecordEncoding': 'UTF-8',
 'RecordFormat': {'MappingParameters': {'JSONMappingParameters': {'RecordRowPath': '$'}},
                  'RecordFormatType': 'JSON'}}


In [9]:
print("Adding input stream to the application.")
input_details = application.add_input(
    input_prefix, input_stream.arn(), role.arn, input_schema)
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']
print("Input details:")
pprint(input_details)

print("Uploading SQL code to the application to process the input stream.")
with open('example.sql') as code_file:
    code = code_file.read()
application.update_code(code)
application.version_id = analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['ApplicationVersionId']

print("Adding output stream to the application.")
application.add_output('DESTINATION_SQL_STREAM', output_stream.arn(), role.arn)

INFO: Add input stream arn:aws:kinesis:us-east-2:333152911718:stream/moralis-input to application moralis-app.


Adding input stream to the application.
Input details:
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '2',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Fri, 11 Nov 2022 22:24:18 GMT',
                                      'x-amzn-requestid': 'fe87a02f-fe0a-45c1-b2f3-d55709ccc877'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'fe87a02f-fe0a-45c1-b2f3-d55709ccc877',
                      'RetryAttempts': 0}}
Uploading SQL code to the application to process the input stream.


INFO: Update code for application moralis-app.
INFO: Added output arn:aws:kinesis:us-east-2:333152911718:stream/moralis-output to moralis-app.


Adding output stream to the application.


In [10]:
print("Starting the application.")
application.start(analytics_client.describe_application(ApplicationName=app_name)['ApplicationDetail']['InputDescriptions'][0]['InputId'])
print("Waiting for the application to start (this may take a minute or two).")
app_running_waiter.wait(application.name)

INFO: Started application moralis-app.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.


Starting the application.
Waiting for the application to start (this may take a minute or two).


INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got STARTING.
INFO: Waiter ApplicationRunning called DescribeApplication, got RUNNING.


In [11]:
print("Application started. Getting records from the output stream.")
logging.disable(level=logging.INFO)
for records in output_stream.get_records(50):
    if records != []:
        print(*[rec['Data'].decode() for rec in records], sep='\n')

Application started. Getting records from the output stream.
{"TIME_INTERVAL":"2022-11-11 22:25:20.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":3.3000000000000003}
{"TIME_INTERVAL":"2022-11-11 22:25:30.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":2869855.9322621212}
{"TIME_INTERVAL":"2022-11-11 22:25:50.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":954.5200196100001}
{"TIME_INTERVAL":"2022-11-11 22:26:00.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":114671.69018404001}
{"TIME_INTERVAL":"2022-11-11 22:26:10.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":47990.41425921}
{"TIME_INTERVAL":"2022-11-11 22:26:20.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":1310.5237633320323}
{"TIME_INTERVAL":"2022-11-11 22:26:30.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":54580.757089934676}
{"TIME_INTERVAL":"2022-11-11 22:26:40.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":377.38608778}
{"TIME_INTERVAL":"2022-11-11 22:26:50.000","CHAIN":"0x1","SYMBOL":"MATIC","SUM_AMOUNT":4233.85

In [None]:
print("Cleaning up...")
application.delete()
input_stream.delete()
output_stream.delete()
print("Deleting read/write role.")
for policy in role.attached_policies.all():
    role.detach_policy(PolicyArn=policy.arn)
    policy.delete()
role.delete()
print('-'*88)