## 1. Kinesis Analytics를 이용한 실시간 이상감지

The algorithm starts developing the machine learning model using current records in the stream when you start the application. The algorithm does not use older records in the stream for machine learning, nor does it use statistics from previous executions of the application.

In [1]:
import sys

In [2]:
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install sklearn-pandas
!{sys.executable} -m pip install awswrangler

Requirement already up-to-date: pip in /home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages (20.2.1)


In [3]:
import json
import random
import boto3
import os
import time
import numpy as np
import pandas as pd
import awswrangler as wr

In [4]:
from sklearn.impute import SimpleImputer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn_pandas import DataFrameMapper
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder

In [5]:
kinesis_client = boto3.client('kinesis')
kinesis_analytics = boto3.client('kinesisanalytics')

In [6]:
sess = boto3.Session()

# create a s3 bucket to hold data, note that your account might already created a bucket with the same name
account_id = sess.client('sts').get_caller_identity()["Account"]
job_bucket = 's3://sagemaker-experiments-{}-{}'.format(sess.region_name, account_id)
job_bucket

's3://sagemaker-experiments-us-east-2-322537213286'

### 1.1 Input/Output Create_stream 생성

In [7]:
data_stream = ['clickstream_input', 'clickstream_output']

try:
    for stream in data_stream:
        kinesis_client.create_stream(
            StreamName=stream,
            ShardCount=1
        )
except Exception as e:
    if e.response['Error']['Code'] == 'ResourceInUseException':
        print(e.response['message'])
    else:
        print(e.response['Error']['Code'])
    

Stream clickstream_input under account 322537213286 already exists.


In [8]:
# Wait until all streams are created
result = {}
waiter = kinesis_client.get_waiter('stream_exists')
for stream in data_stream:
    waiter.wait(StreamName=stream)
    response = kinesis_client.describe_stream(StreamName=stream)
    result[stream] = response["StreamDescription"]["StreamARN"]

### 1.2 IAM 생성

In [9]:
region = boto3.Session().region_name

iam = boto3.client('iam')
sts = boto3.client('sts')

In [10]:
kinesis_role_name = 'kinesis-analytics-{}'.format(region)
kinesis_analytics_policy_name=kinesis_role_name + '_policy'
kinesis_role_name

'kinesis-analytics-us-east-2'

In [11]:
kinesis_assume_role={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

In [12]:
kinesis_analytics_policy = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "ReadInputKinesis",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords"
      ],
      "Resource": [
        result['clickstream_input']
      ]
    },
    {
      "Sid": "WriteOutputKinesis",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": [
        result['clickstream_output']
      ]
    }
  ]
}

In [13]:
try:
    role_res = iam.create_role(
        Path='/service-role/',
        RoleName=kinesis_role_name,
        AssumeRolePolicyDocument=json.dumps(kinesis_assume_role),
    )
except Exception as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        role_res = iam.get_role(
            RoleName=kinesis_role_name
        )
        print("User already exists")
    else:
        print("Unexpected error: %s" % e)

User already exists


In [14]:
try:
    policy_res = iam.create_policy(
        PolicyName=kinesis_analytics_policy_name,
        Path='/',
        PolicyDocument=json.dumps(kinesis_analytics_policy)
    )
except Exception as e:
    if e.response['Error']['Code'] == 'EntityAlreadyExists':
        PolicyArn ='arn:aws:iam::{}:policy/{}'.format(account_id, kinesis_analytics_policy_name)
        policy_res = iam.get_policy(
            PolicyArn=PolicyArn
        )
        print("User already exists")
    else:
        print("Unexpected error: %s" % e)

User already exists


In [15]:
from botocore.exceptions import ClientError

try:
    attach_res = iam.attach_role_policy(
        PolicyArn=policy_res['Policy']['Arn'],
        RoleName=kinesis_role_name
    )
    print("Policy has been succesfully attached to role: %s" % kinesis_role_name)
except Exception as e:
    print("Unexpected error: %s" % e)

Policy has been succesfully attached to role: kinesis-analytics-us-east-2


### 1.3 Create Application

