# Feature Engineering on Text Data

In this notebook, we calculate features on data streamed from seppe.net in Preprocessing.ipynb. We calculate the following features on the data and columns in the extracted wiki_df dataframe:

- TF-IDF: Term Frequency - Inverse Document Frequency matrix is a feature which measures the occurrence of words normalized by their overall occurrence in the entire document corpus. We use this on the raw edits applied to each Wikipedia article to help gather features as to which words and terms in overall edits may lead to vandal edits or otherwise.
- LDA: Latent Dirichlet Analysis is a technique used in automated topic discovery. We use this on the overall Wiki text before edit to discover the original topic of the piece. The reason for using this feature is that some topics may be more susceptible to vandalism than others, such as political articles, for example.
- Leichtenstein Distance: This is used again on the raw edits to quantify the size of the edit. Usually large edits might correspond to large erasures or changes in a document text indicating vandalism and censoring of data from the public.

# 0. Import libraries

In [20]:
# Importing the feature transformation classes for doing TF-IDF 
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, CountVectorizer, IDF, NGram
from pyspark.sql.functions import lit

# Machine learning libraries
from pyspark.ml.tuning import TrainValidationSplit

# 1. Load output from *preprocessing.ipynb*

you can either 
 - **option A**: run `preprocessing.ipynb` and perform the steps below to end up to the `final_df`
 - **option B** just import the data via: `spark.read.parquet("../output/output_preprocessing.parquet")`
 
### TODO
 
 - Evaluate feature engineering steps on a larget part of the data, the subset dataset containg approx. 800 observations and approx 300 observations after downsampling.
 - Train `final model` on **all data**.
 

## Option A

In [2]:
#  %run "preprocessing.ipynb"

In [3]:
# path = "../data/subset/*" (this is the path to the subset data)

# wiki_df = get_wiki_df(path="../data/subset/*")

# clean_df = get_clean_df(wiki_df)

# #In order to get the actual difference column
# df_with_difference = get_difference_column(clean_df)
# final_df = split_difference_into_removed_added(df_with_difference)

In [4]:
# final_df.head()

## Option B

!! NOTE !! this is only on a subset of the data (800 observations)

In [5]:
final_df = spark.read.parquet("../output/output_preprocessing.parquet")

# 2. Balance data using *stratified sampling*

We do this to ease the memory usage of the TF-IDF. In any case, the data is highly imbalanced, with a current distribution of:

- safe: 30333 (~86%)
- unsafe: 4136 (~13.2%)
- vandal: 270 (~0.8%)

It is better to rebalance this by **downsampling** the `safe` class and keeping the others.

**TODO**
 - check effect on performance: does it improves/worsens or really doesn't matter?

In [8]:
def get_stratified_sample(df, fractions, categorical_class="label", random_state = 42):
    
    """
    This function creates a stratified sample based on thresholds specified on a categorical class
    The aim of this is to balance a dataset more evenly by reducing the size of over-prepresented classes.
    
    Args:
        df: pyspark dataframe with data to be stratified sampled
        fractions: a dictionary of fractions for each category in the categorical variable
        categorical_class: the variable on which to perform stratified sampling
        random_state: default = 42. Set the seed for reproducibility
    Returns:
        df: a pyspark dataframe which has been stratified sampled based on the above criteria.
    """
    auto_fractions = df.select("{}".format(categorical_class)).distinct().withColumn("fraction", lit(1.0)).rdd.collectAsMap()
    #fractions = {'safe': 0.1, 'unsafe': 1.0, 'vandal':1.0}
    # override default 1.0 non-samples with classes which need to be subsampled
    for frac in fractions.items():
        key = frac[0]
        frac_value = frac[1]
        auto_fractions[key] = frac_value
    
    seed = random_state
    sampled_df = df.stat.sampleBy(categorical_class, auto_fractions, seed)
    return sampled_df

### before downsampling

In [6]:
final_df.groupBy("label").count().show()

+------+-----+
| label|count|
+------+-----+
|  safe|  675|
|unsafe|  126|
|vandal|    9|
+------+-----+



### after downsampling

In [9]:
sampled_df = get_stratified_sample(df = final_df, fractions = {'safe': 0.15})
sampled_df.groupBy("label").count().show()

+------+-----+
| label|count|
+------+-----+
|unsafe|  126|
|  safe|   96|
|vandal|    9|
+------+-----+



# 2. Split in training and test set

preserve balance of classes by performing a **stratisfied split** to get a representive test set

