diff --git a/python2.7/kinesis_stream_put_to_s3_athena_partitioning.py b/python2.7/kinesis_stream_put_to_s3_athena_partitioning.py index ed92e09..c1003b0 100644 --- a/python2.7/kinesis_stream_put_to_s3_athena_partitioning.py +++ b/python2.7/kinesis_stream_put_to_s3_athena_partitioning.py @@ -8,6 +8,7 @@ S3_BUCKET = os.environ['S3_BUCKET'] S3_PATH = os.environ['S3_PATH'] +DEBUG = os.environ['DEBUG'] print('Loading function') @@ -17,21 +18,55 @@ def lambda_handler(event, context): #s3 client init s3 = boto3.client('s3') + + i=-1 + ii=-1 + s3Puts = 0 + + lastDate = None + thisDate = None + dataToPut = '' + recordsCount=0 + records = [] + for record in event['Records']: - # Kinesis data is base64 encoded so decode here - str_payload = base64.b64decode(record['kinesis']['data']) + recordsCount+=1 + json_payload = json.loads(base64.b64decode(record['kinesis']['data'])) + records.append(json_payload) + records = sorted(records, key=lambda x: int(x['d']['sec'])) - json_payload = json.loads(str_payload) + for json_payload in records: + i+=1 + ii+=1 - + str_payload = json.dumps(json_payload) date = datetime.fromtimestamp(json_payload['d']['sec']) - - filePath = S3_PATH + "/year=" + str(date.year) + "/month=" + str(date.month) + "/day=" + str(date.day) + "/hour=" + str(date.hour) + "/minute=" + str(date.minute) + "/" + str(json_payload['d']['sec']) + "_" + str(json_payload['_id']) + ".json" - #print("path: " + filePath) + - s3.put_object(ContentType="application/json", Bucket=S3_BUCKET, Key=filePath, Body=str_payload) + thisDate = str(date.year) + str(date.month) + str(date.day) + str(date.hour) + str(date.minute) + if(i==recordsCount or (lastDate!=None and thisDate!=lastDate)): + #if is the last iteration OR this date is not in the same minute of the last record but is not the fisrt iteration + filePath = S3_PATH + "/" + "year=" + str(date.year) + "/month=" + str(date.month) + "/day=" + str(date.day) + "/hour=" + str(date.hour) + "/minute=" + str(date.minute) + "/" + str(json_payload['d']['sec']) + "_" + str(json_payload['_id']) + ".json" + + if(DEBUG): + print("path: " + filePath) + print("data: " + dataToPut) + print("\n") + + r = s3.put_object(ContentType="application/json", Bucket=S3_BUCKET, Key=filePath, Body=dataToPut) + s3Puts+=1 + dataToPut = '' + ii=0 - return 'Successfully processed {} records.'.format(len(event['Records'])) \ No newline at end of file + + if(ii!=0): + str_payload = '\n' + str_payload + + dataToPut = dataToPut + str_payload + + lastDate = thisDate + + return 'Successfully processed {} records. {} put to S3.'.format(recordsCount).s3Puts \ No newline at end of file