In [1]:
from sagemaker import get_execution_role
from sagemaker.session import Session
from sagemaker.tensorflow import TensorFlow
import boto3
import random
import os
import config
import json
import pandas as pd
import datetime
import gzip

In [2]:
def copy_object(src_bucket_name, src_object_name,
                dest_bucket_name, dest_object_name=None):
    """Copy an Amazon S3 bucket object

    :param src_bucket_name: string
    :param src_object_name: string
    :param dest_bucket_name: string. Must already exist.
    :param dest_object_name: string. If dest bucket/object exists, it is
    overwritten. Default: src_object_name
    :return: True if object was copied, otherwise False
    """

    # Construct source bucket/object parameter
    copy_source = {'Bucket': src_bucket_name, 'Key': src_object_name}
    if dest_object_name is None:
        dest_object_name = src_object_name

    # Copy the object
    s3_client = boto3.client('s3')
    try:
        s3_client.copy_object(CopySource=copy_source, Bucket=dest_bucket_name,
                       Key=dest_object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

def delete_files(bucket, prefix):
    for obj in boto3.resource('s3').Bucket(bucket).objects.filter(Prefix=prefix).all():
        obj.delete()

def copy_files(src_bucket, dst_bucket, files, dst_prefix):
    for file in files:
        copy_object(src_bucket, file, dst_bucket, '{}/{}'.format(dst_prefix, os.path.basename(file)))
        
def get_files(bucket, prefix, days=[], hours=[], ext='.csv'):
    all_files = []
    s3_resource = boto3.resource('s3')
    for day in days:
        for hour in hours:
            _prefix = '{}/d={}/h={}/'.format(prefix, day, hour)
            new_files = [ obj.key for obj in s3_resource.Bucket(bucket).objects.filter(Prefix=_prefix).all() if obj.key.endswith(ext) ]
            all_files = all_files + new_files
            
    return all_files
             

In [3]:
def calculate_stats_and_upload_to_s3(all_files):
    CONFIG = config.get_config()
    dtypes = config.get_types_of_attributes()
    df = pd.concat((pd.read_csv(f, sep=CONFIG['CSV_SEPARATOR'], compression='gzip', na_values=["null", "\\N"], dtype=dtypes) for f in all_files))
    stats_categorical = json.loads(df.describe(include='O').loc[[
        'count', 'unique'
    ]].to_json())
    stats_numeric = json.loads(df.describe().loc[[
        'count', 'mean', 'std', 'min', 'max'
    ]].to_json())

    weights = json.loads(df['deliveryid'].groupby([ df[label] for label in CONFIG['PREDICT_IMP_LABELS'] ]).agg(['count']).to_json())
    columns = df.columns.values
    
    STATS = json.dumps(obj={
            'columns': {
                'all': columns.tolist(),
                'categorical': list(stats_categorical.keys()),
                'numeric': list(stats_numeric.keys())
            },
            'stats': { **stats_numeric , **stats_categorical },
            'weights': { **weights }
        }, indent=4)

    s3 = boto3.resource('s3')
    s3.Bucket(CONFIG['S3_BUCKET']).put_object(Key=CONFIG['DATA_STATS_FILE_KEY'], Body=STATS)

In [4]:
def add_weight_column_to_dataset(all_files):
    CONFIG = config.get_config()
    dtypes = config.get_types_of_attributes()
    df = pd.concat((pd.read_csv('s3://{}/{}'.format(CONFIG['S3_BUCKET'], f), sep=CONFIG['CSV_SEPARATOR'], compression='gzip', na_values=["null", "\\N"], dtype=dtypes) for f in all_files))
    weights = json.loads(df['deliveryid'].groupby([ df[label] for label in CONFIG['PREDICT_IMP_LABELS'] ]).agg(['count']).to_json())
    total = float(df['deliveryid'].count())
    def get_weight(row):
        labels = CONFIG['PREDICT_IMP_LABELS']
        key = ','.join([ str(row[label]) for label in labels ])
        if len(labels) > 1:
            key = '[{}]'.format(key)
        freq = 1.0
        if key in weights['count']:
            freq = weights['count'][key]
#         prob = freq / total
#         target_prob = 1. / ( 2.0 ** len(labels) )
#         return target_prob / prob
        return total / ( ( 2.0 ** len(labels) ) * freq )
    s3 = boto3.resource('s3')
    for f in all_files:
        df = pd.read_csv('s3://{}/{}'.format(CONFIG['S3_BUCKET'], f), sep=CONFIG['CSV_SEPARATOR'], compression='gzip', na_values=["null", "\\N"], dtype=dtypes)
        df['weight'] = df.apply (lambda row: get_weight(row), axis=1)
        new_data = df.to_csv(sep=CONFIG['CSV_SEPARATOR'], index=False, na_rep="null")
        new_data = gzip.compress(bytes(new_data, 'utf-8'))
        s3.Bucket(CONFIG['S3_BUCKET']).put_object(Key=f, Body=new_data)

In [5]:
def get_hours(startHour=None, numberOfHours=6):
    numberOfHours = numberOfHours % 25
    if startHour is None:
        startHour = datetime.datetime.now().hour
    hours = ["{:02d}".format( (hour + 24) % 24 ) for hour in range(startHour, startHour - numberOfHours, -1)]
    
    return hours

# print(get_hours(23, 24))

In [6]:
SRC_BUCKET = 'wsbidder'
DST_BUCKET = 'wsbidder'

SRC_PREFIX = 'tsv/etl/imp-pred-service-v1/imppredservice_training_data'
# SRC_PREFIX = 'trainer_predict_imp/data2'
DST_PREFIX = 'trainer_predict_imp/data'

delete_files(DST_BUCKET, DST_PREFIX + '/train')
delete_files(DST_BUCKET, DST_PREFIX + '/eval')

eval_files_date = str( datetime.date.today() - datetime.timedelta(1) )
filter_days = [ str( datetime.date.today() - datetime.timedelta(i + 2) ) for i in range(7) ]
filter_hours = get_hours(23, 24)

all_files = get_files(SRC_BUCKET, SRC_PREFIX, filter_days, filter_hours, ext='.gz')

# number_of_files = len(all_files)

# # random.shuffle(all_files)
# train_length = int(number_of_files * 0.6)
# train_files = all_files[:train_length]
# eval_files = all_files[train_length + 1:]

train_files = [ file_path for file_path in all_files if 'd={}'.format(eval_files_date) not in file_path ]
eval_files = [ file_path for file_path in all_files if 'd={}'.format(eval_files_date) in file_path ]

print('Files separated')
add_weight_column_to_dataset(all_files)
print('Added weight')

copy_files(SRC_BUCKET, DST_BUCKET, train_files, DST_PREFIX + '/train')
copy_files(SRC_BUCKET, DST_BUCKET, eval_files, DST_PREFIX + '/eval')

Files separated
Added weight


In [7]:
# print([ 's3://{}/{}/train/{}'.format(DST_BUCKET, DST_PREFIX, os.path.basename(filename)) for filename in train_files ])
calculate_stats_and_upload_to_s3([ 's3://{}/{}/train/{}'.format(DST_BUCKET, DST_PREFIX, os.path.basename(filename)) for filename in train_files ])