In [81]:
from IPython.core.display import display, HTML 
display(HTML("<style>.container { width:100% !important; }</style>")) 

In [82]:
import json
import pprint 
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import os
import boto3
import time

# Rekognitionの結果の読み込みと整形

In [83]:
filename = "data/shibuya2.json"

In [84]:
def parse_video_label_detection(json_filename, start_time=None):
    
    body, ext = os.path.splitext(json_filename)
    assert ext == '.json'
    
    # jsonファイルを開く
    with open(json_filename) as f:
        input_json = json.load(f)
    #物体検出の結果のみを抽出
    ext_obj = [f for f in input_json['Labels'] if not f['Label']['Instances'] == []]   # 物体検出の結果はInstancesに格納されている
    
    # 物体検出の結果だけを含むjson文字列（dict）を作成
    obj_json = input_json.copy()    # 大元のjsonファイルをコピー
    obj_json['Labels'] = ext_obj # 抽出した物体検出結果で置き換え
    
    out_json = "{}_detected_object.json".format(body)
    with open(out_json, 'w') as f:
        json.dump(obj_json, f)
    print('Create json file to "{}"'.format(out_json))
    
    # 物体検出結果をDataFrameに整形
    # ネストしているラベル (Label-Instances)をリスト形式でrecord_pathで指定
    obj_df = pd.json_normalize(obj_json['Labels'], record_path=['Label', 'Instances'], \
                          meta=['Timestamp', ['Label', 'Name'], ])
    obj_df = obj_df[['Timestamp', 'Label.Name', 'Confidence', 'BoundingBox.Width', 'BoundingBox.Height',
       'BoundingBox.Left', 'BoundingBox.Top',]]
    obj_df.columns= ['timedelta', 'name', 'confidence', 'width', 'height', 'left', 'top',]

    
    # 物体領域の中心点と面積を算出しDataFrameに追加
    obj_df['center_x'] = obj_df['left'] + obj_df['width']/2
    obj_df['center_y'] = obj_df['top'] - obj_df['height']/2
    obj_df['area'] = obj_df['width'] * obj_df['height']
    
    # jsonのタイムスタンプをTimeDeltra形式に変換
    timedelta = pd.to_timedelta(obj_df['timedelta'] * 10**6)
    
    # 開始時間を設定。starttime引数を指定しなかった場合は現在時間にする
    if start_time == None:
        start_time = datetime.datetime.now() #"2021-03-19 00:00:00"
    obj_df['timestamp'] = pd.to_datetime(start_time) + timedelta
    
    # 識別子（インデックス番号）入りのオブジェクト名を作成
    obj_df['name_with_id'] = obj_df['name'] + obj_df.index.map(lambda x: '_' + str(x))
    
    out_csv = "{}_detected_object.csv".format(body)
    obj_df.to_csv(out_csv, index=None)
    print('Create csv file to "{}"'.format(out_csv))
    
    return obj_df


In [85]:
obj_df = parse_video_label_detection(filename)

Create json file to "data/shibuya2_detected_object.json"
Create csv file to "data/shibuya2_detected_object.csv"


In [93]:
obj_df.tail(30)

Unnamed: 0,timedelta,name,confidence,width,height,left,top,center_x,center_y,area,timestamp,name_with_id
181,72334,Person,98.836418,0.076321,0.690025,0.252816,0.295426,0.290976,-0.049587,0.052664,2021-03-22 15:13:03.632214,Person_181
182,72334,Person,95.226372,0.054548,0.376388,0.161504,0.024529,0.188777,-0.163665,0.020531,2021-03-22 15:13:03.632214,Person_182
183,72334,Person,86.624107,0.074142,0.531362,0.042515,0.133873,0.079586,-0.131808,0.039396,2021-03-22 15:13:03.632214,Person_183
184,72334,Person,79.589737,0.016886,0.078045,0.592874,0.0,0.601317,-0.039023,0.001318,2021-03-22 15:13:03.632214,Person_184
185,72334,Person,75.930618,0.04674,0.356704,0.251268,0.103087,0.274638,-0.075265,0.016673,2021-03-22 15:13:03.632214,Person_185
186,72334,Person,67.607376,0.024603,0.0969,0.853182,0.0,0.865484,-0.04845,0.002384,2021-03-22 15:13:03.632214,Person_186
187,72334,Person,60.730618,0.06424,0.326397,0.261297,0.186289,0.293417,0.023091,0.020968,2021-03-22 15:13:03.632214,Person_187
188,72808,Person,99.723846,0.077624,0.642879,0.271719,0.341869,0.310531,0.02043,0.049903,2021-03-22 15:13:04.106214,Person_188
189,72808,Person,99.71212,0.056117,0.603548,0.335282,0.141925,0.36334,-0.159849,0.033869,2021-03-22 15:13:04.106214,Person_189
190,72808,Person,99.639206,0.056072,0.52755,0.123435,0.138001,0.151471,-0.125774,0.029581,2021-03-22 15:13:04.106214,Person_190


