In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import Row
from pyspark.sql.functions import col,pandas_udf, PandasUDFType,count
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import nltk

#Install Tagged file from databricks table
multiban = spark.table("allban")
multiban.cache()

In [2]:
multiban=multiban.na.drop()
display(multiban.groupby('label').count())

label,count
1,6367
0,29281


In [3]:
from pyspark.sql.types import IntegerType
multiban = multiban.withColumn("label", multiban["label"].cast(IntegerType()))

In [4]:
train_df, test_df = multiban.randomSplit([0.65, 0.35], seed = 2018)

print("Training Dataset Count: " + str(train_df.count()))
print("Test Dataset Count: " + str(test_df.count()))

In [5]:
train_df.cache()
test_df.cache()

In [6]:
from pyspark.ml.feature import HashingTF, IDF, RegexTokenizer,StopWordsRemover,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,NaiveBayes,LinearSVC,GBTClassifier
from mleap.sklearn.preprocessing.data import FeatureExtractor, LabelEncoder, ReshapeArrayToN1
from pyspark.ml.evaluation import RegressionEvaluator,MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *

In [7]:
print(StopWordsRemover.loadDefaultStopWords("turkish"))

In [8]:
stopWordstr = ['❤️', '◻', '😑', '🙏🏻', '🌸', '🙌🏻',  '😇',  '😊', '😢', '⭐️', '🌼', '🙈', '🍀', '💗', '💕', '😊', '🌺', '😅', '💵', '🙈', '🙏🙏🙏', '🙏🙏', '✔️', '😌🙏🏻', '😍', '👍', '👍👍', '👍👍👍', ':)', '👏🏼', '👏🏼👏🏼', '👏🏼👏🏼👏🏼', '🤔', '☺️', '😑😑', '😑', ':(', '😊', '👌👌', '👌', '💃🏻', '✌🏻', ':))', ':)))', '🎃','😉','😄','🤗']

regexTokenizer = RegexTokenizer(inputCol="Text", outputCol="words", pattern=' |,|;|-|_|\*|\t|\!|\.|\*|\:|\(|\|\"|\&|\$|\|\#|\}|\]|\[|\)|\{|\/|\'|<|>',toLowercase=True)

remover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords =stopWordstr)

hashtf = HashingTF(inputCol="filtered", outputCol='tf')

idf = IDF(inputCol='tf', outputCol="tffeatures")

va = VectorAssembler(inputCols=["tf", "tffeatures"], outputCol="features") 

lr = LogisticRegression()
nv= NaiveBayes()
lsvc=LinearSVC()
gbt = GBTClassifier()

pipelinelr = Pipeline(stages=[regexTokenizer,remover,hashtf, idf,va,lr])
pipelinenv = Pipeline(stages=[regexTokenizer,remover,hashtf, idf, nv])
pipelinelsvc = Pipeline(stages=[regexTokenizer,remover,hashtf, idf, lsvc])
pipelinegbt = Pipeline(stages=[regexTokenizer,remover,hashtf, idf, gbt])


In [9]:

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0001,0.006,0.003,0.01,0.03])
             .addGrid(idf.minDocFreq,[2,3,4])
             .addGrid(hashtf.numFeatures, [2**3,2**18])
             .addGrid(hashtf.binary, [True,False])
             .addGrid(lr.fitIntercept, [True,False])
             .addGrid(lr.standardization, [True,False])
             .addGrid(lr.elasticNetParam, [0.01,0.05,0.1])
             .addGrid(lr.aggregationDepth, [2,3])
             .addGrid(lr.maxIter,[5,1000])
             .addGrid(lr.family,['binomial'])
             .addGrid(lr.tol,[1e-06,1e-01])
             .build()  )

cvlr = CrossValidator(estimator=pipelinelr, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)

cvModel = cvlr.fit(train_df)
modellr = cvModel.bestModel

In [10]:
modellr.stages[4].extractParamMap()

In [11]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.006])
             #.addGrid(idf.minDocFreq,[2])
             .addGrid(hashtf.numFeatures, [2**18])
             #.addGrid(hashtf.binary, [True])
             #.addGrid(lr.fitIntercept, [True])
             #.addGrid(lr.standardization, [True])
             #.addGrid(lr.elasticNetParam, [0.1])
             #.addGrid(lr.aggregationDepth, [2])
             #.addGrid(lr.maxIter,[1])
             #.addGrid(lr.tol,[1e-06])
             .build()  )

cvlr = CrossValidator(estimator=pipelinelr, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)

cvModel = cvlr.fit(train_df)
modellr = cvModel.bestModel

In [12]:
predictions = modellr.transform(test_df)
 

predictions = predictions.select(col("label").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

 
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

evaluatorpr = BinaryClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="areaUnderPR")
pr = evaluatorpr.evaluate(predictions)
print("areaUnderPR = %g" % pr)



In [13]:
evaluatorpr = BinaryClassificationEvaluator(labelCol="label", predictionCol="rawPrediction", metricName="areaUnderPR")
pr = evaluatorpr.evaluate(predictions)
print("areaUnderPR = %g" % pr)

In [14]:
paramGrid = (ParamGridBuilder()
             .addGrid(hashtf.numFeatures, [2**16])
             #.addGrid(lsvc.regParam, [0.1])
             .build()  )

cvlsvc = CrossValidator(estimator=pipelinelsvc, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)

cvModel = cvlsvc.fit(train_df)
modellsvc = cvModel.bestModel



In [15]:
modellsvc.stages[5].extractParamMap()

In [16]:

paramGrid = (ParamGridBuilder()
             .addGrid(hashtf.numFeatures, [2**16])
             .addGrid(nv.smoothing, [0.065])
             .build()  )

cvnv = CrossValidator(estimator=pipelinenv, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)

cvModel = cvnv.fit(train_df)
modelnv = cvModel.bestModel



predictions = modelnv.transform(test_df)
 

predictions = predictions.select(col("label").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

 
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

In [17]:
predictions = modellsvc.transform(test_df)
 

predictions = predictions.select(col("label").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

 
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

In [18]:
paramGrid = (ParamGridBuilder()
             .addGrid(hashtf.numFeatures, [2**12])
             .build()  )

cvgbt = CrossValidator(estimator=pipelinegbt, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid)

cvModel = cvgbt.fit(train_df)
modelgbt = cvModel.bestModel


In [19]:
predictions = modelgbt.transform(test_df)
 

predictions = predictions.select(col("label").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

 
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

In [20]:
display(predictionslr,5)

In [21]:
%sh 
rm -rf /tmp/model_export
mkdir /tmp/model_export

In [22]:
import sys
sys.path.append('/opt/libs/mleap/python')

import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

        
modellr.serializeToBundle("jar:file:/tmp/model_export/LR_model_vsX-json.zip",predictionslr)

In [23]:
predictionAndLabels = test_df.map(lambda lp: (float(modellr.predict(lp.features)), lp.label))

# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)