In [1]:
import pandas as pd
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, RegexTokenizer, NGram, HashingTF, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql import SparkSession

In [2]:
# spark setup
from pyspark import SparkContext, SparkConf
sc = SparkContext(master = 'local')
spark = SparkSession.builder.appName('pySpark word-count').config('spark.some.config.option', 'some-value')             .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/24 16:09:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# data import
data = spark.read.format("csv").option("header",True).load("/Users/iris/spam_data.csv") \
    .withColumnRenamed("Category", "label") \
    .withColumnRenamed("Message", "text")
data_df = data.toDF("label", "text")

In [4]:
data.groupBy('label').count().show()

+--------------------+-----+
|               label|count|
+--------------------+-----+
|ham\tHI BABE UAWA...|    1|
|                 ham| 4825|
|                spam|  747|
|           ham\tYeah|    1|
+--------------------+-----+



In [5]:
data_df.show(3)
print(data_df.count(), data_df.columns)
type(data_df)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
+-----+--------------------+
only showing top 3 rows

5574 ['label', 'text']


pyspark.sql.dataframe.DataFrame

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
# replace label spam = 0 and ham = 1
data_df = data_df.withColumn('label', regexp_replace('label', 'spam', '0'))
data_df = data_df.withColumn('label', regexp_replace('label', 'ham', '1'))

# cast those string categories as int dtype
data_df = data_df.withColumn('label', data_df.label.cast(IntegerType()))

print(data_df.printSchema())

root
 |-- label: integer (nullable = true)
 |-- text: string (nullable = true)

None


In [7]:
# # pre-processing
# post_data = Tokenizer(inputCol='text', outputCol='words').transform(data_df)

# #test_post_data = RegexTokenizer(inputCol='text', outputCol='token', pattern='\\W+')

# #countTokens = udf(lambda x: len(x), IntegerType())
# post_data.show(4)
# post_data.columns
# from pyspark.sql.functions import col 
# post_data.select('label', 'text', 'words').withColumn('token', countTokens(col('words'))).show(5)

In [8]:
data_df.show(4)


+-----+--------------------+
|label|                text|
+-----+--------------------+
|    1|Go until jurong p...|
|    1|Ok lar... Joking ...|
|    0|Free entry in 2 a...|
|    1|U dun say so earl...|
+-----+--------------------+
only showing top 4 rows



In [9]:
#data_df.filter(data_df.label.isNull()).show(5)

In [10]:
# dropNULL we need to delete empty cells
data_df = data_df.dropna()
data_df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 4825|
|    0|  747|
+-----+-----+



In [11]:
# 1. tokenize and clean data sentences using Tokenizer
tokenizer = Tokenizer(inputCol='text', outputCol='words')
tokenized_df = tokenizer.transform(data_df)

# 2. BiGram the words
bigrams = NGram(n=2, inputCol='words', outputCol='bigrams')
bigramed_df = bigrams.transform(tokenized_df)

# 3. hashTF the data
hasherTF = HashingTF(inputCol='bigrams', outputCol='hasher', numFeatures=6000)
hashed_df = hasherTF.transform(bigramed_df)

# 4. IDF
idf = IDF(inputCol='hasher', outputCol='features')
idf_df = idf.fit(hashed_df)
output_df = idf_df.transform(hashed_df)

                                                                                

