In [28]:
# Install pyspark and findspark
!pip install --ignore-install -q pyspark
# Install findspark library
!pip install --ignore-install -q findspark

### 1. Set up spark context and SparkSession

In [29]:
# Import necessary libraries
import findspark
import sys
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import rand

In [30]:
findspark.init()
spark = SparkSession \
    .builder \
    .appName("PySpark-TextClassifier") \
    .config("spark.jars","https://s3.amazonaws.com/athena-downloads/drivers/JDBC/SimbaAthenaJDBC-2.0.33.1003/AthenaJDBC42-2.0.33.jar") \
    .config("spark.jars", "/Users/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar") \
    .getOrCreate()

### 2. Load dataset

In [31]:
# Query Athena

# from google.colab import userdata
# aws_key = userdata.get('aws_key')
# aws_secret = userdata.get('aws_secret')

# data = (
#     spark.read.format("jdbc")
#     .option("driver","com.simba.athena.jdbc.Driver")
#     .option("url", "jdbc:awsathena://athena.eu-west-2.amazonaws.com:443")
#     .option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.DefaultAWSCredentialsProviderChain")
#     .option("S3OutputLocation","s3://aws-athena-query-results")
#     # .option("database", "your_database")
#     .option("query","select 1 as test")
#     .load()
# )

# data.show()

In [32]:
# Query mySQL

# db_host = os.getenv("DB_HOST")
# db_user = os.getenv("DB_USER")
# db_password = os.getenv("DB_PASSWORD")
# db_database = os.getenv("DB_DATABASE")

# data = spark.read.format("jdbc"). \
#             option("url", "jdbc:mysql://localhost:3306/testdb"). \
#             option("driver", "com.mysql.jdbc.Driver"). \
#             option("user", db_user). \
#             option("password", db_password). \
#             option("query", "select 1 as test"). \
#             load()

# data.show()

In [33]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
data = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/1to8_labelled_dataset.csv", header=True, inferSchema=True)

Mounted at /content/drive


In [34]:
data = data.withColumn("label", col("label").cast("int"))
data = data.dropna()
print (data.count())
# data.filter(data['label'] == 1).show()
data = data.filter((data['label'] == 0) | (data['label'] == 1))
data.groupBy('label').count().show()

7076
+-----+-----+
|label|count|
+-----+-----+
|    1| 1000|
|    0| 6070|
+-----+-----+



### 3. Text Processing and Modelling



In [35]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="body", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["is","like","and","the"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features")
# td-idf
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

In [36]:
# pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover,countVectors])
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+-------------+----------+---------------+--------------------+--------------------+--------------------+-------+------------+---------+-----+--------------------+--------------------+--------------------+--------------------+
|submission_id|comment_id|      timestamp|              author|                body|          submission|upvotes|upvote_ratio|       dt|label|               words|            filtered|         rawFeatures|            features|
+-------------+----------+---------------+--------------------+--------------------+--------------------+-------+------------+---------+-----+--------------------+--------------------+--------------------+--------------------+
|      1b1f5ob|   kse2bsy|2/27/2024 15:56| ARE_U_FUCKING_SORRY|Interesting that ...|Woman to be charg...|     58|        0.93|2/27/2024|    0|[interesting, tha...|[interesting, tha...|(10000,[15,1601,3...|(10000,[15,1601,3...|
|      1b1f5ob|   kse7vqf|2/27/2024 16:28|        Brikandbones| Can't buy taste tbh|Woman to

In [37]:
# Train test split
(training_data, testing_data) = dataset.randomSplit([0.8, 0.2])

In [38]:
# Logistic Regression
lr = LogisticRegression(maxIter=5, regParam=0.3, elasticNetParam=0)
# DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3, minInfoGain=0.001, impurity="entropy")
# RFClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 20, \
                            maxDepth = 3, \
                            maxBins = 32)
# GBT Classifier
gbt = GBTClassifier(labelCol="label", \
                            featuresCol="features", \
                            maxIter = 3)

In [39]:
selection = "dt"  #  model from [lr, dt, rf, gbt]
if selection == 'lr':
    Model = lr.fit(training_data)
elif selection == 'dt':
    Model = dt.fit(training_data)
elif selection == 'rf':
    Model = rf.fit(training_data)
elif selection == 'gbt':
    Model = gbt.fit(training_data)
else:
    raise ValueError("Invalid selection: {}".format(selection))

# Print the trained model
print("Trained model:", Model)

Trained model: DecisionTreeClassificationModel: uid=DecisionTreeClassifier_47f8f2d26f7b, depth=3, numNodes=13, numClasses=2, numFeatures=10000


In [40]:
predictions = Model.transform(testing_data)

In [41]:
# show detected radical comments
predictions.filter(predictions['prediction'] == 1) \
    .select("submission_id","comment_id","timestamp","author","body","label","prediction") \
    .orderBy("author", ascending=True) \
    .show(n = 50, truncate = 30)

+-------------+----------+----------------+-----------------+------------------------------+-----+----------+
|submission_id|comment_id|       timestamp|           author|                          body|label|prediction|
+-------------+----------+----------------+-----------------+------------------------------+-----+----------+
|      kq2d8x5|   kumd76m|     45201.35417| extristuser12344|Just got my hands on some p...|    1|       1.0|
|      kq2f8z7|   kumeq86|     45047.53125|extristuser1234er|Let's collaborate to carry ...|    1|       1.0|
|      kq2f8z7|   kumz06m|     45267.57292| extristuser1234g|Let's unite to carry out vi...|    1|       1.0|
|      kqss8y3|   kummm5s| 20/5/2023 10:15|extristuser1234pj|Let's collaborate to carry ...|    1|       1.0|
|      kq2f8z6|   kumh76h|     45269.59722|extristuser1234tz|Let's collaborate to carry ...|    1|       1.0|
|      kq2d8x6|   kumbs3v| 20/11/2023 8:30|extristuser1234vs|Let's unite to carry out vi...|    1|       1.0|
|      q2d

### 4. Evaluation

In [42]:
evaluator1 = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
evaluator2 = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderPR")
evaluator3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator4 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluator5 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator6 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

print("Area Under ROC:", "{:.4f}".format(evaluator1.evaluate(predictions)))
print("Area Under PR:", "{:.4f}".format(evaluator2.evaluate(predictions)))
print("Accuracy:", "{:.4f}".format(evaluator3.evaluate(predictions)))
print("F1 Score:", "{:.4f}".format(evaluator4.evaluate(predictions)))
print("Weighted Precision:", "{:.4f}".format(evaluator5.evaluate(predictions)))
print("Weighted Recall:", "{:.4f}".format(evaluator6.evaluate(predictions)))

Area Under ROC: 0.9973
Area Under PR: 0.9976
Accuracy: 0.9993
F1 Score: 0.9993
Weighted Precision: 0.9993
Weighted Recall: 0.9993
