In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as functions

spark = SparkSession.builder\
    .master("spark://192.168.2.59:7077")\
    .appName("test_app")\
    .config("spark.driver.memory","4096m")\
    .config("spark.executor.memory", "2048m")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
rc = spark.read.format('json').load('hdfs:///reddit_comments')

                                                                                

In [3]:
rc.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- removal_reason: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- score_hidden: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [4]:
df_subreddits = rc.groupBy('subreddit')
df_subreddits_count = df_subreddits.count().orderBy("count", ascending=False)
df_subreddits_count.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+----------+------+
| subreddit| count|
+----------+------+
| AskReddit|875973|
|      pics|487514|
|reddit.com|325653|
|    gaming|244828|
|  politics|243931|
+----------+------+
only showing top 5 rows



                                                                                

In [None]:
df_askreddit = rc.filter(rc["subreddit"] == "AskReddit")

df_askreddit_words = df_askreddit.withColumn('word',(functions.explode(functions.split(functions.col('body'), ' '))))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

lowercase_askreddit_words = df_askreddit_words.withColumn("word", functions.lower(df_askreddit_words["word"]))
lowercase_askreddit_words.show()

In [None]:
df_askreddit = rc.filter(rc["subreddit"] == "pics")

df_pics_words = df_askreddit.withColumn('word', functions.explode(functions.split(functions.col('body'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

lowercase_pics_words = df_pics_words.withColumn("word", functions.lower(df_pics_words["word"]))
lowercase_pics_words.show()

In [None]:
profanity= ["asshole","bitch","bloody","bollocks","bugger","bullshit","bitch","cock","cocksucker","coonass","cornhole","cracker"
            ,"cunt","dick","dickhead","faggot","fuck","motherfucker","nigga","nigger","paki","pussy","shit","slut","tranny"
            "twat","wanker"]

askreddit_profanity = df_askreddit_words.filter(df_askreddit_words["word"].isin(profanity))

total_profanity_askreddit = askreddit_profanity.select(functions.sum("count"))

total_profanity_askreddit.show()

In [None]:
pics_profanity = df_pics_words.filter(df_pics_words["word"].isin(profanity))

total_profanity_pics = pics_profanity.select(functions.sum("count"))

total_profanity_pics.show()

In [None]:

df_subreddit_words = df_subreddit.withColumn('word',(functions.explode(functions.split(functions.col('body'), ' '))))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

lowercase_askreddit_words = df_askreddit_words.withColumn("word", functions.lower(df_askreddit_words["word"]))
lowercase_askreddit_words.show()

askreddit_profanity = df_askreddit_words.filter(df_askreddit_words["word"].isin(profanity))

total_profanity_askreddit = askreddit_profanity.select(functions.sum("count"))

In [5]:
df_small = rc.filter(rc["subreddit"] == "trees")

In [6]:
# Defines function that counts number of bad words

# Bad words
profanity= ["asshole","bitch","bloody","bollocks","bugger","bullshit","bitch","cock","cocksucker","coonass","cornhole","cracker"
            ,"cunt","dick","dickhead","faggot","fuck","motherfucker","nigga","nigger","paki","pussy","shit","slut","tranny"
            "twat","wanker"]

# Function to count nr profanity in a single comment
def count_profanity(comment):
    count = 0
    for word in comment.split():
        if word.lower() in profanity:
            count += 1
    return count
    
# User defined function boilerplate code
udf_count_profanity = functions.udf(lambda x: count_profanity(x))


In [7]:
# Use withColumn to create a new column with long color names
d4 = df_small.withColumn("bad_words", udf_count_profanity(functions.col("body")))
d4.sort('bad_words', ascending=False).select("subreddit", "bad_words", "author").show(5)




+---------+---------+---------------+
|subreddit|bad_words|         author|
+---------+---------+---------------+
|    trees|        7|      natenwman|
|    trees|        6|      [deleted]|
|    trees|        6|  DidntClickGuy|
|    trees|        6|        Adhamio|
|    trees|        5|aDildoAteMyBaby|
+---------+---------+---------------+
only showing top 5 rows



                                                                                

In [16]:
# Removes unecessary columns
remove_cols = ["archived","author_flair_css_class","author", "author_flair_text", "controversiality", "created_utc", \
              "distinguished", "downs", "edited", "gilded", "id", "link_id", "name", "parent_id", "removal_reason", \
              "retrieved_on", "score", "score_hidden", "subreddit_id", "ups"]


df_slim = rc.drop(*remove_cols)
df_slim.printSchema()


root
 |-- body: string (nullable = true)
 |-- subreddit: string (nullable = true)



In [33]:
# Count nr bad words for all comments and convert column to int
from pyspark.sql.types import IntegerType

d1 = df_slim.withColumn("bad_words", udf_count_profanity(functions.col("body")))
d2 = d1.drop("body")
d3 = d2.selectExpr("subreddit", "cast(bad_words as int) bad_words")
d3.printSchema()


root
 |-- subreddit: string (nullable = true)
 |-- bad_words: integer (nullable = true)



In [38]:
# Sort by each subreddit and add up bad words 
d4 = d3.groupBy("subreddit").sum("bad_words").sort('sum(bad_words)', ascending=False).show(5)

[Stage 16:>                                                         (0 + 1) / 1]

+----------+--------------+
| subreddit|sum(bad_words)|
+----------+--------------+
| AskReddit|         39532|
|      pics|         17017|
|reddit.com|         14970|
|  politics|         12873|
|       WTF|         10052|
+----------+--------------+
only showing top 5 rows



                                                                                

hello world
