In [None]:
## This is a demonstration of Chalice App to process email log files

from chalice import Chalice
import boto3
import logging
import json
import ast
from pprint import pprint
import re, datetime
from copy import deepcopy

app = Chalice(app_name='email_parser')

app.log.setLevel(logging.DEBUG)

# s3 config
s3 = boto3.client('s3')
bucket_name = 'mango-email'

email_folder_path = 'mango-email-test/'
email_parsed_data_folder_path = 'mango-email/processed-data/'

In [None]:
# global, timestamp
curr_ymdh = datetime.datetime.now().strftime("%Y%m%d%H")

In [None]:
# email_to can be a list with 1 or more element, or string
def clean_email_to(email_to):
    if isinstance(email_to, list):
        email_to = [x.strip() for x in email_to if x.strip() != 'admin@mango.com']
        if len(email_to) > 1:
            email_to = '|'.join(email_to)
        elif len(email_to) == 1:
            email_to = email_to[0]
        else:
            # there is no more email, so len(email_to) == 0
            return ''

    # if email_to is a str
    elif email_to.strip() == 'admin@mango.com':
        return ''
    
    return email_to

In [None]:
## We can have log email from different vendors like Amazon SES, Mailgun, Mailjet

In [None]:
def x_provider_parser(x_provider):
    global curr_ymdh
    obj = s3.get_object(Bucket=bucket_name, Key=x_provider)
    file_data = obj['Body'].read().decode('utf-8')

    file_data_list = file_data.strip().split('\n')

    for idx, data in enumerate(file_data_list):
        try:
            data = json.loads(data)
            parsed_dict = {}   
            parsed_dict['email_to'] = data.get('mail', {'destination': ''}).get('destination', '')
            parsed_dict['email_to'] = clean_email_to(parsed_dict['email_to'])
            if parsed_dict['email_to'] == '':
                continue

            # some other key extraction ...


            parsed_dict['key1'] = {
                # this data is in nested dict
                'key2': data.get('key2', {'key3': ''}).get('key4', ''),
                'key5': data.get('key6', {'key7': ''}).get('key8', {'key9': ''}).get('key10', '')
            }

            output_dict_key = f"{curr_ymdh}_{x_provider.rsplit('/', 1)[1]}_{parsed_dict['email_to']}_{parsed_dict['key11']}_{idx}.json"
            
            s3.put_object(Bucket=bucket_name,
                    Key=f"{email_parsed_data_folder_path}{output_dict_key}",
                    Body=json.dumps(parsed_dict, indent=4).encode('utf-8'))

        except Exception as e:
            error = f"x_provider_{x_provider.rsplit('/', 1)[1]}_{idx}_{e}"
            app.log.error(error)

In [None]:
## Structure is different for different providers, so we code separate function for different provider.
## Interestingly, one provider had 2 type of structure but it was not mentioned in the documentatin.
## Because of logging error, we find out this issue and handle both case correctly.

In [None]:
# This function will be triggered on s3 events, like upload or modify of a file
@app.on_s3_event(bucket=bucket_name)
def email_parser(event):

    if re.search(r'mango-email-test/(.+?)/(.+?)$', event.key, re.MULTILINE | re.IGNORECASE) == None and re.search(r'raw_logs/(.+?)/(.+?)$', event.key, re.MULTILINE | re.IGNORECASE) == None:
        return

    if '/x-provider/' in event.key:
        # call the respective function and process the data
        pass

    
    elif '/y-provider/' in event.key:
        # call the respective function and process the data
        pass


In [None]:
# Chalice also offers API, this end point is to test purpose
@app.route('/')
def tester_call():
    global curr_ymdh
    
    response = s3.list_objects_v2(
            Bucket=bucket_name,
            Prefix=email_folder_path)
    
    print("Test started at " + datetime.datetime.now().strftime("%Y%m%d%H") + ".")

    for event_key in response['Contents']:
        if re.search(r'mango-email-test/(.+?)/(.+?)$', event.key, re.MULTILINE | re.IGNORECASE) == None and re.search(r'raw_logs/(.+?)/(.+?)$', event.key, re.MULTILINE | re.IGNORECASE) == None:
        return

    if '/x-provider/' in event.key:
        # call the respective function and process the data
        pass

    
    elif '/y-provider/' in event.key:
        # call the respective function and process the data
        pass
    
    print("Test ended at " + datetime.datetime.now().strftime("%Y%m%d%H") + ".")