Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions python2.7/kinesis_stream_put_to_s3_athena_partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

S3_BUCKET = os.environ['S3_BUCKET']
S3_PATH = os.environ['S3_PATH']
DEBUG = os.environ['DEBUG']

print('Loading function')

Expand All @@ -17,21 +18,55 @@ def lambda_handler(event, context):
#s3 client init
s3 = boto3.client('s3')


i=-1
ii=-1
s3Puts = 0

lastDate = None
thisDate = None
dataToPut = ''
recordsCount=0
records = []

for record in event['Records']:
# Kinesis data is base64 encoded so decode here
str_payload = base64.b64decode(record['kinesis']['data'])
recordsCount+=1
json_payload = json.loads(base64.b64decode(record['kinesis']['data']))
records.append(json_payload)

records = sorted(records, key=lambda x: int(x['d']['sec']))

json_payload = json.loads(str_payload)
for json_payload in records:
i+=1
ii+=1


str_payload = json.dumps(json_payload)
date = datetime.fromtimestamp(json_payload['d']['sec'])

filePath = S3_PATH + "/year=" + str(date.year) + "/month=" + str(date.month) + "/day=" + str(date.day) + "/hour=" + str(date.hour) + "/minute=" + str(date.minute) + "/" + str(json_payload['d']['sec']) + "_" + str(json_payload['_id']) + ".json"

#print("path: " + filePath)

s3.put_object(ContentType="application/json", Bucket=S3_BUCKET, Key=filePath, Body=str_payload)
thisDate = str(date.year) + str(date.month) + str(date.day) + str(date.hour) + str(date.minute)

if(i==recordsCount or (lastDate!=None and thisDate!=lastDate)):
#if is the last iteration OR this date is not in the same minute of the last record but is not the fisrt iteration
filePath = S3_PATH + "/" + "year=" + str(date.year) + "/month=" + str(date.month) + "/day=" + str(date.day) + "/hour=" + str(date.hour) + "/minute=" + str(date.minute) + "/" + str(json_payload['d']['sec']) + "_" + str(json_payload['_id']) + ".json"

if(DEBUG):
print("path: " + filePath)
print("data: " + dataToPut)
print("\n")

r = s3.put_object(ContentType="application/json", Bucket=S3_BUCKET, Key=filePath, Body=dataToPut)
s3Puts+=1
dataToPut = ''
ii=0

return 'Successfully processed {} records.'.format(len(event['Records']))

if(ii!=0):
str_payload = '\n' + str_payload

dataToPut = dataToPut + str_payload

lastDate = thisDate

return 'Successfully processed {} records. {} put to S3.'.format(recordsCount).s3Puts