In [16]:
sql_query = open('random_cut_forest_with_explanation.sql', "r").read()

In [17]:
kinesis_ApplicationName='clickstream-analytics-rcf-explanation-{}'.format(int(time.time()))

In [18]:
try:
    response = kinesis_analytics.create_application(
        ApplicationName=kinesis_ApplicationName,
        ApplicationCode=sql_query,
        Inputs=[
            {
                'NamePrefix': 'SOURCE_SQL_STREAM',
                'KinesisStreamsInput': {
                    'ResourceARN': result[data_stream[0]],
                    'RoleARN': role_res['Role']['Arn']
                },
                'InputSchema': {
                    'RecordFormat': {
                        'RecordFormatType': 'JSON',
                        'MappingParameters': {
                            'JSONMappingParameters': {
                                'RecordRowPath': '$'
                            },
                        }
                    },
                    'RecordEncoding': 'UTF-8',
                    'RecordColumns': [
                                {
                                    "SqlType": "TIMESTAMP",
                                    "Name": "TIMESTAMPS",
                                    "Mapping": "$.timestamp"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "CLICKSTREAM_ID",
                                    "Mapping": "$.clickstream_id"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "URL",
                                    "Mapping": "$.url"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "IS_PURCHASED",
                                    "Mapping": "$.is_purchased"
                                },
                                {
                                    "SqlType": "DOUBLE",
                                    "Name": "IS_PAGE_ERRORED",
                                    "Mapping": "$.is_page_errored"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "USER_SESSION_ID",
                                    "Mapping": "$.user_session_id"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "CITY",
                                    "Mapping": "$.city"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "STATE",
                                    "Mapping": "$.state"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "COUNTRY",
                                    "Mapping": "$.country"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "BIRTH_DT",
                                    "Mapping": "$.BIRTH_DT"
                                },
                                {
                                    "SqlType": "INTEGER",
                                    "Name": "GENDER_CD",
                                    "Mapping": "$.GENDER_CD"
                                }
                    ],
                    "RecordFormat": {
                        "MappingParameters": {
                            'JSONMappingParameters': {
                                'RecordRowPath': '$'
                            },
                        },
                        "RecordFormatType": "JSON"
                      }
                }
            },
        ],
        Outputs=[
            {
                'Name': 'DESTINATION_SQL_STREAM',
                'DestinationSchema': {
                    'RecordFormatType': 'JSON'
                },
                "KinesisStreamsOutput": {
                    "ResourceARN": result[data_stream[1]],
                    "RoleARN": role_res['Role']['Arn']
                }
            },
        ],
    )
except Exception as e:
    if e.response['Error']['Code'] == 'ResourceInUseException':
        print(e.response['message'])
    else:
        print(e)

### 1.5 Start Application

In [19]:
response = kinesis_analytics.describe_application(
    ApplicationName=kinesis_ApplicationName
)

application = response["ApplicationDetail"]
inputId = application['InputDescriptions'][0]['InputId']

try:
    kinesis_analytics.start_application(ApplicationName=kinesis_ApplicationName,
                                     InputConfigurations=[{
                                       "Id": inputId,
                                       "InputStartingPositionConfiguration": {
                                         "InputStartingPosition": "NOW"
                                       }
                                     }])
except Exception as e:
    if e.response['Error']['Code'] == 'ResourceInUseException':
        print(e.response['message'])
    else:
        print(e.response['Error']['Code'])

In [20]:
# Wait until application starts running
response = kinesis_analytics.describe_application(
    ApplicationName=kinesis_ApplicationName
)
status = response["ApplicationDetail"]["ApplicationStatus"]

sys.stdout.write('Starting ')
while status != "RUNNING":
    sys.stdout.write('.')
    sys.stdout.flush()
    time.sleep(1)
    response = kinesis_analytics.describe_application(
      ApplicationName=kinesis_ApplicationName
    )
    status = response["ApplicationDetail"]["ApplicationStatus"]
sys.stdout.write('RUNNING')
sys.stdout.write(os.linesep)

Starting ...............................RUNNING


###  1.7 Sending Input Stream

In [21]:
users = pd.read_csv('users.tsv', delimiter='\t')
users.head(5)