# TimeStreamへのデータ登録

## データベース、テーブルの作成

In [87]:
db_name = "sampleDB8"
table_name = "sampleTable8"

In [88]:
ts_write = boto3.client("timestream-write")

データベース作成

In [89]:
_response = ts_write.create_database(
    DatabaseName=db_name,
    Tags=[
    ]
)

In [90]:
_response

{'Database': {'Arn': 'arn:aws:timestream:us-east-1:820974724107:database/sampleDB8',
  'DatabaseName': 'sampleDB8',
  'TableCount': 0,
  'KmsKeyId': 'arn:aws:kms:us-east-1:820974724107:key/b73e0ee6-00f6-451e-93ea-aa88d8a82139',
  'CreationTime': datetime.datetime(2021, 3, 22, 15, 12, 14, 216000, tzinfo=tzlocal()),
  'LastUpdatedTime': datetime.datetime(2021, 3, 22, 15, 12, 14, 216000, tzinfo=tzlocal())},
 'ResponseMetadata': {'RequestId': '5O3N735VHO5NSQM5GLGKMUQ7V4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5O3N735VHO5NSQM5GLGKMUQ7V4',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '281',
   'date': 'Mon, 22 Mar 2021 06:12:13 GMT'},
  'RetryAttempts': 0}}

テーブル作成

In [91]:
_response = ts_write.create_table(
    DatabaseName=db_name,
    TableName=table_name,
    RetentionProperties={
        'MemoryStoreRetentionPeriodInHours': 123,
        'MagneticStoreRetentionPeriodInDays': 123
    },
    Tags=[
    ]
)
print(_response)

{'Table': {'Arn': 'arn:aws:timestream:us-east-1:820974724107:database/sampleDB8/table/sampleTable8', 'TableName': 'sampleTable8', 'DatabaseName': 'sampleDB8', 'TableStatus': 'ACTIVE', 'RetentionProperties': {'MemoryStoreRetentionPeriodInHours': 123, 'MagneticStoreRetentionPeriodInDays': 123}, 'CreationTime': datetime.datetime(2021, 3, 22, 15, 12, 16, 188000, tzinfo=tzlocal()), 'LastUpdatedTime': datetime.datetime(2021, 3, 22, 15, 12, 16, 188000, tzinfo=tzlocal())}, 'ResponseMetadata': {'RequestId': 'SAEJVAELHSIAXGRJDTLBZNBAHM', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'SAEJVAELHSIAXGRJDTLBZNBAHM', 'content-type': 'application/x-amz-json-1.0', 'content-length': '348', 'date': 'Mon, 22 Mar 2021 06:12:15 GMT'}, 'RetryAttempts': 0}}


テーブルにレコードを登録

In [92]:
dim_columns = ['name', 'name_with_id']
mes_columns = ['timedelta', 'confidence', 'width',  'height', 'left', 'top', 'center_x', 'center_y', 'area']


records = []
write_freq = 10

for index, row in obj_df.iterrows():
    print(index)
    
    dimensions = []
    for col in dim_columns:
        dimensions.append({
            'Name': col,
            'Value': str(row[col])
        })
    #print(dimensions)
    
    '''
    common_attributes = {
        'Dimensions': dimensions,
        'MeasureValueType': 'DOUBLE',
        'Time': str(round(time.time() * 1000))
        }
    '''
    
    for col in mes_columns:
        records.append({
            'Dimensions': dimensions,
            'MeasureName': col,
            'MeasureValue': str(row[col]),
            'MeasureValueType': 'DOUBLE',
            'Time': str(round(row['timestamp'].to_pydatetime().timestamp() * 10**6)),
            #'Time': str(round(time.time() * 10**6)),

            'TimeUnit': 'MICROSECONDS',

        })
    
    #print(records)
        
    if index % write_freq == 0:
        try:
            result = ts_write.write_records(DatabaseName=db_name, TableName=table_name,
                                               Records=records, CommonAttributes={})
            #print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
        except ts_write.exceptions.RejectedRecordsException as err:
            print("RejectedRecords: ", err)
            for rr in err.response["RejectedRecords"]:
                print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
            print("Other records were written successfully. ")
        except Exception as err:
            print("Error:", err)
        records = []
        print("recoreds set to zero")

