In [1]:
from pyspark.sql import SparkSession
from datetime import datetime, timezone
import re
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from operator import add


spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.242:7077") \
        .appName("reddit_analysis_t11")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("INFO")

In [2]:
df = spark_session.read.json("hdfs://192.168.2.242:9000/user/reddit_data/RC_2007-03")
print("Input Sample:\n\n", df.take(2))
print("\nNumber of partitions:", df.rdd.getNumPartitions())
print("row entries count:", df.count())

print("\nReddit post schema:\n")
df.printSchema()

# test converting timestamp
first_timestamp = datetime.fromtimestamp(df.first()["created_utc"], tz=timezone.utc)
last_timestamp = datetime.fromtimestamp(df.tail(1)[0]["created_utc"], tz=timezone.utc)

print("First Timestamp of this batch: ", first_timestamp)
print("Last Timestamp of this batch", last_timestamp)


start_ts = int(datetime(2007, 3, 1, 0, 0).timestamp())
end_ts = int(datetime(2007, 3, 10, 0, 0).timestamp())
date_range = range(start_ts, end_ts)
print("start_ts:", start_ts)
print("end_ts:", end_ts)

Input Sample:

 [Row(author='[deleted]', author_flair_css_class=None, author_flair_text=None, body='[deleted]', controversiality=0, created_utc=1172707213, distinguished=None, edited='false', gilded=0, id='c174td', link_id='t3_1742j', parent_id='t3_1742j', retrieved_on=1473819365, score=15, stickied=False, subreddit='programming', subreddit_id='t5_2fwo', ups=15), Row(author='jvance', author_flair_css_class=None, author_flair_text=None, body='Flouride, good.\r\n\r\nHolistic Dentistry wackos and the Weston A. Price Foundation, bad.', controversiality=0, created_utc=1172707269, distinguished=None, edited='false', gilded=0, id='c174tg', link_id='t3_170ie', parent_id='t1_c174mz', retrieved_on=1473819365, score=3, stickied=False, subreddit='reddit.com', subreddit_id='t5_6', ups=3)]

Number of partitions: 4
row entries count: 112444

Reddit post schema:

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = 

In [3]:
# drop everything else
in_range = df[['created_utc', 'body']]
print(type(df.first()["created_utc"]))
in_range = in_range.filter((end_ts >= in_range.created_utc) & (in_range.created_utc >= start_ts))
in_range = in_range.filter(df.body != '[deleted]')
in_range = in_range.filter(df.body != '[removed]')
print(type(in_range))
print("count after filter on date:", in_range.count())
print(df.created_utc)

<class 'int'>
<class 'pyspark.sql.dataframe.DataFrame'>
count after filter on date: 26201
Column<'created_utc'>


In [4]:
def normalize(string):
    return  re.sub(r'[^A-Za-z ]', '', string.lower())

udf_normalize = udf(normalize, StringType())

in_range_norm = in_range.withColumn('body', udf_normalize(col('body')))
print(in_range_norm.select("body").take(10))

#posts = in_range.select("body").take(10)
#single_post = posts[0]
# normalize, remove special signs and lower case strings
#list_posts = [re.sub(r'[^A-Za-z ]', '', post["body"].lower()) for post in posts]
#print(list_posts[0:10])


[Row(body='flouride goodholistic dentistry wackos and the weston a price foundation bad'), Row(body='clerks anyone'), Row(body='its even surprising that you had to explain your commentthats why i say that some atheists and such are far more emotional and intolerant than religious zealots'), Row(body='reminds me of the can you build a website for me cause im your friend post recently i get that a lot personally'), Row(body='picture'), Row(body='your laptop is running directly off of the charger and not from the battery'), Row(body='call me when he fixes the line wrap on that site'), Row(body='i think its pretty obvious its not dairy fat that helps its junk put into defatted dairy products like steroids that hurts'), Row(body='whats next newspaper vending machines'), Row(body='voted down for index abuse')]


In [66]:
pres_cand = ["donald trump", "hillary clinton", "ronald reagan"]
print(in_range_norm.take(1))
    
data_rdd = in_range_norm.rdd\
                        .map(lambda line: tuple(set([cand for cand in pres_cand if cand in line[1]])))\
                        .filter(lambda match: match != ())\
                        .map(lambda word: (word, 1)).reduceByKey(add)

print(data_rdd.collect())


[Row(created_utc=1172707269, body='flouride goodholistic dentistry wackos and the weston a price foundation bad')]
[(('ronald reagan',), 2), (('hillary clinton',), 10), (('donald trump',), 1)]
