# Exploring AWS Streams

This section demonstrates AWS streaming applications using Kinesis Firehose which is a version of plain Kinesis but with delivery to a S3 bucket or Redshift baked along with Kinesis basic functionality

### Objectives
* Set-up Kinesis Firehose streams (Terraform) for price and trade data
* Use AWS CLI and python to demonstrate data loaded to S3
* Generate random data using Kinesis Data Generator for price and trade data
* Validate data loaded to S3

## Section-1: Set-up Kinesis Firehose streams

### Set-up Roles and Role Policy

Services such as Kinesis Firehose work without human intervention. Yet, an identity is required to run theses servics. In AWS, roles (service principals) are the identities to run servies. Roles are associated with policies granting relevant permissions

**kinesis_role can be used to run three services: Firehose, Redshift and Glue**
[Code](../terraform/kinesis/01_iam_role.tf)

Role and role policies use a different provider which is associated with an user who has `IAMFullAccesss` permissons.

```json
resource "aws_iam_role" "kinesis_role" {
    name = "firehose_role"
    provider = "aws.iam"
    #Ensure the opening braces is the first character of new line
    #Json does not like white spaces before curly braces
    assume_role_policy = <<EOF
{
            "Version": "2012-10-17",
            "Statement": [{
                "Action": "sts:AssumeRole",
                "Principal": {
                    "Service": [
                        "firehose.amazonaws.com", 
                        "redshift.amazonaws.com",
                        "glue.amazonaws.com"
                    ]
                },
                "Effect": "Allow"
            }]
        }
    EOF
    tags = {
     Group     = "${var.resource_group}"
    }
}
```

**kinesis_role_policy grants access to perform operations on s3 bucket**
[Code](../terraform/kinesis/01_iam_role_policy.tf)

```json
resource "aws_iam_role_policy" "kinesis_role_policy" {
    name = "kinesis_role_policy"
    provider = "aws.iam"
    role = "${aws_iam_role.kinesis_role.id}"
    #Ensure the opening braces is the first character of new line
    #Json does not like white spaces before curly braces
    policy = <<EOF
{
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": [
                "${aws_s3_bucket.price_bucket.arn}",
                "${aws_s3_bucket.price_bucket.arn}/*",
                "${aws_s3_bucket.transaction_bucket.arn}",
                "${aws_s3_bucket.transaction_bucket.arn}/*"
            ]
        }]  
    }
    EOF
  
}
```
_**Note**:_ Policy makes reference to s3 bucket. Terraform deteremines the order in which to execute code blocks spread across multiple files based on dependencies. The code block for s3 bucket would get executed before the policy

### Set-up S3 bucket to dump data from Kinesis Firehose streams

[Code](../terraform/kinesis/01_s3.tf)

**Bucket for unloading data from price stream**

```json
resource "aws_s3_bucket" "price_bucket" {
  bucket        = "${var.bucket_name}-price-${var.region}"
  acl           = "private"
  
  # delete all data from this bucket before destroy
  force_destroy = true   

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }

  tags = {
    Description = "Bucket for storing price data"
    Group       = var.resource_group
    }
}

**Bucket for unloading data from transaction stream**

```json
resource "aws_s3_bucket" "transaction_bucket" {
  bucket        = "${var.bucket_name}-transaction-${var.region}"
  acl           = "private"
  
  # delete all data from this bucket before destroy
  force_destroy = true   

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }

  tags = {
    Description = "Bucket for storing transaction data"
    Group       = var.resource_group
    }
}
```

**Bucket for uploading data for reference data**

```json
resource "aws_s3_bucket" "reference_bucket" {
  bucket        = "${var.bucket_name}-reference-${var.region}"
  acl           = "private"
  
  # delete all data from this bucket before destroy
  force_destroy = true   

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }

  tags = {
    Description = "Bucket for storing transaction data"
    Group       = var.resource_group
    }
}
```

**Bucket objects for copying data from local machine to S3 bucket**
```json
locals {
  upload_directory = "${path.module}/data/"
}

