### AIT 614 - Big Data Essentials <br>
#### DL2 Team 3 Final Project
#### Detecting Abrasive online user content

##### Team 3
Yasser Parambathkandy

Indranil Pal

Deepak Rajan

<br>

##### University
George Mason University


##### Installation

###### Databricks:
On an existing cluster, add the following to the Advanced Options -> Spark tab:
```
  spark.kryoserializer.buffer.max 2000M
  spark.serializer org.apache.spark.serializer.KryoSerializer
```

In Libraries tab inside the cluster:
  * Install New -> PyPI -> spark-nlp==4.4.0 -> Install
  * Install New -> Maven -> Coordinates -> com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.0 -> Install

Refer https://github.com/JohnSnowLabs/spark-nlp#databricks-cluster for installation issues

###### Local Machine:
See README
<br>

##### Data load

Upload train.csv to databricks. It should get loaded to dbfs:/FileStore/tables/train.csv
For local development, update train.csv file path in second cell below
<br>
##### Saving Model

Save model to use in real-time prediction services. The model gets saved to S3 bucket if runtime environment is databricks and AWS accesskey csv has been uploaded to dbfs:/FileStore/tables/ait614_databricks_accessKeys.csv. If either condition is not satisfied, then model is not saved in databricks. To overwrite the file path, bucket name, update properties in second cell below
For local development, model is always saved.

Saved model is used by prediction services when user enter new questions in UI.

In [30]:
import os
import datetime

import matplotlib.pyplot as plt
import sparknlp
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import ArrayType, DoubleType
from sklearn.metrics import roc_curve, auc, classification_report, accuracy_score
from sparknlp.annotator import *
from sparknlp.base import *

In [31]:
#set properties for local machine development and databricks

# check if running in Databricks by looking for the DATABRICKS_RUNTIME_VERSION environment variable
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    # set the CSV file path for Databricks
    train_file_path = 'dbfs:/FileStore/tables/train.csv'
    aws_s3_keys = 'dbfs:/FileStore/tables/ait614_databricks_accessKeys.csv'
    s3_mount_path = '/mnt/ait614-models'
    s3_bucket_name = 'ait614-models'
else:
    # set the training file path for local machine. this training file should be placed  relative to the current notebook in data directory
    train_file_path = '/home/yarafatin/PycharmProjects/AIT-614-Final/data/train.csv'
    spark = sparknlp.start()

print('training file path {}'.format(train_file_path))

training file path /home/yarafatin/PycharmProjects/AIT-614-Final/data/train.csv


In [32]:
# Read the CSV file
schema = "qid STRING, question_text STRING, target INT"
df = spark.read.option("header", "true") \
        .option("delimiter", ",")\
        .option("multiLine", "true")\
        .option("quote", "\"")\
        .option("escape", "\"")\
        .schema(schema).csv(train_file_path)

df.printSchema()
df.show(10)

root
 |-- qid: string (nullable = true)
 |-- question_text: string (nullable = true)
 |-- target: integer (nullable = true)

+--------------------+--------------------+------+
|                 qid|       question_text|target|
+--------------------+--------------------+------+
|00002165364db923c7e6|How did Quebec na...|     0|
|000032939017120e6e44|Do you have an ad...|     0|
|0000412ca6e4628ce2cf|Why does velocity...|     0|
|000042bf85aa498cd78e|How did Otto von ...|     0|
|0000455dfa3e01eae3af|Can I convert mon...|     0|
|00004f9a462a357c33be|Is Gaza slowly be...|     0|
|00005059a06ee19e11ad|Why does Quora au...|     0|
|0000559f875832745e2e|Is it crazy if I ...|     0|
|00005bd3426b2d0c8305|Is there such a t...|     0|
|00006e6928c5df60eacb|Is it just me or ...|     0|
+--------------------+--------------------+------+
only showing top 10 rows



In [33]:
#takes a long time to download, so please be patient
embeddings = WordEmbeddingsModel.pretrained("glove_840B_300", "xx")\
                .setInputCols("document", "token")\
                .setOutputCol("embeddings")

glove_840B_300 download started this may take some time.
Approximate size to download 2.3 GB
[OK!]


In [34]:
document_assembler = DocumentAssembler()\
                        .setInputCol("question_text")\
                        .setOutputCol("document")
tokenizer = Tokenizer()\
                .setInputCols(["document"])\
                .setOutputCol("token")

