In [1]:
import os
import sys
import pyspark
import findspark as fs
import pandas as pd
from bs4 import BeautifulSoup as bs

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

Creating Spark session 

In [3]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [4]:
def cleanBody(df):
    for j in range(df.shape[0]):
        soup=bs(df._Body.iloc[j])
        s=""
        for i in range(len(soup.findAll(lambda tag: tag.name == 'p'))):
            s+=(soup.find_all('p')[i].get_text())+' '
        s=s[:-1] 
        df.loc[df.index[j], 'new_body'] = s
    return df

In [5]:
df = pd.read_csv("seed.csv")
df = cleanBody(df)

In [6]:
df.columns

Index(['_Id', '_PostTypeId', '_CreationDate', '_Score', '_ViewCount', '_Body',
       '_OwnerUserId', '_LastActivityDate', '_Title', '_Tags', '_AnswerCount',
       '_CommentCount', '_FavoriteCount', '_LastEditorUserId',
       '_AcceptedAnswerId', '_LastEditDate', '_ParentId', '_Category',
       'new_body'],
      dtype='object')

In [7]:
spark_df = spark.createDataFrame(df[['new_body','_Category']])

Splitting to training and validation data

In [8]:
training, validation = spark_df.randomSplit([0.7, 0.3], seed=7)
training.cache()

DataFrame[new_body: string, _Category: string]

Pipeline Creation 

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer , IDF , StringIndexer ,StopWordsRemover
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Configure an ML pipeline
indexer = StringIndexer(inputCol="_Category", outputCol="label")
tokenizer = Tokenizer(inputCol="new_body", outputCol="words")
stopwordsRemover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")
hashingTF = HashingTF(inputCol=stopwordsRemover.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100,  maxDepth=20)
pipeline = Pipeline(stages=[indexer, tokenizer,stopwordsRemover, hashingTF,idf, rf])


paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 500]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=4) 

# Run cross-validation, and choose the best set of parameters.
cvModelRF = crossval.fit(training)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
predictions = cvModelRF.transform(validation)
predictions.show(5)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Validation set accuracy = " + str(accuracy))