### IMPORTANT
- The **training set** is to train the model and find good paramters (possible an additional validation test or cross validation)
- The **test set** is to evaluate performance (generalization error) of the final chosen model
- When you stream the data: that's model deployment (I don't really consider this as test data), this is actual live incoming data

In [141]:
# Taking 80% of both 0's and 1's into training set
split_ratio = 0.8
train = sampled_df.sampleBy("label", 
                            fractions={'unsafe': split_ratio, 'safe': split_ratio, 'vandal':split_ratio}, seed=10)

# Subtracting 'train' from original 'data' to get test set 
test = sampled_df.subtract(train)

In [142]:
train.groupBy("label").count().show()

+------+-----+
| label|count|
+------+-----+
|unsafe|  102|
|  safe|   75|
|vandal|    7|
+------+-----+



In [143]:
test.groupBy("label").count().show()

+------+-----+
| label|count|
+------+-----+
|unsafe|   24|
|  safe|   21|
|vandal|    2|
+------+-----+



# 3. TF-IDF Features on New and Old Texts 

Term Frequency - Inverse Document Frequency (TF-IDF) is a technique used to build features out of text documents which have theoretically infinite dimensionality without feature reduction techniques such as this. The term-frequency is the step where we take the tokenized words from the text documents and hash them to a finite feature space. The resulting vectors represent a single document of text. For example, the text 'the brown fox' will hash to a vector of specified length, say 5, such that the result of the hash yields [1,0,2,0,0]. In the case of Spark, the hash used is MurmurHash 3.

However, in a large text corpus, some words will be very present (e.g. “the”, “a”, “is”) hence carrying very little meaningful information about the actual contents of the document. If we were to feed the direct count data directly to a classifier those very frequent terms would shadow the frequencies of rarer yet more interesting terms.

In order to re-weight the count features into floating point values suitable for usage by a classifier it is very common to incorporate the document frequency of occurrence as a weight or normalization to the term-frequencies mentioned above. Hence, TF-IDF.

### Function below is depreciated (will be removed in further updates)
 - the function below is integrated in the pipeline (see below)

In [13]:
def tfIdf(df=None, 
          text_col_for_tf_idf=None, 
          output_tf_idf_col=None, 
          numFeatures=20, 
          count_method ='hash'):
    
    """ This fucntion takes the text data and converts it into a term frequency-Inverse Document Frequency vector
        The steps for this are tokenization of the input string column, stop word removal, feature hashing/count vectorization depending on 
        the count_method, and inverse document normalization step.
        
    Args: 
        - text_col_for_tf_idf: input text column of type 'string' in Java which is used as input to the tokenization, stop word removal and TF-IDF step
        - output_tf_idf_col: output column to store the resulting feature
        - count_method: default = 'hash'. Determines whether to use featuer hashing or counts as the TF step for TF-IDF
    returns: 
        dataframe with tf-idf vectors

    """

    # Carrying out the Tokenization of the text documents (splitting into words)
    tokenizer = Tokenizer(inputCol=text_col_for_tf_idf, outputCol="tokenised_text")
    tokensDf = tokenizer.transform(df)
    # Carrying out the StopWords Removal for TF-IDF
    stopwordsremover=StopWordsRemover(inputCol='tokenised_text',outputCol='words')
    swremovedDf= stopwordsremover.transform(tokensDf)

    if count_method == 'hash':
        # hashing is irreversible whereas counting is 
        # While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
        # First to compute the IDF vector and second to scale the term frequencies by IDF.
        hashingTF = HashingTF(inputCol="words", outputCol="tf_features", numFeatures=20)
        tfDf = hashingTF.transform(swremovedDf)
    else:
        # Creating Term Frequency Vector for each word
        cv=CountVectorizer(inputCol="words", outputCol="tf_features", vocabSize=300, minDF=2.0)
        cvModel=cv.fit(swremovedDf)
        tfDf=cvModel.transform(swremovedDf)

    # Carrying out Inverse Document Frequency on the TF data
    # spark.mllib's IDF implementation provides an option for ignoring terms
    # which occur in less than a minimum number of documents.
    # In such cases, the IDF for these terms is set to 0.
    # This feature can be used by passing the minDocFreq value to the IDF constructor.
    idf=IDF(inputCol="tf_features", outputCol="{}".format(output_tf_idf_col))
    idfModel = idf.fit(tfDf)
    tfidfDf = idfModel.transform(tfDf)

    tfidfDf.cache().count()
    cols_to_drop = ["tokenised_text", "tf_features", "words"]
    tfidfDf = tfidfDf.drop(*cols_to_drop)

    return tfidfDf

### Calculate TF-IDF via Spark

