In [None]:
from datetime import datetime
from typing import Iterable
from open_connector.events.sns import EmailsS3SNSEvent
from open_connector.protocol.pipeline import OpenMessage, Metadata
from open_connector.pipeline import Pipeline
from source_s3 import OpenS3Source
from destination_s3 import OpenS3Destination
import boto3

# Create S3 client
s3 = boto3.client('s3')


class EmailLandingSource(OpenS3Source):

    def __init__(self, event: dict):
        super().__init__()
        self._event = event

    def read(self) -> Iterable[OpenMessage]:
        """Get the newly added emails. Yield them back as OpenMessages."""

        # Loop over each new emails in the datalake
        for s3_object in EmailsS3SNSEvent(self._event):

            # Get file from S3
            response: dict = self.s3.get_object(
                Bucket=s3_object.bucket,
                Key=s3_object.key
            )

            # Save as bytes
            file_object: bytes = response['Body'].read()

            # Send to the destination
            yield OpenMessage(
                data=file_object,
                metadata=Metadata(
                    ingestion_time=datetime.now().isoformat(),
                    output_format='email'
                )
            )


def lambda_handler(event, context):

    # Create a source to dest pipeline
    pipeline = Pipeline(
        source=EmailLandingSource(event),
        destination=OpenS3Destination()
    )

    # Execute the pipeline
    pipeline.execute()

    # Remove the landed emails
    for s3_object in EmailsS3SNSEvent(event):
        s3.delete_object(
            Bucket=s3_object.bucket,
            Key=s3_object.key,
        )

    return {'statusCode': 200}