In [None]:
from pyspark import SparkContext, SparkConf
import sagemaker_pyspark

conf = (SparkConf().set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars())))
sc = SparkContext(conf=conf)

In [None]:
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionWithLBFGS, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree, GradientBoostedTrees, RandomForest
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark import ml

## Load 2 types of emails from text files: spam and ham (non-spam).

### Each line has text from one email.

In [None]:
spam = sc.textFile("spam")
ham = sc.textFile("ham")

In [None]:
spam_words = spam.map(lambda email: email.split())
ham_words = ham.map(lambda email: email.split())

In [None]:
print(spam_words)

## Create a HashingTF instance to map email text to vectors of features.

In [None]:
tf = HashingTF(numFeatures = 200)
spam_features = tf.transform(spam_words)
ham_features = tf.transform(ham_words)

## Create LabeledPoint datasets for positive (spam) and negative (ham) examples.

In [None]:
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
ham_samples = ham_features.map(lambda features:LabeledPoint(0, features))

## Split the data set 80/20

In [None]:
samples = spam_samples.union(ham_samples)
[training_data, test_data] = samples.randomSplit([0.8, 0.2])
training_data.cache()
test_data.cache()

In [None]:
def score(model):
    predictions = model.predict(test_data.map(lambda x: x.features))
    labels_and_preds = test_data.map(lambda x: x.label).zip(predictions)
    accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
    return accuracy

## Create a Logistic Regression model with SGD optimization

In [None]:
algo = ml.classification.LogisticRegression()
model = algo.train(training_data)

In [None]:
score(model)

## Create a Logistic Regression model with LBFGS optimization 

In [None]:
algo = LogisticRegressionWithLBFGS()
model = algo.train(training_data)

In [None]:
score(model)

## Train a SVM model

In [None]:
algo = SVMWithSGD()
model = algo.train(training_data)

In [None]:
score(model)

## Train Decision Tree model

In [None]:
algo = DecisionTree()
model = algo.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={})

In [None]:
score(model)

## Train Gradient Booosted Trees model

In [None]:
algo = GradientBoostedTrees()
model = algo.trainClassifier(training_data,categoricalFeaturesInfo={},numIterations=10)

In [None]:
score(model)

## Train Random Forest model

In [None]:
algo = RandomForest()
model = algo.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={},numTrees=16)

In [None]:
score(model)

## Train a Naive Bayes model

In [None]:
algo = NaiveBayes()
model = algo.train(training_data)

In [None]:
score(model)

## Test

In [None]:
spamExample = tf.transform("You have won $1,000,000. Please fly to Nigeria ASAP".split(" "))
hamExample = tf.transform("Spark is really good at data processing".split(" "))

print(model.predict(spamExample))
print(model.predict(hamExample))