In [0]:
###INFO7374: Digital Marketing Analytics:
###LAB: AWS Mobile Gaming Analytics Pipeline
###Date: 06/28/2019
###Instructor: Prof. Srikanth Krishnamurthy
###TA: Mr. Pramod Nagare and Mr. Syed Haroon

In [0]:
# Architectural Components:
# S3 Bucket: https://aws.amazon.com/s3/
# Kinesis Data Stream: https://aws.amazon.com/kinesis/data-streams/
# Kinesis Delivery Stream: https://aws.amazon.com/kinesis/data-firehose/
# Redshift Cluster: https://aws.amazon.com/redshift/
# IAM Policy: https://aws.amazon.com/iam/
# IAM Role: https://aws.amazon.com/iam/

<img src="AWS_GAP_Architecture_New.jpg">

In [664]:
#Importing all required libraries:
#pip install boto3
#pip install psycopg2

import boto3
import json
import psycopg2
import datetime

In [665]:
#Setting up the AWS Access keys:
#NOTE: DO NOT EXPOSE THE ACCESS & SECRET KEYS

config_file = open(r'Config.json')
config_file = json.loads(config_file.read())

access_key = config_file['access_key']
secret_access_key = config_file['secret_access_key']


In [666]:
#Configuration Parameters:

bucket_name = "info7374s3team93"#------------------------------------S3: Name of the creating AWS S3 Bucket

policy_name = "info7374policyteam93"#----------------------------------IAM: Access policy name
iam_role_name = "info7374roleteam93"#----------------------------------IAM: IAM Role for the architecural access

db_name = "info7374dbteam93"#-------------------------------------Redshift: Database Name for gaming data
cluster_identifier = "info7374clusterteam93"#---------------------Redshift: Redshift Cluster Name
master_username = "root"#----------------------------------------Redshift: Admin Username
master_password = "Info7374gap"#---------------------------------Redshift: Admin Password
node_type = "dc2.large"#-----------------------------------------Redshift: Cluster configuration
cluster_type = "single-node"#------------------------------------Redshift: Cluster Type
availability_zone = "us-east-1a"#--------------------------------Redshift: Cluster Availability Zone
table_name = "flac"#------------------------------------Redshift: Database table name
kinesis_data_stream_name = "info7374datastreamteam93"#-------------Kinesis: Data Stream Name
shard_count = 100#------------------------------------------------Kinesis: Data Stream Shard Count

log_group_name = "info7374loggroupteam93"#------------------------CloudWatch: Log Group Name
log_stream_name = "info7374logstreamteam93"#------------------------CloudWatch: Log Group Name
delivery_stream_name = "info7374deliverystreamteam93"#--------------Kinesis: Delivery Stream Name
stream_type = "KinesisStreamAsSource"#-----------------------------Kinesis: Delivery Stream Type


In [667]:
#Creating AWS S3 Bucket:

s3_client = boto3.client('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_s3 = s3_client.create_bucket(ACL="private", Bucket=bucket_name)
print(response_s3)

{'ResponseMetadata': {'RequestId': '02EA85B87CABEA4E', 'HostId': 'tvX/CSgp2XPuOpn7/QdFlVuayfrff4WJ70Co1UjdnpQcpCVV36H1hcELpU6/FekSKf7ieTmv0zY=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'tvX/CSgp2XPuOpn7/QdFlVuayfrff4WJ70Co1UjdnpQcpCVV36H1hcELpU6/FekSKf7ieTmv0zY=', 'x-amz-request-id': '02EA85B87CABEA4E', 'date': 'Fri, 12 Jul 2019 16:24:47 GMT', 'location': '/info7374s3team93', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Location': '/info7374s3team93'}


In [668]:
bucket_arn = "arn:aws:s3:::"+bucket_name#---------------------------S3: Bucker ARN
print(bucket_arn)

arn:aws:s3:::info7374s3team93


In [669]:
#Creating IAM Policy for Architecural access:

iam_client = boto3.client('iam', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)


In [670]:

policy_details = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "glue:GetTableVersions"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": "*"
        }
    ]
}