Unnamed: 0,SWID,BIRTH_DT,GENDER_CD
0,0001BDD9-EABF-4D0D-81BD-D9EABFCD0D7D,8-Apr-84,F
1,00071AA7-86D2-4EB9-871A-A786D27EB9BA,7-Feb-88,F
2,00071B7D-31AF-4D85-871B-7D31AFFD852E,22-Oct-64,F
3,0007967E-F188-4598-9C7C-E64390482CFB,1-Jun-66,M
4,000B90B2-92DC-4A7A-8B90-B292DC9A7A71,13-Jun-84,M


In [22]:
cs = pd.read_csv('clickstream-feed-generated.tsv', header=None, delimiter='|')
cs.columns = pd.read_csv('clickstream-data-schema.txt', delimiter='|').columns
cs['timestamp'] = cs['timestamp'].astype('datetime64')
cs.set_index('timestamp', inplace=True)
cs.head(5)

Unnamed: 0_level_0,clickstream_id,IP address,url,is_purchased?,is_page_errored?,user_session_id,city,state,country
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2012-03-01 00:00:19,1330588819,147.222.227.200,http://www.RL.com/,0,0.0,AF8A0FDF-B1F8-474C-8CD7-8CA06A8E435B,spokane,wa,usa
2012-03-01 00:00:25,1330588825,99.49.96.163,http://www.RL.com/product/4004,0,1.0,C9183A22-6E1D-4147-BDC9-D634FC957098,detroit,mi,usa
2012-03-01 00:00:27,1330588827,147.222.227.200,http://www.RL.com/,0,0.0,AF8A0FDF-B1F8-474C-8CD7-8CA06A8E435B,spokane,wa,usa
2012-03-01 00:00:57,1330588857,69.114.3.205,http://www.RL.com/product/4004,0,0.0,F761B842-9DDA-42CC-9F28-A6359B6C7219,east northport,ny,usa
2012-03-01 00:00:59,1330588859,71.217.29.209,http://www.RL.com/review/3004,0,0.59596,6FE1CB72-95C9-47F9-A1CB-7295C927F916,tacoma,wa,usa


In [23]:
r_users = users.rename({'SWID':'user_session_id'}, axis='columns')
r_cs = cs.rename({'IP address':'ip_address','is_purchased?':'is_purchased'
                ,'is_page_errored?':'is_page_errored', }, axis='columns')
r_cs = r_cs.reset_index()

In [24]:
## Data merge
r_css = pd.merge(r_cs, r_users, how='left')

## Delete NaN and dup-data for timestamp
r_css = r_css.dropna(axis=0)
# r_css = r_css.drop_duplicates(['timestamp'])
r_css_sorted = r_css.sort_values(by=['timestamp'])
# r_css_sorted = r_css_sorted.set_index('timestamp')

In [25]:
# These features can be parsed as numeric.
timestamp = ['timestamp']
# These features contain a relatively small number of unique items.
clickstream_id = ['clickstream_id']
url = ['url']
is_purchased = ['is_purchased']
is_page_errored = ['is_page_errored']
user_session_id = ['user_session_id']
city = ['city']
state = ['state']
country = ['country']
birth_dt = ['BIRTH_DT']
gender_cd = ['GENDER_CD']

In [26]:
mapper = DataFrameMapper([
        ( clickstream_id, LabelEncoder()), 
        ( url, LabelEncoder()),
        (is_purchased, LabelEncoder()),
        ( is_page_errored, [SimpleImputer(strategy='constant', missing_values=np.nan)]), 
        ( user_session_id, LabelEncoder()),
        ( city, LabelEncoder()), 
        ( state, LabelEncoder()), 
        ( country, LabelEncoder()), 
        ( birth_dt, LabelEncoder()), 
        ( gender_cd, LabelEncoder()),

    ], 
    df_out=True
)

In [27]:
import time, datetime, sched

In [28]:
def data_list(index=0, duration=0, check_data=[]):
    check_data.append(list(r_css_sorted.iloc[index]))
    print("{}    {}      {}".format(index, duration, str(list(r_css_sorted.iloc[index]))))