+--------------------+-----------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|            new_body|  _Category|label|               words|            filtered|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|+1, it's a good idea|        avp|  7.0|[+1,, it's, a, go...|   [+1,, good, idea]|(500,[168,391,458...|(500,[168,391,458...|[10.0051587301587...|[0.10005158730158...|       7.0|
|As discussed in t...|arabic.meta|  6.0|[as, discussed, i...|[discussed, propo...|(500,[0,4,5,6,12,...|(500,[0,4,5,6,12,...|[27.3333333333333...|[0.27333333333333...|       0.0|
|As far as the que...|  agur.meta|  4.0|[as, far, as, the...|[far, question, r...|(500,[41,94,109,1...|(500,[4

In [None]:
labelConverter = IndexToString(inputCol="label", outputCol="predictedLabel")
opdata = labelConverter.transform(predictions)
op =(opdata.select('predictedLabel').toPandas())
op.rename(columns={'predictedLabel':'rf_prediction'}, inplace=True)

In [12]:
# creates a mapping between the labels and the categories using the StringIndexer and stores it in a dictionary dct

category = predictions.select('_Category')
labels = predictions.select('label')
number = labels.toPandas()
cat = category.toPandas()
dct = {}
df3 =pd.concat([number,cat], axis=1)
for index, r in df3.iterrows():
    key = r['label'] 
    val = r['_Category'] 
    if key not in dct:
        dct[key] = val 

In [13]:
dct

{7.0: 'avp',
 6.0: 'arabic.meta',
 4.0: 'agur.meta',
 9.0: 'beer',
 2.0: 'bricks',
 8.0: 'arabic',
 1.0: 'ai',
 5.0: 'agur',
 3.0: 'bioinformatics',
 0.0: '3dprinting'}

Saving the Model 

In [206]:
# To save model : 

# cvModelRF.save("classification_model")

# can be loaded easily later. 

Working with Test Data

In [14]:
# working on the test data. 

test = pd.read_csv("input_data.csv")

In [15]:
test.head(2)

Unnamed: 0,_Id,_PostTypeId,_CreationDate,_Score,_ViewCount,_Body,_OwnerUserId,_LastActivityDate,_Title,_Tags,_AnswerCount,_CommentCount,_FavoriteCount,_LastEditorUserId,_AcceptedAnswerId,_LastEditDate,_ParentId,Unnamed: 17,Unnamed: 18,Unnamed: 19
0,1,1,2011-10-25T19:48:36.693,6,157.0,"<p>Are questions related to <a href=""http://ww...",6.0,2011-10-26T05:35:32.733,Are questions about non-LEGO brick toys on-topic?,<discussion><on-topic>,3.0,0,,8.0,,2011-10-26T05:35:32.733,,,,
1,2,1,2011-10-25T19:55:17.860,3,53.0,<p>What is a good tag for purchasing/acquiring...,13.0,2011-10-26T05:32:58.667,Nomenclature of [Purchasing] tag,<discussion><tagging>,1.0,1,,8.0,16.0,2011-10-26T05:32:58.667,,,,


In [16]:
test=test.fillna("")
test=test[['_Body']]
test = cleanBody(test)

In [17]:
testDf = spark.createDataFrame(test)

In [106]:
testDf.show(5)

+--------------------+--------------------+
|               _Body|            new_body|
+--------------------+--------------------+
|<p>Are questions ...|Are questions rel...|
|<p>What is a good...|What is a good ta...|
|<p>I've asked one...|I've asked one, s...|
|<p>Lego Mindstorm...|Lego Mindstorms a...|
|<p>I suspect that...|I suspect that Mi...|
+--------------------+--------------------+
only showing top 5 rows



load the saved model created before and predict for the test data

In [18]:

predictions_test = cvModelRF.transform(testDf)
predictions_test.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|               _Body|            new_body|               words|            filtered|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|<p>Are questions ...|Are questions rel...|[are, questions, ...|[questions, relat...|(500,[22,114,182,...|(500,[22,114,182,...|[4.80119047619047...|[0.04801190476190...|       2.0|
|<p>What is a good...|What is a good ta...|[what, is, a, goo...|[good, tag, purch...|(500,[39,76,85,88...|(500,[39,76,85,88...|[5.40277777777777...|[0.05402777777777...|       2.0|
|<p>I've asked one...|I've asked one, s...|[i've, asked, one...|[asked, one,, fin...|(500,[7,10

In [19]:
body_test = predictions_test.select('_Body')
pred_test = predictions_test.select('prediction')
df5 =pd.concat([body_test.toPandas() ,pred_test.toPandas()], axis=1)
df5.head(5)

Unnamed: 0,_Body,prediction
0,"<p>Are questions related to <a href=""http://ww...",2.0
1,<p>What is a good tag for purchasing/acquiring...,2.0
2,"<p>I've asked one, so <a href=""https://bricks....",2.0
3,<p>Lego Mindstorms allows one to write embedde...,2.0
4,<p>I suspect that Mindstorms by itself is not ...,2.0


In [20]:
def fetchval(key):
    return dct[key]

In [21]:
df5['predicted_label'] = df5.apply(lambda row: fetchval(row['prediction']), axis=1)

In [25]:
df5.tail(10)

Unnamed: 0,_Body,prediction,predicted_label
491,<p>I think that hardware recommendations are o...,0.0,3dprinting
492,<p><strong>Yes - in some cases.</strong></p>\n...,0.0,3dprinting
493,<p><strong>Yes!</strong></p>\n\n<p>Absolutely....,2.0,bricks
494,<p>I believe this question is asked on every p...,7.0,avp
495,"<p>When asking my first question, I noticed we...",0.0,3dprinting
496,<p><strong>The Hotbed.</strong></p>\n\n<p>Coll...,0.0,3dprinting
497,"<p>""print-quality"" is a better (more specific)...",4.0,agur.meta
498,<p>I would say that <strong>they are on topic<...,0.0,3dprinting
499,"<p>I would like to nominate myself, <a href=""h...",0.0,3dprinting
500,<h1>Filibusters!</h1>\n\n<p>I've heard that fr...,7.0,avp


The predictions have been saved to test_predictions.csv file

In [24]:
df5.to_csv("test_predictions.csv")