In [671]:
response_iam_policy = iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_details))
print(response_iam_policy)

{'Policy': {'PolicyName': 'info7374policyteam93', 'PolicyId': 'ANPAVAHHQWNGYOZTMXXNY', 'Arn': 'arn:aws:iam::344082920269:policy/info7374policyteam93', 'Path': '/', 'DefaultVersionId': 'v1', 'AttachmentCount': 0, 'PermissionsBoundaryUsageCount': 0, 'IsAttachable': True, 'CreateDate': datetime.datetime(2019, 7, 12, 16, 24, 51, tzinfo=tzutc()), 'UpdateDate': datetime.datetime(2019, 7, 12, 16, 24, 51, tzinfo=tzutc())}, 'ResponseMetadata': {'RequestId': '93180971-a4c1-11e9-8ca2-478827fdfe1c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '93180971-a4c1-11e9-8ca2-478827fdfe1c', 'content-type': 'text/xml', 'content-length': '775', 'date': 'Fri, 12 Jul 2019 16:24:51 GMT'}, 'RetryAttempts': 0}}


In [672]:
policy_arn = response_iam_policy['Policy']['Arn']#------------------------IAM: Policy ID for assignment
print(policy_arn)

arn:aws:iam::344082920269:policy/info7374policyteam93


In [673]:
#Creating IAM Role for Architectural access:

assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "344082920269"
        }
      }
    }
  ]
}

response_iam_role = iam_client.create_role(RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc))

print(response_iam_role)

{'Role': {'Path': '/', 'RoleName': 'info7374roleteam93', 'RoleId': 'AROAVAHHQWNG2SYLDDBOC', 'Arn': 'arn:aws:iam::344082920269:role/info7374roleteam93', 'CreateDate': datetime.datetime(2019, 7, 12, 16, 24, 55, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2012-10-17', 'Statement': [{'Sid': '', 'Effect': 'Allow', 'Principal': {'Service': 'firehose.amazonaws.com'}, 'Action': 'sts:AssumeRole', 'Condition': {'StringEquals': {'sts:ExternalId': '344082920269'}}}]}}, 'ResponseMetadata': {'RequestId': '95833ced-a4c1-11e9-8ca2-478827fdfe1c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '95833ced-a4c1-11e9-8ca2-478827fdfe1c', 'content-type': 'text/xml', 'content-length': '928', 'date': 'Fri, 12 Jul 2019 16:24:55 GMT'}, 'RetryAttempts': 0}}


In [674]:
role_arn = response_iam_role['Role']['Arn']#----------------------------IAM: Role ARN

In [675]:
#Attaching a Policy to a Role:

response_iam_role_policy_attach = iam_client.attach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_iam_role_policy_attach)

{'ResponseMetadata': {'RequestId': '96adb2b2-a4c1-11e9-8ca2-478827fdfe1c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '96adb2b2-a4c1-11e9-8ca2-478827fdfe1c', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Fri, 12 Jul 2019 16:24:57 GMT'}, 'RetryAttempts': 0}}


In [676]:
#Creating AWS Redshift Cluster:

