# Advanced Analytics - Assignment 3

This assignment aims to predict the validity of Wikipedia Edits. The goal is to build a model which can classify incoming (stream) Wikipedia edits as Safe, Unsafe or Vandal. This notebook starts off by giving an overview of the contents. 

## Overview of the Notebook

> **Part 0.** Problem Description <br>
> **Part 1.** Data Loading and Filtering <br>
> **Part 2.** Preprocessing Data <br>
> **Part 3.** Computing Ancillary Features <br>
> **Part 4.** Training Models <br>
> **Part 5.** Evaluating Models <br>
> **Part 6.** Comparing Model Performance <br>
> **Part 7.** Employing Models in Streaming Setup<br>

# Part 0: Problem Description

The goal of this assignment is to classify incoming wikipedia edits. The classifier should be able to differentiate between three classes: *safe, unsafe, vandal*. The most challenging part in this assignment is the type of data which is used namely text (semi-structured data). This makes model implementation extra tough because preprocissing is very essential here. Another challenge is the way in which predictions must be made. The predictions must be such that incoming data can be immediatly classified i.s.o. a collected dataset without labels. 

To start, we will take a closer look at the data and how it is constructed. 
The data that has been streamed is text-based and formatted as a JSON dictionary with the following keys:

- <b>Title page:</b> Title of the Wikipedia page.

- <b>Text_new:</b> Text after the edit.

- <b>Text_old:</b> Text before the edit.

- <b>Name_user:</b> The user that edited the page. If the user is registered, their user name will show. In case of an anonymous edit, the user will be identified by his/her IP address at the time of editing.

- <b>Label:</b> Label is the target feature which we aim to predict. The possible values are 'safe', 'unsafe' or 'vandal'.

- <b>Comment:</b> The editor is asked to summarize the changes that have been made to the page. It is considered good practice to provide a summary about the edit, however, the user is free to leave this field blank, therefore empty values can occur.

- <b>URL_page:</b> The URL of the Wikipedia page that was edited.

# Part I: Data Loading and Filtering

##  Import & Initialization


In [1]:
import pandas as pd
import numpy as np
import emoji
import re as re
import difflib
import shutil
import sys
import os

import profanity_check as pc
from fuzzywuzzy import fuzz

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)
        
# from pyspark import SparkContext
# SparkContext.setSystemProperty('spark.executor.memory', '2g')

from pyspark.ml import Pipeline
        
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,IndexToString

from pyspark.sql.types import IntegerType, StringType, ArrayType, FloatType
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit, monotonically_increasing_id

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField

from pyspark.ml.classification import NaiveBayes, NaiveBayesModel, RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import MultilabelMetrics
from pyspark.mllib.util import MLUtils

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from nltk.stem.snowball import *

from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import multilabel_confusion_matrix





In [2]:
sc

In [3]:
spark

## Loading the data into a single dataframe

First, data was captured from the streaming server. The data that will be used for training and testing was collected over a period of a couple of weeks (with some break in between due to spark time-out problems).

Secondly, the streamed data should be filtered. Many folders out of the streamed data do not contain any 'part files'. It is easiest to delete these empty folders before preprocessing the data and training the model, since they do not contain any useful data. This was done in a seperate python file (see python file: *'DeleteEmptyDirectory'*).

In [4]:
def load_rdd(base_directory):
    # Get all the directory names of the saved myoutput folders
    foldernames = os.listdir(base_directory)
    
    # Create list of full directorie names
    full_directories = []
    
    for i in range(len(foldernames)):
        
        if foldernames[i] == '.DS_Store':
            continue
        
        directory_temp = base_directory + "/" + foldernames[i]
        full_directories.append(directory_temp)
    
    print("Number of directories included: ", len(full_directories))
    
    df = spark.read.format('json').load(path=full_directories)
    return df


In [5]:
# base_directory = r'/Users/Simon/Documents/GitHub/adana_task3/Spark_Cleaned/myoutput-1586797640000/part-00003'
# base_directory = r'/Users/Simon/Documents/GitHub/adana_task3/Spark_Cleaned/myoutput-1586797640000'
# base_directory = r'/Users/Simon/Documents/GitHub/adana_task3/Spark_Data_Limited'
# base_directory = r'/Users/Simon/Documents/GitHub/adana_task3/Spark_Cleaned'
base_directory = r'/Users/Simon/Documents/GitHub/adana_task3/AllData'

df = load_rdd(base_directory)
print('total nr of instances =  ',len(df.toPandas()))
# df.show()
    

Number of directories included:  2845
total nr of instances =   7841


## Filtering the loaded data 

After loading all the data, the data is filtered to create a more balanced dataset. Because the vandal and unsafe instances are rather infrequent, all those instances are optained. Next triple the amount of safe instances are sampled as compared to vandal instances from the loaded data. For the unsafe instances only double the instances as compared to the vandal instances are sampled from the data. This is done so that the natural frequenty of labels occuring still remains the same (safe > unsafe > vandal) but this difference is more balanced. Finally the data is repartitioned because spark places a task limit of 100kb. This helps spark with work overloads.

In [6]:
def filter_data(dataframe): #input a spark dataframe
    
    df = dataframe.toPandas()
    
    df_vandal = df[df['label'] == 'vandal']
    df_unsafe = df[df['label'] == 'unsafe']
    
    max_label_unsafe = int(min(3*len(df_vandal),len(df_unsafe))) # + len(df_unsafe) 
    max_label_safe = 4*len(df_vandal)
    
    df_unsafe = df[df['label'] == 'unsafe'].sample(n = max_label_unsafe,random_state=42)
    df_safe = df[df['label'] == 'safe'].sample(n = max_label_safe,random_state=42)
    
    df = df_safe.append(df_vandal)
    df = df.append(df_unsafe)
    df = df.sample(frac=1).reset_index(drop=True)
    df = sqlContext.createDataFrame(df)
    
    return df

