# Ingest streaming data into S3 using Kinesis Firehose

When interacting with AWS from a Jupyter notebook or python code, it is a good practise to store relevant data that allow to communicate with the cloud in a separate config file. In this tutorial, that file is called "dl.cfg" and is store in the same location as the current jupyter notebook. The file contains three sections:
- AWS credentials (access key ID and secret access key) needed to programmatically access AWS
- IAM role and IAM policy names
- settings of the Kinesis Firehose delivery stream
- S3 destination bucket name
As a first step, let's extract some of the above mentioned parameters from "dl.cfg" file.

In [14]:
import configparser
import time
import boto3
import json
import random

# Read AWS credentials from the config file
cfg_data = configparser.ConfigParser()
cfg_data.read('dl.cfg')   

# Save AWS credentials
access_key_id     = cfg_data["AWS"]["access_key_id"]
secret_access_key = cfg_data["AWS"]["secret_access_key"]

# Save IAM role and IAM policy settings
RoleName = cfg_data["IAM"]["role_name"]
RoleARN = 'arn:aws:iam::341370630698:role/{}'.format(RoleName)
PolicyName = cfg_data["IAM"]["policy_name"]

account_id = boto3.client(
    'sts',
    aws_access_key_id = access_key_id,
    aws_secret_access_key = secret_access_key).get_caller_identity().get('Account')

# Kinesis Firehose
Region = cfg_data["Firehose"]["region"]
DeliveryStreamName = cfg_data["Firehose"]["stream_name"]
DeliveryStreamType = cfg_data["Firehose"]["delivery_stream_type"]
SizeInMBs = cfg_data["Firehose"]["size_in_mb"]
IntervalInSeconds = cfg_data["Firehose"]["interval_in_seconds"]
CompressionFormat = cfg_data["Firehose"]["compression_format"]
StreamARN = 'arn:aws:firehose:{}:{}:deliverystream/'.format(Region, account_id, DeliveryStreamName)

# S3 Bucket
Bucket = cfg_data['S3']['bucket_name']
BucketARN = 'arn:aws:s3:::{}'.format(Bucket)


The first step consists of creating a S3 bucket that will serve as a destination for the Kinesis Firehose delivery stream. This can be achieved through the following steps:
- Define Boto3 S3 client to programmatically access S3
- Define a function to create a new bucket
- Run the function

In [15]:
# Create S3 client feeding AWS credentials extracted from the config.json file
s3 = boto3.client(
    's3',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=secret_access_key)

# Define a function to create a S3 bucket
def create_bucket(Bucket):
    """
    Create a S3 bucket named 'Bucket'
    """
    
    # Check if a S3 bucket with the same name already exists
    try:
        s3.head_bucket(Bucket=Bucket)
        print('Bucket {} already exists'.format(Bucket))
    except:
        print('Bucket {} does not exist or you have no access'.format(Bucket))
        
        print('Creating bucket {}...'.format(Bucket))

        # Create a new bucket
        response = s3.create_bucket(Bucket=Bucket)
    
        # Loop until the bucket has been created succesfully
        created = False
        while not created:

            for bucket in s3.list_buckets()['Buckets']:
                if bucket['Name'] == Bucket:
                    created = True
                    break
        print('Bucket {} successfully created'.format(Bucket))
        return response
    
# Run the function defined above to create a new S3 bucket
create_bucket(Bucket)
    

Bucket receive-streaming-data already exists


Check if a IAM Role with the same name already exists and delete it if it does. Then create a new IAM Role to enable Kinesis Firehose to write to S3

In order to allow Kinesis Firehose writing data into a S3 bucket, an Identity Access Management (IAM) role should be created. This role will allow AWS services to be called on behalf of the user. Similarly to S3 service, AWS IAM service can be accessed by python SDK Boto3 using a specific client. In the code below, the following operations will be executed:
- define client to control IAM
- check if any role with the name defined in the config file already exists and (if it does) delete it
- create a new role destined to Kinesis Firehose.

In [16]:
# Create IAM client feeding AWS credentials extracted from the config.json file
iam = boto3.client(
    "iam",
    aws_access_key_id = access_key_id,
    aws_secret_access_key = secret_access_key
)

