# Kinesis Data Stream
* https://github.com/aws-samples/aws-ml-data-lake-workshop
* https://aws.amazon.com/blogs/big-data/snakes-in-the-stream-feeding-and-eating-amazon-kinesis-streams-with-python/

In [1]:
import boto3
import sagemaker
import pandas as pd

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name='sagemaker', region_name=region)
kinesis = boto3.Session().client(service_name='kinesis', region_name=region)
sts = boto3.Session().client(service_name='sts', region_name=region)

# Create a Kinesis Data Stream

![Kinesis Data Stream](img/kinesis_data_stream_docs.png)

In [2]:
stream_name = "dsoaws-data-stream"
shard_count = 2

In [3]:
response = kinesis.create_stream(
    StreamName=stream_name, 
    ShardCount=shard_count
)

In [4]:
import time

status = ''
while status != 'ACTIVE':    
    r = kinesis.describe_stream(StreamName=stream_name)
    description = r.get('StreamDescription')
    status = description.get('StreamStatus')
    time.sleep(5)
    
print('Stream {} is active'.format(stream_name))

Stream dsoaws-data-stream is active


In [5]:
print(response)

{'ResponseMetadata': {'RequestId': 'f539a7de-d15c-c460-a282-038c0b6cbe62', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f539a7de-d15c-c460-a282-038c0b6cbe62', 'x-amz-id-2': 'iRfz4kjgOfQeBTpuMTwh4Re+Z6Qn6UY22vaPYwqJISiXzUFax56t6JEOPVt5la6DsFJ7cCL6rYJVtVZt0qKwNG1QJ0Q2VgJK', 'date': 'Sat, 22 Aug 2020 22:54:10 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '0'}, 'RetryAttempts': 0}}


In [6]:
data_stream_response = kinesis.describe_stream(
    StreamName=stream_name
)

print(data_stream_response)

{'StreamDescription': {'StreamName': 'dsoaws-data-stream', 'StreamARN': 'arn:aws:kinesis:us-west-2:250107111215:stream/dsoaws-data-stream', 'StreamStatus': 'ACTIVE', 'Shards': [{'ShardId': 'shardId-000000000000', 'HashKeyRange': {'StartingHashKey': '0', 'EndingHashKey': '170141183460469231731687303715884105727'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49610091589228913449896360939155411114010545354140811266'}}, {'ShardId': 'shardId-000000000001', 'HashKeyRange': {'StartingHashKey': '170141183460469231731687303715884105728', 'EndingHashKey': '340282366920938463463374607431768211455'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49610091589251214195094891562296946832283193715646791698'}}], 'HasMoreShards': False, 'RetentionPeriodHours': 24, 'StreamCreationTimestamp': datetime.datetime(2020, 8, 22, 22, 54, 9, tzinfo=tzlocal()), 'EnhancedMonitoring': [{'ShardLevelMetrics': []}], 'EncryptionType': 'NONE'}, 'ResponseMetadata': {'RequestId': 'ec4492ed-a3d2-81ba-bbff-368c79e

In [7]:
data_stream_arn = data_stream_response['StreamDescription']['StreamARN']
print(data_stream_arn)

arn:aws:kinesis:us-west-2:250107111215:stream/dsoaws-data-stream


In [8]:
iam_kinesis_role_name = 'DSOAWS_Kinesis'

In [9]:
assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesis.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }      
  ]
} 

In [10]:
import json
import boto3
import time

from botocore.exceptions import ClientError

try:
    iam = boto3.client('iam')

    iam_role_kinesis = iam.create_role(
        RoleName=iam_kinesis_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),
        Description='DSOAWS Kinesis Role'
    )
except ClientError as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        iam_role_kinesis = iam.get_role(RoleName=iam_kinesis_role_name)
        print("Role already exists")
    else:
        print("Unexpected error: %s" % e)
        
time.sleep(10)

In [11]:
iam_role_kinesis_arn = iam_role_kinesis['Role']['Arn']
print(iam_role_kinesis_arn)

iam_role_kinesis_name = iam_role_kinesis['Role']['RoleName']
print(iam_role_kinesis_name)

arn:aws:iam::250107111215:role/DSOAWS_Kinesis
DSOAWS_Kinesis


In [12]:
account_id = sts.get_caller_identity()['Account']

In [13]:
kinesis_policy_doc = {
    "Version": "2012-10-17",
    "Statement": [
         {
            "Effect": "Allow",
            "Action": [
                "firehose:DeleteDeliveryStream",
                "firehose:PutRecord",
                "firehose:PutRecordBatch",
                "firehose:UpdateDestination",
            ],
            "Resource": [
                "arn:aws:firehose:{}:{}:deliverystream/{}".format(region, account_id, stream_name)
            ]
         },
         {
            "Effect": "Allow",
            "Action": [
                "kinesis:Get*",
                "kinesis:DescribeStream",
                "kinesis:Put*",
                "kinesis:List*",                
            ],
            "Resource": [
                "arn:aws:kinesis:{}:{}:stream/{}".format(region, account_id, stream_name)
            ]
         }
        
    ]
}

print(kinesis_policy_doc)

{'Version': '2012-10-17', 'Statement': [{'Effect': 'Allow', 'Action': ['firehose:DeleteDeliveryStream', 'firehose:PutRecord', 'firehose:PutRecordBatch', 'firehose:UpdateDestination'], 'Resource': ['arn:aws:firehose:us-west-2:250107111215:deliverystream/dsoaws-data-stream']}, {'Effect': 'Allow', 'Action': ['kinesis:Get*', 'kinesis:DescribeStream', 'kinesis:Put*', 'kinesis:List*'], 'Resource': ['arn:aws:kinesis:us-west-2:250107111215:stream/dsoaws-data-stream']}]}


# Update Policy

In [14]:
import time

response = iam.put_role_policy(
    RoleName=iam_role_kinesis_name,
    PolicyName='DSOAWS_KinesisPolicy',
    PolicyDocument=json.dumps(kinesis_policy_doc)
)

time.sleep(10)

In [15]:
print(response)

{'ResponseMetadata': {'RequestId': '30c8d347-c514-429e-b518-29b9d8817889', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '30c8d347-c514-429e-b518-29b9d8817889', 'content-type': 'text/xml', 'content-length': '206', 'date': 'Sat, 22 Aug 2020 22:54:36 GMT'}, 'RetryAttempts': 0}}


# Create a Kinesis Firehose Stream with Source Data Stream

![](img/kinesis_firehose_s3_docs.png)

# Store Variables for the Next Notebooks

In [16]:
%store stream_name

Stored 'stream_name' (str)


In [17]:
%store data_stream_arn

Stored 'data_stream_arn' (str)


In [18]:
%store iam_role_kinesis_arn

Stored 'iam_role_kinesis_arn' (str)


In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();

<IPython.core.display.Javascript object>