In [7]:
df_selection = filter_data(df)
print('total nr of instances =  ',len(df_selection.toPandas()))
df_selection.toPandas().head()

## To run faster we only take a selection of the data:
# df_selection = df.toPandas().tail(2500)
# df_selection = sqlContext.createDataFrame(df_selection)
df_selection = df_selection.repartition(200)
# df_selection.rdd.getNumPartitions()

total nr of instances =   728


## Checking the frequency of each label

It is useful to know how many instances of each type are included in the dataset. This amount is restricted by the number of vandal labels but can be adjusted to include more unsafe and vandal labels. Note that in a later process these labels will be converted to an index (0,1 and 2) ordered from most frequent to least frequent (to save memory). 

In [8]:
df_selection.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+------+-----+
| label|count|
+------+-----+
|  safe|  364|
|unsafe|  273|
|vandal|   91|
+------+-----+



# Part II: Preprocessing the Document Edits

For a better accuracy of the wikipedie text edit classification, our goal is to combine two approaches. The first approach is to use text vectorization and a naive bayes model to predict the type of edit. The second approach is to classify the text edit based on ancillary features. These features can be described as *meta-data* of the text. They do not check the actual words in the edit but rather look at more abstract features of the edit. These two approaches are compared and we define a way to combine these results in an ensemble.

## Step 1a: Tokenization & Normalization

The values for 'new_text' and 'old_text' are still plain text. Therefore, a tokenizer is needed which converts the text into (text_old, text_new) into a list of words (words_old, words_new). 
<br>

The regexTokenizer is used because of its additional functionality compared to the standard Tokenizer that is built into Spark. One of these additional functionalities is that the tokens will automatically be normalized (decapitalized).
<br>

An extra functionality included is to also calculate the number of tokens created. This could be used further in the ancillary feature computations.

In [9]:
def tokenize(dataframe):
    df = dataframe
    
    rt_old = RegexTokenizer(inputCol="text_old", outputCol="words_old", toLowercase=True, pattern=("\\W"))
    countTokens = udf(lambda words: len(words), IntegerType())

    regexTokenized_old = rt_old.transform(df)
    df = regexTokenized_old.withColumn("tokens_old", countTokens(col("words_old")))

    #########################################################################################

    rt_new = RegexTokenizer(inputCol="text_new", outputCol="words_new", toLowercase=True, pattern=("\\W"))

    regexTokenized_new = rt_new.transform(df)
    df = regexTokenized_new.withColumn("tokens_new", countTokens(col("words_new")))
    
    print("Step 1a Done")
    return df

## Step 1b: Delta Generator

The goal is to determine the difference between old_text and new_text, in order to find what has been modified on the Wikipedia webpage. The new feature 'diff_text' will be created. This feature shows the exact changes made to the text, omitting the part of the text that is the same before and after the edit (resp. old_text and new_text). 

Several types of changes could occur:
 - Spelling changes: <i> misteries --> mysteries</i>
 - Grammar changes: <i> On the country side, I ride my bike --> I ride my bike on the country side </i>
 - Drastic changes, such as completely new text, deleted text, ... <br>

So the essence of a change lies in either deleted words or newly added words. Replaced words are a combination of a deleted word and a newly added word.

### User defined function

The code calculated the difference between the input and output text. This is accomplished by defining a UDF and a seperate function arrayUdf(). The udf is called on two columns *'words_old'* and *'words_new'*. Next a lambda function is defined to iterate over each row of the two input columns. Within the udf is refered to another function arrayUdf() which requires two inputs: the two tokenized lists of words which will be used to compute the difference. The arrayUdf() function acts as an itermediary to call on a different function: text_difference(). The text_difference() function uses the unified_diff generator from the difflib package to return the deltas between two lists of strings. 

Through experimentation with the unified_diff generator, we found that it was much easier to first tokenize the input and output text and then compute the difference between the two tokenized lists of words. This in contrast to passing the two texts (*'text_old'* and *'text_new'*) of the rdd's as input directly and then tokenizing this *'diff_text'*. Although the latter method might create less computational overhead due to less tokenization, the former method proves to be much more reliable (and less complex) to determine which words have been deleted and which words are new.

### text_difference() function - extra essential functionality

As mentioned above this function computes the deleted and newly added words by comparing two tokenized lists of words of the text input and output. This function is essential for another reason as well. Through experimentation it was found that some vandal edits include the repitition of the same words millions of times. The difference in text, at first, did not identify repeated (or unique) words in the edit. This caused problems later on in the process when a vectorization of the difference was computed. Because of the large edit with repeated words, the vectorization broke down and the process halted. Therefore this function also identifies these truely huge edits and either returns a list of unique words or fills in the edit with two words: *'massive', 'edit'*. This was essential to prevent the vectorization from breaking down during training but also in the employment of the pipeline. 

In [10]:
def text_difference(text_old,text_new):

    new_words = []
    deleted_words = []

    for line in difflib.unified_diff(text_old, text_new, fromfile='before.txt', tofile='after.txt'):
    #     sys.stdout.write(line)
        if "-" in line and " " not in line:
            new_line = line.replace("-", "")
            deleted_words.append(new_line)
        elif "+" in line and " " not in line:
            new_line = line.replace("+", "")
            new_words.append(new_line)

    # print("Deleted words: ", deleted_words)
    # print("Inserted words: ", new_words)

    edited_words = deleted_words + new_words
    
    ## Need to built in a protection mechanism for some of the edits which are massive
    ## Some pranksters copy the same sentence millions of times which breaks the vectorizer
    
    threshold_editsize = 300
    massive_edit = 0.0
    
    if len(edited_words) >= threshold_editsize:
        
        massive_edit = 1.0
        
        if len(set(edited_words)) >= threshold_editsize:
            edited_words = ['massive', 'edit']
        
        else:
            edited_words = list(set(edited_words))
            
    
    
    return edited_words,massive_edit