# Try to delete the existing role with the same name, if it exists
try:
    role = iam.get_role(RoleName = RoleName)
    
    print("Role named '{}' already exists".format(RoleName))

    # Extract all the attached policies to the existing role
    attached_policies = iam.list_attached_role_policies(RoleName = RoleName)["AttachedPolicies"]

    # Iterate over all attached policies
    for attached_policy in attached_policies:

        # Extract attached policy ARN
        attached_policy_arn = attached_policy["PolicyArn"]

        # Detach policy from role
        iam.detach_role_policy(
            RoleName = RoleName,
            PolicyArn = attached_policy_arn
        )

    # Delete role
    try:
        delete_role = iam.delete_role(RoleName = RoleName)
        print("Role named '{}' has been deleted".format(RoleName))

    except Exception as e:
        print(str(e))
        
except Exception as e:
    print(str(e))

# Create new IAM role
try:
    role = iam.create_role(
        RoleName = RoleName,
        Description = "Allows Kinesis Firehose Stream to write to S3",
        AssumeRolePolicyDocument = json.dumps(
            {
             "Version": "2012-10-17",
             "Statement": {
               "Effect": "Allow",
               "Principal": {"Service": "firehose.amazonaws.com"},
               "Action": "sts:AssumeRole"
              }
            } 

        )
    )
    print("Role '{}' has been created".format(RoleName))

except Exception as e:
    print(str(e))
 
# Extract role ARN
RoleARN = iam.get_role(RoleName=RoleName)["Role"]["Arn"]
print("Role '{}'s ARN is: '{}'".format(RoleName, RoleARN))


Role named 'KinesisFirehoseWritesToS3' already exists
Role named 'KinesisFirehoseWritesToS3' has been deleted
Role 'KinesisFirehoseWritesToS3' has been created
Role 'KinesisFirehoseWritesToS3's ARN is: 'arn:aws:iam::341370630698:role/KinesisFirehoseWritesToS3'


An IAM role does not grant by default any permission to access specific AWS services. What determines which specific services are accessible is defined by an IAM policy. IAM policies are written in JSON and consist of a list of statements; each statement defines one or more actions, an effect (Allow or Deny), and a resource which the statement is applied to.
In the code below, the following operations will be executed:
- check if a policy with the name defined in the config file already exists
- if a policy already exists, detach the policy from all the role it is attached to
- delete all versions of the policy (including the default version)
- create a new policy allowing Kinesis Firehose specific permissions for the destination S3 bucket
- attach the policy to the role created above.

In [17]:
# Check if policy with the wanted name already exists
try:
    policies = iam.list_policies()["Policies"]
    policy_exists = False
    for policy in policies:
        if policy["PolicyName"] == PolicyName:
            existing_policy_arn = policy["Arn"]
            policy_exists = True
            break          
except:
    None

# If a policy with the same name already exists, delete it
if policy_exists:
    print("Policy named '{}' already exists".format(PolicyName))
    
    # Extract all roles
    roles = iam.list_roles()["Roles"]
    
    # Iterate over all the roles
    for role in roles:
        
        # Extract role name
        existing_role_name = role["RoleName"]
        
        # Extract all the attached policy to the role
        attached_policies = iam.list_attached_role_policies(
            RoleName = existing_role_name
        )["AttachedPolicies"]
        
        # Iterate over all the attached policies
        for attached_policy in attached_policies:

            # Extract attached policy ARN
            attached_policy_arn = attached_policy["PolicyArn"]

            # Checking if the policy correspond to the wanted one
            if attached_policy_arn == existing_policy_arn:
                
                # Detach policy from role
                iam.detach_role_policy(
                    RoleName = existing_role_name,
                    PolicyArn = attached_policy_arn
                )
                
                print("Policy with ARN '{}' detached from role '{}'".format(PolicyArn, existing_role_name))
    
    # Extract all the policy versions
    policy_versions = iam.list_policy_versions(
        PolicyArn = existing_policy_arn
    )["Versions"]
    
    # Iterate over all the policy versions
    for policy_version in policy_versions:
        
        # Skip the version if it is a default version
        if policy_version["IsDefaultVersion"]:
            continue
          
        # Extract policy ID
        version_id = policy_version["VersionId"]
        
        # Delete policy version
        iam.delete_policy_version(
            PolicyArn = existing_policy_arn,
            VersionId = version_id
        )
        print("Policy with ARN '{}', version_ID '{}' deleted".format(existing_policy_arn, version_id))
    
    # Delete default version of the policy
    iam.delete_policy(
        PolicyArn = existing_policy_arn
    )
    print("Policy with ARN '{}' deleted".format(existing_policy_arn))
    
else:
    print("Policy named '{}' does not exists".format(PolicyName))
 
