The following code does Multi-class text classification using pyspark.
Article text is preprocessed by removing non-alphanumeric characters. 
Tokenizing, Stopwords removal and Count vector operations are done with a pipeline.
The dataset is split into training and test sets to train the model and perform predictions.

Predictions are made the following ways
1. Logistic Regression using Count Vector Features
2. Logistic Regression using TF-IDF Features
3. Cross-Validation
4. Naive Bayes
5. Random Forest

Sample output: Accuracy
1. 0.73
2. 0.76
3. 0.76
4. 0.80
5. 0.78

In [3]:
import os, csv, re

def preProcessData():
    with open('ArticleData.csv', 'w+') as csvfile:
        fieldnames = ['text', 'category']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()

        keyWords = ['blockchain','business','sports','politics']

        for keyWord in keyWords:

            for i in range(0, 50):
                total_path = "/Users/sajidkhan/Desktop/DIC/Lab3/Input/"+keyWord
                text_file = open(os.path.join(total_path, keyWord+str(i)+".txt"), "r")
                for line in text_file:
                    line2 = re.sub('[^A-Za-z0-9\s]+', '', line)
                    writer.writerow({'text': line2, 'category': keyWord})  

                text_file.close()

In [7]:
# Model Training and Evaluation
# Logistic Regression using Count Vector Features

from pyspark.sql import SQLContext
from pyspark import SparkContext
import pyspark
import pandas as pd

sc = pyspark.SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
#preProcessData() run only once..
dataFile = "/Users/sajidkhan/Desktop/DIC/Lab3/DataAnalysisCode/ArticleData.csv"
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(dataFile)

#data.show(5)

#data.printSchema()