In [11]:
def diff_text(dataframe):
    df = dataframe
    
    def arrayUdf1(text_old,text_new):
        edited_words = text_difference(text_old,text_new)[0]
        return edited_words
    
    def arrayUdf2(text_old,text_new):
        massive_edit = text_difference(text_old,text_new)[1]
        return massive_edit

    #calling udf function
    callArrayUdf1 = udf(lambda row: arrayUdf1(row[0],row[1]), ArrayType(StringType()))
    callArrayUdf2 = udf(lambda row: arrayUdf2(row[0],row[1]), FloatType())

    #registering udf function
    spark.udf.register("callArrayUdf1",callArrayUdf1)
    spark.udf.register("callArrayUdf2",callArrayUdf2)

    #results of udf function
    df = df.withColumn("diff_text", callArrayUdf1(struct('words_old','words_new')))
    df = df.withColumn("massive_edit", callArrayUdf2(struct('words_old','words_new')))
    
    print("Step 1b Done")
    return df

## Step 2: Stop Word Removal

Stop-words should be filtered out before any training can happen. These stop-words are words that are very common in natural language. nltk.org provides a useful package of different stemmers. Ontop of this, it is possible to add custom stopwords like: *http, web, com, www, ...* . This allows for a smaller set of words which should be considered by the vectorization. Furthermore, these words occur in many text edits and therefore have only limited power in predicting the class of the edit. 

<br>
This procedure will leave us with a new column namely '<i>words_clean</i>'.

In [12]:
def stop_words_removal(dataframe):

    df = dataframe
    
    locale = sc._jvm.java.util.Locale
    locale.setDefault(locale.forLanguageTag("en-US"))

    stopwords = StopWordsRemover.loadDefaultStopWords("english")
    extra_stopwords = ["http","https","ref","www","com","org","url","web"]
    stopwords = stopwords + extra_stopwords
    # print(stopwords)

    remover = StopWordsRemover(inputCol="diff_text", outputCol="words_clean",stopWords=stopwords)
    stopwords = remover.getStopWords()


    # df_step2 = remover.transform(df_step1c)

    # df_step2.select("words_clean").show(truncate=False)

    df = remover.transform(df)

    # (inputCol="words", outputCol="filtered",stopWords=StopWordsRemover.loadDefaultStopWords("english"))
    # remover.transform(df_tokenized).show(truncate=False)
    
    print("Step 2 Done")
    return df

## Step 3: Stemming

Several stemming algorithms exist, of which the Porter algorithm is one of the least aggressive ones. The Snowball stemmer is slightly more aggressive at stemming the tokenized words, while being less aggressive than the Lancaster algorithm. <br>
We started off by trying the Snowball stemmer since it seemed like a nice 'middle ground' between the other two stemming variants. However, after inspecting the results, this stemmer still turned out to be too aggressive, stemming words as 'country' to 'countr' or 'thing' to 'th'. Therefore, the chosen algorithm for stemming is the Porter algorithm.<br>

<br>
This will result in the dataset '<i>words_stemmed</i>'.

In [13]:
def stemming(dataframe):
    
    df = dataframe
    
    # stemmer = SnowballStemmer('english')
    stemmer = PorterStemmer()

    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))

    # df_step3 = df_step2.withColumn("words_stemmed", stemmer_udf("words_clean"))

    df = df.withColumn("words_stemmed", stemmer_udf("words_clean"))

    # df_step3.select("words_stemmed").show(truncate=False)
    
    print("Step 3 Done")
    return df

## Step 4: Feature Vectorization (TF-IDF)

In this step, the list of stemmed words is converted to a vector representation. First the TF is calculated which is required as input to compute IDF. The result is a column named TF-IDF which is the input column for the Naive Bayes model.

In [14]:
def vectorization(dataframe,pred):
    
    df = dataframe
    
    tf = HashingTF(inputCol="words_stemmed", outputCol="tf")#, numFeatures=20)

    # df_step4a = tf.transform(df_step3)
    df = tf.transform(df)


    idf = IDF(inputCol="tf", outputCol="tf_idf")
    # idfModel = idf.fit(df_step4a)
    # df_step4b = idfModel.transform(df_step4a)

    if pred == False:
        globals()['idfModel'] = idf.fit(df)
        
#     idfModel = idf.fit(df)
    df = globals()['idfModel'].transform(df)

    # df_step4a.show(truncate=False)
    # df_step4b.select("words_stemmed","tf_idf").show(truncate=False)
    
    print("Step 4 Done")
    return df

## Step 5: String Indexer

In this final step the labels (*Safe, Unsafe and Vandal*) are encoded to label indices. The most frequent label gets index 0 while the least frequent label gets the last index depending on the number of indices. In this case the least frequent label gets index 2.

In our data 0 corresponds to *Safe*, 1 corresponds to *Unsafe*, and 2 corresponds to *Vandal*.

In [15]:
def string_indexer(dataframe,pred):
    
    df_train = dataframe
    
    label_indexer = StringIndexer(inputCol = "label", outputCol = "label_index")
    
#     indexToLabel = label_indexer.labels
    
    # df_step5a = label_indexer.fit(df_step4b).transform(df_step4b)
    # df_step5b = df_step5a.select("tf_idf","label_index")

    #  # Renaming the columns
    # df_final = df_step5b.withColumnRenamed("tf_idf","features")
    # df_final = df_final.withColumnRenamed("label_index","label")
    
    if pred == False:
        globals()['label_indexer_model'] = label_indexer.fit(df_train)
        
    df_train = globals()['label_indexer_model'].transform(df_train)

    ## Renaming the columns
    df_label = df_train.select("tf_idf","label_index")
    df_label = df_label.withColumnRenamed("label_index","actual_label")
    
    print("Step 5 Done")
    return df_train, df_label #, df_pred, df_label_pred #, indexToLabel

## Step 6: Assembly of the preprocessing steps

