https://venukanaparthy.wordpress.com/2015/07/04/spam-classification-with-naive-bayes-using-spark-mllib/

https://tomaxent.com/2017/04/25/Machine-Learning-with-MLlib-of-Spark/

https://medium.com/@julsimon/building-a-spam-classifier-pyspark-mllib-vs-sagemaker-xgboost-1980158a900f


In [5]:
import findspark
findspark.init('/home/raghav/spark-2.4.0-bin-hadoop2.7/')
from pyspark import SparkContext
sc = SparkContext('local[*]', 'pyspark spam filter')

In [10]:
spam = sc.textFile("../Data/sms_spam/spam")
ham = sc.textFile("../Data/sms_spam/ham") 


spam_words = spam.map(lambda email: email.split())
ham_words = ham.map(lambda email: email.split())

print(spam_words.take(1))
print(ham_words.take(1))

[['2,Free', 'entry', 'in', '2', 'a', 'wkly', 'comp', 'to', 'win', 'FA', 'Cup', 'final', 'tkts', '21st', 'May', '2005.', 'Text', 'FA', 'to', '87121', 'to', 'receive', 'entry', 'question(std', 'txt', "rate)T&C's", 'apply', "08452810075over18's"]]
[['0,"Go', 'until', 'jurong', 'point,', 'crazy..', 'Available', 'only', 'in', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'Cine', 'there', 'got', 'amore', 'wat..."']]


In [14]:
from pyspark.mllib.feature import HashingTF, IDF

tf = HashingTF(numFeatures = 1000)
spam_features = tf.transform(spam_words)
ham_features = tf.transform(ham_words)

print(spam_features.take(1))
print(ham_features.take(1))

[SparseVector(1000, {4: 1.0, 52: 1.0, 162: 1.0, 261: 1.0, 289: 1.0, 309: 2.0, 359: 1.0, 365: 3.0, 368: 1.0, 389: 1.0, 408: 1.0, 505: 1.0, 524: 2.0, 530: 1.0, 542: 1.0, 547: 1.0, 569: 1.0, 571: 1.0, 588: 1.0, 627: 1.0, 633: 1.0, 655: 1.0, 665: 1.0, 783: 1.0})]
[SparseVector(1000, {14: 1.0, 17: 1.0, 41: 1.0, 52: 1.0, 66: 1.0, 84: 1.0, 97: 1.0, 125: 1.0, 381: 1.0, 407: 1.0, 501: 1.0, 604: 1.0, 657: 1.0, 668: 1.0, 683: 1.0, 708: 1.0, 802: 1.0, 914: 1.0, 932: 1.0, 993: 1.0})]


In [16]:
from pyspark.mllib.regression import LabeledPoint
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
ham_samples = ham_features.map(lambda features:LabeledPoint(0, features))

print(spam_samples.take(1))
print(ham_samples.take(1))

[LabeledPoint(1.0, (1000,[4,52,162,261,289,309,359,365,368,389,408,505,524,530,542,547,569,571,588,627,633,655,665,783],[1.0,1.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]
[LabeledPoint(0.0, (1000,[14,17,41,52,66,84,97,125,381,407,501,604,657,668,683,708,802,914,932,993],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]


In [17]:
samples = spam_samples.union(ham_samples)
[training_data, test_data] = samples.randomSplit([0.8, 0.2])
training_data.cache()
test_data.cache()

PythonRDD[32] at RDD at PythonRDD.scala:53

In [19]:
def score(model):
    predictions = model.predict(test_data.map(lambda x: x.features))
    labels_and_preds = test_data.map(lambda x: x.label).zip(predictions)
    accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
    return accuracy

In [21]:
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionWithLBFGS

algo = LogisticRegressionWithSGD()
model = algo.train(training_data)
print(score(model))


algo = LogisticRegressionWithLBFGS()
model = algo.train(training_data)
print(score(model))

0.8960645812310797
0.8748738647830474