In [35]:
nlpPipeline = Pipeline(stages=[document_assembler,
                               tokenizer,
                               embeddings])

df = nlpPipeline.fit(df).transform(df)

In [36]:
def avg_vectors(word_vectors):
    length = len(word_vectors[0]["embeddings"])
    avg_vec = [0] * length
    for vec in word_vectors:
        for i, x in enumerate(vec["embeddings"]):
            avg_vec[i] += x
        avg_vec[i] = avg_vec[i] / length
    return avg_vec


def dense_vector(vec):
    return Vectors.dense(vec)

In [37]:
# create a udf
avg_vectors_udf = udf(avg_vectors, ArrayType(DoubleType()))
df_doc_vec = df.withColumn("doc_vector", avg_vectors_udf(col("embeddings")))
df_doc_vec.show(10)

+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|                 qid|       question_text|target|            document|               token|          embeddings|          doc_vector|
+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------------------+
|00002165364db923c7e6|How did Quebec na...|     0|[{document, 0, 71...|[{token, 0, 2, Ho...|[{word_embeddings...|[0.38199613126926...|
|000032939017120e6e44|Do you have an ad...|     0|[{document, 0, 80...|[{token, 0, 1, Do...|[{word_embeddings...|[-0.7888190010562...|
|0000412ca6e4628ce2cf|Why does velocity...|     0|[{document, 0, 66...|[{token, 0, 2, Wh...|[{word_embeddings...|[0.04380194842815...|
|000042bf85aa498cd78e|How did Otto von ...|     0|[{document, 0, 56...|[{token, 0, 2, Ho...|[{word_embeddings...|[1.12683903053402...|
|0000455dfa3e01eae3af|Can I convert mon...|     0|[{doc

                                                                                

In [38]:
dense_vector_udf = udf(dense_vector, VectorUDT())
word_df = df_doc_vec.withColumn("features", dense_vector_udf(col("doc_vector")))

In [39]:
# split train and test data
# code inspired from https://stackoverflow.com/questions/47637760/stratified-sampling-with-pyspark
split_ratio = 0.8
seed = 42
fractions = word_df.select('target').distinct().withColumn("fraction", lit(split_ratio)).filter(
    'fraction is not null').rdd.collectAsMap()

train_df = word_df.stat.sampleBy('target', fractions, seed)
test_df = word_df.join(train_df, on='qid', how="left_anti")

                                                                                

In [None]:
lr = LogisticRegression(labelCol="target", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(train_df)

[Stage 20:>                                                         (0 + 1) / 1]

In [None]:
predictions = lrModel.transform(test_df)
predictions.select("target", "prediction").show(n=10, truncate=30)

In [None]:
# model performance evaluations
pred_df = predictions.select('target', 'prediction').toPandas()
print(classification_report(pred_df.target, pred_df.prediction))
print(accuracy_score(pred_df.target, pred_df.prediction))

In [None]:
# Compute AUC-ROC
binary_evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction",
                                                 metricName="areaUnderROC")
auc_roc = binary_evaluator.evaluate(predictions)
print("AUC-ROC: {:.2f}%".format(auc_roc * 100))

In [None]:
# Compute AUC-PR
binary_evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="prediction",
                                                 metricName="areaUnderPR")
auc_pr = binary_evaluator.evaluate(predictions)
print("AUC-PR: {:.2f}%".format(auc_pr * 100))

In [None]:
# Plot ROC curve
results = predictions.select(['probability', 'target']).collect()
results_list = [(float(i[0][1]), 1.0 - float(i[1])) for i in results]
fpr, tpr, _ = roc_curve([i[1] for i in results_list], [i[0] for i in results_list])
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8, 8))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

In [None]:
# save model. if running in databricks save to s3.
model_save_dir = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + '-word-model'

if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    try:
        dbutils.fs.ls(aws_s3_keys)
        aws_keys_df = spark.read.csv(aws_s3_keys)

        ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
        SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
        ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
        # see https://docs.databricks.com/dbfs/mounts.html#mount-a-bucket-using-aws-keys for reference
        dbutils.fs.mount(f"s3a://{ACCESS_KEY}:{ENCODED_SECRET_KEY}@{s3_bucket_name}", s3_mount_path)
        lrModel.save(s3_mount_path + '/' + model_save_dir)
    except Exception as e:
        print('no aws access key file loaded, so model is not being saved to s3')
else:
    lrModel.save(model_save_dir)