In [234]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# Initialize a Spark Session. The App name refers to Reddit, since Reddit data is being processed. 
# The Spark master is located on host 8080, so this is denoted in the file.
# The Session should be created if it doesn't exist yet, and otherwise get. 
spark = SparkSession.builder.appName("RedditData").master("spark://spark-master:7077").getOrCreate()

Next, the data will be read from the reddit_vm csv file and saved to df. Since the csv file uses a header, this will be denoted as true in the function.

In [235]:
reddit_data = spark.read.format("bigquery").load("nodal-strength-325610.assignment2.reddit_data")

# Setup hadoop fs configuration
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopF5")

In [236]:
reddit_data.show(5)

+--------------------+-----+------+--------------------+---------+-------------+--------------------+-------------------+
|               title|score|    id|                 url|comms_num|      created|                body|          timestamp|
+--------------------+-----+------+--------------------+---------+-------------+--------------------+-------------------+
|FACT: vaccines ca...|    0|mhma6j|https://www.reddi...|       10|1.617246485E9|You guys don't kn...|2021-04-01 06:08:05|
|Are vaccines made...|    0|f05tlq|https://www.reddi...|       15|1.581052672E9|Is it more likely...|2020-02-07 07:17:52|
|Saying vaccines c...|    0|cb0ebr|https://www.reddi...|       22|  1.5626763E9|It may or may not...|2019-07-09 15:45:00|
|I almost died of ...|    0|bu2j8m|https://www.reddi...|       15|1.559063033E9|my mom gave me a ...|2019-05-28 20:03:53|
|Vacvimes cause Au...|    0|bigdzp|https://www.reddi...|       25|1.556487377E9|Everyone on this ...|2019-04-29 00:36:17|
+--------------------+--

# Exploratory data analysis
Next, some exploratory data analysis will be performed to understand the data better.

In [237]:
print("Number of reddit posts: {}\n".format(reddit_data.count()))

Number of reddit posts: 1597



Next, we will check if there are are posts without any text in the body. These cannot be used for sentiment analysis, and should be removed in the data cleaning step.

In [238]:
reddit_data.filter("body is not null").show()

+--------------------+-----+------+--------------------+---------+-------------+--------------------+-------------------+
|               title|score|    id|                 url|comms_num|      created|                body|          timestamp|
+--------------------+-----+------+--------------------+---------+-------------+--------------------+-------------------+
|FACT: vaccines ca...|    0|mhma6j|https://www.reddi...|       10|1.617246485E9|You guys don't kn...|2021-04-01 06:08:05|
|Are vaccines made...|    0|f05tlq|https://www.reddi...|       15|1.581052672E9|Is it more likely...|2020-02-07 07:17:52|
|Saying vaccines c...|    0|cb0ebr|https://www.reddi...|       22|  1.5626763E9|It may or may not...|2019-07-09 15:45:00|
|I almost died of ...|    0|bu2j8m|https://www.reddi...|       15|1.559063033E9|my mom gave me a ...|2019-05-28 20:03:53|
|Vacvimes cause Au...|    0|bigdzp|https://www.reddi...|       25|1.556487377E9|Everyone on this ...|2019-04-29 00:36:17|
|Wait Wakefield sa...|  

In [239]:
print("Number of reddit posts that have null in the body: {}\n".format(reddit_data.filter("body is null").count()))

Number of reddit posts that have null in the body: 374



# Cleaning
First, the rows with null values for the body will be removed. Next, unnecessary columns will be removed.

In [240]:
reddit_data = reddit_data.filter("body is not null")

In [242]:
# reddit_data = reddit_data.drop('timestamp')
reddit_data = reddit_data.drop('comms_num')
reddit_data = reddit_data.drop('url')
reddit_data = reddit_data.drop('created')

Create a special dataframe with only the text in the body, to perform sentiment analysis on.

In [243]:
reddit_data_text = reddit_data.select('body','timestamp')

Add an index to the body text.

