In [14]:
import findspark
findspark.init()

from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import udf, col
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml import Pipeline
from sparknlp.pretrained import PretrainedPipeline



import pandas as pd
import numpy as np
import glob
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split

In [2]:
import os
os.getcwd()

'/Users/lilynorthcutt/Documents/subreddit-classification/training'

In [16]:
# Initialize Spark
sparknlp.start()
conf = SparkConf().setAppName('subreddit-classification')
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

In [18]:
# Read in subset of data for training
df = spark.read.json("*.json", multiLine=True)


In [19]:
# Clean the data
df = df.withColumnRenamed('selftext', 'text')

df = df.select('subreddit', 'title')

df = df.filter(df.title != '[deleted]')\
               .filter(df.title != '[removed]')\
               .filter(df.title != '')

print('Cleaned')

Cleaned


In [20]:
# Label the data 1 if from r/incel and 0 if not
df_incel = df.withColumn(
    'label',
    F.when((F.col("subreddit") == 'Incels') , '1')\
    .otherwise('0')
)

In [21]:
df_incel.groupby("label")\
    .count()\
    .orderBy(col("count").desc()).show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 9997|
|    1| 5000|
+-----+-----+



In [22]:
# SparkNLP Pipeline
document = DocumentAssembler()\
    .setInputCol("title")\
    .setOutputCol("document")
print("Document Done")

use = UniversalSentenceEncoder.pretrained()\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")
print("Use Done")

classsifierdl = ClassifierDLApproach().setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("label")\
    .setMaxEpochs(10)\
    .setEnableOutputLogs(True)\
    .setLr(0.004)
print("Classifierd Done")

nlpPipeline = Pipeline(       
    stages = [
        document,
        use,
        classsifierdl
    ])
print("Pipeline Done")

Document Done
tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
Use Done
Classifierd Done
Pipeline Done


In [23]:
# Split into test and training set
(train_set, test_set)= df_incel.randomSplit([0.8, 0.2], seed=100)

In [24]:
# Train the model
model_incel = nlpPipeline.fit(train_set)

In [28]:
# Predict Testing Data
df_incel= model_incel.transform(test_set).select("subreddit", "title","label", "document", "class.result").toPandas()
df_incel["result"] = df_incel["result"].apply(lambda x:x[0])


In [29]:
print(classification_report(df_incel["label"], df_incel["result"]))
print(accuracy_score(df_incel["label"], df_incel["result"]))

              precision    recall  f1-score   support

           0       0.89      0.94      0.92      1966
           1       0.87      0.78      0.82      1013

    accuracy                           0.89      2979
   macro avg       0.88      0.86      0.87      2979
weighted avg       0.89      0.89      0.88      2979

0.8865391070829137


In [30]:
# Test on a small batch of data 

In [37]:
tests = spark.read.json("tests/*.json", multiLine=True)

In [38]:
tests = tests.withColumnRenamed('selftext', 'text')

tests = tests.select('subreddit', 'title')

tests = tests.filter(tests.title != '[deleted]')\
               .filter(tests.title != '[removed]')\
               .filter(tests.title != '')

print('Cleaned')

Cleaned


In [39]:
tests.groupby("subreddit")\
    .count()\
    .orderBy(col("count").desc()).show()

+---------+-----+
|subreddit|count|
+---------+-----+
|Braincels| 1000|
|shortcels| 1000|
|  Cooking| 1000|
+---------+-----+



In [40]:
# Predict on the new data
df_tests= model_incel.transform(tests).select("subreddit", "title", "class.result").toPandas()
df_tests["result"] = df_tests["result"].apply(lambda x:x[0])


In [41]:
# View how each are classified
df_tests[["subreddit", "result"]].groupby(["result", "subreddit"])\
    .size()

result  subreddit
0       Braincels    326
        Cooking      988
        shortcels    360
1       Braincels    674
        Cooking       12
        shortcels    640
dtype: int64

In [42]:
# Save outputs as csv files
df_incel.to_csv("outputs/incel_title.csv")
df_tests.to_csv("outputs/tests_title.csv")

In [1]:
os.getcwd()

NameError: name 'os' is not defined

In [45]:
# Read in all incel posts ever posted to the subreddit
fullIncel =  spark.read.json("../data/data_incel/incel.json", multiLine=True)