#Use for_each loop multiple ipython notebooks into the base_bucket
resource "aws_s3_bucket_object" "reference_data" {
    for_each = fileset(local.upload_directory, "*.csv")
    bucket = aws_s3_bucket.reference_bucket.id
    key = "${each.value}"
    source = "${local.upload_directory}${each.value}"
    acl = "private"
    //etag = filemd5("${local.upload_directory}${each.value}")
    tags = {
        Group       = var.resource_group
        modfied = "v0.0.002"
    }
}
```
### Set-up Kinesis Firehose streams
[Code](../terraform/kinesis/02_firehose.tf)

**Firehose stream for Price**

```json
resource "aws_kinesis_firehose_delivery_stream" "price_hose" {
    name        = "price_hose"
    tags = {
        Group     = "${var.resource_group}"
    }
    
    destination = "s3"
    s3_configuration {
        role_arn   = "${aws_iam_role.kinesis_role.arn}"
        bucket_arn = "${aws_s3_bucket.price_bucket.arn}"
        buffer_interval = 60
  
    }
}
```

**Firehose stream for Transactions**

```json
resource "aws_kinesis_firehose_delivery_stream" "transaction_hose" {
    name        = "transaction_hose"
    tags = {
        Group     = "${var.resource_group}"
    }
    
    destination = "s3"

    s3_configuration {
        role_arn   = "${aws_iam_role.kinesis_role.arn}"
        bucket_arn = "${aws_s3_bucket.transaction_bucket.arn}"
        buffer_interval = 60
  
    }

}
```

## Section-2: Test the Kinesis streams (AWS CLI)

### Use command line to put a dummy record to firehose
watch command puts a record every 2 seconds

In [45]:
!watch aws firehose put-record --delivery-stream-name price_hose --record Data=blob1 --profile fin-demo

[?1l>24;80H1;24r[m[4l[H[2JEvery 2.0s: aws firehose put-reco...  sganesan-desktop: Fri Sep 13 11:53:05 2019[3;1H{[4;5H"RecordId": "R/5Q/Gdb47t+p/0rEtrO80DS1HCkNH2uQCnTPycxhKiw/Wzbyw7mtT5EcFPZHpt[5;1Hd7g29h9aW395lHOrIJBJ6X8D8Cno1c/XnzNbclijQndklHGOTEftnPpg51QvEggTyxgCrTYrplelmXd4[6;1HHol9kF6qiunC2fDQT81jk1jPxz/yMpzdO/N7IqkUy4I7pwnMIwKkq+695UnDc3PCudbTEZ7hUQq0kLhv[7;1Hc",[1B "Encrypted": false[1;75H8[4;19HBJKqSFHVE8MpXB0K2GCHim4MOxQaU2ikgXY1qpmPvAbWFTcSC+caFfFwZB7NtZ[5;1HPFnt4y0Y1ON1+7H6Qwg8Q8je09hs84wXLhNNd9Dbu6VoPzCBgifDWwTSHzbK5j6ww+3qXtLgoDwCmVxj[6;1HRtlTmbFCyXZOUIUUxcKmpnFPN0UAqQBl+pynTUO/xp7h0k4GueWPiHP8sKtrF8lTxE0/isOgzP6si+S5[7;1HC[24;80H[1;74H11[4;18HvxTWO376GHTm61qV4KUlx4EjU3Z8s0csp84uLVF1uc1d01wjzW0BDQFCOTtFup2[5;1HOesNI5/sJzLzByHFXfSQ6HNLey8KbPpfTa89j79h3Cef0NkIVYjc3dUHNK7taoOuJsr9RLmwdo/866dC[6;1HSRGHKJj52x6JxIrA7n9TClgOCsNNbQE6ZiPHUf5nylG96G3C8KJ5moUSJnfafTfHFuAejgMIOqChARRk[7;1HM[24;80H[1;75H4[4;18HQ+rKcIP2WYLnIpAbLpISY7zwtA0nQltDjhQyhhATHAWdblfpBu

### Display contents of  S3 files where FireHose has unloaded content

In [46]:
import boto3
import smart_open as so
import pandas as pd
bucket = 'rsdg-fin-demo-price-eu-west-2'

def get_s3_keys(pS3Client, pBucket, pSubKey):
    """Get a list of keys in an S3 bucket."""
    _keys = {}
    resp = pS3Client.list_objects_v2(Bucket=pBucket)
    for obj in resp['Contents']:
        _key = obj['Key']
        if _key.find(pSubKey) > 0:
            _size = obj['Size']
            _keys[_key] = _size
    return _keys

_session = boto3.Session(profile_name='fin-demo')
s3 = _session.client('s3')

def print_s3_keys_sizes (stream):
    keysAndSizes = get_s3_keys(s3, bucket, 'price_hose' )
    print(keysAndSizes)
    for key in keysAndSizes:
        object_key = "s3://"+ bucket + "/" + key
        for line in open(object_key, transport_params=dict(session= _session)):
            print(object_key + ":" + repr(line))
            break

{'2019/09/13/10/price_hose-1-2019-09-13-10-49-56-cc80e41b-fffb-47cf-98c5-4d9b834506b1': 95}
s3://rsdg-fin-demo-price-eu-west-2/2019/09/13/10/price_hose-1-2019-09-13-10-49-56-cc80e41b-fffb-47cf-98c5-4d9b834506b1:'blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1blob1'


## Section-3: Load data with Kinesis Data Generator

[Kinesis Data Generator (KDG)](https://awslabs.github.io/amazon-kinesis-data-generator)

The KDG extends faker.js, an open source random data generator. For full documentation of the items that can be "faked" by faker.js, see the [faker.js documentation.](https://github.com/Marak/faker.js/blob/master/Readme.md)

### Script for generating Price
```json
{
    "instrument_id": {{random.number(50)}},
    "price": {{random.number(
        {
            "min":1,
            "max":15
        }
    )}},
    "ts": "{{date.now}}"
}
```
[KDG Screenshot - Price](./images/price_hose.png)

### Script for generating Transactions
```json
{
    "book_id": {{random.number(
        {
            "min":1,
            "max":10
        })}},
    "trader_id": {{random.number(
        {
            "min":1,
            "max":20
        })}},
    "instrument_id": {{random.number(
        {
            "min":1,
            "max":50
        })}},
    "qty": {{random.number(
        {
            "min":1000,
            "max":5000
        })}},
    "price": {{random.number(
        {
            "min":1,
            "max":15
        })}},
    "ts": "{{date.now}}"
}
```
[KDG Screenshot - Transactions](./images/price_hose.png)

## Section-4: Testing the S3 buckets

Let the KDG run for about 10 seconds. 
* Configure Price to run at 1000 records per second, choose the script to run against `price_hose` stream
* Transactions to run at 100 records per second against `transaction_hose` stream

In [48]:
!aws s3 ls s3://rsdg-fin-demo-price-eu-west-2/2019/09/ --recursive --profile fin-demo

2019-09-13 11:50:59         95 2019/09/13/10/price_hose-1-2019-09-13-10-49-56-cc80e41b-fffb-47cf-98c5-4d9b834506b1
2019-09-13 11:54:08         30 2019/09/13/10/price_hose-1-2019-09-13-10-53-06-974d3f14-0d49-48ee-ae32-beab01097a84
2019-09-13 12:05:09      39565 2019/09/13/11/price_hose-1-2019-09-13-11-04-07-569efc24-e66e-413e-bedb-87a525af5ebd
2019-09-13 12:14:09    2297051 2019/09/13/11/price_hose-1-2019-09-13-11-13-07-f63566fd-f647-4cd9-a068-ea37e2ef7f0a
2019-09-13 12:15:15    3148391 2019/09/13/11/price_hose-1-2019-09-13-11-14-13-f9c7a8c3-2cfd-41c9-b9ee-190c317e6d76
2019-09-13 12:16:15     603992 2019/09/13/11/price_hose-1-2019-09-13-11-15-13-ad20aeb1-c684-4b17-8fa8-e08283391e19


In [49]:
keysAndSizes = get_s3_keys(s3, bucket, 'price_hose' )
print(keysAndSizes)

{'2019/09/13/10/price_hose-1-2019-09-13-10-49-56-cc80e41b-fffb-47cf-98c5-4d9b834506b1': 95, '2019/09/13/10/price_hose-1-2019-09-13-10-53-06-974d3f14-0d49-48ee-ae32-beab01097a84': 30, '2019/09/13/11/price_hose-1-2019-09-13-11-04-07-569efc24-e66e-413e-bedb-87a525af5ebd': 39565, '2019/09/13/11/price_hose-1-2019-09-13-11-13-07-f63566fd-f647-4cd9-a068-ea37e2ef7f0a': 2297051, '2019/09/13/11/price_hose-1-2019-09-13-11-14-13-f9c7a8c3-2cfd-41c9-b9ee-190c317e6d76': 3148391, '2019/09/13/11/price_hose-1-2019-09-13-11-15-13-ad20aeb1-c684-4b17-8fa8-e08283391e19': 603992}


In [50]:
!aws s3 ls s3://rsdg-fin-demo-transaction-eu-west-2/2019/09/ --recursive --profile fin-demo

2019-09-13 12:15:32     804268 2019/09/13/11/transaction_hose-1-2019-09-13-11-14-30-87c9069e-1503-4809-889b-32eb2ee6d4bf
2019-09-13 12:16:33     395612 2019/09/13/11/transaction_hose-1-2019-09-13-11-15-31-74e7b57a-8245-4042-9e29-5c49a7d8c897


In [51]:
keysAndSizes = get_s3_keys(s3, bucket, 'price_hose' )
print(keysAndSizes)

{'2019/09/13/10/price_hose-1-2019-09-13-10-49-56-cc80e41b-fffb-47cf-98c5-4d9b834506b1': 95, '2019/09/13/10/price_hose-1-2019-09-13-10-53-06-974d3f14-0d49-48ee-ae32-beab01097a84': 30, '2019/09/13/11/price_hose-1-2019-09-13-11-04-07-569efc24-e66e-413e-bedb-87a525af5ebd': 39565, '2019/09/13/11/price_hose-1-2019-09-13-11-13-07-f63566fd-f647-4cd9-a068-ea37e2ef7f0a': 2297051, '2019/09/13/11/price_hose-1-2019-09-13-11-14-13-f9c7a8c3-2cfd-41c9-b9ee-190c317e6d76': 3148391, '2019/09/13/11/price_hose-1-2019-09-13-11-15-13-ad20aeb1-c684-4b17-8fa8-e08283391e19': 603992}


## Next Steps
* Configure the data to be loaded into AWS Athena
* Build quicksight dashboard

### Related notebooks
* [00-Setup](./00_setup.ipynb)
* [01-Process S3 using python](./01_Process_s3_files.ipynb)
* [02-Visualization and Analytics](./02_Visualization_and_Analytics.ipynb)
* [03-Risk Analytics](./03_Risk_Analytics.ipynb)
* [04-Exploring Firehose,Athena and Quicksight](./04_Exploring_Kinesis_Firehose.ipynb)
* [05-Athena and Quicksights](./05_Athena_Quicksight.ipynb)
* [06-Sagemaker to run the notebooks](./06_Sagemaker_jupyterlab.ipynb)
* [07_Transform stream data using Lambda](./07_Transform_lambda.ipynb)
* [08_Move data to Redshift using Glue](./08_Glue_Redshift.ipynb)
* [09_CI/CD Terrform with Travis CI](./09_Integrating_terraform_travisci.ipynb)