In [12]:
import boto3, logging, botocore
from botocore.config import Config
import pandas as pd
import logging, io
import re
from datetime import datetime

In [4]:
current_ts = datetime.now().strftime('%Y%m%d_%H%M%S')
# Upload data to S3 bucket on web console
# Data: https://github.com/erkansirin78/datasets/raw/master/dirty_store_transactions.csv

In [23]:
def get_s3_client():
    s3 = boto3.client('s3')
    return s3

def get_s3_resource():
    s3 = boto3.resource('s3')
    return s3

def load_df_from_s3(bucket, key, sep=",", index_col=None, usecols=None):
    ''' Read a csv from a s3 bucket & load into pandas dataframe'''
    s3 = get_s3_client()
    try:
        logging.info(f"Loading {bucket, key}")
        obj = s3.get_object(Bucket=bucket, Key=key)
        return pd.read_csv(obj['Body'], sep=sep, index_col=index_col, usecols=usecols, low_memory=False)
    except botocore.exceptions.ClientError as err:
        status = err.response["ResponseMetadata"]["HTTPStatusCode"]
        errcode = err.response["Error"]["Code"]
        if status == 404:
            logging.warning("Missing object, %s", errcode)
        elif status == 403:
            logging.error("Access denied, %s", errcode)
        else:
            logging.exception("Error in request, %s", errcode)


def save_df_to_s3(df, bucket, prefix, key=None, s3_resource=None, index_col=None, usecols=None, sep=';'):
    ''' Store df as a buffer, then save buffer to s3'''
    try:
        csv_buffer = io.StringIO()
        df.to_csv(csv_buffer, index=False)
        key = prefix if not prefix else prefix + '/' + key
        s3_resource.Object(bucket, key).put(Body=csv_buffer.getvalue())
        logging.info(f'{key} saved to s3 bucket {bucket}')
    except Exception as e:
        raise logging.exception(e)



def data_cleaner(df):
    def clean_store_location(st_loc):
        return re.sub(r'[^\w\s]', '', st_loc).strip()

    def clean_product_id(pd_id):
        matches = re.findall(r'\d+', pd_id)
        if matches:
            return matches[0]
        return pd_id

    def remove_dollar(amount):
        return float(amount.replace('$', ''))

    df['STORE_LOCATION'] = df['STORE_LOCATION'].map(lambda x: clean_store_location(x))
    df['PRODUCT_ID'] = df['PRODUCT_ID'].map(lambda x: clean_product_id(x))

    for to_clean in ['MRP', 'CP', 'DISCOUNT', 'SP']:
        df[to_clean] = df[to_clean].map(lambda x: remove_dollar(x))

    return df

In [9]:
source_bucket = "vbo-de-input"
source_key = "transactions/dirty-input/dirty_store_transactions_1k.csv"

In [15]:
df = load_df_from_s3(bucket=source_bucket, key=source_key)

In [16]:
df.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR1475,New York*,Electronics,90175476,$38,$25.84,$3.42,$34.58,2019-11-26
1,OJ2470,Denver,Kitchen,75536193,$96,$70.08,$4.8,$91.2,2019-11-26
2,XV5488,Washington,Kitchen,58647460,$68,$51,$2.04,$65.96,2019-11-26
3,OJ9422,Denver,Fashion,60402526,$74,$51.8,$6.66,$67.34,2019-11-26
4,XV7687,Washington%,Kitchen,68167114,$78,$56.16,$5.46,$72.54,2019-11-26


In [18]:
clean_df = data_cleaner(df)

In [19]:
clean_df.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR1475,New York,Electronics,90175476,38.0,25.84,3.42,34.58,2019-11-26
1,OJ2470,Denver,Kitchen,75536193,96.0,70.08,4.8,91.2,2019-11-26
2,XV5488,Washington,Kitchen,58647460,68.0,51.0,2.04,65.96,2019-11-26
3,OJ9422,Denver,Fashion,60402526,74.0,51.8,6.66,67.34,2019-11-26
4,XV7687,Washington,Kitchen,68167114,78.0,56.16,5.46,72.54,2019-11-26


In [24]:
s3_res = get_s3_resource()

In [25]:
save_df_to_s3(df=clean_df, bucket="vbo-de-output", prefix="clean_data", key="clean_store_transactions_1k.csv",
             s3_resource=s3_res)

In [26]:
df_from_clean_s3 = load_df_from_s3(bucket="vbo-de-output", key="clean_data/clean_store_transactions_1k.csv")

In [27]:
df_from_clean_s3.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR1475,New York,Electronics,90175476,38.0,25.84,3.42,34.58,2019-11-26
1,OJ2470,Denver,Kitchen,75536193,96.0,70.08,4.8,91.2,2019-11-26
2,XV5488,Washington,Kitchen,58647460,68.0,51.0,2.04,65.96,2019-11-26
3,OJ9422,Denver,Fashion,60402526,74.0,51.8,6.66,67.34,2019-11-26
4,XV7687,Washington,Kitchen,68167114,78.0,56.16,5.46,72.54,2019-11-26
