## Introduction

This notebook solves the problem of classifying the job title into a broad category. The training data is a set of job titles scraped from various job categories such as sales, human resource, nurse, etc. This notebook uses Multinomial Naive Bayes algorithm to perform supervised multi class with single label text classification. 

The multinomial Naive Bayes classifier is suitable for classification with discrete features (e.g., word counts for text classification). In this document, two methods to create discrete features is used - Count Vectorizing the document ( counting the frequency of the words in each corpus) then running a tf-idf to get the weightage of the word in the whole corpus. The final model is pipelined with CV(), TF-IDF() and MultinomialNB().

### Setup

The S3 bucket and prefix that you want to use for training and model data. This is currently written in US-EAST-2 region as the Notebook Instance, training, and hosting.

The IAM role ARN is used to give SageMaker access to the data. It can be fetched using the get_execution_role method from sagemaker python SDK.

In [1]:

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor


region = boto3.session.Session().region_name

role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.m5.large',
                                     instance_count=1)

#### The data is read from the S3 bucket and then first few rows are checked to confirm the right training data set

In [3]:
import pandas as pd
bucket = 'My-Fav-Bucket'
prefix = 'Text-Classification'

input_data = 's3://{}/{}/{}'.format(bucket, prefix, 'train.csv')
df = pd.read_csv(input_data, names=['Job Title','Label'])
df.head()

Unnamed: 0,Job Title,Label
0,merchandiser,Merchandiser
1,fraud specialist train work home,Operations
2,family therapist,Therapist
3,human resource director,Human Resources
4,respiratory therapist,Therapist


### Data Preprocessing
We need to preprocess the training data to make it ready to be ingested by the training model. Primarily the nltk library is used to free it from alpha numeric, punctuation, etc

In [4]:



%%writefile preprocessing.py

import argparse
import os
import warnings 
import pandas as pd
import numpy as np
import re
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem.wordnet import WordNetLemmatizer
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
from glob import glob
nltk.download('wordnet')
nltk.download('punkt')
import json 
from pandas.io.json import json_normalize
import os
import sagemaker
from sklearn.exceptions import DataConversionWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)

bucket = 'altru-comprehend-input'
prefix = 'Data_Sagemaker'

data = 's3://{}/{}/{}'.format(bucket, prefix, 'train.csv')
test_data = 's3://{}/{}/{}'.format(bucket, prefix, 'validation.csv')

if __name__=='__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train-test-split-ratio', type=float, default=0.2)
    args, _ = parser.parse_known_args()
    
    input_data_path = os.path.join('/opt/ml/input/data', 'train.csv')
    print('Reading input data from {}'.format(input_data_path))

    df = pd.read_csv(data, names=['Job Title','Label'])
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)
    df_test = pd.read_csv(test_data, names=['Job Title','Label'])
    
    print('df shape {}'.format(df.shape))
    print('df_test shape {}'.format(df_test.shape))
    


    def clean_column(data):
        if data is not None:
            data =  data.lower()
            data = re.sub('&nbsp', '', data)
            data = re.sub(';', '', data)
            data = re.sub('_', '', data)
            data =re.sub("\[[^]]*\]", " ", data)
            data = re.sub('<[^<]+?>', '', data)
            data = re.sub(r'\n',' ',data)
            data = re.sub(r'[0-9]+','',data)
            data = re.sub(r'@type','',data)
            data = re.findall('[\w]+',data)
            return data
        return 'Sorry'

    df['Job Title'] = df['Job Title'].apply(clean_column)
    df['Job Title'] = df['Job Title'].apply(str)
    
    #Processing test data
    df_test['Job Title'] = df_test['Job Title'].apply(clean_column)
    df_test['Job Title'] = df_test['Job Title'].apply(str)
    
    print('Data after cleaning : {}, number of categories: {}'.format(df.shape, df.Label.unique()))
    split_ratio = 0.2
    print('Splitting data into train and test sets with ratio {}'.format(split_ratio))
    
 
    X_train = df['Job Title']
    X_test = df_test['Job Title']
    print('X shape {}'.format(X_train.shape))

    y_train = df['Label']
    y_test = df_test['Label']
    print('y_train shape {}'.format(y_train.shape))


    print('Train data shape after pre-processing: {}'.format(X_train.shape))


    train_features_output_path = os.path.join('/opt/ml/processing/output/train', 'train_features.csv')
    train_labels_output_path = os.path.join('/opt/ml/processing/output/train', 'train_labels.csv')

    test_features_output_path = os.path.join('/opt/ml/processing/output/test', 'test_features.csv')
    test_labels_output_path = os.path.join('/opt/ml/processing/output/test', 'test_labels.csv')

    print('Saving training features to {}'.format(train_features_output_path))
    pd.DataFrame(X_train).to_csv(train_features_output_path, header=False, index=False)

    print('Saving test features to {}'.format(test_features_output_path))
    pd.DataFrame(X_test).to_csv(test_features_output_path, header=False, index=False)

    print('Saving training labels to {}'.format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)

    print('Saving test labels to {}'.format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)

    