from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["a", "an", "as", "able", "about", "above", "according", "accordingly", "across", "actually", "after", "afterwards", "again", "against", "aint", "all", "allow", "allows", "almost", "alone", "along", "already", "also", "although", "always", "am", "among", "amongst", "an", "and", "another", "any", "anybody", "anyhow", "anyone", "anything", "anyway", "anyways", "anywhere", "apart", "appear", "appreciate", "appropriate", "are", "arent", "around", "as", "aside", "ask", "asking", "associated", "at", "available", "away", "awfully", "be", "became", "because", "become", "becomes", "becoming", "been", "before", "beforehand", "behind", "being", "believe", "below", "beside", "besides", "best", "better", "between", "beyond", "both", "brief", "but", "by", "cmon", "cs", "came", "can", "cant", "cannot", "cant", "cause", "causes", "certain", "certainly", "changes", "clearly", "co", "com", "come", "comes", "concerning", "consequently", "consider", "considering", "contain", "containing", "contains", "corresponding", "could", "couldnt", "course", "currently", "definitely", "described", "despite", "did", "didnt", "different", "do", "does", "doesnt", "doing", "dont", "done", "down", "downwards", "during", "each", "edu", "eg", "eight", "either", "else", "elsewhere", "enough", "entirely", "especially", "et", "etc", "even", "ever", "every", "everybody", "everyone", "everything", "everywhere", "ex", "exactly", "example", "except", "far", "few", "ff", "fifth", "first", "five", "followed", "following", "follows", "for", "former", "formerly", "forth", "four", "from", "further", "furthermore", "get", "gets", "getting", "given", "gives", "go", "goes", "going", "gone", "got", "gotten", "greetings", "had", "hadnt", "happens", "hardly", "has", "hasnt", "have", "havent", "having", "he", "hes", "hello", "help", "hence", "her", "here", "heres", "hereafter", "hereby", "herein", "hereupon", "hers", "herself", "hi", "him", "himself", "his", "hither", "hopefully", "how", "howbeit", "however", "i", "id", "ill", "im", "ive", "ie", "if", "ignored", "immediate", "in", "inasmuch", "inc", "indeed", "indicate", "indicated", "indicates", "inner", "insofar", "instead", "into", "inward", "is", "isnt", "it", "itd", "itll", "its", "its", "itself", "just", "keep", "keeps", "kept", "know", "knows", "known", "last", "lately", "later", "latter", "latterly", "least", "less", "lest", "let", "lets", "like", "liked", "likely", "little", "look", "looking", "looks", "ltd", "mainly", "many", "may", "maybe", "me", "mean", "meanwhile", "merely", "might", "more", "moreover", "most", "mostly", "much", "must", "my", "myself", "name", "namely", "nd", "near", "nearly", "necessary", "need", "needs", "neither", "never", "nevertheless", "new", "next", "nine", "no", "nobody", "non", "none", "noone", "nor", "normally", "not", "nothing", "novel", "now", "nowhere", "obviously", "of", "off", "often", "oh", "ok", "okay", "old", "on", "once", "one", "ones", "only", "onto", "or", "other", "others", "otherwise", "ought", "our", "ours", "ourselves", "out", "outside", "over", "overall", "own", "particular", "particularly", "per", "perhaps", "placed", "please", "plus", "possible", "presumably", "probably", "provides", "que", "quite", "qv", "rather", "rd", "re", "really", "reasonably", "regarding", "regardless", "regards", "relatively", "respectively", "right", "said", "same", "saw", "say", "saying", "says", "second", "secondly", "see", "seeing", "seem", "seemed", "seeming", "seems", "seen", "self", "selves", "sensible", "sent", "serious", "seriously", "seven", "several", "shall", "she", "should", "shouldnt", "since", "six", "so", "some", "somebody", "somehow", "someone", "something", "sometime", "sometimes", "somewhat", "somewhere", "soon", "sorry", "specified", "specify", "specifying", "still", "sub", "such", "sup", "sure", "ts", "take", "taken", "tell", "tends", "th", "than", "thank", "thanks", "thanx", "that", "thats", "thats", "the", "their", "theirs", "them", "themselves", "then", "thence", "there", "theres", "thereafter", "thereby", "therefore", "therein", "theres", "thereupon", "these", "they", "theyd", "theyll", "theyre", "theyve", "think", "third", "this", "thorough", "thoroughly", "those", "though", "three", "through", "throughout", "thru", "thus", "to", "together", "too", "took", "toward", "towards", "tried", "tries", "truly", "try", "trying", "twice", "two", "un", "under", "unfortunately", "unless", "unlikely", "until", "unto", "up", "upon", "us", "use", "used", "useful", "uses", "using", "usually", "value", "various", "very", "via", "viz", "vs", "want", "wants", "was", "wasnt", "way", "we", "wed", "well", "were", "weve", "welcome", "well", "went", "were", "werent", "what", "whats", "whatever", "when", "whence", "whenever", "where", "wheres", "whereafter", "whereas", "whereby", "wherein", "whereupon", "wherever", "whether", "which", "while", "whither", "who", "whos", "whoever", "whole", "whom", "whose", "why", "will", "willing", "wish", "with", "within", "without", "wont", "wonder", "would", "would", "wouldnt", "yes", "yet", "you", "youd", "youll", "youre", "youve", "your", "yours", "yourself", "yourselves", "zero"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
#dataset.show(200)

# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0).select("text","category","probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+----------+-----+
|  category|count|
+----------+-----+
|blockchain|   50|
|  politics|   50|
|    sports|   50|
|  business|   50|
+----------+-----+

Training Dataset Count: 161
Test Dataset Count: 39
+------------------------------+----------+------------------------------+-----+----------+
|                          text|  category|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|AdvertisementSupported byBE...|blockchain|[0.9794388072609735,0.00956...|  0.0|       0.0|
|AdvertisementSupported byJA...|blockchain|[0.9717340764936586,0.01016...|  0.0|       0.0|
|AdvertisementSupported byBr...|blockchain|[0.6992683853363467,0.14656...|  0.0|       0.0|
|AdvertisementSupported byLe...|blockchain|[0.6864717614051721,0.07115...|  0.0|       0.0|
|AdvertisementSupported byPA...|blockchain|[0.5258271593659443,0.20536...|  0.0|       0.0|
|AdvertisementSupported byWe...|    sports|[0.47502211871510

0.7315664727429434

In [8]:
# Logistic Regression using TF-IDF Features

from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0).select("text","category","probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)


+------------------------------+----------+------------------------------+-----+----------+
|                          text|  category|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|AdvertisementSupported byBE...|blockchain|[0.9774984551184397,0.01178...|  0.0|       0.0|
|AdvertisementSupported byJA...|blockchain|[0.9724146117374186,0.01193...|  0.0|       0.0|
|AdvertisementSupported byBr...|blockchain|[0.6603851970428619,0.16771...|  0.0|       0.0|
|AdvertisementSupported byLe...|blockchain|[0.5224822642505726,0.08110...|  0.0|       0.0|
|AdvertisementSupported byPA...|blockchain|[0.5016705322088901,0.14088...|  0.0|       0.0|
|AdvertisementSupported byOp...|blockchain|[0.47397423413169515,0.0783...|  0.0|       0.0|
|AdvertisementSupported byPA...|  business|[0.43361068525024304,0.2497...|  1.0|       0.0|
|AdvertisementSupported byWe...|    sports|[0.4001291260953632,0.15227...|  3.0|

0.7641025641025642

In [9]:
# Cross-Validation

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)


0.7573001508295625

In [10]:
# Naive Bayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0).select("text","category","probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+------------------------------+----------+------------------------------+-----+----------+
|                          text|  category|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|AdvertisementSupported byBr...|blockchain|[1.0,3.891326381433641E-25,...|  0.0|       0.0|
|AdvertisementSupported byPA...|blockchain|[1.0,7.034308272942E-49,1.8...|  0.0|       0.0|
|AdvertisementSupported byJA...|blockchain|[1.0,5.937137900977191E-83,...|  0.0|       0.0|
|AdvertisementSupported byBE...|blockchain|[1.0,3.021747213129572E-127...|  0.0|       0.0|
|AdvertisementSupported byLe...|blockchain|[0.9999791333172087,6.54399...|  0.0|       0.0|
|AdvertisementSupported byTh...|blockchain|[0.9996263473745822,6.98917...|  0.0|       0.0|
|AdvertisementSupported byco...|blockchain|[0.9666104240012462,2.68553...|  0.0|       0.0|
+------------------------------+----------+------------------------------+-----+

0.7956083018002522

In [11]:
# Random Forest

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees = 100, maxDepth = 4, maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0).select("text","category","probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+------------------------------+----------+------------------------------+-----+----------+
|                          text|  category|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|AdvertisementSupported byBE...|blockchain|[0.5384341125599904,0.18089...|  0.0|       0.0|
|AdvertisementSupported byJA...|blockchain|[0.48177160890257,0.1976802...|  0.0|       0.0|
|AdvertisementSupported byPA...|blockchain|[0.4800704086492222,0.23724...|  0.0|       0.0|
|AdvertisementSupported byBr...|blockchain|[0.36741608460541303,0.3074...|  0.0|       0.0|
|AdvertisementSupported byco...|blockchain|[0.3308526107187717,0.23151...|  0.0|       0.0|
|AdvertisementSupported byLe...|blockchain|[0.3272008595451493,0.22196...|  0.0|       0.0|
|AdvertisementSupported byOp...|blockchain|[0.3128171015215361,0.22418...|  0.0|       0.0|
|AdvertisementSupported byTh...|blockchain|[0.3088612965950106,0.20622...|  0.0|

0.7782916740010563