PolicyContent = {
                "Version": "2012-10-17",  
                "Statement":
                [    
                    {      
                        "Effect": "Allow",      
                        "Action": [
                            "s3:AbortMultipartUpload",
                            "s3:GetBucketLocation",
                            "s3:GetObject",
                            "s3:ListBucket",
                            "s3:ListBucketMultipartUploads",
                            "s3:PutObject"
                        ],      
                        "Resource": [        
                            "arn:aws:s3:::{}".format(Bucket),
                            "arn:aws:s3:::{}/*".format(Bucket)
                        ]    
                    },        
                    {
                        "Effect": "Allow",
                        "Action": [
                            "kinesis:DescribeStream",
                            "kinesis:GetShardIterator",
                            "kinesis:GetRecords",
                            "kinesis:ListShards"
                        ],
                        "Resource": "arn:aws:kinesis:{}:{}:stream/{}".format(Region, account_id, DeliveryStreamName)
                    },
                ]
            }

# Create policy 
try:
    policy = iam.create_policy(
        PolicyName = PolicyName,
        Description = "Allow to list and access content of the target bucket 'receive-streaming-data'",
        PolicyDocument = json.dumps(PolicyContent)        
    )
    print("Policy named '{}' created".format(PolicyName))
    PolicyArn = policy["Policy"]["Arn"]
    print("Policy named '{}' has ARN '{}'".format(PolicyName, PolicyArn))
except Exception as e:
    print(str(e))

# Attach policy to IAM role
try:
    attachment = iam.attach_role_policy(
        RoleName = RoleName,
        PolicyArn = PolicyArn
    )
    print("Policy named '{}' attached to role '{}'".format(PolicyName, RoleName))
except Exception as e:
    print(str(e))


Policy named 'KinesisWritesToS3receive-streaming-data' already exists
Policy with ARN 'arn:aws:iam::341370630698:policy/KinesisWritesToS3receive-streaming-data' deleted
Policy named 'KinesisWritesToS3receive-streaming-data' created
Policy named 'KinesisWritesToS3receive-streaming-data' has ARN 'arn:aws:iam::341370630698:policy/KinesisWritesToS3receive-streaming-data'
Policy named 'KinesisWritesToS3receive-streaming-data' attached to role 'KinesisFirehoseWritesToS3'


Using the boto3 client for Kinesis Firehose, the following functions are created:
- a delete_stream function whose goal is to identify if a delivery stream with the same name exists and delete it
- a create_stream function whose goal is to create a new delivery stream; for this tutorial, the stream operates with a "Put Records" delivery method and the buffer limits are setup as 5MB/s and 60 seconds a a GZIP compression is used. These parameters can be easily replaced in the "dl.cfg" file.

In [18]:
# Create Kinesis Firehose client feeding AWS credentials extracted from the config.json file
firehose = boto3.client(
    'firehose',
    aws_access_key_id=access_key_id,
    aws_secret_access_key=secret_access_key)

def delete_stream(DeliveryStreamName):
    """
    The function deletes an existing stream named 'DeliveryStreamName'
    """
    
    # Delete the current stream with the same name
    response = firehose.delete_delivery_stream(
        DeliveryStreamName=DeliveryStreamName,
        AllowForceDelete=True
    )

    # Get status of the stream 
    status = firehose.describe_delivery_stream(
    DeliveryStreamName=DeliveryStreamName)[
        'DeliveryStreamDescription']['DeliveryStreamStatus']
    print('{} stream "{}" ...'.format(status, DeliveryStreamName))

    # Wait until the stream is deleted
    i = 0
    while status == 'DELETING':
        time.sleep(10)
        print('Stream "{}" is being deleted, {} seconds elapsed...'.format(DeliveryStreamName, 10*(i+1)))
        try:
            status = firehose.describe_delivery_stream(
                DeliveryStreamName=DeliveryStreamName)['DeliveryStreamDescription']['DeliveryStreamStatus']
            i += 1
        except:
            status = 'DELETED'
    print('Stream "{}" has been succesfully deleted'.format(DeliveryStreamName))

    return status