#     print(str(list(r_css_sorted.iloc[index]))+'\n')

In [29]:
def make_kinesis_data(df, i, col_names):
    kinesis_data = {}
    kinesis_data['timestamp']=str(df['timestamp'][i])
    for col_name in col_names:
        if col_name not in ['timestamp']:
            if col_name in ['is_page_errored']:
                kinesis_data[col_name]=float(df.iloc[i][col_name])
            else:
                kinesis_data[col_name]=int(df.iloc[i][col_name])

    return kinesis_data

In [33]:
def s3_sink(shardIterator):
    rows = []
    response = kinesis_client.get_records(ShardIterator=shardIterator)
    records = response["Records"]

    try:
        while len(records) > 0:
            parsed_records = []
            for rec in records:
                parsed_record = json.loads(rec["Data"])
                parsed_records.append(parsed_record)
            rows.extend(parsed_records)
            time.sleep(1)
            shardIterator = response["NextShardIterator"]
            response = kinesis_client.get_records(ShardIterator=shardIterator)
            records = response["Records"]
        flag = rows[0]['TIMESTAMPS'].replace(":","-").replace(" ","-").replace(".","-")
        s3_save_path = '{}/{}/{}'.format(job_bucket, 'kinesis-analytics', flag)
        wr.s3.to_csv(
            df=pd.DataFrame(rows),
            path=s3_save_path
        )
    except:
        sys.stdout.write("[ExpiredIteratorException] Iterator expired.")
        pass
    return 

In [34]:
total_cnt = 500

In [None]:
check_data = []
sch = sched.scheduler(time.time, time.sleep)
for i in range(r_css_sorted.shape[0]):
    duration = int((r_css_sorted.iloc[i]['timestamp'] - r_css_sorted.iloc[0]['timestamp']).seconds)
    duration = round(duration/100)
    sch.enter(duration, 1, data_list, kwargs={'index': i, 'duration' : duration, 'check_data' : check_data})

    if (i+1) % 10 == 0:
        ts_data = pd.DataFrame(check_data, columns=['timestamp', 'clickstream_id', 'ip_address', 'url', 'is_purchased',
       'is_page_errored', 'user_session_id', 'city', 'state', 'country',
       'BIRTH_DT', 'GENDER_CD'])
        input_df = np.round(mapper.fit_transform(ts_data),2)
        input_df = pd.concat([ts_data['timestamp'], input_df], axis=1)
        response = kinesis_client.describe_stream(StreamName=data_stream[1])
        shardId = response["StreamDescription"]["Shards"][0]["ShardId"]
        response = kinesis_client.get_shard_iterator(StreamName=data_stream[1],
                                            ShardId=shardId,
                                            ShardIteratorType="LATEST")
        shardIterator = response["ShardIterator"]
        
        kinesis_data = {}
        rnd=random.random()
        col_names = input_df.keys().to_list()
        for i, idx in enumerate(input_df.index):
            kinesis_data = json.dumps(make_kinesis_data(input_df, idx, col_names))
            kinesis_client.put_record(
                StreamName=data_stream[0],
                Data=kinesis_data,
                PartitionKey=str(rnd)
            )
        check_data = []
        time.sleep(3)
        s3_sink(shardIterator)
        
    if i == total_cnt:
        break
    sch.run()

