In [0]:
file_location=r'/FileStore/tables/spam.csv'

In [0]:
# !pip install nltk
# import nltk
# nltk.download('stopwords')
# nltk.download('wordnet')

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, DecisionTreeClassifier
from pyspark.sql.functions import col, udf
from pyspark.sql.types import  IntegerType,StringType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StopWordsRemover
from nltk.corpus import stopwords
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from nltk.stem import WordNetLemmatizer

In [0]:
wnl = WordNetLemmatizer()

In [0]:
spark = SparkSession.builder.appName(
	'Text_Classification').getOrCreate()


data = spark.read.csv(file_location, sep=',',
						inferSchema=True, header=True)

In [0]:
type(data)

pyspark.sql.dataframe.DataFrame

In [0]:
def spam_ham(st):
    if st=='ham':
        return 1
    else:
        return 0

In [0]:
spam_hamUDF = udf(lambda x:spam_ham(x),IntegerType()) 

In [0]:
data=data.withColumnRenamed("v1","label").withColumnRenamed("v2","Content")

In [0]:
data=data.withColumn("label", spam_hamUDF(col("label")))

In [0]:
data=data.select(['label','Content']) 
data=data.na.drop()

In [0]:
data.groupBy('label').count().show() 

+-----+-----+
|label|count|
+-----+-----+
|    1| 4825|
|    0|  748|
+-----+-----+



In [0]:
data.show(5)

+-----+--------------------+
|label|             Content|
+-----+--------------------+
|    1|Go until jurong p...|
|    1|Ok lar... Joking ...|
|    0|Free entry in 2 a...|
|    1|U dun say so earl...|
|    1|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



In [0]:
def text_processing(st):
    st=st.lower()
    st=st.split(' ')
    st=' '.join([wnl.lemmatize(w) for w in st if (w!='' and w.isalpha())])
    return st

In [0]:
text_processingUDF = udf(lambda x:text_processing(x),StringType()) 

In [0]:
data=data.withColumn("Content", text_processingUDF(col("Content")))

In [0]:
data.show(3)

+-----+--------------------+
|label|             Content|
+-----+--------------------+
|    1|go until jurong a...|
|    1|     ok joking wif u|
|    0|free entry in a w...|
+-----+--------------------+
only showing top 3 rows



In [0]:
(trainingData, testData) = data.randomSplit([0.75, 0.25], seed = 623)

In [0]:
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 4178
Test Dataset Count: 1395


In [0]:
trainingData.groupBy('label').count().show() 

+-----+-----+
|label|count|
+-----+-----+
|    1| 3610|
|    0|  568|
+-----+-----+



In [0]:
testData.groupBy('label').count().show() 

+-----+-----+
|label|count|
+-----+-----+
|    1| 1215|
|    0|  180|
+-----+-----+



In [0]:
tokenizer = Tokenizer(inputCol="Content", outputCol="words")
wordsData = tokenizer.transform(trainingData)

add_stopwords = stopwords.words('english')

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords=add_stopwords)
stopwordsRemoverData = stopwordsRemover.transform(wordsData)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=100)
featurizedData = hashingTF.transform(stopwordsRemoverData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features",minDocFreq=3)
idfModel = idf.fit(featurizedData)
trainingData = idfModel.transform(featurizedData)


In [0]:
wordsData = tokenizer.transform(testData)
stopwordsRemoverData = stopwordsRemover.transform(wordsData)
featurizedData = hashingTF.transform(stopwordsRemoverData)
testData = idfModel.transform(featurizedData)


In [0]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0,0.1, 0.01,0.02]) \
    .addGrid(lr.elasticNetParam, [0, 0.1, 0.01,0.02]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",metricName="recallByLabel"),
                          numFolds=5) 
cvModel = crossval.fit(trainingData)
prediction = cvModel.transform(testData)

evaluator_accuracy = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="accuracy")
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="recallByLabel")
evaluator_precision = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="precisionByLabel")

print('+++++++++++++ LogisticRegression +++++++++++++')
print('accuracy : ',evaluator_accuracy.evaluate(prediction)*100,' recall : ',evaluator_recall.evaluate(prediction)*100,' precision : ',evaluator_precision.evaluate(prediction)*100)

print(cvModel.bestModel.extractParamMap())

+++++++++++++ LogisticRegression +++++++++++++
accuracy :  93.26164874551971  recall :  65.0  precision :  79.05405405405406
{Param(parent='LogisticRegression_5e80cc68976e', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_5e80cc68976e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_5e80cc68976e', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_5e80cc68976e', name='featuresCol', doc='features column name.'): 'features', Param(parent='LogisticRegression_5e80cc68976e', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_5e80cc68976e', name='labelCol', doc='label column nam

In [0]:
rf = RandomForestClassifier(featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5,10,15]) \
    .addGrid(rf.maxBins, [32,42,52]) \
    .addGrid(rf.minInstancesPerNode, [1,2,3,4,5]) \
    .build()

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",metricName="recallByLabel"),
                          numFolds=5) 
cvModel = crossval.fit(trainingData)
prediction = cvModel.transform(testData)

evaluator_accuracy = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="accuracy")
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="recallByLabel")
evaluator_precision = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="precisionByLabel")

print('+++++++++++++ RandomForestClassifier +++++++++++++')
print('accuracy : ',evaluator_accuracy.evaluate(prediction)*100,' recall : ',evaluator_recall.evaluate(prediction)*100,' precision : ',evaluator_precision.evaluate(prediction)*100)

print(cvModel.bestModel.extractParamMap())

+++++++++++++ RandomForestClassifier +++++++++++++
accuracy :  94.91039426523298  recall :  65.0  precision :  93.60000000000001
{Param(parent='RandomForestClassifier_31ec6d06d350', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_31ec6d06d350', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_31ec6d06d350', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClas

In [0]:
svc = LinearSVC(featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0,0.1, 0.01,0.02]) \
    .build()

crossval = CrossValidator(estimator=svc,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(predictionCol="prediction",metricName="recallByLabel"),
                          numFolds=5) 
cvModel = crossval.fit(trainingData)
prediction = cvModel.transform(testData)

evaluator_accuracy = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="accuracy")
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="recallByLabel")
evaluator_precision = MulticlassClassificationEvaluator(predictionCol="prediction",metricName="precisionByLabel")

print('+++++++++++++ LinearSVC +++++++++++++')
print('accuracy : ',evaluator_accuracy.evaluate(prediction)*100,' recall : ',evaluator_recall.evaluate(prediction)*100,' precision : ',evaluator_precision.evaluate(prediction)*100)

print(cvModel.bestModel.extractParamMap())

+++++++++++++ LinearSVC +++++++++++++
accuracy :  93.11827956989247  recall :  66.11111111111111  precision :  77.27272727272727
{Param(parent='LinearSVC_95255ffeece3', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearSVC_95255ffeece3', name='featuresCol', doc='features column name.'): 'features', Param(parent='LinearSVC_95255ffeece3', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LinearSVC_95255ffeece3', name='labelCol', doc='label column name.'): 'label', Param(parent='LinearSVC_95255ffeece3', name='maxBlockSizeInMB', doc='maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.'): 0.0, Param(parent='LinearSVC_95255ffeece3', name='maxIter', doc='max number of iterations (>= 0).'): 100, Par