def create_stream(
    DeliveryStreamName,
    RoleARN,
    BucketARN,
    SizeInMBs=SizeInMBs,
    IntervalInSeconds=IntervalInSeconds,
):
    """
    The function creates a new stream named 'DeliveryStreamName'
    """
         
    # Create a new stream
    response_create = firehose.create_delivery_stream(
        DeliveryStreamName=DeliveryStreamName,
        DeliveryStreamType='DirectPut',
        S3DestinationConfiguration={
            'RoleARN': RoleARN,
            'BucketARN': BucketARN,
            'BufferingHints': {
                'SizeInMBs': SizeInMBs,
                'IntervalInSeconds': IntervalInSeconds
            },
        },
    )
    

    # Get the status of the new stream
    status = firehose.describe_delivery_stream(
        DeliveryStreamName=DeliveryStreamName)['DeliveryStreamDescription']['DeliveryStreamStatus']
    print('{} stream "{}" ...'.format(status, DeliveryStreamName))

    # Wait until the stream is active
    i = 0
    while status == 'CREATING':
        time.sleep(10)
        print('Stream "{}" is being created, {} seconds elapsed...'.format(DeliveryStreamName, 30*(i+1)))
        status = firehose.describe_delivery_stream(
        DeliveryStreamName=DeliveryStreamName)['DeliveryStreamDescription']['DeliveryStreamStatus']
        i += 1

    # Check that the stream is active
    if status == 'ACTIVE':
        print('Stream "{}" has been succesfully created'.format(DeliveryStreamName))
        stream_arn = response_create['DeliveryStreamARN']
        print('Stream "{}" ARN: {}'.format(DeliveryStreamName, stream_arn))
    elif status == 'CREATING_FAILED':
        print('Stream "{}" creation has failed'.format(DeliveryStreamName))

    return status



In the following code the two functions defined above are run in order to create a Kinesis Firehose delivery stream according to the parameters defined in the 'dl.cfg' file

In [19]:
# Check if there is an existing stream with the same name in the same region

try:
    list_stream = firehose.list_delivery_streams()
    
    replace = 'yes'
    
    # Check if the stream already exists
    if DeliveryStreamName in list_stream['DeliveryStreamNames']:

        
        # Ask the user if the stream should be replaced
        replace = input("Stream {} already exists. Do you want to replace it? Type 'yes' to replace, otherwise 'no'".format(DeliveryStreamName))
        print(replace)
        
        # If the user has chosen to replace the stream, delete it and create a new one
        if replace == 'yes':
            
            # Delete stream
            try:
                status = delete_stream(DeliveryStreamName)
            
            except Exception as e:
                print(str(e))
            
            
            # Create new stream
            try:
                status = create_stream(
                    DeliveryStreamName=DeliveryStreamName,
                    RoleARN=RoleARN,
                    BucketARN=BucketARN,
                    SizeInMBs=SizeInMBs,
                    IntervalInSeconds=IntervalInSeconds)
            
            except Exception as e:
                print(str(e))            

        # If the user has chosen not to replace the stream, do nothing
        elif replace == 'no':

            None
            
        else:
            print('input not valid')
            
    # If the stream does not exist, proceed and create a new one
    else:
        
        try:
            status = create_stream(
                DeliveryStreamName=DeliveryStreamName,
                RoleARN=RoleARN,
                BucketARN=BucketARN,
                SizeInMBs=SizeInMBs,
                IntervalInSeconds=int(IntervalInSeconds))

        except Exception as e:
            print(str(e)) 

        
except Exception as e:
    print(str(e))
        

CREATING stream "my_stream" ...
Stream "my_stream" is being created, 30 seconds elapsed...
Stream "my_stream" is being created, 60 seconds elapsed...
Stream "my_stream" is being created, 90 seconds elapsed...
Stream "my_stream" is being created, 120 seconds elapsed...
Stream "my_stream" is being created, 150 seconds elapsed...
Stream "my_stream" is being created, 180 seconds elapsed...
Stream "my_stream" is being created, 210 seconds elapsed...
Stream "my_stream" is being created, 240 seconds elapsed...
Stream "my_stream" is being created, 270 seconds elapsed...
Stream "my_stream" has been succesfully created
Stream "my_stream" ARN: arn:aws:firehose:us-east-1:341370630698:deliverystream/my_stream


After the delivery stream has been succesfully created, it can be tested by producing some sample records and streaming them to the delivery stream using the "Put Record" method

In [20]:
# Define a sample record
record = {
    "sensorId": random.randrange(1,3,1),
    "currentTemperature": random.randrange(0,35,1),
    "status": "OK"
    }

# Send each record to the delivery stream
for i in range(20):
    response = firehose.put_record(
        DeliveryStreamName=DeliveryStreamName,
        Record={
            'Data': json.dumps(record)
        }
    )
    print(response)
    
    


