In [1]:
import json
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf, col, count
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, Tokenizer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import rand
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import length

In [3]:
## create rdd to further work on it
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

rdd = sc.textFile("RC_comment_08.txt")

In [6]:
rdd.take(1)

['{"link_id":"t3_j4zx3","score_hidden":false,"score":1,"archived":true,"author_flair_text":null,"subreddit":"fffffffuuuuuuuuuuuu","body":"\\"$2, would you take that deal? I\'d take that deal\\"","author":"DorkyDude","distinguished":null,"parent_id":"t3_j4zx3","id":"c298mtc","subreddit_id":"t5_2qqlo","controversiality":0,"gilded":0,"downs":0,"retrieved_on":1427415708,"name":"t1_c298mtc","ups":1,"edited":false,"author_flair_css_class":null,"created_utc":"1312156800"}']

In [7]:
## txt file shows that it is txt of json 
## keep only unix time information and comments
## also data that has 'deleted' info has been removed

## rdd keys are comment ids give under "name"
rdd_subset = rdd.map(lambda line : (json.loads(line)['name'],                                    
                                    json.loads(line)['author'],
                                    json.loads(line)['author_flair_text'],
                                    json.loads(line)['created_utc'],
                                    json.loads(line)['parent_id'],
                                    json.loads(line)['ups'],
                                    json.loads(line)['downs'],
                                    json.loads(line)['retrieved_on'],
                                    json.loads(line)['subreddit'],
                                    json.loads(line)['body'],)
                    ).filter(lambda line: line if '[deleted]' not in line[9] else None)

In [8]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 

In [9]:
spark = SparkSession.builder.getOrCreate() 

## here I create a dataframe from rdd and give column names
df = spark.createDataFrame(rdd_subset).toDF("name","author",
                                             "author_flair_text",
                                             "unix_time","parent_id",
                                             "ups","downs","retrieved_on",
                                             "subreddit", "comment")

In [10]:
df.show(2)

+----------+---------+-----------------+----------+----------+---+-----+------------+-------------------+--------------------+
|      name|   author|author_flair_text| unix_time| parent_id|ups|downs|retrieved_on|          subreddit|             comment|
+----------+---------+-----------------+----------+----------+---+-----+------------+-------------------+--------------------+
|t1_c298mtc|DorkyDude|             null|1312156800|  t3_j4zx3|  1|    0|  1427415708|fffffffuuuuuuuuuuuu|"$2, would you ta...|
|t1_c298mtg|  TrptJim|             null|1312156800|t1_c293dct|  2|    0|  1427415708|        motorcycles|Have you wrecked ...|
+----------+---------+-----------------+----------+----------+---+-----+------------+-------------------+--------------------+
only showing top 2 rows



In [10]:
df.cache()

DataFrame[name: string, author: string, author_flair_text: string, unix_time: string, parent_id: string, ups: bigint, downs: bigint, retrieved_on: bigint, subreddit: string, comment: string]

In [64]:
## here I check number of comments for each group
count_by_subreddit = df.select('comment','subreddit').groupBy('subreddit').agg(count('comment'))
count_by_subreddit_ = count_by_subreddit.collect()

In [66]:
## so we have almost 10000 subreddits/classes in all data
len(count_by_subreddit_)

9411

In [68]:
count_by_subreddit_df = spark.createDataFrame(count_by_subreddit_)
print(count_by_subreddit_df.sort('count(comment)').show(10))

+--------------+--------------+
|     subreddit|count(comment)|
+--------------+--------------+
|          NOLA|             1|
|          entp|             1|
|   testasextoy|             1|
|     herobrine|             1|
|      spelling|             1|
|     deardiary|             1|
|deletefacebook|             1|
|     pantyhose|             1|
|           WDP|             1|
|  neanderthals|             1|
+--------------+--------------+
only showing top 10 rows

None


In [69]:
## here i just choose randomly two classes for classification
print(count_by_subreddit_df.sort(col('count(comment)').desc()).show(10))

+-------------------+--------------+
|          subreddit|count(comment)|
+-------------------+--------------+
|          AskReddit|       1659075|
|               pics|        888750|
|         reddit.com|        497388|
|             gaming|        496350|
|              funny|        399773|
|fffffffuuuuuuuuuuuu|        380435|
|               IAmA|        364646|
|           politics|        286835|
|              trees|        260490|
|            atheism|        240203|
+-------------------+--------------+
only showing top 10 rows

None


In [12]:
## trees/gaming
df_filtered = df.filter((col("subreddit") == "gaming") | (col("subreddit") == "trees"))

