In [1]:
import pandas as pd

df = pd.read_csv('../dataset/spam/train.csv',index_col=False)
df.reset_index(drop=True, inplace=True)
df.drop(df.tail(20000).index, inplace = True)
df

Unnamed: 0,Subject,Message,Spam/Ham
0,transfers from ees,attached is the latest version of the cost cen...,spam
1,fw : re ivanhoe e . s . d,"fyi , kim .\n- - - - - original message - - - ...",spam
2,re : enerfin meter 980439 for 10 / 00,it did but tetco prorated the flow between the...,ham
3,meoh plant status,the methanol plant has determined extensive re...,ham
4,re : tenaska iv,i tried calling you this am but your phone rol...,spam
...,...,...,...
10339,transwestern open season,( see attached file : twopenseason . doc )\n- ...,ham
10340,valero 8018 and 1394,"gary ,\nwhat is the status of this ?\nhc\n- - ...",ham
10341,tenaska iv 4 / 01,we need to change the demand fee on deal 38425...,ham
10342,re : releases,"louise ,\nthanks so much for your speedy reply...",spam


In [2]:
js = df.to_json(orient ='records')
f = open('ddd.json','w')
f.write(js)
f.close()

In [3]:
type(js)

str

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
x = spark.read.json('ddd.json')

In [6]:
x.show()

+--------------------+--------+--------------------+
|             Message|Spam/Ham|             Subject|
+--------------------+--------+--------------------+
|attached is the l...|    spam|  transfers from ees|
|fyi , kim .\n- - ...|    spam|fw : re ivanhoe e...|
|it did but tetco ...|     ham|re : enerfin mete...|
|the methanol plan...|     ham|   meoh plant status|
|i tried calling y...|    spam|     re : tenaska iv|
|fyi , kim .\n- - ...|    spam|fw : re ivanhoe e...|
|hi ,\ni am forwar...|    spam|fw : memo : re : ...|
|enron replaces fa...|     ham|      enron mentions|
|attached is the l...|    spam|  transfers from ees|
|start date : 2 / ...|    spam|start date : 2 / ...|
|start date : 2 / ...|    spam|start date : 2 / ...|
|start date : 12 /...|     ham|start date : 12 /...|
|fyi , kim .\n- - ...|    spam|fw : re ivanhoe e...|
|this is a complet...|     ham|re : priority cus...|
|enron tiger team ...|     ham|new consultants a...|
|business highligh...|     ham|  entouch newsl

In [105]:
from pyspark.sql.functions import length, regexp_replace

data = x.withColumn('length',length(x['Message'])).withColumnRenamed("Spam/Ham","class").withColumnRenamed("Message","text")
data = data.drop('Subject')
data = data.withColumn("text", regexp_replace("text", ",", ""))\
    .withColumn("text", regexp_replace("text", ",", ""))\
    .withColumn("text", regexp_replace("text", ":", ""))\
    .withColumn("text", regexp_replace("text", "-", ""))\
    .withColumn("text", regexp_replace("text", "/", ""))\
    .withColumn("text", regexp_replace("text", "\n", ""))\
    .withColumn("text", regexp_replace("text", "  ", ""))\
    .withColumn("text", regexp_replace("text", ";", ""))
data= data.where(data['length']<20000)
data.show()

+--------------------+-----+------+
|                text|class|length|
+--------------------+-----+------+
|attached is the l...| spam|   868|
|fyikim . original...| spam|  1180|
|it did but tetco ...|  ham|  4512|
|the methanol plan...|  ham|   360|
|i tried calling y...| spam|  2284|
|fyikim . original...| spam|  1180|
|hi i am forwardin...| spam|  1674|
|attached is the l...| spam|   868|
|start date2602  h...| spam|   632|
|start date2602  h...| spam|   632|
|start date123001 ...|  ham|   536|
|fyikim . original...| spam|  1180|
|this is a complet...|  ham|  1937|
|enron tiger team ...|  ham|   553|
|business highligh...|  ham|  8074|
|attached is the w...|  ham|   140|
|attached is the l...| spam|   868|
|attached is the l...| spam|   868|
|start date2602  h...| spam|   632|
|start date2602  h...| spam|   632|
+--------------------+-----+------+
only showing top 20 rows



In [106]:
data.groupby('class').mean().show()
data.printSchema()

+-----+------------------+
|class|       avg(length)|
+-----+------------------+
|  ham|1295.9862741197533|
| spam|1566.7767653758542|
+-----+------------------+

root
 |-- text: string (nullable = true)
 |-- class: string (nullable = true)
 |-- length: integer (nullable = true)



In [107]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [108]:
from pyspark.ml.classification import NaiveBayes
# Use defaults
nb = NaiveBayes()

In [109]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

In [110]:
clean_data = clean_data.select(['label','features'])
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(101720,[0,1,19,2...|
|  0.0|(101720,[0,2,4,5,...|
|  1.0|(101720,[0,1,4,5,...|
|  1.0|(101720,[0,79,91,...|
|  0.0|(101720,[0,1,5,6,...|
|  0.0|(101720,[0,2,4,5,...|
|  0.0|(101720,[0,4,5,6,...|
|  0.0|(101720,[0,1,19,2...|
|  0.0|(101720,[0,1,2,3,...|
|  0.0|(101720,[0,1,2,3,...|
|  1.0|(101720,[0,2,3,14...|
|  0.0|(101720,[0,2,4,5,...|
|  1.0|(101720,[0,1,2,4,...|
|  1.0|(101720,[0,4,6,8,...|
|  1.0|(101720,[0,1,5,6,...|
|  1.0|(101720,[35,255,2...|
|  0.0|(101720,[0,1,19,2...|
|  0.0|(101720,[0,1,19,2...|
|  0.0|(101720,[0,1,2,3,...|
|  0.0|(101720,[0,1,2,3,...|
+-----+--------------------+
only showing top 20 rows



In [111]:
(training,testing) = clean_data.randomSplit([0.7,0.3])
spam_predictor = nb.fit(training)
test_results = spam_predictor.transform(testing)
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[0,1,2,3,...|[-1286.7920745740...|[1.0,6.7080016980...|       0.0|
|  0.0|(101720,[

In [112]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))

Accuracy of model at predicting spam was: 0.9854758608382852


In [113]:
from pyspark.ml.regression import LinearRegression

In [115]:
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(training)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: (101720,[58],[-0.027514528651162422])
Intercept: 0.5020358167192102


In [116]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.491239
r2: 0.034520


In [117]:
training.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|               7263|
|   mean|0.49249621368580476|
| stddev| 0.4999781107362106|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+



In [121]:
from pyspark.ml.clustering import KMeans
from pyspark.sql.types import DoubleType

In [128]:
kmeans = KMeans().setK(4).setSeed(1)
kmodel = kmeans.fit(training)
predictions=kmodel.transform(testing)
predictions = predictions.withColumn("prediction", predictions["prediction"].cast(DoubleType()))

In [129]:
evaluatorsvm = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluatorsvm.evaluate(predictions)

0.2889739696076238