In [8]:
# tfidfDf = tfIdf(sampled_df, text_col_for_tf_idf = "clean_new_text", output_tf_idf_col = "new_text_tf_idf_features")
# tfidfDf = tfIdf(tfidfDf, text_col_for_tf_idf = "clean_old_text", output_tf_idf_col = "old_text_tf_idf_features")

# 4. N-Gram Features on Text Differences (Added/Removed)

Here we extract n-gram features from the text differences (text added or removed). The goal is from these simple combinations of words to extract usable features for modelling. Since the words are unordered, an n-gram model is appropriate, as it itself is not necessarily order-preserving in its selection of words.

We select $n = 2$ for simplicity of the method. Additionally, we optionally apply feature hashing to the resulting n-grams.


### Function below is depreciated (will be removed in further updates)
 - the function below is integrated in the pipeline (see below)

In [9]:
def extract_ngrams(df, text_col_for_ngrams, output_col_for_ngrams, n = 2, do_feature_hashing = True):
    """ This function takes a text column and converts it to a (hashed or unhashed) n-gram representation.
        The steps are to remove stop words and to run the n-gram, then do optional feature hashing.
        
    parameter: 
        text_col_for_ngrams: input text column of typ 'string' in Java which is used as input to the stop word removal and n-gram step
        output_col_for_ngrams: output column to store the resulting feature
        n: default = 2. Determines the value of n for the n-gram calculation. Example, n = 1 is a unigram of single words.
        do_feature_hashing: default = True. Determines whether to use featuer hashing or not
    returns: dataframe with n-gram vectors

    """

    ngram = NGram(n=n, inputCol="{}".format(text_col_for_ngrams), outputCol="ngrams")
    df = ngram.transform(df)
    if do_feature_hashing:
        # Carrying out the StopWords Removal for TF-IDF
        stopwordsremover=StopWordsRemover(inputCol='ngrams',outputCol='words')
        swremovedDf= stopwordsremover.transform(df)
        # hashing is irreversible whereas counting is 
        # While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
        # First to compute the IDF vector and second to scale the term frequencies by IDF.
        hashingTF = HashingTF(inputCol="words", outputCol="{}".format(output_col_for_ngrams), numFeatures=20)
        tfDf = hashingTF.transform(swremovedDf)  
        tfDf = tfDf.drop("words")
    tfDf = tfDf.drop("ngrams")
    return tfDf


### Calculate N-Gram Features via Spark

In [10]:
# tfidfDf = extract_ngrams(tfidfDf, text_col_for_ngrams = "removed_words", 
#                          output_col_for_ngrams = "removed_words_ngrams_hash_features")
# tfidfDf = extract_ngrams(tfidfDf, text_col_for_ngrams = "added_words", 
#                          output_col_for_ngrams = "added_words_ngrams_hash_features")

# 5 construct pipeline

 - Spark API pipeline: https://spark.apache.org/docs/latest/ml-pipeline.html (very similar to scikit-learn)
 - Spark API extact features (e.g. *TF-IDF, N-Gram*): https://spark.apache.org/docs/latest/ml-features.html 
 - Spark API models: https://spark.apache.org/docs/latest/ml-classification-regression.html 
 
**TODO**:  
 - Try out different feature engineering steps and evaluate

In [144]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

## 5.1  TF-IDF Features (new and old text)

### *A) clean_new_text*

In [145]:
# Carrying out the Tokenization of the text documents (splitting into words)
tokenizer_new = Tokenizer(inputCol="clean_new_text", outputCol="clean_new_tokenised_text")
stopwordsremover_new=StopWordsRemover(inputCol=tokenizer_new.getOutputCol(),outputCol='words_clean_new')
# hashing is irreversible whereas counting is 
hashingTF_new = HashingTF(inputCol=stopwordsremover_new.getOutputCol(), outputCol="tf_features_clean_new", numFeatures=100)
# cv=CountVectorizer(inputCol="words", outputCol="tf_features", vocabSize=300, minDF=2.0)
idf_new = IDF(inputCol=hashingTF_new.getOutputCol(), outputCol="feature_clean_new", minDocFreq=5)

### *B) clean_old_text*

In [146]:
# Carrying out the Tokenization of the text documents (splitting into words)
tokenizer_old = Tokenizer(inputCol="clean_old_text", outputCol="clean_old_tokenised_text")
stopwordsremover_old=StopWordsRemover(inputCol=tokenizer_old.getOutputCol(),outputCol='words_clean_old')
# hashing is irreversible whereas counting is 
hashingTF_old = HashingTF(inputCol=stopwordsremover_old.getOutputCol(), outputCol="tf_features_clean_old", numFeatures=100)
# cv=CountVectorizer(inputCol="words", outputCol="tf_features", vocabSize=300, minDF=2.0)
idf_old = IDF(inputCol=hashingTF_old.getOutputCol(), outputCol="feature_clean_old", minDocFreq=5)

