In [3]:
# basic packages
import os
import subprocess
import logging
from tqdm import tqdm
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from time import localtime, strftime

In [4]:
# sagemaker parameters
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
import boto3

session = sagemaker.Session()
role = get_execution_role()
bucket = session.default_bucket()
prefix = 'kaggle/tweetSentiment'  # Prefix should not tontain '/' at the end!
s3 = boto3.client('s3')

In [5]:
# directories
working_dir = '/home/ec2-user/SageMaker/kaggle_data'
data_dir = os.path.join(working_dir, 'processed_data/')
output_dir = os.path.join(working_dir, 'output/')
if not os.path.exists(output_dir):
    subprocess.check_call('mkdir {}'.format(output_dir), shell=True)

In [6]:
# data upload to S3
s3_object_dict = s3.list_objects_v2(
    Bucket=bucket,
    Prefix=prefix
)

# Fetch filenames in the data directory.
local_data_list = os.listdir(data_dir)

# Combine the name with the S3 bucket prefix.
local_data_list = [os.path.join(prefix, f) for f in local_data_list]

# Upload the data if they are not present in S3.
try:
    s3_object_list = [content['Key'] for content in s3_object_dict['Contents']]
    if set(local_data_list).intersection(s3_object_list) == set(local_data_list):
        test_location = os.path.join('s3://', bucket, prefix, 'test_processed.csv')        
        print("input_data already present in S3.")
        
    else:
        # Split the training data to train and velidation data.
        train = pd.read_csv(os.path.join(data_dir, 'train_processed.csv'), header=None)
        train, validation = train_test_split(
            train, 
            stratify=train[0].values,
            test_size=0.25,
            random_state=0
        )
        
        # Save them locally.
        train.to_csv(os.path.join(data_dir, 'train_processed_split.csv'), header=None, index=None)
        validation.to_csv(os.path.join(data_dir, 'validation.csv'), header=None, index=None)
        
        train = None,
        validation = None
        
        # Upload the data to S3.
        test_location = session.upload_data(
            path=os.path.join(data_dir, 'test_processed.csv'), 
            bucket=bucket,
            key_prefix=prefix
        )
        
        print("train and validation data uploaded to S3.")
        
except KeyError:  # if nothing exists in the S3 bucket.
    # Upload the data to S3.
    test_location = session.upload_data(
        path=os.path.join(data_dir, 'test_processed.csv'), 
        bucket=bucket,
        key_prefix=prefix
    )
    
    print("train and validation data uploaded to S3.")

input_data already present in S3.


In [7]:
with open('tf_model.txt', 'r') as f:
    tf_model = f.read().split()[0]
print(tf_model)


tensorflow-training-200604-2204-006-ca546ec3


In [8]:
from sagemaker.tensorflow.model import TensorFlowModel

In [9]:
from sagemaker.tensorflow import TensorFlow


tf_estimator = TensorFlow(
    entry_point='train.py',
    source_dir='source',
    role=role,
    train_instance_count=1,
    train_instance_type='ml.p2.xlarge',
    framework_version='2.1.0',
    py_version='py3',
    distributions={'parameter_server': {'enabled': True}},
)

Parameter distribution will be renamed to {'parameter_server': {'enabled': True}} in SageMaker Python SDK v2.


In [10]:
tf_estimator = tf_estimator.attach(tf_model)

2020-06-04 22:39:59 Starting - Preparing the instances for training
2020-06-04 22:39:59 Downloading - Downloading input data
2020-06-04 22:39:59 Training - Training image download completed. Training in progress.
2020-06-04 22:39:59 Uploading - Uploading generated training model
2020-06-04 22:39:59 Completed - Training job completed[34m2020-06-04 22:39:16,881 sagemaker-containers INFO     Imported framework sagemaker_tensorflow_container.training[0m
[34m2020-06-04 22:39:16,882 sagemaker-containers INFO     Failed to parse hyperparameter _tuning_objective_metric value loss to Json.[0m
[34mReturning the value itself[0m
[34m2020-06-04 22:39:16,910 sagemaker_tensorflow_container.training INFO     Appending the training job name to model_dir: s3://sagemaker-us-east-2-815596061983/kaggle/tweetSentiment/model/tensorflow-training-2020-06-04-22-04-41-432/model/tensorflow-training-200604-2204-006-ca546ec3/model[0m
[34m2020-06-04 22:39:17,258 sagemaker-containers INFO     Failed to parse

In [11]:
predictor = tf_estimator.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
'create_image_uri' will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.


-----------!

In [12]:
test = pd.read_csv('processed_data/test_processed.csv', header=None)
test = test.to_numpy()


In [13]:
# Sending all the test data at once returns 'Broken Pipe' error.
# So send them by chunks.

def predict_by_chunks(test_arr, chunk_size):
    test_split = np.array_split(test_arr, test_arr.shape[0] / chunk_size)
    for i in range(len(test_split)):
        result = predictor.predict(test_split[i])
        result = np.array(result['predictions'])
        if i == 0:
            result_all = result.copy()

        else:
            result_all = np.append(result_all, result)
            
    return result_all.astype(int)

In [14]:
y_pred = predict_by_chunks(test, 100)

In [15]:
test = pd.read_csv("test.csv")

In [16]:
out = pd.concat([test.id, pd.DataFrame(y_pred)], axis=1)
out.columns = ['id', 'target']

In [17]:
out.to_csv("output/tf_out.csv", header=True, index=False)
subprocess.check_call('aws s3 cp {} s3://{}/{}/tf_out.csv'.format(os.path.join(output_dir, 'tf_out.csv'), bucket, prefix), shell=True)

0

In [18]:
# Print a path to s3 location.
os.path.join('s3://', bucket, prefix, 'tf_out.csv')

's3://sagemaker-us-east-2-815596061983/kaggle/tweetSentiment/tf_out.csv'

In [19]:
predictor.delete_endpoint()