In [None]:
from datetime import datetime

import numpy as np
import html
import re
import neattext as nt
import neattext.functions as nfx
import nltk
nltk.data.path.append("/dbfs/tmp/nltk")
nltk.download('stopwords', download_dir="/dbfs/tmp/nltk")
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
import torch

from transformers import pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [None]:
spark = SparkSession.builder.getOrCreate()

## Loading Data from MongoDB

### Posts Dataframe

In [None]:
database = 'reddit_project'
user_name = 'admin'
password = 'msds697'
address = 'msds697-cluster.c0ekx.mongodb.net'

In [None]:
collection_posts = 'posts'
connection_string_posts = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection_posts}"

In [None]:
df_posts = spark.read.format("mongo").option("uri",connection_string_posts).load()
df_posts.show(10)

### Comments Dataframe

In [None]:
collection_comments = 'comments'
connection_string_comments = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection_comments}"

In [None]:
df_comments = spark.read.format("mongo").option("uri",connection_string_comments).load()
df_comments.rdd.count()

In [None]:
df_comments.show(5)

### Kaggle Dataframe

In [None]:
collection_kaggle = 'kaggle'
connection_string_kaggle = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection_kaggle}"

In [None]:
df_kaggle = spark.read.format("mongo").option("uri",connection_string_kaggle).load()
df_kaggle.rdd.count()

In [None]:
df_kaggle.show(5)

In [None]:
df_kaggle.printSchema()

## Data Cleaning

Goals: 
1. Drop null values in tweets
2. Tokenize the tweets and then perform word embedding on tokens by word counts
3. Split the three-class label column into two binary-class label columns

In [None]:
df = df_kaggle.filter(df_kaggle["tweet"].isNotNull())\
    .withColumn("hate_speech", when(df_kaggle["class"] == '1', 1).otherwise(0))\
    .withColumn("offensive_language", when(df_kaggle["class"] == '2', 1).otherwise(0))\
    .select("tweet", "hate_speech", "offensive_language")

df.show()

In [None]:
hs_data = df.select("tweet", "hate_speech")

coltokenizer = RegexTokenizer(inputCol='tweet', outputCol='words', pattern='\\W')
hs_data = coltokenizer.transform(hs_data)

stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
hs_data = stopwords_remover.transform(hs_data)

count_vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
hs_data = count_vectorizer.fit(hs_data).transform(hs_data)
hs_data.show()

## Model Training and Evaluation

### Naive Bayes Classifier

Split data into training and test data. For each of the "hate speech" and "offensive language" label columns, fit training data with a Naive Bayes Classifier and predict on test data. Record the f1 scores of prediction against true labels of test data.

In [None]:
#Split training and Testing
train_val_hs, test_hs = hs_data.randomSplit([0.8, 0.2], seed=42)
train_hs, val_hs = train_val_hs.randomSplit([0.8, 0.2], seed=42)

In [None]:
naive_bayes_hs = NaiveBayes(smoothing=1.0, modelType='multinomial', labelCol='hate_speech')
hs_model = naive_bayes_hs.fit(train_hs)
hs_pred = hs_model.transform(test_hs)

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='hate_speech', metricName='f1')
hs_f1_score = evaluator.evaluate(hs_pred)
hs_f1_score

In [None]:
ol_data = df.select("tweet", "offensive_language")

tokenizer = RegexTokenizer(inputCol='tweet', outputCol='words', pattern='\\W')
ol_data = tokenizer.transform(ol_data)

stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
ol_data = stopwords_remover.transform(ol_data)

count_vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
ol_data = count_vectorizer.fit(ol_data).transform(ol_data)

#Split training and Testing
train_val_ol, test_ol = ol_data.randomSplit([0.8, 0.2], seed=42)
train_ol, val_ol = train_val_ol.randomSplit([0.8, 0.2], seed=42)

naive_bayes_ol = NaiveBayes(smoothing=1.0, modelType='multinomial', labelCol='offensive_language')
ol_model = naive_bayes_ol.fit(train_ol)
ol_pred = ol_model.transform(test_ol)

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='offensive_language', metricName='f1')
ol_f1_score = evaluator.evaluate(ol_pred)
ol_f1_score

The f1 score for detecting hate speech in tweets is 0.8707 while the f1 score for detecting offensive language in tweets is 0.9164.

## Next Steps

Try out the Naive Bayes Classifier on the reddit comment data to see how it performs.