redshift_client = boto3.client('redshift',region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_redshift = redshift_client.create_cluster(
    DBName=db_name,
    ClusterIdentifier=cluster_identifier,
    ClusterType=cluster_type,
    NodeType=node_type,
    MasterUsername=master_username,
    MasterUserPassword=master_password,
    AvailabilityZone=availability_zone,
    IamRoles=[role_arn],
    PubliclyAccessible=True)

print(response_redshift)

{'Cluster': {'ClusterIdentifier': 'info7374clusterteam93', 'NodeType': 'dc2.large', 'ClusterStatus': 'creating', 'MasterUsername': 'root', 'DBName': 'info7374dbteam93', 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-f1cf91aa', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-7c9bfb06', 'AvailabilityZone': 'us-east-1a', 'PreferredMaintenanceWindow': 'sun:03:00-sun:03:30', 'PendingModifiedValues': {'MasterUserPassword': '****'}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'PubliclyAccessible': True, 'Encrypted': False, 'Tags': [], 'EnhancedVpcRouting': False, 'IamRoles': [{'IamRoleArn': 'arn:aws:iam::344082920269:role/info7374roleteam93', 'ApplyStatus': 'adding'}], 'MaintenanceTrackName': 'current', 'DeferredMaintenanceW

In [0]:
#Note: Wait for 5 min to get AWS Redshift cluster to setup as we need to create a Database table.

In [648]:
#Describe AWS Redshift Cluster to get an endpoint:

response_redshift_desc = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)
print(response_redshift_desc)

{'Clusters': [{'ClusterIdentifier': 'info7374clusterteam93', 'NodeType': 'dc2.large', 'ClusterStatus': 'available', 'MasterUsername': 'root', 'DBName': 'info7374dbteam93', 'Endpoint': {'Address': 'info7374clusterteam93.crpojqzpsevq.us-east-1.redshift.amazonaws.com', 'Port': 5439}, 'ClusterCreateTime': datetime.datetime(2019, 7, 12, 0, 20, 21, 702000, tzinfo=tzutc()), 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-f1cf91aa', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-7c9bfb06', 'AvailabilityZone': 'us-east-1a', 'PreferredMaintenanceWindow': 'sun:03:30-sun:04:00', 'PendingModifiedValues': {}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'PubliclyAccessible': True, 'Encrypted': False, 'ClusterPublicKey': 'ssh-rsa AAA

In [677]:
hostname = response_redshift_desc['Clusters'][0]['Endpoint']['Address']#----------------Redshift: Hostname for database
port_number = response_redshift_desc['Clusters'][0]['Endpoint']['Port']#----------------Redshift: Port Number for databse

In [678]:
#Creating Database table on Redshift:

con = psycopg2.connect(dbname= db_name, host= hostname, port= port_number, user= master_username, password= master_password)

con.set_session(autocommit=True)
cur = con.cursor()

In [496]:
#Example: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_examples.html

query = "create table info7374_game (\n"
query+= "player_id integer not null,\n"
query+= "player_name varchar(20) not null,\n"
query+= "event_time varchar(20) not null);"

print(query)

create table info7374_game (
player_id integer not null,
player_name varchar(20) not null,
event_time varchar(20) not null);


In [394]:
#Example: https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_examples.html

query = "create table flac (\n"
query+= "id varchar(50) not null,\n"
query+= "artists varchar(50) not null,\n"
query+= "beforeFLAC varchar(50) not null,\n"
query+= "afterFLAC varchar(50) not null);"

print(query)

create table flac (
id varchar(50) not null,
artists varchar(50) not null,
beforeFLAC varchar(50) not null,
afterFLAC varchar(50) not null);


In [330]:
query2 = "create table customerdata (\n"
query2+= "name varchar(20) not null,\n"
query2+= "email varchar(20) not null,\n"
query2+= "subscriptionplan varchar(20) not null,\n"
query2+= "ispremium varchar(20) not null, \n"
query2+= "profitperuser varchar(20) not null, \n"
query2+= "usage varchar(20) not null, \n"
query2+= "flacusage varchar(20) not null);"

print(query2)

create table customerdata (
name varchar(20) not null,
email varchar(20) not null,
subscriptionplan varchar(20) not null,
ispremium varchar(20) not null, 
profitperuser varchar(20) not null, 
usage varchar(20) not null, 
flacusage varchar(20) not null);


In [503]:
query2 = "create table flac (\n"
query2+= "id varchar(200) not null,\n"
query2+= "artists varchar(200) not null,\n"
query2+= "beforeflac varchar(200) not null,\n"
query2+= "afterflac varchar(200) not null, \n"
query2+= "dance decimal(10,2) not null);"

print(query2)

create table flac (
id varchar(200) not null,
artists varchar(200) not null,
beforeflac varchar(200) not null,
afterflac varchar(200) not null, 
dance decimal(10,2) not null);


In [395]:
cur.execute(query)

In [504]:
cur.execute(query2)

In [679]:
#Creating Kinesis Stream:

kinesis_client = boto3.client('kinesis',region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

response_kinesis_data_stream = kinesis_client.create_stream(StreamName=kinesis_data_stream_name,ShardCount=shard_count)
print(response_kinesis_data_stream)

{'ResponseMetadata': {'RequestId': 'f3edf654-4cd5-c559-a64f-41ce9afb61e3', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f3edf654-4cd5-c559-a64f-41ce9afb61e3', 'x-amz-id-2': 'pI2Kv5fEmHcZyJdjxGhIHcnsF43GB/4LM4jI/Cldb7DP3RGSK+3fpTx00+Knyo9Jrj9Wu9hE6JWoBJ7x4n/hdAR5/SgA152Q', 'date': 'Fri, 12 Jul 2019 16:29:14 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '0'}, 'RetryAttempts': 0}}


In [680]:
response_kinesis_data_stream_desc = kinesis_client.describe_stream(StreamName=kinesis_data_stream_name)
print(response_kinesis_data_stream_desc)

{'StreamDescription': {'StreamName': 'info7374datastreamteam93', 'StreamARN': 'arn:aws:kinesis:us-east-1:344082920269:stream/info7374datastreamteam93', 'StreamStatus': 'ACTIVE', 'Shards': [{'ShardId': 'shardId-000000000000', 'HashKeyRange': {'StartingHashKey': '0', 'EndingHashKey': '3402823669209384634633746074317682113'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49597536128027807208001158134733027616202192224664944642'}}, {'ShardId': 'shardId-000000000001', 'HashKeyRange': {'StartingHashKey': '3402823669209384634633746074317682114', 'EndingHashKey': '6805647338418769269267492148635364228'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49597536128050107953199688757874563334474840586170925074'}}, {'ShardId': 'shardId-000000000002', 'HashKeyRange': {'StartingHashKey': '6805647338418769269267492148635364229', 'EndingHashKey': '10208471007628153903901238222953046342'}, 'SequenceNumberRange': {'StartingSequenceNumber': '4959753612807240869839821938101609905274748894767690550

In [681]:
kinesis_stream_arn = response_kinesis_data_stream_desc['StreamDescription']['StreamARN']#-----------------------Kinesis: Datastream ARN
print(kinesis_stream_arn)

arn:aws:kinesis:us-east-1:344082920269:stream/info7374datastreamteam93


In [682]:
#Creating Kinesis Delivery Stream: Firehose

firehose_client = boto3.client('firehose',region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

cluster_jdbc_url = "jdbc:redshift://"+hostname+":"+str(port_number)+"/"+db_name
print(cluster_jdbc_url)

jdbc:redshift://info7374clusterteam93.crpojqzpsevq.us-east-1.redshift.amazonaws.com:5439/info7374dbteam93


In [683]:
response_firehose = firehose_client.create_delivery_stream(
    DeliveryStreamName = delivery_stream_name,
    DeliveryStreamType = stream_type,
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': kinesis_stream_arn,
        'RoleARN': role_arn
    },
    RedshiftDestinationConfiguration={
        'RoleARN': role_arn,
        'ClusterJDBCURL': cluster_jdbc_url,
        'CopyCommand': {
            'DataTableName': table_name,
            'DataTableColumns': 'id,name',
            'CopyOptions': "json 'auto'"
        },
        'Username': master_username,
        'Password': master_password,
        'S3Configuration': {
            'RoleARN': role_arn,
            'BucketARN': bucket_arn,
            'BufferingHints': {
                'IntervalInSeconds': 60
            }
        }
    })

print(response_firehose)

{'DeliveryStreamARN': 'arn:aws:firehose:us-east-1:344082920269:deliverystream/info7374deliverystreamteam93', 'ResponseMetadata': {'RequestId': 'e8b41e3f-211e-df7b-bd16-a989972a097b', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e8b41e3f-211e-df7b-bd16-a989972a097b', 'x-amz-id-2': '5ROT/xJjVy6eGHEDKZ+Sg/2CInuXUDiYQHHaATzD1HBmizfQZ7cr/rEaJomBjxhi8QF2UomDWQ9cf2ddbBpD/EdemenuVckl', 'content-type': 'application/x-amz-json-1.1', 'content-length': '107', 'date': 'Fri, 12 Jul 2019 16:29:41 GMT'}, 'RetryAttempts': 0}}


In [157]:
#Congratulations!!! At this stage we have successfully created our Architecture

In [684]:
#Real-time data streaming:

kinesis_delivery_client = boto3.client('kinesis', region_name="us-east-1", aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)

In [159]:
#import boto3
s3 = boto3.resource('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)
#s3_client = boto3.client('s3'
#s3.create_bucket(Bucket= 'info7374s3team93')
s3.Object('info7374s3team93','CustomerData.csv').upload_file(Filename='C:\\Users\\suhas\\Downloads\\LAB1_Data\\CustomerData.csv')

In [78]:
sql="""
copy info7374_game from 's3://info7374s3team93/FLAC.csv' 
CREDENTIALS aws_access_key_id=AKIAJ3K7C6OSTJGYO2ZA,aws_secret_access_key=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
DELIMITER ',' ;"""

In [79]:
cur.execute(sql)

SyntaxError: syntax error at or near "aws_access_key_id"
LINE 3: CREDENTIALS aws_access_key_id=AKIAJ3K7C6OSTJGYO2ZA,aws_secre...
                    ^


In [None]:
COPY info7374_game
FROM 's3://info7374s3team93/FLAC.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::986997421585:role/info7374roleteam92'
REGION 'us-east-1'

In [160]:
import pandas as pd
datacust = pd.read_csv("CustomerData.csv") 
# Preview the first 5 lines of the loaded data 
datacust.head()

Unnamed: 0,Name,Email,Subscription Plan,Premium or not,ProfitperUser,Usage,Flac Usage
0,Adam Kehoe,kehoe@widgetcorp.com,1 year subscription,Yes,3113.978,Android,Yes
1,Al Broumand,broumand@smithandco.com,6 month subscription,No,275.914,Windows,No
2,Alan Prussia,prussia@foobars.com,Family Plan subscription,Yes,562.646,MAC,No
3,Alison Propeller,propeller@abctelecom.com,Student Plan subscription,Yes,1368.676,UNIX,No
4,Alix Rowan,rowan@fakebrothers.com,1 month subscription,Yes,1117.137,Android,No


In [None]:
for i in range(0,20):
    data = {"player_id":i,"player_name":"player_"+str(i),"event_time":now.strftime("%Y-%m-%d %H:%M")}
    data = json.dumps(data)
    data = bytes(data, 'utf-8')
    formatted_records = [{'PartitionKey': "pk",'Data': data}]
    print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name
    response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)

In [421]:
for i in range(0,20):
    now = datetime.datetime.now()
    data = {"player_id":i,"player_name":"player_"+str(i),"event_time":now.strftime("%Y-%m-%d %H:%M")}
    #print(data)
    data = json.dumps(data)
    #print(data)
    data = bytes(data, 'utf-8')
    formatted_records = [{'PartitionKey': "pk",'Data': data}]
    print(formatted_records)
    #print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name
    response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)

[{'PartitionKey': 'pk', 'Data': b'{"player_id": 0, "player_name": "player_0", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 1, "player_name": "player_1", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 2, "player_name": "player_2", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 3, "player_name": "player_3", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 4, "player_name": "player_4", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 5, "player_name": "player_5", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 6, "player_name": "player_6", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 7, "player_name": "player_7", "event_time": "2019-07-10 04:10"}'}]
[{'PartitionKey': 'pk', 'Data': b'{"player_id": 8, "player_name": "player_8", "e

In [426]:
import json
with open(r'C:\Users\suhas\Downloads\LAB1_Data\FLAC.txt') as f:
    #data6 = bytes(data6, 'utf-8')
    data6 = json.load(f)
    #print(data6)
    #for i in range(0,10):
    data = json.dumps(data6)
    #print(data)
    data = bytes(data, 'utf-8')
    #print(data)
    formatted_records = [{'PartitionKey': "pk",'Data': data}]
    #print(data)
    print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name
    response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)
#print(data6)
    

[{'PartitionKey': 'pk', 'Data': b'[{"id": "6DCZcSspjsKoFjzjrWoCd", "artists": "Drake", "beforeFLAC": "No", "afterFLAC": "Yes"}]'}]


In [436]:
import json
with open(r'C:\Users\suhas\Downloads\LAB1_Data\FLAC.txt') as f:
    #data6 = bytes(data6, 'utf-8')
    data6 = json.load(f)
    #print(data6)
    #for i in range(0,10):
    data = json.dumps(data6)
    #print(data)
    data = bytes(data, 'utf-8')
    #print(data)
    formatted_records = [{'PartitionKey': "pk",'Data': data}]
    #print(data)
    #print(formatted_records)
    delivery_stream_name = kinesis_data_stream_name

    response = kinesis_delivery_client.put_records(
        StreamName=delivery_stream_name,
        Records=formatted_records     
    )

In [118]:
export5 = datacust.to_json (r'C:\Users\suhas\Downloads\LAB1_Data\Export_DataFrame5.json')
data5 = json.dumps(export5)
data5 = bytes(data5, 'utf-8')
print(data5)

b'null'


In [None]:
delivery_stream_name = kinesis_data_stream_name
response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=datacust)

In [120]:
#readd= pd.read_json('custData.json')
data1 = json.dumps('custData.json')
data1 = bytes(data1, 'utf-8')
print(data1)

b'"custData.json"'


In [230]:
import json
with open(r'C:\Users\suhas\Downloads\LAB1_Data\custData.json') as f:
    #data6 = bytes(data6, 'utf-8')
    data6 = json.load(f)
    for i in range(0,1):
        data7 = json.dumps(data6)
        data7 = bytes(data7, 'utf-8')
        formatted_records = [{'PartitionKey': "pk",'Data': data7}]
        print(formatted_records)
        delivery_stream_name = kinesis_data_stream_name
        response = kinesis_delivery_client.put_records(StreamName=delivery_stream_name, Records=formatted_records)

[{'PartitionKey': 'pk', 'Data': b'[{"Name": "Adam Kehoe", "Email": "kehoe@widgetcorp.com", "Subscription Plan": "1 year subscription", "Premium or not": "Yes", "ProfitperUser": 3113.978, "Usage": "Android", "Flac Usage": "Yes"}, {"Name": "Al Broumand", "Email": "broumand@smithandco.com", "Subscription Plan": "6 month subscription", "Premium or not": "No", "ProfitperUser": 275.914, "Usage": "Windows", "Flac Usage": "No"}, {"Name": "Alan Prussia", "Email": "prussia@foobars.com", "Subscription Plan": "Family Plan subscription", "Premium or not": "Yes", "ProfitperUser": 562.646, "Usage": "MAC", "Flac Usage": "No"}, {"Name": "Alison Propeller", "Email": "propeller@abctelecom.com", "Subscription Plan": "Student Plan subscription", "Premium or not": "Yes", "ProfitperUser": 1368.676, "Usage": "UNIX", "Flac Usage": "No"}, {"Name": "Alix Rowan", "Email": "rowan@fakebrothers.com", "Subscription Plan": "1 month subscription", "Premium or not": "Yes", "ProfitperUser": 1117.137, "Usage": "Android", 

In [126]:
import json
with open(r'C:\Users\suhas\Downloads\LAB1_Data\custData.json') as f:
    data6 = json.load(f)
    #data6 = bytes(data6, 'utf-8')
print(data6)

[{'Name': 'Adam Kehoe', 'Email': 'kehoe@widgetcorp.com', 'Subscription Plan': '1 year subscription', 'Premium or not': 'Yes', 'ProfitperUser': 3113.978, 'Usage': 'Android', 'Flac Usage': 'Yes'}, {'Name': 'Al Broumand', 'Email': 'broumand@smithandco.com', 'Subscription Plan': '6 month subscription', 'Premium or not': 'No', 'ProfitperUser': 275.914, 'Usage': 'Windows', 'Flac Usage': 'No'}, {'Name': 'Alan Prussia', 'Email': 'prussia@foobars.com', 'Subscription Plan': 'Family Plan subscription', 'Premium or not': 'Yes', 'ProfitperUser': 562.646, 'Usage': 'MAC', 'Flac Usage': 'No'}, {'Name': 'Alison Propeller', 'Email': 'propeller@abctelecom.com', 'Subscription Plan': 'Student Plan subscription', 'Premium or not': 'Yes', 'ProfitperUser': 1368.676, 'Usage': 'UNIX', 'Flac Usage': 'No'}, {'Name': 'Alix Rowan', 'Email': 'rowan@fakebrothers.com', 'Subscription Plan': '1 month subscription', 'Premium or not': 'Yes', 'ProfitperUser': 1117.137, 'Usage': 'Android', 'Flac Usage': 'No'}, {'Name': 'All

In [427]:
response

{'FailedRecordCount': 0,
 'Records': [{'SequenceNumber': '49597463529178354377476705143383300373202759429610537138',
   'ShardId': 'shardId-000000000011'}],
 'ResponseMetadata': {'RequestId': 'fc3ccd30-592c-82f1-a9a3-6d92861f6ec2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'fc3ccd30-592c-82f1-a9a3-6d92861f6ec2',
   'x-amz-id-2': 'bDAzaMEyW5fNKzZiLSspWxEYCbI8SYPWLfQ6N0MRpjZ29jEBjKm92Dp760DR5Jrvez3EZQ6srlrSBYFJdB+DgLzjP3GT1amUC0IzP0KVl/Q=',
   'date': 'Wed, 10 Jul 2019 08:14:26 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '146'},
  'RetryAttempts': 0}}

#### 

In [657]:
#Now we will demolish the complete architecture:

#Delete Redshift Cluster:
response_delete_redshift = redshift_client.delete_cluster(
    ClusterIdentifier=cluster_identifier,
    SkipFinalClusterSnapshot=True
)

print(response_delete_redshift)

{'Cluster': {'ClusterIdentifier': 'info7374clusterteam93', 'NodeType': 'dc2.large', 'ClusterStatus': 'deleting', 'MasterUsername': 'root', 'DBName': 'info7374dbteam93', 'Endpoint': {'Address': 'info7374clusterteam93.crpojqzpsevq.us-east-1.redshift.amazonaws.com', 'Port': 5439}, 'ClusterCreateTime': datetime.datetime(2019, 7, 12, 0, 20, 21, 702000, tzinfo=tzutc()), 'AutomatedSnapshotRetentionPeriod': 1, 'ManualSnapshotRetentionPeriod': -1, 'ClusterSecurityGroups': [], 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-f1cf91aa', 'Status': 'active'}], 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0', 'ParameterApplyStatus': 'in-sync'}], 'ClusterSubnetGroupName': 'default', 'VpcId': 'vpc-7c9bfb06', 'AvailabilityZone': 'us-east-1a', 'PreferredMaintenanceWindow': 'sun:03:30-sun:04:00', 'PendingModifiedValues': {}, 'ClusterVersion': '1.0', 'AllowVersionUpgrade': True, 'NumberOfNodes': 1, 'PubliclyAccessible': True, 'Encrypted': False, 'Tags': [], 'EnhancedVpcRouting': F

In [658]:
#Delete Kinesis Delivery stream:
response_delete_firehose = firehose_client.delete_delivery_stream(DeliveryStreamName="info7374deliverystreamteam93")
print(response_delete_firehose)

{'ResponseMetadata': {'RequestId': 'de1de5f5-b893-e925-8bbc-3b0a3a2d03e1', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'de1de5f5-b893-e925-8bbc-3b0a3a2d03e1', 'x-amz-id-2': 'JrGAIpub7y1jaMX50fUNis68oxDY0RU53E1l65fW4WGcIMeaQW5kJtDKyfvq2qcEYLRl8rFRDuCDZ+e/OEjz/ZAdQsBm9olx', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'date': 'Fri, 12 Jul 2019 01:05:03 GMT'}, 'RetryAttempts': 0}}


In [659]:
#Delete Kinesis Data stream:
response_delete_data_stream = kinesis_client.delete_stream(StreamName=kinesis_data_stream_name,EnforceConsumerDeletion=True)
print(response_delete_data_stream)

{'ResponseMetadata': {'RequestId': 'c80e98fd-5abc-7800-9daf-47fdb86d5249', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c80e98fd-5abc-7800-9daf-47fdb86d5249', 'x-amz-id-2': 'uDVvnxjXMaEJmSDe46efC3WQyLD3yFXFYaHFo/HJBwJvI5/VWBRKbO800enlOVLAJaw0cpO3VRmIsSyCvVXb94YK5LZG6KHm', 'date': 'Fri, 12 Jul 2019 01:05:04 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '0'}, 'RetryAttempts': 0}}


In [660]:
#Delete S3 Bucket:

#All of the Objects in a bucket must be deleted before the bucket itself can be deleted:

s3 = boto3.resource('s3', aws_access_key_id = access_key, aws_secret_access_key = secret_access_key)
bucket = s3.Bucket(bucket_name)

for key in bucket.objects.all():
    key.delete()
bucket.delete()

print("Bucket deleted successfully!")

Bucket deleted successfully!


In [661]:
#Delete IAM Policy:



In [662]:
#Delete IAM Role:

response_detach_policy = iam_client.detach_role_policy(PolicyArn=policy_arn,RoleName=iam_role_name)
print(response_detach_policy)

response_delete_role = iam_client.delete_role(RoleName=iam_role_name)
print(response_delete_role)

{'ResponseMetadata': {'RequestId': '16e3fb83-a441-11e9-915f-6b1729499a84', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '16e3fb83-a441-11e9-915f-6b1729499a84', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Fri, 12 Jul 2019 01:05:06 GMT'}, 'RetryAttempts': 0}}
{'ResponseMetadata': {'RequestId': '16f5af1b-a441-11e9-915f-6b1729499a84', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '16f5af1b-a441-11e9-915f-6b1729499a84', 'content-type': 'text/xml', 'content-length': '200', 'date': 'Fri, 12 Jul 2019 01:05:06 GMT'}, 'RetryAttempts': 0}}


In [663]:
#Delete IAM Policy:

response_delete_policy = iam_client.delete_policy(PolicyArn=policy_arn)
print(response_delete_policy)

{'ResponseMetadata': {'RequestId': '1748b0d7-a441-11e9-915f-6b1729499a84', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1748b0d7-a441-11e9-915f-6b1729499a84', 'content-type': 'text/xml', 'content-length': '204', 'date': 'Fri, 12 Jul 2019 01:05:07 GMT'}, 'RetryAttempts': 0}}