In [244]:
from pyspark.sql.functions import *
reddit_data_text = reddit_data_text.select("*").withColumn("id", monotonically_increasing_id()).select("id","body","timestamp")

In [245]:
reddit_data_text.show(5)

+---+--------------------+-------------------+
| id|                body|          timestamp|
+---+--------------------+-------------------+
|  0|You guys don't kn...|2021-04-01 06:08:05|
|  1|Is it more likely...|2020-02-07 07:17:52|
|  2|It may or may not...|2019-07-09 15:45:00|
|  3|my mom gave me a ...|2019-05-28 20:03:53|
|  4|Everyone on this ...|2019-04-29 00:36:17|
+---+--------------------+-------------------+
only showing top 5 rows



# Sentiment analysis

In [246]:
!pip3 install textblob



In [247]:
from textblob import TextBlob

In [248]:
# There was an issue with the texblob module such that it did not allow for application of an udf to an entire column. 
# To fix this, we converted the pyspark df to a pandas df and simply appended a list of sentiments to the df.

from pyspark.sql.types import *

pandas_df = reddit_data_text.toPandas()
sentiment_list = list()

for index, row in pandas_df.iterrows():
    sentiment = row['body']
    sentiment_list.append(TextBlob(sentiment).sentiment[0])

sentiments_df = spark.createDataFrame(sentiment_list, FloatType()) \
                    .select("*") \
                    .withColumn("id", monotonically_increasing_id()) 

# Join dataframes
reddit_sentiments = reddit_data_text.join(sentiments_df, "id") \
                    .withColumnRenamed("value","sentiment")
# Peak
reddit_sentiments.show(10)

+---+--------------------+-------------------+------------+
| id|                body|          timestamp|   sentiment|
+---+--------------------+-------------------+------------+
|  0|You guys don't kn...|2021-04-01 06:08:05|        -0.3|
|  1|Is it more likely...|2020-02-07 07:17:52|-0.060714286|
|  2|It may or may not...|2019-07-09 15:45:00|-0.084821425|
|  3|my mom gave me a ...|2019-05-28 20:03:53|        -0.8|
|  4|Everyone on this ...|2019-04-29 00:36:17| -0.17045455|
|  5|Can't wait to see...|2019-04-25 09:26:52|  0.10052632|
|  6|and yet...
https:...|2019-03-21 20:13:58|         0.5|
|  7|The Association o...|2019-03-11 03:01:53|  0.14444445|
|  8|
In 1933, Dr. Art...|2019-03-11 00:51:42| 0.018333333|
|  9|Ok so a worker at...|2021-09-07 19:30:35|        0.13|
+---+--------------------+-------------------+------------+
only showing top 10 rows



### Grouping and binning the results 

In [257]:
# Grouping the average sentiments by month and year to get a better and more consise, aggregated overview

reddit_grouped_sentiments = reddit_sentiments.groupBy(year("timestamp"),month("timestamp")) \
                                .agg(avg("sentiment"), count("*"))

# Creating bins, here 0 is neutral, <0 is negative and >0 is positive. This will help visualisation.
def categorizer(s):
    if s == -1:
        return "-1"
    elif -1 < s <= -0.5:
        return "-0.75"
    elif -0.5 < s < 0:
        return "-0.25"
    elif s == 0:
        return "0"
    elif 0 < s <0.5:
        return "0.25"
    elif 0.5<= s <1:
        return "0.75"
    else:
        return "1"
        
bin_udf = udf(categorizer, StringType() )
bin_df = reddit_grouped_sentiments.withColumn("bin", bin_udf("avg(sentiment)"))
reddit_final_sentiment_df = bin_df.select("*") \
                                .withColumnRenamed("year(timestamp)","year") \
                                .withColumnRenamed("month(timestamp)","month") \
                                .withColumnRenamed("count(1)","record_count") \
                                .withColumnRenamed("avg(sentiment)","average_sentiment") 
reddit_final_sentiment_df.show()

