## 1: Dependencies

In [2]:
import os
import pandas as pd

import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

import boto3

## 2: AWS Setup

In [3]:
s3_bucket = 'project-thunderbirds-storage'
prefix = 'Scikit-LinearLearner-pipeline-disaster-tweets'
DATA_DIR = 'data'
prefix = 'sagemaker-template-test'
import sagemaker
from sagemaker import get_execution_role
sagemaker_session = sagemaker.Session()
# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

## 3: Data

You may want to be able to do a few different things with datasets:
 - Use local data (either before uploading to S3 or for local sessions)
 - Upload the local data to S3
 - Use data already uploaded on S3

### 3.1: Local Data : Train

In [4]:
train_dataset = pd.read_csv('data/clean_train.csv')
train_dataset.sample(5)

Unnamed: 0,data,label
11905,screwed,0
25206,wondering earthquake haiti would put unsalted ...,1
6543,crazy ideas album im soooo tired,0
15267,approximately persons camping outside roadside,1
27053,rawalpindi nov app governor punjab lt gen r kh...,1


 ### Local Data: Validation

In [5]:
batch_dataset = pd.read_csv('data/validation_dataset_utf8_location_and_message.csv')
batch_dataset.sample(5)

Unnamed: 0,location,data
3279,"Essex, Massachusetts",bane le babylonian breaking back one chain bru...
2434,"Atlantic City, New Jersey",folks big storm key food http co
4096,"Monmouth County, New Jersey",cowanjacob hate jumped front car would hit bra...
7530,"Cape May, New Jersey",done single hurricane sandy joke till might lo...
8963,"Cape May, New Jersey",every season praise jah barbados usual path hu...


### 3.2: Upload Data

In [6]:
train_dataset.to_csv("data/clean_train.csv", index=False)

# We uploading the /data folder to a bucket
# data = sagemaker_session.upload_data(DATA_DIR, key_prefix="{}/{}".format(prefix, DATA_DIR))

### UPLOAD DATA TO S3 FOR PASSING TO MODEL LATER ON ###
WORK_DIRECTORY = 'data'

train_data = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'clean_train.csv'), 
    bucket=s3_bucket,
    key_prefix='{}/{}'.format(prefix, 'train'))

print ("Bucket: " + sagemaker_session.default_bucket())
print ("Data: " + train_data)

Bucket: sagemaker-eu-west-2-663466646878
Data: s3://project-thunderbirds-storage/sagemaker-template-test/train/clean_train.csv


In [7]:
batch_dataset.to_csv("data/batch.csv", index=False)

# We uploading the /data folder to a bucket
# data = sagemaker_session.upload_data(DATA_DIR, key_prefix="{}/{}".format(prefix, DATA_DIR))

### UPLOAD DATA TO S3 FOR PASSING TO MODEL LATER ON ###
WORK_DIRECTORY = 'data'

batch_data = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'batch.csv'), 
    bucket=s3_bucket,
    key_prefix='{}/{}'.format(prefix, 'batch_input'))

print ("Bucket: " + sagemaker_session.default_bucket())
print ("Data: " + batch_data)

Bucket: sagemaker-eu-west-2-663466646878
Data: s3://project-thunderbirds-storage/sagemaker-template-test/batch_input/batch.csv


### 3.3: Test Data Downloaded Successfully from S3

In [8]:
# Sanity Testing batch_input
pd.read_csv(batch_data)

Unnamed: 0,location,data
0,"New York City, New York",got enough candles supply mexican family
1,"New York City, New York",sandy soooo mad shattering doors shiet hurrica...
2,"New York City, New York",ibexgirl thankfully hurricane waugh played coo...
3,"New York City, New York",taos never got magnificent case burgundy sent ...
4,"New York City, New York",mad river bar amp grille new york ny http co v...
...,...,...
10003,"New York City, New York",sandy weak name hurricane
10004,"New York City, New York",seaoftime freaking excited know plans hurricane
10005,"New York City, New York",rt find hurricane jokes funny itsnotajoke
10006,"New York City, New York",best wishes friends northeast stay safe hurric...


