Here are the instructions to create AWS Lambda Function to transform data into Parquet file format. The data will also follow the foler hierarchy of year, month and then day of month. The folder structure can be interpreted as partitions by Glue Catalog or Spark.

* Make sure to add a Lambda Layer with pandas, pyarrow, s3fs, fsspec, etc. We will use latest version of Python (Python 3.7) in Cloud shell.
* Create a Lambda Function with blue print to understand how metadata is passed on triggering Lambda Function using s3 Event Notification.
* Once we create Lambda Function, we will work on configuring Lambda Trigger based on s3 Event Notification.
* We need to copy a file into the relevant location in s3 to validate if the Lambda Function is triggered or not.
* Update the Lambda Function with required Pandas based processing logic. It will be integrated with the Metadata passed when Lambda Function is triggered when object is placed in s3.


In [None]:
# Here is the Lambda Function which acts as blue print
# We will use Python 3.7 as the version in Cloudshell is Python 3.7

import urllib
import json
import boto3

def lambda_handler(event, context):
    for rec in event['Records']:
        bucket_name = rec['s3']['bucket']['name']
        key = urllib.parse.unquote(rec['s3']['object']['key'])

        s3_client = boto3.client('s3')
        log_messages = s3_client. \
            get_object(
                Bucket=bucket_name,
                Key=key
            )['Body']. \
            read(). \
            decode('utf-8'). \
            splitlines()
        print(json.loads(log_messages[0]))
        print(f'Number of messages in the file {key} is {len(log_messages)}')
    
    return {
        'statusCode': 200,
        'statusMessage': f'Successfully processed!!!'
    }

In [3]:
!aws s3 ls s3://airetail/topics/retail_logs_backup/partition=1/

2022-08-10 00:40:49      29233 retail_logs+1+0000011280.bin
2022-08-10 00:40:49      28978 retail_logs+1+0000011400.bin
2022-08-10 00:40:49      29048 retail_logs+1+0000011520.bin
2022-08-10 00:40:49      29734 retail_logs+1+0000011640.bin
2022-08-10 00:40:49      29439 retail_logs+1+0000011760.bin
2022-08-10 00:40:49      29007 retail_logs+1+0000011880.bin
2022-08-10 00:40:49      29025 retail_logs+1+0000012000.bin
2022-08-10 00:40:49      29611 retail_logs+1+0000012120.bin
2022-08-10 00:40:49      29323 retail_logs+1+0000012240.bin
2022-08-10 00:40:49      30258 retail_logs+1+0000012360.bin
2022-08-10 00:40:49      29398 retail_logs+1+0000012480.bin
2022-08-10 00:40:49      28936 retail_logs+1+0000012600.bin
2022-08-10 00:40:49      29523 retail_logs+1+0000012720.bin
2022-08-10 00:40:49      29239 retail_logs+1+0000012840.bin
2022-08-10 00:40:49      29321 retail_logs+1+0000012960.bin
2022-08-10 00:40:49      29246 retail_logs+1+0000013080.bin
2022-08-10 00:40:49      29286 retail_lo

In [4]:
import boto3

In [5]:
s3_client = boto3.client('s3')

In [8]:
object_body = s3_client.get_object(
    Bucket='airetail',
    Key='topics/retail_logs_backup/partition=1/retail_logs+1+0000011280.bin'
)['Body'].read()

In [9]:
s3_client.put_object(
    Body=object_body,
    Bucket='airetail',
    Key='topics/retail_logs/partition=1/retail_logs+1+0000011280.bin'
)

{'ResponseMetadata': {'RequestId': 'B12Q4JS450M1DQXN',
  'HostId': 'VXFStNt+I0yMOortPibcTxEYFCtt8AXOGQ+umiolyuOpn4Yyo6wfKnHBU5Ul3Zoj6Q4YOs9lef0=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'VXFStNt+I0yMOortPibcTxEYFCtt8AXOGQ+umiolyuOpn4Yyo6wfKnHBU5Ul3Zoj6Q4YOs9lef0=',
   'x-amz-request-id': 'B12Q4JS450M1DQXN',
   'date': 'Thu, 11 Aug 2022 01:14:08 GMT',
   'etag': '"9cc1854dd974d3e0e51be5c8f17f1f83"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"9cc1854dd974d3e0e51be5c8f17f1f83"'}

In [10]:
!aws s3 ls s3://airetail/topics/retail_logs/partition=1/

2022-08-11 01:14:08      29233 retail_logs+1+0000011280.bin


In [None]:
import urllib
import json
import boto3
import pandas as pd

def lambda_handler(event, context):
    print('Processing Started')
    for rec in event['Records']:
        bucket_name = rec['s3']['bucket']['name']
        object_key = urllib.parse.unquote(rec['s3']['object']['key'])

        df = pd.read_json(
            f's3://{bucket_name}/{object_key}',
            lines=True
        )

        df['year'] = df['date'].apply(lambda dt: str(dt)[:4])
        df['month'] = df['date'].apply(lambda dt: str(dt)[5:7])
        df['dayofmonth'] = df['date'].apply(lambda dt: str(dt)[8:10])
        df_grouped = df.groupby(['year', 'month', 'dayofmonth'])
        orig_file_name = object_key.split('/')[-1].split('.')[0]
        for key in df_grouped.groups.keys():
            df_by_key = df_grouped.get_group(key).drop(labels=['year', 'month', 'dayofmonth'], axis=1)
            year = list(key)[0]
            month = list(key)[1]
            dayofmonth = list(key)[2]
            file_name = f'{orig_file_name}.snappy.parquet'
            df_by_key.to_parquet(f's3://{bucket_name}/silver/retail_logs/year={year}/month={month}/dayofmonth={dayofmonth}/{file_name}')
        print(f'{df.shape[0]} records in s3://{bucket_name}/{object_key} are converted to Parquet Format')
    print('Processing is successfully done')
    return {
        'statusCode': 200,
        'statusMessage': f'Successfully processed!!!'
    }