0    0      [Timestamp('2012-03-01 00:01:30'), 1330588890, '68.5.184.133', 'http://www.RL.com/', 0, 0.0, '60C8049D-C1A2-41C2-B503-6C1200424C49', 'mission viejo', 'ca', 'usa', '13-Jul-85', 'M']
1    0      [Timestamp('2012-03-01 00:01:31'), 1330588891, '68.5.184.133', 'http://www.RL.com/reco/2001', 0, 0.919192, '60C8049D-C1A2-41C2-B503-6C1200424C49', 'mission viejo', 'ca', 'usa', '13-Jul-85', 'M']
2    0      [Timestamp('2012-03-01 00:02:19'), 1330588939, '68.5.184.133', 'http://www.RL.com/', 0, 0.0, '60C8049D-C1A2-41C2-B503-6C1200424C49', 'mission viejo', 'ca', 'usa', '13-Jul-85', 'M']
3    1      [Timestamp('2012-03-01 00:02:28'), 1330588948, '50.8.107.4', 'http://www.RL.com/', 0, 0.0, 'A24EA63A-843F-4290-8B17-B7D16A204C96', 'denver', 'co', 'usa', '23-Feb-80', 'U']
4    1      [Timestamp('2012-03-01 00:03:33'), 1330589013, '67.2.109.46', 'http://www.RL.com/', 0, 0.0, '6AF05C56-1AB3-4CB7-9584-7CDCB27BACFE', 'south jordan', 'ut', 'usa', '23-Jul-68', 'M']
5    2      [Timestamp('2012-03-

  return f(**kwargs)


9    2      [Timestamp('2012-03-01 00:04:45'), 1330589085, '24.9.63.79', 'http://www.RL.com/product/4001', 0, 0.0, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
10    2      [Timestamp('2012-03-01 00:04:55'), 1330589095, '24.9.63.79', 'http://www.RL.com/', 0, 0.0, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
11    2      [Timestamp('2012-03-01 00:04:57'), 1330589097, '24.9.63.79', 'http://www.RL.com/product/4004', 1, 0.0, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
12    2      [Timestamp('2012-03-01 00:05:12'), 1330589112, '24.9.63.79', 'http://www.RL.com/', 0, 0.0, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
13    2      [Timestamp('2012-03-01 00:05:22'), 1330589122, '99.102.31.167', 'http://www.RL.com/', 0, 0.0, '7A7EC1B7-BAFE-431A-A4B5-0378A4CDE8CB', 'sacramento', 'ca', 'usa', '28-Feb-89', 'M']
14    2      [Timestamp('2012-03-01 00:05:32'

  return f(**kwargs)


19    4      [Timestamp('2012-03-01 00:08:44'), 1330589324, '68.5.184.133', 'http://www.RL.com/review/3004', 0, 0.24242399999999997, '60C8049D-C1A2-41C2-B503-6C1200424C49', 'mission viejo', 'ca', 'usa', '13-Jul-85', 'M']
20    4      [Timestamp('2012-03-01 00:08:52'), 1330589332, '50.8.107.4', 'http://www.RL.com/review/3004', 0, 0.323232, 'A24EA63A-843F-4290-8B17-B7D16A204C96', 'denver', 'co', 'usa', '23-Feb-80', 'U']
21    5      [Timestamp('2012-03-01 00:09:26'), 1330589366, '98.228.34.12', 'http://www.RL.com/review/3004', 0, 0.666667, 'D956528D-3CAC-4035-BB1C-79A8D4F4996A', 'bloomington', 'in', 'usa', '3-Feb-86', 'F']
22    5      [Timestamp('2012-03-01 00:09:26'), 1330589366, '98.228.34.12', 'http://www.RL.com/video/1001', 0, 0.666667, 'D956528D-3CAC-4035-BB1C-79A8D4F4996A', 'bloomington', 'in', 'usa', '3-Feb-86', 'F']
23    5      [Timestamp('2012-03-01 00:09:34'), 1330589374, '98.228.34.12', 'http://www.RL.com/video/1001', 0, 0.747475, 'D956528D-3CAC-4035-BB1C-79A8D4F4996A', 'blo

  return f(**kwargs)


29    6      [Timestamp('2012-03-01 00:11:48'), 1330589508, '207.62.158.62', 'http://www.RL.com/video/1003', 0, 0.08080810000000001, 'E1D2DE32-6F67-4A50-ABC1-102555658531', 'san luis obispo', 'ca', 'usa', '15-Nov-92', 'M']
30    6      [Timestamp('2012-03-01 00:12:05'), 1330589525, '99.102.31.167', 'http://www.RL.com/product/4003', 0, 0.0, '7A7EC1B7-BAFE-431A-A4B5-0378A4CDE8CB', 'sacramento', 'ca', 'usa', '28-Feb-89', 'M']
31    6      [Timestamp('2012-03-01 00:12:07'), 1330589527, '207.62.158.62', 'http://www.RL.com/reco/2002', 0, 0.272727, 'E1D2DE32-6F67-4A50-ABC1-102555658531', 'san luis obispo', 'ca', 'usa', '15-Nov-92', 'M']
32    6      [Timestamp('2012-03-01 00:12:17'), 1330589537, '207.62.158.62', 'http://www.RL.com/video/1002', 0, 0.373737, 'E1D2DE32-6F67-4A50-ABC1-102555658531', 'san luis obispo', 'ca', 'usa', '15-Nov-92', 'M']
33    6      [Timestamp('2012-03-01 00:12:19'), 1330589539, '207.62.158.62', 'http://www.RL.com/reco/2001', 0, 0.393939, 'E1D2DE32-6F67-4A50-ABC1-1025

  return f(**kwargs)


39    7      [Timestamp('2012-03-01 00:13:36'), 1330589616, '67.2.109.46', 'http://www.RL.com/reco/2002', 0, 0.161616, '6AF05C56-1AB3-4CB7-9584-7CDCB27BACFE', 'south jordan', 'ut', 'usa', '23-Jul-68', 'M']
40    7      [Timestamp('2012-03-01 00:13:39'), 1330589619, '24.9.63.79', 'http://www.RL.com/', 0, 0.0, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
41    8      [Timestamp('2012-03-01 00:14:32'), 1330589672, '24.9.63.79', 'http://www.RL.com/video/1001', 0, 0.727273, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
42    8      [Timestamp('2012-03-01 00:14:32'), 1330589672, '24.9.63.79', 'http://www.RL.com/video/1003', 0, 0.727273, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
43    8      [Timestamp('2012-03-01 00:14:39'), 1330589679, '67.2.109.46', 'http://www.RL.com/product/4004', 0, 0.0, '6AF05C56-1AB3-4CB7-9584-7CDCB27BACFE', 'south jordan', 'ut', 'usa', '23-Jul-68', 'M']
44   

  return f(**kwargs)


49    9      [Timestamp('2012-03-01 00:16:40'), 1330589800, '68.52.198.113', 'http://www.RL.com/product/4003', 0, 0.0, 'DA55421E-31B5-45E5-9542-1E31B575E5CC', 'nashville', 'tn', 'usa', '1-Jan-79', 'U']
50    9      [Timestamp('2012-03-01 00:16:40'), 1330589800, '68.52.198.113', 'http://www.RL.com/product/4001', 1, 0.0, 'DA55421E-31B5-45E5-9542-1E31B575E5CC', 'nashville', 'tn', 'usa', '1-Jan-79', 'U']
51    9      [Timestamp('2012-03-01 00:16:41'), 1330589801, '98.228.34.12', 'http://www.RL.com/reco/2002', 0, 0.010101, 'D956528D-3CAC-4035-BB1C-79A8D4F4996A', 'bloomington', 'in', 'usa', '3-Feb-86', 'F']
52    9      [Timestamp('2012-03-01 00:16:58'), 1330589818, '24.9.63.79', 'http://www.RL.com/reco/2001', 0, 0.181818, 'DEADBDB9-274E-4119-8184-DA17A059D6CE', 'boulder', 'co', 'usa', '1-Nov-95', 'M']
53    10      [Timestamp('2012-03-01 00:18:06'), 1330589886, '98.228.34.12', 'http://www.RL.com/review/3001', 0, 0.8686870000000001, 'D956528D-3CAC-4035-BB1C-79A8D4F4996A', 'bloomington', 'in'

In [None]:
kinesis_analytics.stop_application(ApplicationName=kinesis_ApplicationName)

# Wait until application stops running
response = kinesis_analytics.describe_application(
    ApplicationName=kinesis_ApplicationName
)
status = response["ApplicationDetail"]["ApplicationStatus"]
sys.stdout.write('Stopping ')

while status != "READY":
    sys.stdout.write('.')
    sys.stdout.flush()
    time.sleep(1)
    response = kinesis_analytics.describe_application(
        ApplicationName=kinesis_ApplicationName
    )
    status = response["ApplicationDetail"]["ApplicationStatus"]

sys.stdout.write(os.linesep)