+----+-----+--------------------+------------+-----+
|year|month|   average_sentiment|record_count|  bin|
+----+-----+--------------------+------------+-----+
|2019|   10| 0.12503105815913942|          36| 0.25|
|2014|    4| 0.12171957641839981|           1| 0.25|
|2020|    6|                 0.0|           1|    0|
|2019|    5|-0.00840922754723...|          10|-0.25|
|2021|    8| 0.08750000223517418|           4| 0.25|
|2021|    6|                 0.0|           1|    0|
|2019|    3|0.021486500636316262|          26| 0.25|
|2021|    5|0.016446514447268686|          19| 0.25|
|2021|   10|  0.5767857134342194|           2| 0.75|
|2020|    3|0.012194234589558272|          42| 0.25|
|2019|    8| 0.08687666524201632|          18| 0.25|
|2021|   11|-0.31874999962747097|           4|-0.25|
|2019|    6| 0.15000000099341074|           3| 0.25|
|2021|    9|-0.04941359721124172|          12|-0.25|
|2019|    1| -0.0360704114039739|           3|-0.25|
|2019|    2|-0.07155612111091614|           1|

The next step is to save the sentiment analysis df to BigQuery.

In [258]:
# Use the Cloud Storage bucket for temporary BigQuery export 
bucket = "elise_ass2_temp"
spark.conf.set('temporaryGcsBucket', bucket)

# Saving the data to BigQuery
reddit_final_sentiment_df.write.format('bigquery') \
  .option('table', 'nodal-strength-325610.assignment2.reddit_sentiment_v2') \
  .mode("overwrite") \
  .save()


## Word Frequency

In [251]:
import pyspark.sql.functions as f
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenize
tokenizer = Tokenizer(inputCol='body', outputCol='body_tokenized')
reddit_data_text_tokenized = tokenizer.transform(reddit_data_text).select('id', 'body_tokenized')

# Remove unnecessary words
unncessary_words_list = ["like", "know", "get", 'one', 'think', 'cause', 'say', 'even', "don't", 'got', 'also', 'good', 'said',
                        'make', 'it.', 'first', 'many', 'still', 'actually', "don't", 'want', 'read', 'print', 'vaccine',
                        'vaccines', 'vaccinated', 'vaccination', 'may', 'saying', 'point', 'virus', 'never', 'much', 'see',
                        '1', 'way', 'wrong', 'really', 'used', 'well', 'getting', 'take', 'every', 'go', '>'] 
unncessary_words_list.extend(StopWordsRemover().getStopWords())
remover = StopWordsRemover(inputCol='body_tokenized', outputCol='body_clean', stopWords=unncessary_words_list)
reddit_data_text_no_stopwords = remover.transform(reddit_data_text_tokenized).select(['id', 'body_clean'])

# Return to regular strings
reddit_data_text_no_stopwords = reddit_data_text_no_stopwords.withColumn("body_clean", 
                                                                         concat_ws(",", "body_clean"))

# Count the words
reddit_count_df = reddit_data_text_no_stopwords.withColumn('body_clean', f.explode(f.split(f.col('body_clean'), ',')))\
    .groupBy('body_clean')\
    .count()\
    .sort('count', ascending=False)\
    .filter(f.col('body_clean') != "")\
    .withColumnRenamed("body_clean","word")

reddit_count_df.show()

+------------+-----+
|        word|count|
+------------+-----+
|      people|  251|
|     measles|  175|
|       study|  143|
|      autism|  139|
|    children|  109|
|       don’t|   87|
|      immune|   83|
|    immunity|   83|
|     mercury|   82|
|        time|   68|
|         mmr|   68|
|         cdc|   68|
|unvaccinated|   67|
|         flu|   65|
|        kids|   62|
|        risk|   61|
|     studies|   60|
|       years|   59|
|      health|   59|
|     medical|   58|
+------------+-----+
only showing top 20 rows



In [253]:
# Save the word count
reddit_count_df.write.format('bigquery') \
  .option('table', 'nodal-strength-325610.assignment2.reddit_word_count2') \
  .mode("overwrite") \
  .save()

At last, the spark context should be stopped.

In [259]:
# Stop the spark context
spark.stop()