Overwriting preprocessing.py


In [5]:
# The ScriptProcessor class lets you run a command inside this container, which you can use to run your own script.

In [6]:
from sagemaker.processing import ScriptProcessor

script_processor = ScriptProcessor(command=['python3'],
                image_uri= '589274001472.dkr.ecr.us-east-2.amazonaws.com/altru-recommend-development:c5b83c7beadc6388a19cd328e1008f97741fd749',
                role=role,
                instance_count=1,
                instance_type='ml.m5.xlarge')

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

script_processor.run(code='preprocessing.py',
                      inputs=[ProcessingInput(
                        source=input_data,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='train_data',
                                                source='/opt/ml/processing/output/train'),
                               ProcessingOutput(output_name='test_data',
                                                source='/opt/ml/processing/output/test')]
                     )
script_processor_job_description = script_processor.jobs[-1].describe()
print(script_processor_job_description)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  altru-recommend-development-2020-11-09-16-00-52-746
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://altru-comprehend-input/Data_Sagemaker/train.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-589274001472/altru-recommend-development-2020-11-09-16-00-52-746/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train_data', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-2-589274001472/altru-recommend-development-2020-11-09-16-00-52-746/output/train_data', 'LocalPath': '/opt/ml/processing/output/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test_data', 'S3Output': {'S3Uri': 's3://sagemaker-us-eas

In [8]:
output_config = script_processor_job_description['ProcessingOutputConfig']
output_config
for output in output_config['Outputs']:
    if output['OutputName'] == 'train_data':
        preprocessed_training_data = output['S3Output']['S3Uri']
    if output['OutputName'] == 'test_data':
        preprocessed_test_data = output['S3Output']['S3Uri']

In [16]:
from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point='train.py',
    train_instance_type="ml.m5.large",
    role=role)

This is not the latest supported version. If you would like to use version 0.23-1, please add framework_version=0.23-1 to your constructor.


In [17]:
%%writefile train.py
#trying Decision Tree with experimental transformer
import os

import numpy
import pandas as pd
from io import StringIO
from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)
from sklearn.externals import joblib
import json

if __name__=="__main__":
    training_data_directory = '/opt/ml/input/data/train' #check this
    train_features_data = os.path.join(training_data_directory, 'train_features.csv')
    train_labels_data = os.path.join(training_data_directory, 'train_labels.csv')
    print('Reading input data')
    X_train = pd.read_csv(train_features_data, index_col = False, names = ["Job Title"], header = 0)#header=None,
    print('X_train shape', X_train.shape)
    print('Done reading the training features')
    y_train = pd.read_csv(train_labels_data, index_col = False, names = ["Labels"], header = 0)# header=None,
    print('y_train after changing to str', y_train.shape)
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.pipeline import Pipeline
    
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    import pandas as pd
    
    
    import re


    from sklearn.pipeline import Pipeline
    
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    #from sklearn.tree import DecisionTreeClassifier
    import pandas as pd
    from sklearn.naive_bayes import MultinomialNB
    
    text_clf = Pipeline([
            ('vect', CountVectorizer(lowercase = False)),
        ('tfidf', TfidfTransformer()),
        ('clf', MultinomialNB()),
        ])

    
    model = text_clf.fit(X_train['Job Title'].astype(str),y_train)

    model_output_directory = os.path.join('/opt/ml/model', "model.joblib")
    print('Saving model to {}'.format(model_output_directory))
    joblib.dump(model, model_output_directory)

def model_fn(model_dir):
    """Deserialized and return fitted model

    Note that this should have the same name as the serialized model in the main method
    """
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf

def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), 
                         header=None, names = ["Job Title"])

        return df['Job Title'].astype(str)
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        json_output = {"instances": prediction.tolist()}
        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))



Overwriting train.py


In [95]:
%%writefile train.py
#trying Decision Tree with experimental transformer
import os