{'RecordId': 'HEehmrTQmapq3mmF6usJJZEsCp0DbNTi+xRqZsA4jnvY0/zomHkq8AyIL4IaFr81cQLuWMFTunvpq9Gyw3fBO1yVOS6alav3HDFeUW302oue79qvJAQylF8ujLRDknPLUuuKXPigpNKS+OhX7byKUxG151ZirkTOxhUOZElu2qcq07uDxCSMnP1/Kc2vKpmF8iXly9P7i3yC8nRKSk0AoE8r7G6SXjMW', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'd741169c-060b-7bb0-8ce3-39241337e4bc', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd741169c-060b-7bb0-8ce3-39241337e4bc', 'x-amz-id-2': '1ZlqiP2v1eJS6TET7X2awxYMFIj26PqBUBc+Pn11PKe/ZGqy7tPOzb+ieRAKnrxutOxtjgszMOy5G9SWteaeqdR3HQos/6zP', 'content-type': 'application/x-amz-json-1.1', 'content-length': '257', 'date': 'Mon, 19 Sep 2022 08:51:04 GMT'}, 'RetryAttempts': 0}}
{'RecordId': 'KQ1KbXAhKaruFs4jZYa5eMgDxrauvNtpP3SoI11ZboSjJ5ZuJfR044m83tduEFc5gXRym3bsVDWgSZD6abvp9w+oZrlg8AM9sWw+YaDV3qaOZYyjSlSeATQB8ZOjwaL4/H/28IlSkLM0Lfg4alduN9onSUUXWEhn6wFx0yLr5ateIUMke2tsYqjhy02mbtmoMXbUp1+Lp7mFNLuT7Zb7HvbYc5p5L62d', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'cb22a65d-7d65-9901-908

{'RecordId': 'QjJ2uRI/rGmTd94TXz3a7kA6w0gc37ZYn1q1Vp4XXtetFj5b+LtqoXCROYZYfebNTROc6jhW8u/IsiWWvFrOZwbDhATzPOCHRY4+o/XXUmSkJBM1xowYmg+0OGuEjcKH0eH1Fs0m+ODhlm2giFeZAVNQapuATmRzH9Mb2NyPIFKguOvJO4BemYZEZdAvU/HOWd4XgUGRCHtaUwwbZtlZbhVwQwBkyydx', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'cebc81a5-faae-dc72-951e-ae19ef92437e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'cebc81a5-faae-dc72-951e-ae19ef92437e', 'x-amz-id-2': 'lOt7Og8efyQozh2thBB1wX6fn9KXMcuK7VSbNtjoz1OBtKBqsLA9RwjNUTmbriuZ95c7Bq46Hh/2BD7STsQfGcqQuo3J+x91', 'content-type': 'application/x-amz-json-1.1', 'content-length': '257', 'date': 'Mon, 19 Sep 2022 08:51:07 GMT'}, 'RetryAttempts': 0}}
{'RecordId': '3+h+qtCMlRgOcYkNFO6suS391G+jchUBkioDCN7nokXkbbFAIGOpWYwRSrMq7ozWxjE77bUDYY53HNDkoaV5QxXVW5qbWxGe8JDCuWbraqrRT73mJ5W7nEIWPvHH7/mD3Ofq5gg3XMLJsMIillvi2a/I9Zs7RtS3mOA7w9w2IY/kXLSdA4d0K23TZRK63eKf0lT4o/IbLDCo8TS3FoKFlLSKk7jmK5bC', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'd681a0f1-0ef3-1aa6-8d2

Finally, the delivery stream can be deleted to avoid extra cost.

In [95]:
# Delete stream

try:
    status = delete_stream(DeliveryStreamName)
except:
    print('Delivery stream {} does not exist or you do not have access to it.'.format(DeliveryStreamName))


DELETING stream "my_stream" ...
Stream "my_stream" is being deleted, 10 seconds elapsed...
Stream "my_stream" is being deleted, 20 seconds elapsed...
Stream "my_stream" is being deleted, 30 seconds elapsed...
Stream "my_stream" is being deleted, 40 seconds elapsed...
Stream "my_stream" is being deleted, 50 seconds elapsed...
Stream "my_stream" is being deleted, 60 seconds elapsed...
Stream "my_stream" is being deleted, 70 seconds elapsed...
Stream "my_stream" is being deleted, 80 seconds elapsed...
Stream "my_stream" is being deleted, 90 seconds elapsed...
Stream "my_stream" is being deleted, 100 seconds elapsed...
Stream "my_stream" has been succesfully deleted