In [70]:
## data has unequal amount of obs
print(df_filtered.groupby('subreddit').agg(count('comment')).show())

+---------+--------------+
|subreddit|count(comment)|
+---------+--------------+
|    trees|        260490|
|   gaming|        496350|
+---------+--------------+

None


In [14]:
## randomly choose 50000 obs for each class
df_gaming = df_filtered.select('comment','subreddit').filter(col("subreddit") == "gaming").orderBy(rand()).sample(withReplacement = False, fraction = 1.0, seed = 1000).take(50000)
df_trees = df_filtered.select('comment','subreddit').filter(col("subreddit") == "trees").orderBy(rand()).sample(withReplacement = False, fraction = 1.0, seed = 1000).take(50000)

In [15]:
df_gaming = spark.createDataFrame(df_gaming)
df_trees = spark.createDataFrame(df_trees)

In [16]:
## number of comments in gaming subreddit
df_gaming.select('comment').distinct().count()

49373

In [17]:
## number of comments in trees subreddit
df_trees.select('comment').distinct().count()

49136

In [19]:
## so now data has almost equal number of obs for each class
## make union of them for modeling
df_union = df_gaming.union(df_trees)

## quick check the number of subreddits. it should be only 2
df_union.select('subreddit').distinct().count()

2

In [20]:
df_union = df_union.withColumn('comment_length', length(df_union['comment']))

In [21]:
## length of comments not exactly the same but for this simple modeling that is fine
## I address this issue later for more robust modeling
df_union.groupby('subreddit').agg({'comment_length':'mean'}).show()

+---------+-------------------+
|subreddit|avg(comment_length)|
+---------+-------------------+
|    trees|           112.8109|
|   gaming|          169.44256|
+---------+-------------------+



In [22]:
## make 0 1 label
def labeler(line):
    if line == 'gaming':
        return 1
    else:
        return 0

labeler_udf = udf(lambda line: labeler(line), IntegerType())

df_union = df_union.withColumn('label', labeler_udf('subreddit'))

In [23]:
df_union.filter(col('label') == 0).show()