In [12]:
output_df = output_df.select('features', 'label')
output_df.show(5)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|(6000,[531,745,10...|    1|
|(6000,[2226,2343,...|    1|
|(6000,[223,245,33...|    0|
|(6000,[1131,1650,...|    1|
|(6000,[556,2043,2...|    1|
+--------------------+-----+
only showing top 5 rows



In [13]:
train_1, test_1 = output_df.randomSplit([0.8, 0.2], seed=666)
train_2, test_2 = output_df.randomSplit([0.8, 0.2], seed=768)
train_3, test_3 = output_df.randomSplit([0.8, 0.2], seed=563)
train_4, test_4 = output_df.randomSplit([0.8, 0.2], seed=356)
train_5, test_5 = output_df.randomSplit([0.8, 0.2], seed=879)


In [14]:
# train test
dt_clf_1 = DecisionTreeClassifier(featuresCol='features', labelCol="label").fit(train_1)
dt_pred_1 = dt_clf_1.transform(test_1)
dt_pred_1.show(5)

                                                                                

+------------+-----+--------------+--------------------+----------+
|    features|label| rawPrediction|         probability|prediction|
+------------+-----+--------------+--------------------+----------+
|(6000,[],[])|    1|[439.0,3851.0]|[0.10233100233100...|       1.0|
|(6000,[],[])|    1|[439.0,3851.0]|[0.10233100233100...|       1.0|
|(6000,[],[])|    1|[439.0,3851.0]|[0.10233100233100...|       1.0|
|(6000,[],[])|    1|[439.0,3851.0]|[0.10233100233100...|       1.0|
|(6000,[],[])|    1|[439.0,3851.0]|[0.10233100233100...|       1.0|
+------------+-----+--------------+--------------------+----------+
only showing top 5 rows



In [15]:
check_accuracy = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy') \
                .evaluate(dt_pred_1)
check_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedPrecision')   \
                .evaluate(dt_pred_1)
check_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall') \
                .evaluate(dt_pred_1)
check_f = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')  \
                .evaluate(dt_pred_1)

In [16]:
print('accuracy: %0.4f' % check_accuracy)
print('precision: %0.4f' % check_precision)
print('recall: %0.4f' % check_recall)
print('F1 score: %0.4f' % check_f)

accuracy: 0.8863
precision: 0.8780
recall: 0.8863
F1 score: 0.8571


In [17]:
dt_clf_1.featureImportances

SparseVector(6000, {80: 0.0071, 306: 0.0074, 366: 0.0074, 394: 0.0078, 477: 0.0077, 520: 0.0259, 536: 0.0083, 1600: 0.1145, 2289: 0.0094, 2623: 0.0224, 2852: 0.2077, 4454: 0.0157, 4622: 0.1134, 5774: 0.2344, 5984: 0.2108})

In [18]:
# # from pyspark.ml.classification import RandomForestClassifier
# # rf = RandomForestClassifier(labelCol='label', featuresCol='')
# eval = BinaryClassificationEvaluator()

# training_accuracy = eval.evaluate(training_predictions)
# # print('Training set accuracy: {:.4g}.'.format(training_accuracy))
# test_accuracy = eval.evaluate(test_predictions)
# print('Test accuracy: {:.4g}'.format(test_accuracy))

In [19]:
# create Cross-Validator and ParamGrid
# pipeline = Pipeline(stages = [tokenizer, bigrams, hasherTF, idf])
params = ParamGridBuilder().build()

# Run k-fold Cross-Validator 
cv = CrossValidator(estimator=DecisionTreeClassifier(),\
     estimatorParamMaps=params,\
     evaluator=MulticlassClassificationEvaluator(), \
     seed=645,numFolds=5)
cv_model_1 = cv.fit(train_1) 
cv_model_2 = cv.fit(train_2) 
cv_model_3 = cv.fit(train_3) 
cv_model_4 = cv.fit(train_4) 
cv_model_5 = cv.fit(train_5) 

print('CV_1: %0.4f' %cv_model_1.avgMetrics[0])
print('CV_2: %0.4f' %cv_model_2.avgMetrics[0])
print('CV_3: %0.4f' %cv_model_3.avgMetrics[0])
print('CV_4: %0.4f' %cv_model_4.avgMetrics[0])
print('CV_5: %0.4f' %cv_model_5.avgMetrics[0])

                                                                                

CV_1: 0.8648
CV_2: 0.8710
CV_3: 0.8621
CV_4: 0.8683
CV_5: 0.8658


In [24]:
# prediction model
cv_prediction_1 = cv_model_1.transform(test_1)
cv_prediction_1 = cv_prediction_1.select('label', 'prediction')

cv_prediction_2 = cv_model_2.transform(test_2)
cv_prediction_2 = cv_prediction_2.select('label', 'prediction')

cv_prediction_3 = cv_model_3.transform(test_3)
cv_prediction_3 = cv_prediction_3.select('label', 'prediction')

cv_prediction_4 = cv_model_3.transform(test_4)
cv_prediction_4 = cv_prediction_4.select('label', 'prediction')

cv_prediction_5 = cv_model_4.transform(test_5)
cv_prediction_5 = cv_prediction_5.select('label', 'prediction')


In [25]:
cv_prediction_1.groupBy('label','prediction').count().show()
pred_1_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedPrecision')
pred_1_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall')
precision_1 = pred_1_precision.evaluate(cv_prediction_1)
recall_1 = pred_1_recall.evaluate(cv_prediction_1)

cv_prediction_2.groupBy('label','prediction').count().show()
pred_2_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedPrecision')
pred_2_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall')
precision_2 = pred_2_precision.evaluate(cv_prediction_2)
recall_2 = pred_2_recall.evaluate(cv_prediction_2)

pred_3_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedPrecision')
pred_3_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall')
precision_3 = pred_3_precision.evaluate(cv_prediction_3)
recall_3 = pred_3_recall.evaluate(cv_prediction_3)

pred_4_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedPrecision')
pred_4_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall')
precision_4 = pred_2_precision.evaluate(cv_prediction_4)
recall_4 = pred_4_recall.evaluate(cv_prediction_4)

pred_5_precision = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedPrecision')
pred_5_recall = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='weightedRecall')
precision_5 = pred_5_precision.evaluate(cv_prediction_5)
recall_5 = pred_5_recall.evaluate(cv_prediction_5)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|    8|
|    0|       0.0|   33|
|    1|       1.0|  934|
|    0|       1.0|  116|
+-----+----------+-----+

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|    5|
|    0|       0.0|   31|
|    1|       1.0|  982|
|    0|       1.0|  144|
+-----+----------+-----+



In [26]:
print('Precision_1: %0.4f' %precision_1)
print('Recall_1: %0.4f \n' %recall_1)

print('Precision_2: %0.4f' %precision_2)
print('Recall_1: %0.4f\n' %recall_2)

print('Precision_3: %0.4f' %precision_3)
print('Recall_1: %0.4f\n' %recall_3)

print('Precision_4: %0.4f' %precision_4)
print('Recall_1: %0.4f\n' %recall_4)

print('Precision_5: %0.4f' %precision_5)
print('Recall_1: %0.4f\n' %recall_5)

Precision_1: 0.8780
Recall_1: 0.8863 

Precision_2: 0.8705
Recall_1: 0.8718

Precision_3: 0.8999
Recall_1: 0.9026

Precision_4: 0.8954
Recall_1: 0.9031

Precision_5: 0.8977
Recall_1: 0.9015



In [27]:
# mean results for precision and recall for each split data
from statistics import mean
nums_precision = [precision_1, precision_2, precision_3, precision_4, precision_5]
avgP = mean(nums_precision)
print('The average for Precision is: %0.4f' % avgP)

nums_recall = [recall_1, recall_2, recall_3, recall_4, recall_5]
avgR = mean(nums_recall)
print('The average for Recall is: %0.4f' %avgR)

The average for Precision is: 0.8883
The average for Recall is: 0.8931
