In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import

In [2]:
spark = SparkSession.builder.appName("toxicity").getOrCreate()

In [8]:
import s3fs

# csv file
# df = pd.read_csv('s3://toxicityproject/train.csv')

PermissionError: Access Denied

In [78]:
# df = spark.read.csv("data/train.csv", inferSchema=True, header=True, multiLine=True)

In [None]:
df = pd.read_csv("data/train.csv")
# (159571, 8)

In [80]:
spark_df = spark.createDataFrame(df)

In [81]:
spark_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: long (nullable = true)
 |-- severe_toxic: long (nullable = true)
 |-- obscene: long (nullable = true)
 |-- threat: long (nullable = true)
 |-- insult: long (nullable = true)
 |-- identity_hate: long (nullable = true)



In [82]:
data = spark_df.select("id", "comment_text", "toxic")

In [85]:
# data.show(truncate=False)

Does length matter?

In [88]:
from pyspark.sql.functions import length

In [94]:
data = data.withColumn('length',length(data['comment_text']))
data.groupby('toxic').mean().show()

+-----+----------+-----------------+
|toxic|avg(toxic)|      avg(length)|
+-----+----------+-----------------+
|    0|       0.0|404.5493391185012|
|    1|       1.0|  295.24604420034|
+-----+----------+-----------------+



# Feature Extraction

In [205]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, NaiveBayes
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [107]:
# create stages
tokenizer = Tokenizer(inputCol="comment_text", outputCol="tokens")
stop_remove = StopWordsRemover(inputCol="tokens", outputCol="stop_tokens")
hashingTF = HashingTF(inputCol="stop_tokens", outputCol="rawFeatures", numFeatures=20)
idf = IDF(inputCol="rawFeatures", outputCol="tf_idf")
clean_data = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

In [113]:
# create Pipeline
pipeline = Pipeline(stages=[tokenizer, stop_remove, hashingTF, idf, clean_data])
pipeline = pipeline.fit(data)
new_data = pipeline.transform(data)

In [119]:
# train_test_split
train, val = new_data.randomSplit([0.7, 0.3])

# Model

In [117]:
rfc = RandomForestClassifier(labelCol="toxic", numTrees=100)
gbt = GBTClassifier(labelCol="toxic")

In [206]:
nb = NaiveBayes(labelCol="toxic")

In [120]:
rfc_model = rfc.fit(train)
gbt_model = gbt.fit(train)

In [207]:
nb_model = nb.fit(train)

In [121]:
rfc_results = rfc_model.transform(val)
gbt_results = gbt_model.transform(val)

In [208]:
nb_results = nb_model.transform(val)

# Evaluate Model

In [123]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

## Calculate Accuracies

In [136]:
acc_eval = MulticlassClassificationEvaluator(labelCol="toxic", metricName="accuracy")

In [137]:
rfc_acc = acc_eval.evaluate(rfc_results)
gbt_acc = acc_eval.evaluate(gbt_results)

In [138]:
print("RFC Accuracy: {:.5f}. GBT Accuracy: {:.5f}.".format(rfc_acc, gbt_acc))

RFC Accuracy: 0.90505. GBT Accuracy: 0.90553.


In [209]:
nb_acc = acc_eval.evaluate(nb_results)

## Calculate Recalls

In [128]:
recall_eval = MulticlassClassificationEvaluator(labelCol="toxic", metricName="weightedRecall")

In [131]:
rfc_recall = recall_eval.evaluate(rfc_results)
gbt_recall = recall_eval.evaluate(gbt_results)

In [134]:
print("RFC Recall: {:.5f}. GBT Recall: {:.5f}.".format(rfc_recall, gbt_recall))

RFC Recall: 0.90505. GBT Recall: 0.90553.


# Read Test Data and Evaluate Results

In [154]:
test_df = pd.read_csv("data/test.csv")
spark_test_df = spark.createDataFrame(test_df)

Exception ignored in: <function JavaModelWrapper.__del__ at 0x12ac2e7b8>
Traceback (most recent call last):
  File "/Users/patricktung/anaconda3/lib/python3.7/site-packages/pyspark/mllib/common.py", line 142, in __del__
    self._sc._gateway.detach(self._java_model)
AttributeError: 'MultilabelMetrics' object has no attribute '_sc'
Exception ignored in: <function JavaModelWrapper.__del__ at 0x12ac2e7b8>
Traceback (most recent call last):
  File "/Users/patricktung/anaconda3/lib/python3.7/site-packages/pyspark/mllib/common.py", line 142, in __del__
    self._sc._gateway.detach(self._java_model)
AttributeError: 'MultilabelMetrics' object has no attribute '_sc'


In [161]:
test_data = spark_test_df.withColumn('length',length(spark_test_df['comment_text']))

In [163]:
new_test_data = pipeline.transform(test_data)

In [167]:
# predict
rfc_predict = rfc_model.transform(new_test_data).select("id", "comment_text", "prediction")
gbt_predict = gbt_model.transform(new_test_data).select("id", "comment_text", "prediction")

In [174]:
test_labels = spark.createDataFrame(pd.read_csv("data/test_labels.csv")).select("id", "toxic")

In [175]:
test_labels.show()

+----------------+-----+
|              id|toxic|
+----------------+-----+
|00001cee341fdb12|   -1|
|0000247867823ef7|   -1|
|00013b17ad220c46|   -1|
|00017563c3f7919a|   -1|
|00017695ad8997eb|   -1|
|0001ea8717f6de06|    0|
|00024115d4cbde0f|   -1|
|000247e83dcc1211|    0|
|00025358d4737918|   -1|
|00026d1092fe71cc|   -1|
|0002eadc3b301559|   -1|
|0002f87b16116a7f|    0|
|0003806b11932181|   -1|
|0003e1cccfd5a40a|    0|
|00059ace3e3e9a53|    0|
|000634272d0d44eb|   -1|
|000663aff0fffc80|    0|
|000689dd34e20979|    0|
|000834769115370c|   -1|
|000844b52dee5f3f|    0|
+----------------+-----+
only showing top 20 rows



In [212]:
inner_join = test_labels.join(gbt_predict, test_labels.id == gbt_predict.id)

In [213]:
inner_join.printSchema()

root
 |-- id: string (nullable = true)
 |-- toxic: long (nullable = true)
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- prediction: double (nullable = false)



In [214]:
# Evaluate
my_eval = MulticlassClassificationEvaluator(labelCol="toxic", predictionCol="prediction", metricName="accuracy")

In [215]:
test_results = my_eval.evaluate(inner_join)

In [216]:
print("Test accuracy: {}".format(test_results))

Test accuracy: 0.3779151758898958


In [211]:
# test... NB
nb_predict = nb_model.transform(new_test_data).select("id", "comment_text", "prediction")
test_nb = test_labels.join(nb_predict, test_labels.id == nb_predict.id)
test_nb_results = my_eval.evaluate(test_nb)
print("Test accuracy: {}".format(test_nb_results))

Test accuracy: 0.37502938027212657