In [46]:
fullIncel = fullIncel.withColumnRenamed('selftext', 'text')

fullIncel = fullIncel.select('subreddit', 'title')

fullIncel = fullIncel.filter(fullIncel.title != '[deleted]')\
               .filter(fullIncel.title != '[removed]')\
               .filter(fullIncel.title != '')

print('Cleaned')

Cleaned


In [47]:
fullIncel = fullIncel.withColumn(
    'label',
    F.when((F.col("subreddit") == 'Incels') , '1')\
    .otherwise('0')
)

In [48]:
fullIncel= model_incel.transform(fullIncel).select("subreddit", "title","label", "document", "class.result").toPandas()
fullIncel["result"] = fullIncel["result"].apply(lambda x:x[0])


In [49]:
fullIncel[["subreddit", "result"]].groupby(["result"])\
    .size()

result
0     9450
1    38760
dtype: int64

In [51]:
# Accuracy
38760/(9450+38760)

0.8039825762289982

In [52]:
os.getcwd()

'/Users/lilynorthcutt/Documents/subredditCLASSIFICATION/training'

In [53]:
# Write predictions to text file
fullIncel[["subreddit","title","label", "result"]].to_csv('outputs/predictions_incel.txt', sep='\t', index=False)

In [54]:
# Read in 2.5 million rows of subreddit posts
other = spark.read.format("csv").option("header", "true").load("../data/data_other/*.csv")

In [55]:
other = other.withColumnRenamed('selftext', 'text')

other = other.select('subreddit', 'title')

other = other.filter(other.title != '[deleted]')\
               .filter(other.title != '[removed]')\
               .filter(other.title != '')

print('Cleaned')

Cleaned


In [56]:
other = other.withColumn(
    'label',
    F.when((F.col("subreddit") == 'Incels') , '1')\
    .otherwise('0')
)

NameError: name 'model_incel' is not defined

In [None]:
other= model_incel.transform(other).select("subreddit", "title","label", "document", "class.result").toPandas()
other["result"] = other["result"].apply(lambda x:x[0])

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/Users/lilynorthcutt/opt/anaconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3437, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-57-9c197a5f7102>", line 1, in <module>
    other= model_incel.transform(other).select("subreddit", "title","label", "document", "class.result").toPandas()
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/pandas/conversion.py", line 141, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
    answer = self.gateway_client.send_command(command)
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/Users/lilynorthcutt/opt/anaconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3437, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-57-9c197a5f7102>", line 1, in <module>
    other= model_incel.transform(other).select("subreddit", "title","label", "document", "class.result").toPandas()
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/pandas/conversion.py", line 141, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
    answer = self.gateway_client.send_command(command)
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/Users/lilynorthcutt/opt/anaconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3437, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-57-9c197a5f7102>", line 1, in <module>
    other= model_incel.transform(other).select("subreddit", "title","label", "document", "class.result").toPandas()
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/pandas/conversion.py", line 141, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
    answer = self.gateway_client.send_command(command)
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 

ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=TypeError("object of type 'NoneType' has no len()")>
Traceback (most recent call last):
  File "/Users/lilynorthcutt/opt/anaconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3437, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-57-9c197a5f7102>", line 1, in <module>
    other= model_incel.transform(other).select("subreddit", "title","label", "document", "class.result").toPandas()
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/pandas/conversion.py", line 141, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
    answer = self.g

In [None]:
other[["subreddit","title","label", "result"]].to_csv('outputs/predictions_other.txt', sep='\t', index=False)

In [None]:
other[["subreddit", "result"]].groupby(["result"])\
    .size()

In [278]:
(2204858+39448)/(499294+2204858+8762+39448)

0.8154109088848052

In [4]:
with open('outputs/predictions_other.txt') as f:
    data_other = f.read()
    
print("Txt file 1 read")
    
with open('outputs/predictions_incel.txt') as f:
    data_incel = f.read()
print("Txt file 2 read")

data_other += "\n"
data_other += data_incel
print("Txt merged")


with open ('predictions.txt', 'w') as fp:
    fp.write(data_other)
print("Done")

Txt file 1 read
Txt file 2 read
Txt merged
Done
