In [None]:
sc

In [None]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType, DoubleType 

In [None]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
path = 'C:/Users/tinhc/Desktop/data/myoutput-*'

In [None]:
path = 'data/myoutput-*'

In [None]:
df = spark.read.json(path)

In [None]:
from difflib import unified_diff

def make_diff(df):
    return '\n'.join([ l for l in unified_diff(df.text_old.split('\n'),df.text_new.split('\n')) if l.startswith('+') or l.startswith('-') ])

diff_udf = udf(make_diff, StringType())

In [None]:
#make the difference between text_old and text_new
df = df.withColumn("diff", diff_udf(
    struct([df[x] for x in df.columns])
 ))


In [None]:
df.printSchema()

root
 |-- comment: string (nullable = true)
 |-- label: string (nullable = true)
 |-- name_user: string (nullable = true)
 |-- text_new: string (nullable = true)
 |-- text_old: string (nullable = true)
 |-- title_page: string (nullable = true)
 |-- url_page: string (nullable = true)
 |-- diff: string (nullable = true)



In [None]:
df.count()

9677

In [None]:
df.groupBy("label").count().show()

+------+-----+
| label|count|
+------+-----+
|  safe| 8235|
|unsafe| 1339|
|vandal|  103|
+------+-----+



In [None]:
# process data: transforming labels into numbers 0,1,2
#def labelTrans(df):
#    if df.label == "safe": 
 #       return 0
#    elif df.lable == "unsafe":
  #      return 1
 #   else: 
 #       return 2
#trans_udf = udf(labelTrans, DoubleType())

In [None]:
#df = df.withColumn("type", trans_udf(
#    struct([df[x] for x in df.columns])
# ))

In [None]:
df.printSchema()

root
 |-- comment: string (nullable = true)
 |-- label: string (nullable = true)
 |-- name_user: string (nullable = true)
 |-- text_new: string (nullable = true)
 |-- text_old: string (nullable = true)
 |-- title_page: string (nullable = true)
 |-- url_page: string (nullable = true)
 |-- diff: string (nullable = true)
 |-- type: double (nullable = true)



In [None]:
df_new = df.withColumnRenamed("label","category")

In [None]:
df_select = df_new.select("category", "diff")

In [None]:
(train, test) = df_select.randomSplit([0.75, 0.25], seed=100)

# Creating Pipelines

In [None]:
#TF-IDF Features (using a Hash function)
tokenizer = Tokenizer(inputCol="diff", outputCol="words")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

In [None]:
# Count vector Features +IDF 
countVectors = CountVectorizer(inputCol="filtered", outputCol="cv", vocabSize=10000, minDF=5)
idf_cv = IDF(inputCol="cv", outputCol="features", minDocFreq=5)

In [None]:
# Pipeline 1: TF-IDF + Logistic Regression 
lr = LogisticRegression(maxIter=10, regParam=0.01,elasticNetParam=0)
lr_idf = Pipeline(stages=[tokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx,lr])

# fit the pipeline 1 in the training data: 
model1 = lr_idf.fit(train)

In [None]:
nb = NaiveBayes(smoothing=1,modelType="multinomial")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=50, maxDepth = 4, maxBins=32)


In [None]:
# Pipeline 2: TF-IDF + Naive-Bayes  
nb = NaiveBayes(smoothing=1,modelType="multinomial")
nb_idf = Pipeline(stages=[tokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx,nb])

# fit the pipeline 2 in the training data: 
model2 = nb_idf.fit(train)

In [None]:
# Pipeline 3: TF-IDF + RandomForest 
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=50, maxDepth = 4, maxBins=32)
rf_idf = Pipeline(stages=[tokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx,rf])

# fit the pipeline 3 in the training data: 
model3 = rf_idf.fit(train)

In [None]:
# Pipeline 4: Counter-vector + IDF + Logistic Regression 
lr_vector = Pipeline(stages=[tokenizer, stopwordsRemover, countVectors, idf_cv, label_stringIdx,lr])

# fit the pipeline 2 in the training data: 
model4 = lr_vector.fit(train)

In [None]:
# Pipeline 5: Counter-vector + IDF  + Naive-Bayes 
nb_vector = Pipeline(stages=[tokenizer, stopwordsRemover, countVectors,idf_cv, label_stringIdx,nb])

# fit the pipeline 2 in the training data: 
model5 = nb_vector.fit(train)

In [None]:
# Pipeline 6: Counter-vector +IDF  + Random Forest 
rf_vector = Pipeline(stages=[tokenizer, stopwordsRemover, countVectors, idf_cv, label_stringIdx,rf])

# fit the pipeline 2 in the training data: 
model6 = rf_vector.fit(train)

# Measuring Model Performance 

In [None]:
# pipelines of TF-IDF models 
prediction1 = model1.transform(test)
selected1 = prediction1.select("prediction", "label")

prediction2 = model2.transform(test)
selected2 = prediction2.select("prediction", "label")

prediction3 = model3.transform(test)
selected3 = prediction3.select("prediction", "label")

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Model 1's performance is = " + str(evaluator.evaluate(selected1)))
print("Model 2's performance is = " + str(evaluator.evaluate(selected2)))
print("Model 3's performance is = " + str(evaluator.evaluate(selected3)))

Model 1's performance is = 0.8304872969596002
Model 2's performance is = 0.7417742607246981
Model 3's performance is = 0.8546438983756768


In [None]:
# pipelines of Count-Vectors models 
prediction4 = model4.transform(test)
selected4 = prediction4.select("prediction", "label")

prediction5 = model5.transform(test)
selected5 = prediction5.select("prediction", "label")

prediction6 = model6.transform(test)
selected6 = prediction6.select("prediction", "label")

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Model 4's performance is = " + str(evaluator.evaluate(selected4)))
print("Model 5's performance is = " + str(evaluator.evaluate(selected5)))
print("Model 6's performance is = " + str(evaluator.evaluate(selected6)))

Model 4's performance is = 0.8354852144939608
Model 5's performance is = 0.7238650562265723
Model 6's performance is = 0.8546438983756768


In [None]:
model3.save("bestModel")