# Text Classification using PySpark

## Importing fundamental lib 

In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext

In [2]:
sc=SparkContext()

In [3]:
sqlcontext = SQLContext(sc)

## Loading data using sqlcontext

In [4]:
data = sqlcontext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').load('/home/DEADPOOL/Downloads/kauvery.csv')
data.show(5)

+--------------------+------------+
|               query|      labels|
+--------------------+------------+
|can i see gastroe...|Appointment |
|Now I am at Qatar...|Appointment |
|We need dr. g. ve...|Appointment |
|i want to book ap...|Appointment |
|book appointment ...|Appointment |
+--------------------+------------+
only showing top 5 rows



## Printing schema 

In [5]:
data.printSchema()

root
 |-- query: string (nullable = true)
 |-- labels: string (nullable = true)



## Loading lib for processing data

In [6]:
from pyspark.ml.feature import RegexTokenizer,StopWordsRemover,CountVectorizer
from pyspark.ml.classification import LogisticRegression

In [7]:
#regular expressin tokenizer
regexTokenizer = RegexTokenizer(inputCol="query", outputCol="words", pattern="\\W")

In [8]:
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

In [9]:
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [10]:
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

## Init pipeline  

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "labels", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+------------+--------------------+--------------------+--------------------+-----+
|               query|      labels|               words|            filtered|            features|label|
+--------------------+------------+--------------------+--------------------+--------------------+-----+
|can i see gastroe...|Appointment |[can, i, see, gas...|[can, i, see, gas...|(229,[1,5,10,16,3...|  7.0|
|Now I am at Qatar...|Appointment |[now, i, am, at, ...|[now, i, am, at, ...|(229,[0,1,3,9,16,...|  7.0|
|We need dr. g. ve...|Appointment |[we, need, dr, g,...|[we, need, dr, g,...|(229,[4,31,33,61,...|  7.0|
|i want to book ap...|Appointment |[i, want, to, boo...|[i, want, to, boo...|(229,[1,3,10,19,3...|  7.0|
|book appointment ...|Appointment |[book, appointmen...|[book, appointmen...|(229,[10,31,57,79...|  7.0|
+--------------------+------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



## Dividing data into training and test

In [13]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 1193
Test Dataset Count: 520


In [14]:
trainingData.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|               query|              labels|               words|            filtered|            features|label|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
| Hi, this is vija...|  Disease Inquiries |[hi, this, is, vi...|[hi, this, is, vi...|(229,[6,8,10,58,8...| 35.0|
|         I hate you |   system_agent_bad |      [i, hate, you]|      [i, hate, you]|(229,[0,1],[1.0,1...| 15.0|
|     bye good night |system_greetings_...|  [bye, good, night]|  [bye, good, night]|(229,[7,56,88],[1...| 22.0|
|                BRB |system_user_will_...|               [brb]|               [brb]|         (229,[],[])| 75.0|
|Can I make a rese...|        Appointment |[can, i, make, a,...|[can, i, make, a,...|(229,[1,10,15,16,...|  7.0|
|Can i have Dr.Rag...|        Appointment |[can, i, have, dr...|[can, i, have, dr...|(229,[1,4,1

## Logistic Regression using Count Vector Features

In [15]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("query","labels","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+---------------------+----------------------+------------------------------+-----+----------+
|                query|                labels|                   probability|label|prediction|
+---------------------+----------------------+------------------------------+-----+----------+
|that was pretty good |system_appraisal_good |[0.3275237132516771,0.03274...|  0.0|       0.0|
|  that's much better |system_appraisal_good |[0.2997635435289294,0.03933...|  0.0|       0.0|
|  that was very good |system_appraisal_good |[0.27629257922695577,0.0359...|  0.0|       0.0|
|  that's really nice |system_appraisal_good |[0.27373215400968837,0.0425...|  0.0|       0.0|
|      that's amazing |system_appraisal_good |[0.2688603734722277,0.04522...|  0.0|       0.0|
|    that's very good |system_appraisal_good |[0.26509744592293744,0.0440...|  0.0|       0.0|
|   that is wonderful |system_appraisal_good |[0.25510376779754373,0.0448...|  0.0|       0.0|
|    that was awesome |system_appraisal_good |[0.2

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.5196933448904881

## Logistic Regression using TF-IDF Features

In [17]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("query","labels","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+---------------------+----------------------+------------------------------+-----+----------+
|                query|                labels|                   probability|label|prediction|
+---------------------+----------------------+------------------------------+-----+----------+
|that was pretty good |system_appraisal_good |[0.31434005167237633,0.0329...|  0.0|       0.0|
|  that's much better |system_appraisal_good |[0.3006612596363839,0.03913...|  0.0|       0.0|
|  that's really nice |system_appraisal_good |[0.26979213170913524,0.0425...|  0.0|       0.0|
|    that's very good |system_appraisal_good |[0.2665079330845293,0.04380...|  0.0|       0.0|
|  that was very good |system_appraisal_good |[0.2655567916586436,0.03613...|  0.0|       0.0|
|   that is wonderful |system_appraisal_good |[0.2551214410323665,0.04446...|  0.0|       0.0|
|      that's amazing |system_appraisal_good |[0.24968306394704187,0.0460...|  0.0|       0.0|
|    that was awesome |system_appraisal_good |[0.2

In [18]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.512120070862926

## Cross-Validation with hyper parameter

In [19]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)#
#             .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#             .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6157652135213346

## Naive Bayes

In [20]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("query","labels","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+---------------------+----------------------+------------------------------+-----+----------+
|                query|                labels|                   probability|label|prediction|
+---------------------+----------------------+------------------------------+-----+----------+
|    that's very good |system_appraisal_good |[0.9770296971060535,0.00357...|  0.0|       0.0|
|  that's really nice |system_appraisal_good |[0.960007974205793,0.004547...|  0.0|       0.0|
|  that was very good |system_appraisal_good |[0.955311984259193,0.001911...|  0.0|       0.0|
|      it's very good |system_appraisal_good |[0.9348725920923162,0.00488...|  0.0|       0.0|
|      it's very good |system_appraisal_good |[0.9348725920923162,0.00488...|  0.0|       0.0|
|that was pretty good |system_appraisal_good |[0.915807342919953,0.004276...|  0.0|       0.0|
|  that's a good idea |system_appraisal_good |[0.8930461816264039,0.01142...|  0.0|       0.0|
|      that's amazing |system_appraisal_good |[0.8

In [21]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.4003601818816952

## Random Forest

In [22]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("query","labels","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+-----------------------+----------------------+------------------------------+-----+----------+
|                  query|                labels|                   probability|label|prediction|
+-----------------------+----------------------+------------------------------+-----+----------+
|    that's a good idea |system_appraisal_good |[0.09237279657549714,0.0622...|  0.0|       0.0|
|      that's very good |system_appraisal_good |[0.09237279657549714,0.0622...|  0.0|       0.0|
|    oh that's not good | system_appraisal_bad |[0.09154682958387059,0.0622...|  6.0|       0.0|
|       that's not good | system_appraisal_bad |[0.09154682958387059,0.0622...|  6.0|       0.0|
|that's not good enough | system_appraisal_bad |[0.09154682958387059,0.0622...|  6.0|       0.0|
|        that's awesome |system_appraisal_good |[0.09029608846671203,0.0625...|  0.0|       0.0|
|           that's lame | system_appraisal_bad |[0.08983543417847151,0.0630...|  6.0|       0.0|
|        that's amazing |syste

In [23]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.32805372269693417