In [None]:
# Enable current type hints for older Python version (<3.10) 
from __future__ import annotations
import boto3
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime
import time
from pathlib import Path
import pickle
import logging
from dotenv import dotenv_values

In [None]:
logger = logging.getLogger(__name__)

config = dotenv_values(".env")
BUCKET = config['BUCKET']

In [None]:
import sagemaker

role = sagemaker.get_execution_role()
region_name = boto3.Session().region_name
image = sagemaker.amazon.amazon_estimator.get_image_uri(region_name, "blazingtext", "latest")
print(f"Using SageMaker BlazingText image: {image} ({region_name})")

In [None]:
train_data = sagemaker.inputs.TrainingInput(
    f's3://{BUCKET}/train',
    distribution="FullyReplicated",
    content_type="text/plain",
    s3_data_type="S3Prefix",
)
validation_data = sagemaker.inputs.TrainingInput(
    f's3://{BUCKET}/validation',
    distribution="FullyReplicated",
    content_type="text/plain",
    s3_data_type="S3Prefix",
)
data_channels = {"train": train_data, "validation": validation_data}

# Set up Hyperparameter tuning job

In [None]:
# Specify search range based on AWS recommended balues
# ( https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext-tuning.html )

from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

INSTANCE_TYPE_TRAIN = "ml.c4.4xlarge"
hyperparameter_ranges = {
    "learning_rate": ContinuousParameter(0.005, 0.01),
    "vector_dim": IntegerParameter(32, 300),
    "buckets": IntegerParameter(
        int(1E6), int(1E7)
    ),
    "epochs": IntegerParameter(5, 15),
    "min_count": IntegerParameter(0, 100),
    "word_ngrams": IntegerParameter(1, 3),    
}
objective_metric_name = "validation:accuracy"
objective_type = "Maximize"

estimator = sagemaker.estimator.Estimator(
    image_uri=image,
    role=role,
    instance_count=1,
    instance_type=INSTANCE_TYPE_TRAIN,
    volume_size=30,
    max_run=360000,
    input_mode="File",
    output_path=f's3://{BUCKET}/training_results',
    # *Constant* hyperparameters
    hyperparameters={
        "mode": "supervised",
        "early_stopping": "True",
        "patience": "4",
    },
)

tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=hyperparameter_ranges,
    max_jobs=10,
    max_parallel_jobs=1,
    objective_type=objective_type,
)
tuner.fit(inputs=data_channels, logs=True)

# Inspect results of hyperparameter tuning

In [None]:
tuning_job_name = tuner.latest_tuning_job.name

tuning_job_result = sm_client.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name
)
# tuning_job_result

In [None]:
hp_results = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name) \
    .dataframe()
hp_results.sort_values('FinalObjectiveValue', ascending=False)

In [None]:
sns.barplot(
    y=hp_results.TrainingJobName.str.split('-').str.get(-1), 
    x=hp_results.FinalObjectiveValue
)

## Evaluate model
### Make predictions
Let's get the test data and get the associated predictions from the Sagemaker endpoint we deployed in the previous step.

In [None]:
from sagemaker.serializers import JSONSerializer

# Deploy best model
INSTANCE_TYPE_PREDICT = "ml.m5.xlarge"
deployed_model = tuner.deploy(
        initial_instance_count=1, instance_type=INSTANCE_TYPE_PREDICT, serializer=JSONSerializer()
)

In [None]:
# Load test data
# Todo: Remove this once using feature store

import json

with open('df_test.pickle', 'rb') as f:
    df_test: pd.DataFrame = pickle.load(f)
    
# Save as np array for easy splitting later
replied_tos = df_test.loc[df_test.target == '__label__reply', 'feature'].values
no_reply = df_test.loc[df_test.target == '__label__no_reply', 'feature'].values

In [None]:
# Make predictions and extract probability of reply
def _prediction_to_df_row(pred: dict) -> dict:
    labels = pred['label']
    probs = pred['prob']
    return {
        label: prob 
        for label, prob in zip(labels, probs)
    }

def get_probs(text: np.array, deployed_model) -> list[np.float]:
    payload = {
        "instances": text,
        "configuration": {"k": 2},  # get probs for top-k (both) classes
    }
    response = deployed_model.predict(payload)
    predictions = json.loads(response)

    all_probs = [
        _prediction_to_df_row(pred) for pred in predictions
    ]
    return pd.DataFrame(all_probs)['__label__reply']

# Probability of reply for messages that actually received a reply
p_reply = get_probs(replied_tos, deployed_model=deployed_model)
p_reply.describe()

In [None]:
# Probability of reply for messages that did NOT received a reply
p_no_reply = pd.concat(
    [
        # Need to make 2 separate requests
        get_probs(no_reply[:2000], deployed_model=deployed_model),
        get_probs(no_reply[2000:], deployed_model=deployed_model)
    ],
    axis=0
)
p_no_reply.describe()

### Model performance

In [None]:
# Vectors of true and predicted scores
y_true = [1] * len(replied_tos) + [0] * len(no_reply)
y_score = p_reply.append(p_no_reply)

y_score.describe()

In [None]:
# AUC
from sklearn.metrics import roc_auc_score, confusion_matrix, ConfusionMatrixDisplay, classification_report
roc_auc_score(y_true=y_true, y_score=y_score)

The AUC of .9 is pretty good, but  it can be misleading for an imbalanced classification problem (remember, we only oversampled the training and validation data). Thus, let's look at the confusion matrix for a more detailed view.

In [None]:
cm = confusion_matrix(y_true=y_true, y_pred=y_score>0.5)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['no reply', 'replied to'])
disp = disp.plot()
plt.show()

We see that if we just predict the most probable class, we can correctly predict about three quarters of replies. This is pretty good for predicting a rare event. Depending on what we care about, we could adjust the prediction threshold. For example, if we care about identifying messages that could potentially require a reply, we could flag each message that has a probability of reply greater than 20%. Let's see what results this gives us:

In [None]:
print(
    classification_report(y_true=y_true, y_pred=y_score>0.5, target_names=['no reply', 'replied to'])
)

In [None]:
cm = confusion_matrix(y_true=y_true, y_pred=y_score>0.2)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['no reply', 'replied to'])
disp = disp.plot()
plt.show()

In [None]:
print(
    classification_report(
        y_true=y_true, 
        y_pred=y_score>0.2, 
        target_names=['no reply', 'replied to'])
)

Now we are able to identify almost all emails eliciting a reply, at the cost of more false positives.

# Cleanup

In [None]:
deployed_model.delete_predictor()

In [None]:
print(f'Finished at {datetime.now()}')