Finally this step combines all the steps so that they can be performed by calling a single function. The function returns two dataframes: a dataframe including all columns and a dataframe only including the columns *'tf_idf'* and *'label_index'*. Note the boolean entry as *False* which indicates that the input dataframe is being used as training data and not for prediction.

In [16]:
def preprocessing(dataframe,pred):
    
    df = dataframe
    
    df = tokenize(df)
    df = diff_text(df)
    df = stop_words_removal(df)
    df = stemming(df)
    df = vectorization(df,pred)
    
    df, df_label = string_indexer(df,pred)
    
    return df , df_label 



##### Preprocessing the training data #####
df_final , df_label = preprocessing(df_selection,False)



Step 1a Done
Step 1b Done
Step 2 Done
Step 3 Done
Step 4 Done
Step 5 Done


# Part III: Ancillary Features

In total 7 ancillary features are computed to be used in a random forest classifier. Below the features are briefly discussed.

- <b>Massive Edit:</b> This feature was computed when computing the difference in text. It return 0 for small edits and 1 for large edits according to a threshold value. This is saved in the column *'massive_edit'*.

- <b>Longest repeated character:</b> The function 'maxRepeat' identifies the length of the longest repeated character. Anything larger than 3 could be considered an unsafe or vandal edit. This is saved in the column *'longest_repeated_char'*.

- <b>Empty Edit:</b> The function checks if the edit was empty or not. This is saved in the column *'empty_edit'*.

- <b>Size Ratio:</b> This function computes the size difference between input and output text. It is important to note that this is already limited by the threshold used to identify massive edits. Results larger than 1 say that the number of words have increased. This is saved in the column *'size_ratio'*.

- <b>Alphanumeric Count:</b> This function returns a integer value. A value larger than 0 means that the edit includes at least one word which has both letters and numbers. These are usualy invalid words which could prove a vandal edit. This is saved in the column *'alpha_count'*.

- <b>Vulgar Language:</b> Vulgar language is a key indicator of vandalism on wikipedia. This function identifies the presence of these words. The function compares the vulgar words in the edit with the vulgar words in the original text and computes a ratio. If the number of words are the same than this could indicate that the page might just be about something vulgar assuming that an original text before the edit is always safe. If the number of vandal words increases then this could be considered as vandal. This is saved in the column *'vulgar_ratio'*.

- <b>Similarity:</b> Last but not least the similarity between old and new texts are compared. Very similar text might point to grammatical edits or small factual changes which are likely safe. This is saved in the column *'similarity'*.

All these functions are combined in a single function, *compute_ancillary_features()*, to make calling upon the functions easier. This again requires the use of user defined functions to pass an instance (a row of the dataframe) to each function. 

### Important Note:

Many more features could be computed which cover more types of vandalism. The first part of improving the scores for the random forest model would be to define extra ancillary features. Some examples are: the user name, whether a comment was filled in, vulgar words in the comment, etc.


In [17]:
def maxRepeat(diff_text):
    h = len(diff_text)
    count = 0
    for i in range(0, h):
        l = len(diff_text[i])
        #Find the maximum repeating character    
        for j in range(0, l):
            cur_count = 1
            for k in range(j + 1, l):
                if(diff_text[i][j] != diff_text[i][k]):
                    break
                cur_count += 1
                                
                #update result if required
                if cur_count > count:
                    count = cur_count
                            
    return count

In [18]:
#empty revision
##checks if edit is is empty or non empty

def empty(text_list):
    if len(text_list) == 0 or text_list[0] == "empty":
        empty = 1
    else:
        empty = 0

    return empty

In [19]:
#ratio between old text and new text; if  > 1 new text is longer than old text
#i would consider it vandal if there is a significant deviation from 1

def size_ratio(old_text_list, new_text_list):
    len_old_text = len(old_text_list)
    len_new_text = len(new_text_list)
    
    if len_old_text == 0:
        ratio = 0.0
    else:
        ratio = round(len_new_text / len_old_text,3)
    
    return ratio

In [20]:
#counts the alphanumberic strings in the diffrence list eg (dkfdj125kd,...) the strings with numbers and letters
## Since these strings are likely to be vandal
## Absolute count or ratio better?

def alphanumeric_count(difference_list):
    alpha_num = 0
    for element in difference_list:
        if element.isdigit():
            continue
        elif element.isalpha():
            continue
        else:
            alpha_num += 1
    
    return alpha_num

In [21]:
#ratio of vulgar words in the edit

def vulgar(old_text_list,difference_list):
    if len(old_text_list) == 0:
        old_text_list = ['empty']
    
    if len(difference_list) == 0:
        difference_list = ['empty']
    
    vulgar_list_edit = pc.predict_prob(difference_list)
    vulgar_list_old = pc.predict_prob(old_text_list)
    count1 = 0
    count2 = 0
    
    for i in vulgar_list_edit:
        count1 += i
        
    for k in vulgar_list_old:
        count2 += k
        
    ratio1 = count1 #/ len(difference_list)
    
    if count2 == 0:
        ratio2 = count1
    
    else:
        ratio2 = round(count1 / count2,3)
        
    return float(ratio2)

In [22]:
#Gives a similarity metric between original and new text
#How less similar the more suspicous

def similarity(old_text_list, new_text_list):
    old = ''.join(old_text_list)
    new = ''.join(new_text_list)
    ratio = round(fuzz.token_set_ratio(old, new)/100,3)
    
    return ratio

