In [28]:
import logging
import boto3
import os
import pandas as pd
import sqlalchemy as sa
from datetime import datetime
from botocore.exceptions import ClientError
from sqlalchemy.schema import CreateTable

"""
S3 folder structure
 files  : received
 folder : processed
 folder : rejected  

Sucessfully processedfiles moved to process folder otherwise reject folder
TODO : Add multithreding for scaling and perfomance
"""

def download_s3_files(bucket_name,ACCESS_KEY,SECRET_KEY,REGION_NAME,SESSION_TOKEN,folder_name):
    """ List the objects in an Amazon S3 bucket
    :param bucket_name: string
    :return : List of bucket objects. If error, return None.
    """
    # Retrive the list of the bucket objects
    session = boto3.Session(aws_access_key_id=ACCESS_KEY,aws_secret_access_key=SECRET_KEY,)
    s3_resource = session.resource('s3')
    s3_bucket = s3_resource.Bucket(bucket_name)
    prefix_received = 'received'
    prefix_rejected = 'rejected'
    
    try:
        logging.info('Name of the folder: ' + folder_name)
    
#         for file_object in s3_bucket_received.objects.filter(Prefix=bucket_name):
        for file_object in s3_bucket.objects.filter(Prefix=prefix_received):
            logging.info('File Name : ' + file_object.key)
            s3_path,s3_filename = get_s3_path_filename(file_object.key)
            local_folder_path = os.path.join(*[os.curdir,folder_name, s3_path])
            local_full_path = os.path.join(*[local_folder_path,s3_filename])
            logging.info('Folder Path: '+ local_folder_path)
            logging.info('Full Path : '+ local_full_path)
            
            # Create the path before download
            makeDir(local_folder_path)
            
            # Download the file now and skip the root folder
            if not file_object.key == prefix_received+'/':
                s3_bucket.download_file(file_object.key, local_full_path)
            
            if valid_parquet_file(file_object.key, local_full_path):
                try:
                    process_parquet_to_redshift(file_object.key,local_full_path)
                    moveto_s3file_processed(s3_path,s3_filename)
                except Exception as e:
                    logging.error(e)
                    moveto_s3file_rejected(s3_path,s3_filename)
            else:
                moveto_s3file_rejected(s3_path,s3_filename)
              
    except ClientError as e:
        #AllAccessDisabled error == bucket not found
        logging.error(e)
        return None
    return local_full_path


def process_parquet_to_redshift(file_name,local_full_path):
    """
     Process parquet file and moved to resshift
    """
    try:
        connstr = 'redshift+psycopg2://awsuser:Dummy143@redshift-cluster-1.c8gf2sevvj4b.us-east-1.redshift.amazonaws.com:5439/dev_sfdc'
        logging.info('Processing file ' +file_name+ ' started.')
        df = pd.read_parquet(local_full_path,engine='pyarrow')
        df_filtered = df[['id','first_name','last_name']]
        connstr = 'redshift+psycopg2://awsuser:Dummy143@redshift-cluster-1.c8gf2sevvj4b.us-east-1.redshift.amazonaws.com:5439/dev_sfdc'
        engine = sa.create_engine(connstr)
        df_indexed = df_filtered.set_index('id')
        print(df_indexed)
#         TODO : Create Table or Add to the existing table
        df_indexed.to_sql('users',engine, index = False, if_exists='replace', schema = 'public')
    except ClientError as e:
        logging.error(e)
    return None    

    
def moveto_s3file_processed(s3_path,s3_filename):
    """
     Move S3 file to S3 processed location
    """
    logging.info('TODO : file moved to processed folder')
    
    
def moveto_s3file_rejected(s3_path,s3_filename):
    """
    Move S3 file to S3 rejected location
    """
    logging.info('TODO : file moved to rejected folder')
    
    
def valid_parquet_file(file_name, local_full_path):
    """
    TODO check if the received parquet file is valie
    """
    return True

