In [None]:
import nltk
import pyspark
import pandas as pd
from bs4 import BeautifulSoup
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark import keyword_only
from nltk.corpus import stopwords
from pyspark.sql import SQLContext
from pyspark.ml import Transformer
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.classification import LogisticRegression, OneVsRest, RandomForestClassifier
from pyspark.ml.feature import IDF, StringIndexer, StopWordsRemover, CountVectorizer, RegexTokenizer, IndexToString


In [None]:
sparkschema = StructType([StructField('post', StringType(), True), StructField('tags', StringType(), True)])
dataframe = pd.read_csv('postquestions.csv')
conf = SparkConf().setAppName("test").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc) 
sparkdataframe = sqlContext.createDataFrame(dataframe, sparkschema)
sparkdataframe = sparkdataframe.filter(sparkdataframe.tags.isNotNull()) 

In [None]:
(train, test) = sparkdataframe.randomSplit((0.80, 0.20), seed = 100)

In [None]:
class HTMLTAGREMOVER(T, ICol, OCol):
  @keyword_only
  def __init__(self, iCol=None, oCol=None): 
    super(HTMLTAGREMOVER, self).__init__()
    kwargs = self._input_kwargs 
    self.setParams(**kwargs)
  @keyword_only
  def setParams(self, iCol=None, oCol=None):
    kwargs = self._input_kwargs 
    return self._set(**kwargs)
  def _transform(self, dataset):
    def cleaingfunction(s):
      cleantext = BeautifulSoup(s).text 
      return cleantext
    t = StringType()
    icol = dataset[self.getInputCol()]
    ocol = self.getOutputCol()
    return dataset.withColumn(ocol, udf(cleaingfunction, t)(icol))
nltk.download('w')

In [None]:
sw = list(set(w.words('english')))
lI = StringIndexer(inputCol="tags", outputCol="label").fit(train) 
html_tag_remover = HTMLTAGREMOVER(inputCol="post", outputCol="untagged_post") 
RT = RegexTokenizer(inputCol=html_tag_remover.getOutputCol(), outputCol="words", pattern="[^0-9a-z#+_]+")
SR = StopWordsRemover(inputCol=RegexTokenizer.getOutputCol(), outputCol="filtered_words").setStopWords(sw)
CV = CountVectorizer(inputCol=SR.getOutputCol(), outputCol="countFeatures", minDF=5)
idf = IDF(inputCol=CV.getOutputCol(), outputCol="features")
model = RandomForestClassifier(labelCol="label", featuresCol=idf.getOutputCol(), numTrees=80, maxDepth=7)
i = IndexToString(inputCol="prediction", outputCol="predictedValue")
i.setLabels(lI.labels)


In [None]:
datapipe = Pipeline(stages=[ lI,html_tag_remover, RT, SR, CV, idf,model, i])

In [None]:
randomforesttmodel = datapipe.fit(train)

In [None]:
prediction = randomforestmodel.transform(test)

In [None]:
topd = prediction.toPandas() 
print("the Predictions are: ",topd)


In [None]:
MulticlassClassificationEvaluator=0

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1") 
evaluator.evaluate(prediction)