+--------------------+---------+--------------+-----+
|             comment|subreddit|comment_length|label|
+--------------------+---------+--------------+-----+
|PA nor Upstate NY...|    trees|            51|    0|
|Me and my entwife...|    trees|           147|    0|
|http://www.reddit...|    trees|            29|    0|
|i bet if we each ...|    trees|            64|    0|
|Find some ents to...|    trees|            32|    0|
|Happy birthday (a...|    trees|            32|    0|
|If you are going ...|    trees|            62|    0|
|shove edible down...|    trees|            94|    0|
|Some cats are cli...|    trees|            37|    0|
|gooners, laughed ...|    trees|            49|    0|
|You're 14? Damn.....|    trees|            57|    0|
|I'm guessing keep...|    trees|           182|    0|
|i downvoted you b...|    trees|           225|    0|
|Do you have pictu...|    trees|            82|    0|
|You in austin? Le...|    trees|            33|    0|
|Been a huge fan s...|    tr

In [24]:
## randomly order the data
df_union = df_union.orderBy(rand())

In [25]:
df_union.show(10)

+--------------------+---------+--------------+-----+
|             comment|subreddit|comment_length|label|
+--------------------+---------+--------------+-----+
|Me and a friend s...|   gaming|            39|    1|
|Ya but that's why...|    trees|           145|    0|
|Tip:  Use freezer...|   gaming|           197|    1|
|if someone GIVES ...|    trees|           253|    0|
|             REPOST |    trees|             7|    0|
|and buddies we sh...|    trees|            94|    0|
|...touchè, good sir.|    trees|            20|    0|
|It's so trippy ho...|    trees|            93|    0|
|By far the best u...|    trees|            55|    0|
|          **TOASTY**|   gaming|            10|    1|
+--------------------+---------+--------------+-----+
only showing top 10 rows



In [55]:
## nltk is needed only for stopwords
!pip install nltk

In [26]:
import nltk
import string
from nltk.corpus import stopwords
nltk.download('punkt')

## not needed for vader
punct_words=list(string.punctuation)
def remove_punct(x):
    cleaned_sent = [''.join(c for c in s if c not in punct_words) for s in x] 
    cleaned_sent = [s for s in cleaned_sent if s]
    return cleaned_sent
  
def remove_int(x):
    return [a for a in x if a.isdigit() is False]

## 14 comes from the notebook download_preprocess
def short_words(x):
    return [a for a in x if len(a) <= 14]

remove_int_udf = udf(lambda line: remove_int(line), ArrayType(StringType()))
shorten_words_udf = udf(lambda line: short_words(line), ArrayType(StringType()))
remove_punct_udf = udf(lambda line: remove_punct(line), ArrayType(StringType()))
token_count_udf = udf(lambda line: len(line), IntegerType())


tokenizer = Tokenizer(inputCol='comment', outputCol='comment_tokens')
df_tokened = tokenizer.transform(df_union.select('comment','label'))

df_tokened = df_tokened.withColumn('comment_tokens_cleaned', remove_int_udf("comment_tokens"))
df_tokened = df_tokened.withColumn('comment_tokens_cleaned', shorten_words_udf("comment_tokens_cleaned"))
df_tokened = df_tokened.withColumn('comment_tokens_cleaned', remove_punct_udf("comment_tokens_cleaned"))

stopword_remover = StopWordsRemover(inputCol='comment_tokens_cleaned', outputCol='comment_tokens_cleaned_final')
df_cleaned = stopword_remover.transform(df_tokened)

df_cleaned = df_cleaned.withColumn("comment_cleaned_count_final", token_count_udf(col('comment_tokens_cleaned_final')))


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\navruzbek\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [27]:
df_cleaned.show(10)

+--------------------+-----+--------------------+----------------------+----------------------------+---------------------------+
|             comment|label|      comment_tokens|comment_tokens_cleaned|comment_tokens_cleaned_final|comment_cleaned_count_final|
+--------------------+-----+--------------------+----------------------+----------------------------+---------------------------+
|Me and a friend s...|    1|[me, and, a, frie...|  [me, and, a, frie...|        [friend, joining,...|                          3|
|Ya but that's why...|    0|[ya, but, that's,...|  [ya, but, thats, ...|        [ya, thats, good,...|                         17|
|Tip:  Use freezer...|    1|[tip:, , use, fre...|  [tip, use, freeze...|        [tip, use, freeze...|                         19|
|if someone GIVES ...|    0|[if, someone, giv...|  [if, someone, giv...|        [someone, gives, ...|                         23|
|             REPOST |    0|            [repost]|              [repost]|                  

In [28]:
count_vec = CountVectorizer(inputCol='comment_tokens_cleaned_final', outputCol='features')
count_vec_df = count_vec.fit(df_cleaned).transform(df_cleaned)

In [29]:
count_vec_df.show(2)

+--------------------+-----+--------------------+----------------------+----------------------------+---------------------------+--------------------+------+
|             comment|label|      comment_tokens|comment_tokens_cleaned|comment_tokens_cleaned_final|comment_cleaned_count_final|            features|label_|
+--------------------+-----+--------------------+----------------------+----------------------------+---------------------------+--------------------+------+
|Me and a friend s...|    1|[me, and, a, frie...|  [me, and, a, frie...|        [friend, joining,...|                          3|(58706,[143,446,5...|     1|
|Ya but that's why...|    0|[ya, but, that's,...|  [ya, but, thats, ...|        [ya, thats, good,...|                         17|(58706,[8,17,28,4...|     0|
+--------------------+-----+--------------------+----------------------+----------------------------+---------------------------+--------------------+------+
only showing top 2 rows



In [30]:
model_data = count_vec_df.select(['label','comment_cleaned_count_final','features'])

In [31]:
## assemble all features in vec 
vec_assembler = VectorAssembler(inputCols=['features','comment_cleaned_count_final'], outputCol='feature_vec')
model_data = vec_assembler.transform(model_data)

In [32]:
## split data
train_df, test_df = model_data.randomSplit([0.8,0.2])

In [33]:
## train the model
logistic_reg = LogisticRegression(featuresCol='feature_vec', labelCol='label').fit(train_df)

In [34]:
## make predictions
evaluation = logistic_reg.evaluate(test_df).predictions

In [59]:
evaluation.cache()
false_pos = evaluation[(evaluation.label == 0) & (evaluation.prediction == 1)].count()
true_pos = evaluation[(evaluation.label == 1) & (evaluation.prediction == 1)].count()
false_neg = evaluation[(evaluation.label == 1) & (evaluation.prediction == 0)].count()
true_neg = evaluation[(evaluation.label == 0) & (evaluation.prediction == 0)].count()


In [60]:
## get the accuracy
accuracy = float((true_pos + true_neg) / (evaluation.count()))

In [63]:
## pretty high accuracy 
## after training data on 100K obs
print(accuracy)

0.8979111356008616