### Access S3 Data (optional)

In [9]:
# s3 = session.client('s3')
# # [Bucket] [Object/Folder/File-on-s3] [local-file-name-to-be-saved]
# s3.download_file('sagemaker-us-west-2-991775080983', 'sagemaker-template-test/data/reviews.csv', 's3_reviews.csv')

## 4: Entry Point File

This cell will produce a python file, which is the source code for our 'Estimator' and will be passed in as a parameter below. This is where all the AI code should go. This means it need its own imports - even if they are duplicates :/

We are required to define four methods:

 - model_fn: Return the model from S3 location and prep it for inference
 - input_fn: Take the HTTP request and translate for the prediction code
 - predict_fn: Make predictions using the formatted input and model
 - output_fn: Take the prediciton and get it ready to return as the response

In [10]:
%%writefile EstimatorScript.py
import argparse
import pandas as pd
import numpy as np
import os
import json
from io import StringIO

import joblib 

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import LogisticRegression

from sklearn.pipeline import Pipeline

from sagemaker_containers.beta.framework import (worker, encoders)

# method to train and save the model
def training():
    
    print("TRAINING FUNCTION START")
    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))

    raw_data = [ pd.read_csv(file, engine="python") for file in input_files ]
    train_data = pd.concat(raw_data)
    
    # labels are in the first column
    train_y = train_data['label']
    print(str(train_y.sample(5)))
    train_X = train_data['data']
    print(str(train_X.sample(5)))
    
    pipeline = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('clf', LogisticRegression()),
    ])

    pipeline.fit(train_X.values.astype('U').tolist(), train_y.tolist())

    # Print the coefficients of the trained classifier, and save the coefficients
    joblib.dump(pipeline, os.path.join(args.model_dir, "model.joblib"))

if __name__ == '__main__':
    
    print("MAIN FUNCTION START")
    parser = argparse.ArgumentParser()
    
    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()
    training()

# [REQUIRED]: method to load model
def model_fn(model_dir):
    
    print("MODEL FUNCTION START")
    """Deserialized and return fitted model

    Note that this should have the same name as the serialized model in the main method
    """
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    
    return clf

# [REQUIRED]: method to handle input of requests....
def input_fn(request_body, request_content_type):
    
    print("INPUT FUNCTION START") 
    print(str(request_content_type))
    """An input_fn that loads a pickled numpy array"""
    
    if request_content_type == 'text/csv':
        # Read the raw input data as CSV.
        print('CSV FILE INPUT RECEIVED')
        df = pd.read_csv(StringIO(request_body), dtype={"data": "string"})
        print(str(df.head(3)))
        print(df.dtypes)
        print(df.shape)       
        return df
    elif request_content_type == "application/python-pickle":
        array = np.load(BytesIO((request_body)))
        return array
    elif request_content_type == 'application/json':
        jsondata = json.load(StringIO(request_body))
        #processed_data = process_data(jsondata)
        return [jsondata['instances'][0]['features'][0]]
    else:
        # Handle other content-types here or raise an Exception
        # if the content type is not supported.
        raise ValueError("{} not supported by script!".format(request_content_type))

# [REQUIRED]: method to run prediction on the model
def predict_fn(input_data, model):
    
    print("PREDICT FUNCTION START")
    print(str(input_data.head(3)))
    print(type(input_data))
    print(input_data.dtypes)
    print(input_data.shape) 
    predictions = []
    
    for index, row in input_data.iterrows():
        message=[row['data']]
#       print(type(message)) statement returns <class 'list'>
        prediction = model.predict(message)
        predictions.append(prediction[0])
        
    print(predictions[:10])
    nrow = len(predictions)
    print('Predictions list number of rows:')
    print(str(nrow))
    print(type(predictions)) # statement returns <class 'list'>
    predictions.insert(0, 0)
    
    return predictions 