## 5.2 N-Gram Features on Text Differences (Added/Removed)

### *A) added_words*

In [147]:
ngram_added = NGram(n=2, inputCol="added_words", outputCol="ngrams_added")
# Carrying out the StopWords Removal for TF-IDF
stopwordsremover_added=StopWordsRemover(inputCol=ngram_added.getOutputCol(),outputCol='words_added')
hashingTF_added = HashingTF(inputCol=stopwordsremover_added.getOutputCol(), outputCol="feature_added", numFeatures=100)

### *B) removed_words*

In [148]:
ngram_removed = NGram(n=2, inputCol="removed_words", outputCol="ngrams_removed")
# Carrying out the StopWords Removal for TF-IDF
stopwordsremover_removed=StopWordsRemover(inputCol=ngram_removed.getOutputCol(),outputCol='words_removed')
hashingTF_removed = HashingTF(inputCol=stopwordsremover_removed.getOutputCol(), outputCol="feature_removed", numFeatures=100)

## 5.3 Add all steps and define model

You can change the model to whathever model you want to try
see SPARK API for all models:

- Spark API models: https://spark.apache.org/docs/latest/ml-classification-regression.html 

### TODO 
- Explore different models

In [149]:
# add all features to a vector assembler and call it features (default names for most models)
assembler = VectorAssembler(
    inputCols=["feature_clean_new","feature_clean_old",
               "feature_removed","feature_added"],
                outputCol="features")

# make target numeric
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "target")

# define model
lr = LogisticRegression(maxIter=10, regParam=0.001, 
                        featuresCol='features',
                        labelCol='target')

# add pipeline steps
pipeline = Pipeline(stages=[tokenizer_new, stopwordsremover_new, hashingTF_new, idf_new, 
                            tokenizer_old, stopwordsremover_old, hashingTF_old, idf_old,
                            ngram_removed, stopwordsremover_removed, hashingTF_removed,
                            ngram_added, stopwordsremover_added, hashingTF_added,
                            assembler, label_stringIdx, lr])

## 6 Train define model (pipeline), evaluate, and find good parameters

**TODO** 
  - find reasoanble parameters for chosen models (model tuning)

In [150]:
# Fit the pipeline to training documents.
model = pipeline.fit(train)

In [151]:
pred_train = model.transform(train)
pred_train.select("label", "probability", "prediction").show(5)

+------+--------------------+----------+
| label|         probability|prediction|
+------+--------------------+----------+
|unsafe|[0.99999999990567...|       0.0|
|  safe|[4.25847421797823...|       1.0|
|unsafe|[0.99999978834739...|       0.0|
|  safe|[4.77315719487516...|       1.0|
|  safe|[2.87856080558190...|       1.0|
+------+--------------------+----------+
only showing top 5 rows



In [152]:
trainingSummary = model.stages[16].summary
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Accuracy: 0.9728260869565217
FPR: 0.030780886688005292
TPR: 0.9728260869565217
F-measure: 0.9726670832530946
Precision: 0.9732983068152813
Recall: 0.9728260869565217


## 7 Performance of final model (pipeline) on test set


### Important
 - Only use this dataset once you decided on your final model.
 - This is only to get and idea of your models performance in real live (when we start streaming)
 

In [155]:
?MulticlassClassificationEvaluator

In [156]:
pred_test = model.transform(test)
pred_test.select("target","probability", "prediction").show(5)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='target')
evaluator.evaluate(pred_test)

+------+--------------------+----------+
|target|         probability|prediction|
+------+--------------------+----------+
|   0.0|[0.34823711252734...|       1.0|
|   0.0|[0.77946260563692...|       0.0|
|   1.0|[0.24516462637095...|       1.0|
|   0.0|[0.00409218947671...|       1.0|
|   0.0|[0.99980329842690...|       0.0|
+------+--------------------+----------+
only showing top 5 rows



0.5415860735009671

In [157]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = pred_test.select(['probability', 'target'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("Test Data Aread under ROC score is : ", metrics.areaUnderROC)

Test Data Aread under ROC score is :  0.5978260869565217


# 8. Train final model (pipline) on all data and save

### IMPORTANT

- train your final model on **ALL DATA** using the parameters found above

In [158]:
final_model = pipeline.fit(sampled_df)

In [160]:
final_model.save("../output/models/logistic_regression")