import numpy
import pandas as pd
from io import StringIO
from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)
from sklearn.externals import joblib
import json

if __name__=="__main__":
    training_data_directory = '/opt/ml/input/data/train' #check this
    train_features_data = os.path.join(training_data_directory, 'train_features.csv')
    train_labels_data = os.path.join(training_data_directory, 'train_labels.csv')
    print('Reading input data')
    X_train = pd.read_csv(train_features_data, index_col = False, names = ["Job Title"], header = 0)#header=None,
    print('X_train shape', X_train.shape)
    print('Done reading the training features')
    y_train = pd.read_csv(train_labels_data, index_col = False, names = ["Labels"], header = 0)# header=None,
    print('y_train after changing to str', y_train.shape)
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.pipeline import Pipeline
    
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.tree import DecisionTreeClassifier
    import pandas as pd
    
    
    import re


    from sklearn.pipeline import Pipeline
    
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.tree import DecisionTreeClassifier
    import pandas as pd
    #from sklearn.naive_bayes import MultinomialNB
    
    text_clf = Pipeline([
            ('vect', CountVectorizer(lowercase = False)),
        ('tfidf', TfidfTransformer()),
        ('clf', DecisionTreeClassifier()),
        ])

    
    model = text_clf.fit(X_train['Job Title'].astype(str),y_train)

    model_output_directory = os.path.join('/opt/ml/model', "model.joblib")
    print('Saving model to {}'.format(model_output_directory))
    joblib.dump(model, model_output_directory)

def model_fn(model_dir):
    """Deserialized and return fitted model

    Note that this should have the same name as the serialized model in the main method
    """
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf

def input_fn(input_data, content_type):
    """Parse input data payload

    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), 
                         header=None, names = ["Job Title"])

        return df['Job Title'].astype(str)
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
    """Format prediction output

    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        json_output = {"instances": prediction.tolist()}
        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))



Overwriting train.py


In [None]:


sklearn.fit({'train': preprocessed_training_data})
training_job_description = sklearn.jobs[-1].describe()
model_data_s3_uri = '{}{}/{}'.format(
    training_job_description['OutputDataConfig']['S3OutputPath'],
    training_job_description['TrainingJobName'],
    'output/model.tar.gz')

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


2020-11-09 17:24:23 Starting - Starting the training job...
2020-11-09 17:24:25 Starting - Launching requested ML instances......
2020-11-09 17:25:30 Starting - Preparing the instances for training...
2020-11-09 17:26:09 Downloading - Downloading input data...

In [15]:
model_data_s3_uri = '{}{}/{}'.format(
    training_job_description['OutputDataConfig']['S3OutputPath'],
    training_job_description['TrainingJobName'],
    'output/model.tar.gz')



In [16]:
model_data_s3_uri

's3://sagemaker-us-east-2-589274001472/sagemaker-scikit-learn-2020-11-07-16-13-21-597/output/model.tar.gz'

### Evaluating the model with scores

In [17]:
%%writefile evaluation.py

import json
import os
import tarfile
import pandas as pd
from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

if __name__=="__main__":
    model_path = os.path.join('/opt/ml/processing/model', 'model.tar.gz')
    print('Extracting model from path: {}'.format(model_path))
    with tarfile.open(model_path) as tar:
        tar.extractall(path='.')
    print('Loading model')
    model = joblib.load('model.joblib')
        

    
    #vect_path = os.path.join('/opt/ml/processing/model/vect', 'vectorizer.pkl')
    #vectorizer = joblib.load('vectorizer.pkl')
    
    #Reading the test data
    test_data_directory = 'opt/ml/processing/output/test'#'/opt/ml/input/data/test'
    print('Loading test input data')
    test_features_data = os.path.join(test_data_directory, 'test_features.csv')
    test_labels_data = os.path.join(test_data_directory, 'test_labels.csv')

    X_test = pd.read_csv(test_features_data, index_col = False, names = ["Job Title"], header = 0)
    y_test = pd.read_csv(test_labels_data, index_col = False, names = ["Labels"], header = 0)
    #X_test['Job Title'] = str(X_test['Job Title'])
    
    from sklearn.feature_extraction.text import TfidfVectorizer
    #from sklearn import metrics
    #from sklearn.feature_extraction.text import CountVectorizer
    #cv = CountVectorizer(lowercase = False, decode_error = 'ignore', encoding = 'string')
    #vectorizer = TfidfVectorizer(lowercase = False)
    #vectorised_X_test = vectorizer.transform(X_test['Job Title'].astype(str))
    print('Vectorised X_test shape', X_test.shape)
    print('y_test shape', y_test.shape)
    
     
    predictions = model.predict(X_test['Job Title'].astype(str))

    print('Creating classification evaluation report')
    report_dict = classification_report(y_test, predictions, output_dict=True)
    report_dict['accuracy'] = accuracy_score(y_test, predictions)
    #report_dict['roc_auc'] = roc_auc_score(y_test, predictions)

    print('Classification report:\n{}'.format(report_dict))

    evaluation_output_path = os.path.join('/opt/ml/processing/evaluation', 'evaluation.json')
    print('Saving classification report to {}'.format(evaluation_output_path))

    with open(evaluation_output_path, 'w') as f:
        f.write(json.dumps(report_dict))