# [REQUIRED]: method to handle output of requests....
def output_fn(prediction, accept):
    
    print("OUTPUT FUNCTION START")
    print(prediction[:10])
    print(type(prediction)) 
    nrow = len(prediction)
    print('Prediction list number of rows:')
    print(str(nrow))
    
    if accept == "application/json":
        return worker.Response(json.dumps(prediction), accept, mimetype=accept)
    elif accept == 'text/csv':
        output = worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
        print(type(output))
        print(str(output))
        return output
    else:
        raise ValueError("{} accept type is not supported by this script.".format(accept)) 

Overwriting EstimatorScript.py


## 5: Estimator

Sagemaker object that takes the above python file as a parameter

In [11]:
script_path = 'EstimatorScript.py'
sklearn_estimator = SKLearn(
    entry_point=script_path,
    instance_type="ml.m5.xlarge",
    role=role,
    framework_version='0.23-1',
    py_version='py3',
    sagemaker_session=sagemaker_session)

## 6: Training

In [19]:
sklearn_estimator.fit({'train': train_data})

2021-03-05 13:24:32 Starting - Starting the training job...
2021-03-05 13:24:57 Starting - Launching requested ML instancesProfilerReport-1614950672: InProgress
......
2021-03-05 13:25:58 Starting - Preparing the instances for training...
2021-03-05 13:26:18 Downloading - Downloading input data...
2021-03-05 13:26:59 Training - Training image download completed. Training in progress..[34m2021-03-05 13:26:59,496 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2021-03-05 13:26:59,498 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-03-05 13:26:59,506 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2021-03-05 13:26:59,822 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-03-05 13:27:02,848 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-03-05 13:27:02,859 sagemak

### 7: Batch Transform

In [13]:
# %%time
# batch_file = 'batch.csv'
# sm_transformer = sklearn_estimator.transformer(1, 'ml.m5.xlarge', assemble_with = 'Line', accept = 'text/csv')
# ###  Batch Transformer that will call the model endpoint ###
# # start a transform job
# input_location = 's3://{}/{}/batch_input/{}'.format(s3_bucket, prefix, batch_file )# use input data with Location column
# sm_transformer.transform(input_location, split_type='Line', content_type='text/csv', 
#                          input_filter='$[1:]', join_source='Input', output_filter='$[0,-1]')
# sm_transformer.wait()

In [14]:
# print(sm_transformer.output_path)

### Testing Data returned from Batch Transform Job

In [15]:
### Function to return the data from the Batch Transform Job ###

# import json
# import io
# from urllib.parse import urlparse

# def get_csv_output_from_s3(s3uri, file_name):
#     parsed_url = urlparse(s3uri)
#     bucket_name = parsed_url.netloc
#     prefix = parsed_url.path[1:]
#     s3 = boto3.resource('s3')
#     obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
#     return obj.get()["Body"].read().decode('utf-8')

# output = get_csv_output_from_s3(sm_transformer.output_path, '{}.out'.format(batch_file))
# output_df = pd.read_csv(io.StringIO(output), sep=",", header=0)
# output_df.shape
# print(output_df.sample(10))

In [16]:
### Test Logic to identify if a disaster is happening (will reside in lambda) ###

# import numpy as np
# predictionsArray = np.array([])
# for index, row in output_df.iterrows():
#         result=[row['0']]
# #       print(type(message)) statement returns <class 'list'>
#         predictionsArray = np.append(predictionsArray, result)
# disasterRelatedTweets = 0
# for prediction in predictionsArray:
#     if (prediction == 1):
#         disasterRelatedTweets += 1;
# if(disasterRelatedTweets >=3000):
#     print ("A disaster is currently happening! ")
#     print (disasterRelatedTweets)

In [17]:
#  output_df.to_csv("data/predictions.csv", index=False)

## 8: Deploy

In [20]:
predictor = sklearn_estimator.deploy(initial_instance_count=1, instance_type="ml.t2.medium", 
                                     endpoint_name='natural-disaster-detector')

-----------------------!

In [None]:
# sagemaker.delete_endpoint(EndpointName=natural-disaster-detector)