## Building a spam classifier:
### - Locally, with PySpark and a variety of Spark MLLib classification algorithms
### - On Amazon SageMaker, with the XGBoost algorithm

In [None]:
from pyspark import SparkContext, SparkConf
import sagemaker_pyspark

conf = (SparkConf().set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars())))
sc = SparkContext(conf=conf)

In [None]:
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionWithLBFGS, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree, GradientBoostedTrees, RandomForest
from pyspark.mllib.evaluation import MulticlassMetrics

### Load 2 types of emails from text files: spam and ham (non-spam).
Each line has text from one email.

In [None]:
spam = sc.textFile("spam")
ham = sc.textFile("ham") 

In [None]:
spam_words = spam.map(lambda email: email.split())
ham_words = ham.map(lambda email: email.split())

print(spam_words.take(1))
print(ham_words.take(1))

### Create a HashingTF instance to map email text to vectors of features.

In [None]:
tf = HashingTF(numFeatures = 200)
spam_features = tf.transform(spam_words)
ham_features = tf.transform(ham_words)

print(spam_features.take(1))
print(ham_features.take(1))

### Create LabeledPoint datasets for positive (spam) and negative (ham) examples.

In [None]:
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
ham_samples = ham_features.map(lambda features:LabeledPoint(0, features))

print(spam_samples.take(1))
print(ham_samples.take(1))

### Split the data set 80/20

In [None]:
samples = spam_samples.union(ham_samples)
[training_data, test_data] = samples.randomSplit([0.8, 0.2])
training_data.cache()
test_data.cache()

In [None]:
def score(model):
    predictions = model.predict(test_data.map(lambda x: x.features))
    labels_and_preds = test_data.map(lambda x: x.label).zip(predictions)
    accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
    return accuracy

### Create a Logistic Regression model with SGD optimization
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithSGD>

In [None]:
algo = LogisticRegressionWithSGD()
model = algo.train(training_data)

In [None]:
score(model)

### Create a Logistic Regression model with LBFGS optimization
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS>

In [None]:
algo = LogisticRegressionWithLBFGS()
model = algo.train(training_data)

In [None]:
score(model)

### Train a SVM model
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.classification.SVMWithSGD>

In [None]:
algo = SVMWithSGD()
model = algo.train(training_data)

In [None]:
score(model)

### Train Decision Tree model
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.tree.DecisionTree>

In [None]:
algo = DecisionTree()
model = algo.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={})

In [None]:
score(model)

### Train Gradient Boosted Trees model
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.tree.GradientBoostedTrees>

In [None]:
algo = GradientBoostedTrees()
model = algo.trainClassifier(training_data,categoricalFeaturesInfo={},numIterations=10)

In [None]:
score(model)

### Train Random Forest model
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.tree.RandomForest>

In [None]:
algo = RandomForest()
model = algo.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={},numTrees=16)

In [None]:
score(model)

### Train a Naive Bayes model
<https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.classification.NaiveBayes>

In [None]:
algo = NaiveBayes()
model = algo.train(training_data)

In [None]:
score(model)

In [None]:
spamExample = tf.transform("You have won $1,000,000. Please fly to Nigeria ASAP".split(" "))
hamExample = tf.transform("Spark is really good at data processing".split(" "))

print(model.predict(spamExample))
print(model.predict(hamExample))

### Train an XGBoost model with Amazon SageMaker 
<http://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html>

In [None]:
bucket_path="s3://jsimon-sagemaker-euwest1/"
prefix="spam-classifier/"

training_set_directory="spam-classifier-training-data-svm"
test_set_directory="spam-classifier-test-data-svm"

training_set_path=bucket_path+prefix+"train/"
test_set_path=bucket_path+prefix+"test/"

In [None]:
!rm -rf {training_set_directory} {test_set_directory}

In [None]:
MLUtils.saveAsLibSVMFile(training_data, training_set_directory)
MLUtils.saveAsLibSVMFile(test_data, test_set_directory)

In [None]:
!rm -f {training_set_directory}/.*.crc {training_set_directory}/_SUCCESS 
!rm -f {test_set_directory}/.*.crc {test_set_directory}/_SUCCESS

In [None]:
!aws s3 cp --recursive {training_set_directory} {training_set_path}
!aws s3 cp --recursive {test_set_directory} {test_set_path}

In [None]:
import boto3
from sagemaker import get_execution_role

containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
              'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
              'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:latest',
              'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:latest'}
container = containers[boto3.Session().region_name]

role = get_execution_role()

In [None]:
%%time
import boto3
from time import gmtime, strftime

job_name = 'spam-classifier-logistic-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Training job", job_name)

create_training_params = \
{
    "AlgorithmSpecification": {
        "TrainingImage": container,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "OutputDataConfig": {
        "S3OutputPath": bucket_path + prefix + "output"
    },
    "ResourceConfig": {
        "InstanceCount": 1,
        "InstanceType": "ml.m4.4xlarge",
        "VolumeSizeInGB": 5
    },
    "TrainingJobName": job_name,
    "HyperParameters": {
        "silent":"0",
        "objective":"binary:logistic",
        "num_round":"100"
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 3600
    },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": training_set_path,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None"
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": test_set_path,
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
            "ContentType": "libsvm",
            "CompressionType": "None"
        }
    ]
}


client = boto3.client('sagemaker')
client.create_training_job(**create_training_params)

import time

status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
while status !='Completed' and status!='Failed':
    time.sleep(60)
    status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
    print(status)

In [None]:
%%time
import boto3
from time import gmtime, strftime

model_name=job_name+"-model"
print(model_name)

info = client.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)

primary_container = {
    'Image': container,
    'ModelDataUrl': model_data
}

create_model_response = client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

In [None]:
from time import gmtime, strftime

endpoint_config_name = 'spam-classifier-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m4.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName':model_name,
        'VariantName':'AllTraffic'}])

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

In [None]:
%%time
import time

endpoint_name = 'spam-classifier-endpoint' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

while status=='Creating':
    time.sleep(60)
    resp = client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

print("Arn: " + resp['EndpointArn'])
print("Status: " + status)

In [None]:
runtime_client = boto3.client('runtime.sagemaker')

spam_message = "23:2.0 27:1.0 30:1.0 34:1.0 42:1.0 45:2.0 61:1.0 70:1.0 71:1.0 74:1.0 87:1.0 100:1.0 104:1.0 107:1.0 108:1.0 127:1.0 130:1.0 160:1.0 169:1.0 183:1.0 189:1.0"

response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='text/x-libsvm', 
                                   Body=spam_message)

result = response['Body'].read()
result = result.decode("utf-8")
print(result)

In [None]:
ham_message = "2:1.0 10:1.0 13:1.0 18:1.0 23:1.0 29:1.0 44:1.0 67:1.0 72:1.0 74:1.0 82:1.0 89:1.0 96:1.0 107:1.0 115:1.0 117:1.0 118:1.0 119:1.0 122:3.0 124:1.0 148:3.0 164:4.0 166:4.0 171:1.0"

response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='text/x-libsvm', 
                                   Body=ham_message)

result = response['Body'].read()
result = result.decode("utf-8")
print(result)

In [None]:
#client.delete_endpoint(EndpointName=endpoint_name)