if index % write_freq != 0:
    try:
        result = ts_write.write_records(DatabaseName=db_name, TableName=table_name,
                                           Records=records, CommonAttributes={})
        #print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
    except ts_write.exceptions.RejectedRecordsException as err:
        print("RejectedRecords: ", err)
        for rr in err.response["RejectedRecords"]:
            print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
        print("Other records were written successfully. ")
    except Exception as err:
        print("Error:", err)
    records = []
    print("recoreds set to zero")

0
recoreds set to zero
1
2
3
4
5
6
7
8
9
10
recoreds set to zero
11
12
13
14
15
16
17
18
19
20
recoreds set to zero
21
22
23
24
25
26
27
28
29
30
recoreds set to zero
31
32
33
34
35
36
37
38
39
40
recoreds set to zero
41
42
43
44
45
46
47
48
49
50
recoreds set to zero
51
52
53
54
55
56
57
58
59
60
recoreds set to zero
61
62
63
64
65
66
67
68
69
70
recoreds set to zero
71
72
73
74
75
76
77
78
79
80
recoreds set to zero
81
82
83
84
85
86
87
88
89
90
recoreds set to zero
91
92
93
94
95
96
97
98
99
100
recoreds set to zero
101
102
103
104
105
106
107
108
109
110
recoreds set to zero
111
112
113
114
115
116
117
118
119
120
recoreds set to zero
121
122
123
124
125
126
127
128
129
130
recoreds set to zero
131
132
133
134
135
136
137
138
139
140
recoreds set to zero
141
142
143
144
145
146
147
148
149
150
recoreds set to zero
151
152
153
154
155
156
157
158
159
160
recoreds set to zero
161
162
163
164
165
166
167
168
169
170
recoreds set to zero
171
172
173
174
175
176
177
178
179
180
recoreds

In [75]:
dimensions

[{'Name': 'name', 'Value': 'Person'},
 {'Name': 'name_with_id', 'Value': 'Person_582'}]

In [None]:
CommonAttributes={
        'Dimensions': dimensions
        'MeasureName': 'string',
        'MeasureValue': 'string',
        'MeasureValueType': 'DOUBLE'|'BIGINT'|'VARCHAR'|'BOOLEAN',
        'Time': 'string',
        'TimeUnit': 'MILLISECONDS'|'SECONDS'|'MICROSECONDS'|'NANOSECONDS',
        'Version': 123
    },

In [None]:
_response = ts_write.write_records(
    DatabaseName=db_name,
    TableName=table_name,
    CommonAttributes={
        'Dimensions': [
            {
                'Name': 'string',
                'Value': 'string',
                'DimensionValueType': 'VARCHAR'
            },
        ],
        'MeasureName': 'string',
        'MeasureValue': 'string',
        'MeasureValueType': 'DOUBLE'|'BIGINT'|'VARCHAR'|'BOOLEAN',
        'Time': 'string',
        'TimeUnit': 'MILLISECONDS'|'SECONDS'|'MICROSECONDS'|'NANOSECONDS',
        'Version': 123
    },
    Records=[
        {
            'Dimensions': [
                {
                    'Name': 'string',
                    'Value': 'string',
                    'DimensionValueType': 'VARCHAR'
                },
            ],
            'MeasureName': 'string',
            'MeasureValue': 'string',
            'MeasureValueType': 'DOUBLE'|'BIGINT'|'VARCHAR'|'BOOLEAN',
            'Time': 'string',
            'TimeUnit': 'MILLISECONDS'|'SECONDS'|'MICROSECONDS'|'NANOSECONDS',
            'Version': 123
        },
    ]
)

In [None]:
!pip freeze | grep pandas

In [None]:
!pip install awswrangler

In [None]:
rejected_records = wr.timestream.write(
    df=obj_df,
    database=db_name,
    table= table_name,
    time_col="TimeStamp",
    #measure_col= ["Confidence", "Width", "Height", "Left", "Top", "Center_X", "Center_Y", "Area"],
    measure_col= "Confidence",

    dimensions_cols=["Name"],
)

print(f"Number of rejected records: {len(rejected_records)}")


In [None]:
rejected_records