def makeDir(path):
    """
    Create local temparory directory
    """
    try:
        if not os.path.exists(path):
            os.makedirs(path)
        
    except ClientError as e:
        logging.error(e)
        return None
    return None
    
    
def get_s3_path_filename(key):
    """ Get the linux acceptable file name 
    """ 
    key = str(key)
    return key.replace(key.split('/')[-1],""),  key.split('/')[-1]

def create_folder_name():
    """ Current date and time to create the folder name for this run
    """ 
    folder_name_temp = str(datetime.now())
    folder_name_temp = folder_name_temp.translate({ord(c): " " for c in "!@#$%^&*()[]{};:,./<>?\|`~-=_+"})
    folder_name = folder_name_temp.replace(' ','')
    return folder_name

def main():
    """List of the objects for S3 Bucket
    """
    # Assign this value before running the program
    s3_bucket_name = 'sdfc-parquet-files'
    
    ACCESS_KEY = 'AKIARSF242L6P5APOFWP'
    SECRET_KEY = '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'
    REGION_NAME = 'us-east-1'
    local_folder_path = create_folder_name()
    
    # Set up logging
    logging.basicConfig(level=logging.DEBUG,
                       format='%(levelname)s: %(asctime)s: %(message)s')
    # Download the bucket's objects
    file_path = download_s3_files(s3_bucket_name,
                                  ACCESS_KEY,
                                  SECRET_KEY,
                                  REGION_NAME,
                                  None, create_folder_name())
    
    logging.info('Files created at location : ' + file_path )
    # Read the Parquet files from today's file folder and store to redshift
    
    # Move process all S3 files to Processed folder
    
   
if __name__ == '__main__':
    main()

DEBUG: 2019-08-26 02:04:14,615: Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
DEBUG: 2019-08-26 02:04:14,621: Changing event name from before-call.apigateway to before-call.api-gateway
DEBUG: 2019-08-26 02:04:14,622: Changing event name from request-created.machinelearning.Predict to request-created.machine-learning.Predict
DEBUG: 2019-08-26 02:04:14,625: Changing event name from before-parameter-build.autoscaling.CreateLaunchConfiguration to before-parameter-build.auto-scaling.CreateLaunchConfiguration
DEBUG: 2019-08-26 02:04:14,626: Changing event name from before-parameter-build.route53 to before-parameter-build.route-53
DEBUG: 2019-08-26 02:04:14,627: Changing event name from request-created.cloudsearchdomain.Search to request-created.cloudsearch-domain.Search
DEBUG: 2019-08-26 02:04:14,629: Changing event name from docs.*.autoscaling.CreateLaunchConfiguration.complete-section to docs.*.auto-scaling.CreateLaunchConfiguration.complet

      first_name  last_name
id                         
1         Amanda     Jordan
2         Albert    Freeman
3         Evelyn     Morgan
4         Denise      Riley
5         Carlos      Burns
6        Kathryn      White
7         Samuel     Holmes
8          Harry     Howell
9           Jose     Foster
10         Emily    Stewart
11         Susan    Perkins
12         Alice      Berry
13        Justin      Berry
14         Kathy   Reynolds
15       Dorothy     Hudson
16         Bruce     Willis
17         Emily    Andrews
18       Stephen    Wallace
19      Clarence     Lawson
20       Rebecca       Bell
21         Diane    Stevens
22      Lawrence      Ramos
23       Gregory     Barnes
24      Michelle      Ellis
25        Rachel    Perkins
26       Anthony   Lawrence
27         Henry      Henry
28        Samuel     Hunter
29    Jacqueline     Holmes
30         Annie     Torres
...          ...        ...
971        Karen    Garrett
972         Alan     Hudson
973        Bobby    

INFO: 2019-08-26 02:05:17,221: TODO : file moved to processed folder
INFO: 2019-08-26 02:05:17,226: Files created at location : ./20190826020414613825/received/userdata1.parquet