Overwriting evaluation.py


In [18]:
import json
import pandas as pd
from sagemaker.s3 import S3Downloader

sklearn_processor.run(code='evaluation.py',
                      inputs=[ProcessingInput(
                                  source=model_data_s3_uri,
                                  destination='/opt/ml/processing/model'),
                              ProcessingInput(
                                  source=preprocessed_test_data,
                                  destination='/opt/ml/processing/output/test')],
                      outputs=[ProcessingOutput(output_name='evaluation',
                                  source='/opt/ml/processing/evaluation')]
                     )                    
evaluation_job_description = sklearn_processor.jobs[-1].describe()

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  sagemaker-scikit-learn-2020-11-07-16-18-34-866
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-589274001472/sagemaker-scikit-learn-2020-11-07-16-13-21-597/output/model.tar.gz', 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-589274001472/altru-recommend-development-2020-11-07-16-07-51-058/output/test_data', 'LocalPath': '/opt/ml/processing/output/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-589274001472/sagemaker-scikit-learn-2020-11-07-16-18-34-866/input/code/evaluation.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'Fully

In [19]:
evaluation_output_config = evaluation_job_description['ProcessingOutputConfig']
for output in evaluation_output_config['Outputs']:
    if output['OutputName'] == 'evaluation':
        evaluation_s3_uri = output['S3Output']['S3Uri'] + '/evaluation.json'
        break

evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
evaluation_output_dict = json.loads(evaluation_output)
print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))

{
    "Accounting": {
        "f1-score": 0.8416988416988417,
        "precision": 0.8790322580645161,
        "recall": 0.8074074074074075,
        "support": 135
    },
    "Counselor": {
        "f1-score": 0.5956416464891041,
        "precision": 0.4392857142857143,
        "recall": 0.924812030075188,
        "support": 133
    },
    "Customer Service": {
        "f1-score": 0.8516129032258064,
        "precision": 0.8198757763975155,
        "recall": 0.8859060402684564,
        "support": 149
    },
    "Doctor": {
        "f1-score": 0.9432624113475178,
        "precision": 0.9851851851851852,
        "recall": 0.9047619047619048,
        "support": 147
    },
    "Finance": {
        "f1-score": 0.7859237536656892,
        "precision": 0.7745664739884393,
        "recall": 0.7976190476190477,
        "support": 168
    },
    "Human Resources": {
        "f1-score": 0.9235880398671097,
        "precision": 0.9652777777777778,
        "recall": 0.8853503184713376,
        "sup

### Creating EndPoint for real time prediction

In [22]:
# calling predictor function
endpoint_name = "Altru-JobTitleClassifier-Endpoint"
predictor = sklearn.deploy(initial_instance_count=1, instance_type="ml.t2.medium", endpoint_name = endpoint_name)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


-------------!

In [23]:
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

predictor = sagemaker.predictor.RealTimePredictor(endpoint=endpoint_name,   #create predictor to send serialized data to sagemaker
                                                serializer=sagemaker.predictor.csv_serializer,
                                                content_type='text/csv', accept=CONTENT_TYPE_JSON)

In [30]:
payload = 'sales consultant'
print(predictor.predict(payload))

b'{"instances": ["Sales"]}'


In [31]:

test_data = pd.read_csv('validate_BT.csv', names = ['Job Role', 'Label'])
test_data.head()
test_X = test_data.iloc[1:5,0]
test_X

1    VP, Americas NFA/CFTC ex-trading, Cash Solicit...
2    508 Compliance Expert\n\nPerforms 508 complian...
3    Compliance Officer\n\nEvaluate business activi...
4    Engineering Intern- Summer 2021\n\nEngineering...
Name: Job Role, dtype: object

In [34]:

predictor.delete_endpoint()