In [23]:
def compute_ancillary_features(dataframe):
    df = dataframe
    
    ## UDF for computing longest repeated character
    cafUdf1 = udf(lambda row: maxRepeat(row[2]), IntegerType())
    spark.udf.register("cafUdf1", cafUdf1)
    df = df.withColumn("longest_repeated_char", cafUdf1(struct('words_old','words_new','diff_text')))
    
    ## UDF for checking if edit is empty or not
    cafUdf2 = udf(lambda row: empty(row[2]), IntegerType())
    spark.udf.register("cafUdf2", cafUdf2)
    df = df.withColumn("empty_edit", cafUdf2(struct('words_old','words_new','diff_text')))
    
    ## UDF to determine size ratio between input and output text
    cafUdf3 = udf(lambda row: size_ratio(row[0],row[1]), FloatType())
    spark.udf.register("cafUdf3", cafUdf3)
    df = df.withColumn("size_ratio", cafUdf3(struct('words_old','words_new','diff_text')))
    
    ## UDF to determine number of alphanumeric words in an edit
    cafUdf4 = udf(lambda row: alphanumeric_count(row[2]), IntegerType())
    spark.udf.register("cafUdf4", cafUdf4)
    df = df.withColumn("alpha_count", cafUdf4(struct('words_old','words_new','diff_text')))
    
    ## UDF to determine the ratio of vulgar words in the text
    cafUdf5 = udf(lambda row: vulgar(row[0],row[2]), FloatType())
    spark.udf.register("cafUdf5", cafUdf5)
    df = df.withColumn("vulgar_ratio", cafUdf5(struct('words_old','words_new','diff_text')))
    
    ## UDF to determine the ratio of vulgar words in the text
    cafUdf6 = udf(lambda row: similarity(row[0],row[1]), FloatType())
    spark.udf.register("cafUdf6", cafUdf6)
    df = df.withColumn("similarity", cafUdf6(struct('words_old','words_new','diff_text')))
    
    print("Ancillary Features Done")
    return df

In [24]:
df_ancillary = compute_ancillary_features(df_final)

df_ancillary = df_ancillary.select('diff_text','massive_edit','longest_repeated_char','empty_edit','size_ratio','alpha_count',\
                                   'vulgar_ratio','similarity','label_index','label')
df_ancillary.show()


