# AWS with boto3
## CloudWatch
## S3

In [1]:
import logging
import datetime

import boto3
from botocore.exceptions import ClientError

## CloudWatch

In [2]:
def get_unix_ts_int(ts):
    return (int(ts.timestamp() * 1000))

class AWSCloudWatchLogHandler:
    def __init__(self, group_name, stream_name):
        self.group_name = group_name
        self.stream_name = stream_name
        return

    def get_log_events(self, s_ts, e_ts):
        unix_s_ts_int = get_unix_ts_int(s_ts)
        unix_e_ts_int = get_unix_ts_int(e_ts)
        client = boto3.client('logs')
        try:
            resp = client.get_log_events(
                logGroupName=self.group_name,
                logStreamName=self.stream_name,
                startTime=unix_s_ts_int,
                endTime=unix_e_ts_int,
                limit=100,
                startFromHead=False
            )
            return resp.get('events')
        except ClientError as e:
            logging.error(e)
            return []
    
    def put_log_events(self, message_list):
        """send events to AWS CloudWatch log stream
        :param message_list, a list of message string to be sent
        :return: True if sent successfully, else False
        """
        unix_ts_int = get_unix_ts_int(datetime.datetime.now())
        event_list = [{
            'timestamp': unix_ts_int,
            'message': m
        } for m in message_list]
        client = boto3.client('logs')
        # Get upload sequence token first
        try:
            resp = client.describe_log_streams(
                logGroupName=self.group_name,
                logStreamNamePrefix=self.stream_name,
                orderBy='LogStreamName',
                descending=False,
            )
        except ClientError as e:
            logging.error(e)
            return False

        stream_list = resp.get('logStreams')
        upload_seq_token = None
        for stream in stream_list:
            if stream.get('logStreamName') == self.stream_name:
                upload_seq_token = stream.get('uploadSequenceToken')
                break
        if upload_seq_token == None:
            logging.error(f"Invalid upload sequence token: {upload_seq_token}")
            return False

        try:
            resp = client.put_log_events(
                logGroupName=self.group_name,
                logStreamName=self.stream_name,
                logEvents=event_list,
                sequenceToken=upload_seq_token
            )
        except ClientError as e:
            logging.error(e)
            return False
        return True

In [3]:
test_cw_group_name = "Customized"
test_cw_stream_name = "feed-data-quality"

In [4]:
test_cw_log_handler_cls = AWSCloudWatchLogHandler(
    group_name=test_cw_group_name, stream_name=test_cw_stream_name
)

In [5]:
test_log_msg_list = ["test_msg_5", "test_msg_6"]

In [6]:
test_cw_log_handler_cls.put_log_events(test_log_msg_list)

True

In [7]:
test_cw_log_handler_cls.get_log_events(
    s_ts=datetime.datetime.strptime("2020-05-01", '%Y-%m-%d'),
    e_ts=datetime.datetime.strptime("2020-05-31", '%Y-%m-%d')
)

[{'timestamp': 1590895489990,
  'message': 'test_msg_1',
  'ingestionTime': 1590895490205},
 {'timestamp': 1590895489990,
  'message': 'test_msg_2',
  'ingestionTime': 1590895490205},
 {'timestamp': 1590897169642,
  'message': 'test_msg_3',
  'ingestionTime': 1590897169849},
 {'timestamp': 1590897169642,
  'message': 'test_msg_4',
  'ingestionTime': 1590897169849},
 {'timestamp': 1590897264954,
  'message': 'test_msg_5',
  'ingestionTime': 1590897265236},
 {'timestamp': 1590897264954,
  'message': 'test_msg_6',
  'ingestionTime': 1590897265236}]

In [8]:
del test_cw_log_handler_cls

## S3

In [2]:
class AWSS3ObjHandler:
    def __init__(self, bucket_name):
        self.bucket_name = bucket_name
        return

    def get_obj_blob(self, obj_name):
        """Upload a file to an S3 bucket
        :param obj_name: S3 object name
        :return: file blob if success, otherwise None
        """
        s3_client = boto3.client('s3')

        try:
            resp = s3_client.get_object(
                Bucket=self.bucket_name,
                Key=obj_name
            )
            blob = resp['Body'].read()
        except ClientError as e:
            logging.error(e)
            return None
        return blob

    @staticmethod
    def dump_obj_blob(blob, file_path):
        """Dump obj blob to local file
        :param blob: content to be dumped
        :param file_path: file path for the dumped blob
        :return: True if file was dumped, else False
        """
        return True

    def put_obj_by_file_path(self, file_path, obj_name=None):
        """Upload a file to an S3 bucket
        :param file_path: File to upload
        :param obj_name: S3 object name. If not specified then file_path is used
        :return: True if file was uploaded, else False
        """
        # If S3 object_name was not specified, use file_name
        if obj_name is None:
            obj_name = file_path
    
        # Upload the file
        s3_client = boto3.client('s3')
        try:
            resp = s3_client.upload_file(
                file_path, self.bucket_name, obj_name
            )
        except ClientError as e:
            logging.error(e)
            return False
        return True

    def delete_obj(self, obj_name):
        return

In [3]:
test_file_name = "altDailyFactorBankDQReport_2020_2020-05-29.pdf"
test_file_path = f"/Users/huawei/Downloads/{test_file_name}"
test_bucket_name = "feed-data-quality"
test_s3_obj_name = f"report/{test_file_name}"

In [4]:
test_s3_obj_handler_cls = AWSS3ObjHandler(
    bucket_name=test_bucket_name
)

In [5]:
test_s3_obj_handler_cls.put_obj_by_file_path(
    file_path=test_file_path,
    obj_name=test_s3_obj_name
)

True

In [5]:
test_blob = test_s3_obj_handler_cls.get_obj_blob(
    obj_name=test_s3_obj_name
)

In [6]:
del test_s3_obj_handler_cls