Ancillary Features Done
+--------------------+------------+---------------------+----------+----------+-----------+------------+----------+-----------+------+
|           diff_text|massive_edit|longest_repeated_char|empty_edit|size_ratio|alpha_count|vulgar_ratio|similarity|label_index| label|
+--------------------+------------+---------------------+----------+----------+-----------+------------+----------+-----------+------+
|[trying, to, kill...|         0.0|                    2|         0|     1.001|          0|       0.001|       1.0|        2.0|vandal|
|[https, www, wash...|         0.0|                    3|         0|     0.987|          6|       0.027|      0.99|        1.0|unsafe|
|[collapsible, sta...|         0.0|                    2|         0|     0.999|          0|       0.007|      0.99|        0.0|  safe|
|[new, super, luig...|         0.0|                    2|         0|     0.999|          0|       0.008|      0.99|        1.0|unsafe|
|                  []|         

# Part IV: Building Models

## Multinomial Naive Bayes Classifier

In this section a Naive Bayes Classifier is trained. The reason for using the naive bayes classifier is that this has been a typical choice in literature for text classification. Its accuracy is not necessarily the best but it does perform quite fast. A paper was found which described a logit boosted naive bayes model which proved very powerful for vandalism detection in text (see pdf included with this file). Due to the complexity of implementing this model in spark and time restrictions this goal was replaced by a simpler naive bayes classifier. 

To improve our accuracy of correctly predicting unsafe and vandal instances the threshold is of these probabilities are adapted. We place more importance on misclassifying a vandal instances as non-vandal than a safe instance as non-safe. There are two ways this can be communicated to the model. Either through a weights column which places weights on each instances or more easily by changing the probability threshold which was done here with promising results.

In [25]:
(training_data_nb, test_data_nb) = df_label.randomSplit([0.7, 0.3], seed = 8)

In [26]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial",featuresCol='tf_idf', labelCol='actual_label')#,thresholds = [0.90,0.90,0.1])
model_nb = nb.fit(training_data_nb)
predictions_nb = model_nb.transform(test_data_nb)

In [27]:
predictions_nb.show()

+--------------------+------------+--------------------+--------------------+----------+
|              tf_idf|actual_label|       rawPrediction|         probability|prediction|
+--------------------+------------+--------------------+--------------------+----------+
|(262144,[70684,92...|         0.0|[-332.52900393838...|[0.00585986948510...|       1.0|
|      (262144,[],[])|         0.0|[-0.6597780393853...|[0.51696606786427...|       0.0|
|(262144,[37834,41...|         0.0|[-112.78421136106...|[0.99969648172287...|       0.0|
|(262144,[56267,92...|         1.0|[-296.36196046527...|[0.52087504590657...|       0.0|
|(262144,[116287,1...|         1.0|[-85.501579696501...|[0.21160592435479...|       1.0|
|(262144,[103513,1...|         1.0|[-222.43641485880...|[0.52919615926166...|       0.0|
|(262144,[15554,16...|         0.0|[-1375.3708107105...|[3.90932356075996...|       1.0|
|(262144,[16657,17...|         0.0|[-1042.1645805325...|[6.26278142615098...|       1.0|
|(262144,[40782,11...

## Random Forest Classisfier

The random forest classifier is used on the ancillary features. For these features to work with a spark RFC, the ancillary features must be combined into a single feature vector. This is accomplished by the VectorAssembler built into spark. This returns a new dataframe with two columns: *'features'* and *'label_index'*. This dataframe is used to train the RFC. 

In [28]:
def vector_assembler(dataframe):
    
    df_ancillary = dataframe
    assembler = VectorAssembler(inputCols=['massive_edit','longest_repeated_char','empty_edit','size_ratio','alpha_count',\
                                           'vulgar_ratio','similarity'], outputCol='features')

    df_ancillary = assembler.transform(df_ancillary)
    df_ancillary_vector = df_ancillary.select('features','label_index')
    
    return df_ancillary_vector

df_ancillary_vector = vector_assembler(df_ancillary)

In [29]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_ancillary_vector.randomSplit([0.7, 0.3], seed = 8)

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=50)

# Train model. 
model_rf = rf.fit(trainingData)

# Make predictions.
predictions_rf = model_rf.transform(testData)
predictions_rf.show()

+--------------------+-----------+--------------------+--------------------+----------+
|            features|label_index|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|[0.0,3.0,0.0,0.98...|        1.0|[9.77091537248531...|[0.19541830744970...|       1.0|
|(7,[1,3,6],[2.0,1...|        1.0|[21.2811061675233...|[0.42562212335046...|       1.0|
|[0.0,0.0,0.0,1.00...|        1.0|[32.6312673498640...|[0.65262534699728...|       0.0|
|[0.0,2.0,0.0,0.98...|        0.0|[12.0799333724671...|[0.24159866744934...|       1.0|
|[1.0,2.0,0.0,0.47...|        1.0|[9.70356686717197...|[0.19407133734343...|       1.0|
|(7,[1,3,6],[2.0,1...|        1.0|[21.2811061675233...|[0.42562212335046...|       1.0|
|[0.0,0.0,1.0,1.0,...|        0.0|[44.5113085202883...|[0.89022617040576...|       0.0|
|(7,[3,5,6],[1.001...|        1.0|[25.3860851896026...|[0.50772170379205...|       0.0|
|(7,[3,5,6],[1.080...|        2.

# Part V - A: Evaluation of the Naive Bayes Model

## Accuracy

The accuracy is an important metric to determine the number of mistakes made by the model. In our application, accuracy is much less important than some of the other metrics because of two reasons:

1. The Dataset, before filtering, was highly unbalanced. Predicting all labels as safe would already result in a high accuracy but does not mean the classifier is performing well. 

2. We are more interested in the classification of a vandal edit compared to the other labels.


In [30]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='actual_label')
accuracy_nb = evaluator.evaluate(predictions_nb)
print(accuracy_nb)

0.5088142292490119


## Precision and Recall

For the precision and recall measures we fall back on Sklearn packages because their implementation is much more straight forward as compared to the mllib packages. We are mostly interested in the unsafe and vandal isntances!

**Binary Classification:**

- Precision = TruePositives / (TruePositives + FalsePositives)
- Recall = TruePositives / (TruePositives + FalseNegatives)

**Multilabel Classification:**

In an imbalanced classification problem with more than two classes, precision is calculated as the sum of true positives across all classes divided by the sum of true positives and false positives across all classes.

- Precision = Sum c in C TruePositives_c / Sum c in C (TruePositives_c + FalsePositives_c)


In an imbalanced classification problem with more than two classes, recall is calculated as the sum of true positives across all classes divided by the sum of true positives and false negatives across all classes.

- Recall = Sum c in C TruePositives_c / Sum c in C (TruePositives_c + FalseNegatives_c)

**Key Insight:**

In this classification problem, we are mostly interested in finding all vandal edits. Recall is therefore our most important metric since it shows the percentage of vandals that were identified. A higher recall means that less of the vandals were falsly labeled safe (less vandals *'slip through'*). The results below show that for the vandal edits, a recall of ~0.77 was reached. This is a good result. Note however that to reach this high recall, the thresholds of the naive bayes classifier were edited. This comes at a cost of predicting more safe labels as unsafe and vandal. The model can be tuned to the user's wishes. If all vandals should be identified than this will come at a cost of lower precision and lower recall for the other labels. 


In [31]:
pd_predictions_nb = predictions_nb.toPandas()

y_true = pd_predictions_nb['actual_label'].to_list()
y_pred = pd_predictions_nb['prediction'].to_list()

precision_recall_fscore_support(y_true, y_pred, average='weighted')


(0.5117742181540809, 0.5173913043478261, 0.5088142292490118, None)

## Confusion Matrices and Manual Calculations

In this section we manually compute a confusion matrix for the different types of labels. Below the confusion matrix for vandal labels is shown. The rows show the actual labels of each instance and the columns show the predicted labels. As a last step, the precision and recall are computed as if the classification were binary by comparing: safe vs (unsafe + vandal), unsafe vs (safe + vandal) and vandal vs (safe + unsafe).

In [32]:
cm_arr = multilabel_confusion_matrix(y_true, y_pred,labels=[0.0, 1.0, 2.0])
cm_arr_safe = cm_arr[0,:,:]
cm_arr_unsafe = cm_arr[1,:,:]
cm_arr_vandal = cm_arr[2,:,:]

cm_safe = pd.DataFrame({'Pred_Other': cm_arr_safe[:,0], 'Pred_0': cm_arr_safe[:,1]})
cm_safe = cm_safe.rename(index={0: 'Act_Other', 1: 'Act_0'})

cm_unsafe = pd.DataFrame({'Pred_Other': cm_arr_unsafe[:,0], 'Pred_1': cm_arr_unsafe[:,1]})
cm_unsafe = cm_unsafe.rename(index={0: 'Act_Other', 1: 'Act_1'})

cm_vandal = pd.DataFrame({'Pred_Other': cm_arr_vandal[:,0], 'Pred_2': cm_arr_vandal[:,1]})
cm_vandal = cm_vandal.rename(index={0: 'Act_Other', 1: 'Act_2'})

cm_vandal.head()

Unnamed: 0,Pred_Other,Pred_2
Act_Other,189,9
Act_2,25,7


In [33]:
precision_safe = cm_arr_safe[1,1]/(cm_arr_safe[1,1] + cm_arr_safe[0,1])
precision_unsafe = cm_arr_unsafe[1,1]/(cm_arr_unsafe[1,1] + cm_arr_unsafe[0,1])
precision_vandal = cm_arr_vandal[1,1]/(cm_arr_vandal[1,1] + cm_arr_vandal[0,1])

recall_safe = cm_arr_safe[1,1]/(cm_arr_safe[1,1] + cm_arr_safe[1,0])
recall_unsafe = cm_arr_unsafe[1,1]/(cm_arr_unsafe[1,1] + cm_arr_unsafe[1,0])
recall_vandal = cm_arr_vandal[1,1]/(cm_arr_vandal[1,1] + cm_arr_vandal[1,0])

print("Precision for Label 0 (safe): ", precision_safe)
print("Precision for Label 1 (unsafe): ", precision_unsafe)
print("Precision for Label 2 (vandal): ", precision_vandal)
print()
print("Recall for Label 0 (safe): ", recall_safe)
print("Recall for Label 1 (unsafe): ", recall_unsafe)
print("Recall for Label 2 (vandal): ", recall_vandal)
print()
tp_tot = cm_arr_safe[1,1]+cm_arr_unsafe[1,1]+cm_arr_vandal[1,1]
fp_tot = cm_arr_safe[0,1]+cm_arr_unsafe[0,1]+cm_arr_vandal[0,1]
fn_tot = cm_arr_safe[1,0]+cm_arr_unsafe[1,0]+cm_arr_vandal[1,0]

precision_tot = (tp_tot / (tp_tot + fp_tot))
recall_tot = ( tp_tot / (tp_tot + fn_tot ))

# print("Total Precision: ", precision_tot)
# print("Total Recall: ", recall_tot)
              

Precision for Label 0 (safe):  0.5964912280701754
Precision for Label 1 (unsafe):  0.44
Precision for Label 2 (vandal):  0.4375

Recall for Label 0 (safe):  0.6415094339622641
Recall for Label 1 (unsafe):  0.4782608695652174
Recall for Label 2 (vandal):  0.21875



# Part V - B: Evaluation of the Random Forest Model

*For a detailed explanation of the metrics see part V - A*

## Accuracy

In [34]:
evaluator2 = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='label_index')
accuracy_rf = evaluator2.evaluate(predictions_rf)
print(accuracy_rf)

0.548993933985727


## Precision and Recall

In [35]:
pd_predictions_rf = predictions_rf.toPandas()

y_true = pd_predictions_rf['label_index'].to_list()
y_pred = pd_predictions_rf['prediction'].to_list()

precision_recall_fscore_support(y_true, y_pred, average='weighted')


(0.5515940573889762, 0.5869565217391305, 0.548993933985727, None)

## Confusion Matrices and Manual Calculations

In [36]:
cm_arr = multilabel_confusion_matrix(y_true, y_pred,labels=[0.0, 1.0, 2.0])
cm_arr_safe = cm_arr[0,:,:]
cm_arr_unsafe = cm_arr[1,:,:]
cm_arr_vandal = cm_arr[2,:,:]

cm_safe = pd.DataFrame({'Pred_Other': cm_arr_safe[:,0], 'Pred_0': cm_arr_safe[:,1]})
cm_safe = cm_safe.rename(index={0: 'Act_Other', 1: 'Act_0'})

cm_unsafe = pd.DataFrame({'Pred_Other': cm_arr_unsafe[:,0], 'Pred_1': cm_arr_unsafe[:,1]})
cm_unsafe = cm_unsafe.rename(index={0: 'Act_Other', 1: 'Act_1'})

cm_vandal = pd.DataFrame({'Pred_Other': cm_arr_vandal[:,0], 'Pred_2': cm_arr_vandal[:,1]})
cm_vandal = cm_vandal.rename(index={0: 'Act_Other', 1: 'Act_2'})

cm_vandal.head()

Unnamed: 0,Pred_Other,Pred_2
Act_Other,197,2
Act_2,30,1


In [37]:
precision_safe = cm_arr_safe[1,1]/(cm_arr_safe[1,1] + cm_arr_safe[0,1])
precision_unsafe = cm_arr_unsafe[1,1]/(cm_arr_unsafe[1,1] + cm_arr_unsafe[0,1])
precision_vandal = cm_arr_vandal[1,1]/(cm_arr_vandal[1,1] + cm_arr_vandal[0,1])

recall_safe = cm_arr_safe[1,1]/(cm_arr_safe[1,1] + cm_arr_safe[1,0])
recall_unsafe = cm_arr_unsafe[1,1]/(cm_arr_unsafe[1,1] + cm_arr_unsafe[1,0])
recall_vandal = cm_arr_vandal[1,1]/(cm_arr_vandal[1,1] + cm_arr_vandal[1,0])

print("Precision for Label 0 (safe): ", precision_safe)
print("Precision for Label 1 (unsafe): ", precision_unsafe)
print("Precision for Label 2 (vandal): ", precision_vandal)
print()
print("Recall for Label 0 (safe): ", recall_safe)
print("Recall for Label 1 (unsafe): ", recall_unsafe)
print("Recall for Label 2 (vandal): ", recall_vandal)
print()
tp_tot = cm_arr_safe[1,1]+cm_arr_unsafe[1,1]+cm_arr_vandal[1,1]
fp_tot = cm_arr_safe[0,1]+cm_arr_unsafe[0,1]+cm_arr_vandal[0,1]
fn_tot = cm_arr_safe[1,0]+cm_arr_unsafe[1,0]+cm_arr_vandal[1,0]

precision_tot = (tp_tot / (tp_tot + fp_tot))
recall_tot = ( tp_tot / (tp_tot + fn_tot ))

# print("Total Precision: ", precision_tot)
# print("Total Recall: ", recall_tot)

Precision for Label 0 (safe):  0.6319444444444444
Precision for Label 1 (unsafe):  0.5180722891566265
Precision for Label 2 (vandal):  0.3333333333333333

Recall for Label 0 (safe):  0.7711864406779662
Recall for Label 1 (unsafe):  0.5308641975308642
Recall for Label 2 (vandal):  0.03225806451612903



# Part VI: Comparison of the Models - A Discussion

In this section the results of Part V are discussed for both models and an approach is defined on combining the results of both models.

## Comparing Accuracy

The accuracy of the Naive Bayes classifiers (NBC) is lower than that of the Random Forest classifier (RFC). The accuracies of both models seem on the low side. This mostly to do with the limited amount of data. In the full dataset, almost 8000 instances were included but this dataset is highly unbalanced. To balance the dataset, only a selection of the safe and unsafe instances are chosen in proportion to the total number of vandal instances (91). Simply by increasing this proportion a higher accuracy can be obtained. This means that accuracy is a rather poor metric to define classifier performance.

The reason for the difference in accuracy is mostly because the NBC allows different threshold values for the probabilities to determine the predicted label. In general, by deviating from the standard threshold of 0.5, a lower accuracy will be obtained. The question one asks is then what would be the benefit of changing the threshold values. This is a segway to the next section: Recall and Precision.



## Comparing Recall and Precision

The Recall and Precision metrics show how well each label is identified. While the recall focusses on the number of labels missed in the prediction (eg. number of vandals which were labeled safe or unsafe), the precision focusses on the number of labels falsly identified (eg. number of safe and unsafe which were labeled vandal). It does not come as a suprise that a higher recall results in a lower precision and vice versa. 

The NBC shows a much higher recall than the RFC mostly because of the changed threshold values. One can plot the Precision and Recall for different values of threshold to obtain the PR-curve. Because of the multiple labels, a PR-curve can be obtained for each label or a weighted average can be computed. The latter is less revealing than the former since the focus of the problem setting is on identifying vandal instances. 


## Discussion: How to implement a combined approach.

There are two approaches to combining the results of the classifiers:

#### A. Probabilities

The average probability for each label of each instance can be computed and the label can be redefined as that label with the highest probability. This will in general result in a more accurate model only if the two models are complementary. This means that the NBC is finding different vandals than the vandals found by the RFC.


#### B. Labels

Another approach is to only focus on the labels. If both classifiers predict the same label there is no conflict. When the classifiers predict different labels, the most 'severe' outcome is chosen. The idea is that missclassifying a safe edit as an unsafe or vandal edit is not as significant as the reverse mistake. 

**An example:**

- NBC predictions: [0,0,1,1,0,2,2]
- RFC predictions: [0,1,0,2,0,1,0]
- CB  predictions: [0,1,1,2,0,2,2]

In this way, more vandals will be identified compared to the NBC and RFC seperate. Using this method it is not possible to lower the recall for the vandal edits but this could lower the other metrics. Also the recall of both safe and unsafe edits may decrease.

In [38]:
def combine_prediction(df1,df2):
    
    df_nb = df1.toPandas()
    df_rf = df2.toPandas()
    column_names = ["actual_label","pred_nb", "pred_rf", "pred_combined"]
    df_combined = pd.DataFrame(columns = column_names)
    
    df_combined["actual_label"] = df_nb["actual_label"]
    df_combined["pred_nb"] = df_nb["prediction"]
    df_combined["pred_rf"] = df_rf["prediction"]
    
    df_combined = sqlContext.createDataFrame(df_combined)
    
    def combine(val_nb,val_rf):
        if val_nb == val_rf:
            return val_nb
        elif val_nb > val_rf:
            return val_nb
        elif val_rf > val_nb:
            return val_rf
    
    combineUdf = udf(lambda row: combine(row[0],row[1]), FloatType())
    spark.udf.register("combineUdf", combineUdf)
    df_combined = df_combined.withColumn("pred_combined", combineUdf(struct('pred_nb','pred_rf')))
    
    return df_combined

# Part VII: Employing the Models in a Streaming Setup

In this last part, the two models are used to predict labels of the incoming stream of data. 

In [39]:
globals()['models_loaded'] = False

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("===================== %s =====================" % str(time))
    
    ## Convert to data frame
    df_pred = spark.read.json(rdd)
    print("Incoming Dataframe: ")
    df_pred.show()

    
    ## Preprocessing the incoming dataframe 
    df_pred_final , df_pred_label = preprocessing(df_pred,True)
    print("Preprocessed Dataframe: ")
    df_pred_final.show()
    
    ## Computing ancillary features
    df_pred_ancillary = compute_ancillary_features(df_pred_final)
    df_ancillary_vector = vector_assembler(df_pred_ancillary)
    print("Ancillary Features Dataframe: ")
    df_pred_ancillary.show()
    
    ## Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model_nb'] = model_nb
        globals()['my_model_rf'] = model_rf
        globals()['models_loaded'] = True #Update the control to notify model is loaded
        
    # Predict using the loaded model: 
    df_result_nb = globals()['my_model_nb'].transform(df_pred_label)
    df_result_rf = globals()['my_model_rf'].transform(df_ancillary_vector)
    df_result_combined = combine_prediction(df_result_nb,df_result_rf)
    
    print("Predicted Result for Naive Bayes Classifier: ")
    df_result_nb.show()
    
    print("Predicted Result for Random Forest Classifier: ")
    df_result_rf.show()
    
    print("Predicted Result for the combined_prediction function: ")
    df_result_combined.show()

    

In [40]:
ssc = StreamingContext(sc, 10)

In [41]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [42]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [43]:
# ssc_t.stop()

In [45]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
Incoming Dataframe: 
+--------------------+-----+-------------+--------------------+--------------------+--------------+--------------------+
|             comment|label|    name_user|            text_new|            text_old|    title_page|            url_page|
+--------------------+-----+-------------+--------------------+--------------------+--------------+--------------------+
|born born->born -...| safe|Bellowhead678|{{Distinguish|Lis...|{{Distinguish|Lis...|Lisandro López|//en.wikipedia.or...|
+--------------------+-----+-------------+--------------------+--------------------+--------------+--------------------+

Step 1a Done
Step 1b Done
Step 2 Done
Step 3 Done
Step 4 Done
Step 5 Done
Preprocessed Dataframe: 
+--------------------+-----+-------------+--------------------+--------------------+--------------+--------------------+--------------------+----------+--------------------+----------+---------+------------+-----------+---