In [1]:
# # Reddit Comments Analysis
# ### Download and clean data. Estimate polarity scores.

# Reddit monthly comments are zipped and available for some modeling.
# URL for this is as following https://files.pushshift.io/reddit/comments/.
# Since databricks environment is limited to 10GB, only smaller files are downloaded.

# First, I download 2011 September comments.

In [2]:
## nltk is required for sentiment analysis
!pip install nltk

In [3]:
## download important libs

from pyspark import SparkContext
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import json
import bz2

In [4]:
# nltk.download('punkt')
# nltk.download('stopwords')
nltk.download('vader_lexicon')
# nltk.download('wordnet')

In [5]:
## this will save zipped file in temp folder
!wget 'https://files.pushshift.io/reddit/comments/RC_2011-09.bz2'

In [6]:
## import bz2 and unzip the file
with bz2.open("RC_2011-09.bz2", "rb") as f:
    content = f.read()

In [7]:
## make txt file and write the content
f = open("RC_comment_09.txt", "wb")
f.write(content)
f.close()

In [8]:
## move files from driver to dbfs file storage
## data is moved to databrick's local storage for further processing

dbutils.fs.mv("file:/databricks/driver/RC_comment_09.txt", 
              "dbfs:/tmp/RC_2011-09.txt")  

In [9]:
## create rdd to further work on it
rdd = sc.textFile("dbfs:/tmp/RC_2011-09.txt")

In [10]:
## data is list of dictionaries
print(rdd.take(1))

In [11]:
## Here I start taking a subset

In [12]:
## txt file shows that it is txt of json 
## keep only unix time information and comments
## also data that has 'deleted' info has been removed

## rdd keys are comment ids give under "name"
rdd_subset = rdd.map(lambda line : (json.loads(line)['name'],                                    
                                    json.loads(line)['author'],
                                    json.loads(line)['author_flair_text'],
                                    json.loads(line)['created_utc'],
                                    json.loads(line)['parent_id'],
                                    json.loads(line)['ups'],
                                    json.loads(line)['downs'],
                                    json.loads(line)['retrieved_on'],
                                    json.loads(line)['subreddit'],
                                    json.loads(line)['body'],)
                    ).filter(lambda line: line if '[deleted]' not in line[9] else None)

df = spark.createDataFrame(rdd_subset).toDF("name","author",
                                                       "author_flair_text",
                                                       "unix_time","parent_id",
                                                       "ups","downs","retrieved_on",
                                                       "subreddit", "comment")

In [13]:
## first i loop through comments and find average len of words in every comment
rdd_commments = rdd_subset.map(lambda line: line[-1])
rdd_commments.cache()

rdd_comments_ave_len_words = rdd_commments.map(lambda line: 
                  sum([len(part) for part in line.split(' ')]) / len(line.split(' ')) 
                 )


ave_string_mean = rdd_comments_ave_len_words.mean()
ave_string_std = rdd_comments_ave_len_words.sampleStdev()

## second i loop through comments and find longest len of words
## this will help to know the len of long strings and then I remove 
## long strings that do not have any semantic value
## such as urls etc.
rdd_comments_longest_len_words = rdd_commments.map(lambda line: 
                  max( 
                    [len(part) for part in line.split(' ')]
                  ))
long_string_mean = rdd_comments_longest_len_words.mean()
long_string_std = rdd_comments_longest_len_words.sampleStdev()

## third loop through and see if comments just keep only numbers or just None
rdd_comments_none = rdd_commments.map(lambda line: True if type(line) is type(None) else False)
rdd_comments_int = rdd_commments.map(lambda line: True if type(line) is type(int()) else False)

In [14]:
## average string len 
## this looks a little bit longer than 4.7 char
## as explained in this link below, English words on average has 4.7 char
## http://norvig.com/mayzner.html
## so I will remove very long strings 
ave_string_mean

In [15]:
ave_string_std

In [16]:
## Long strings are too long so their average as well
## i use this 14 as a cutoff length to trim tokens later on in short_words function below
long_string_mean

In [17]:
long_string_std

In [18]:
## all values are false
## so there is no comment that is only int
any(rdd_comments_int.collect())

In [19]:
## all values are false
## so there is no comment that is only None
any(rdd_comments_none.collect())

In [20]:
## now i go back to df and start shortening and tokenizing comments
## subset looks fine
## it has unix time, comments and all other info that is needed
print(df.show(1))

In [21]:
## here I create several functions for processing
  
from pyspark.sql.types import ArrayType, StringType, FloatType, IntegerType
from pyspark.sql.functions import udf
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import RegexTokenizer
analyzer = SentimentIntensityAnalyzer()

def remove_int(x):
    return [a for a in x if a.isdigit() is False]

def short_words(x):
    return [a for a in x if len(a) <= 14]
    
def join_tokens(x):
    return [" ".join(x)]
  
def sentiment_score(x):
    vs = analyzer.polarity_scores(x[0])
    return vs['neg'], vs['neu'], vs['pos'], vs['compound']


In [22]:
regtokenizer = RegexTokenizer(inputCol='comment', outputCol='comment_tokens', toLowercase = False)
df_regtokened = regtokenizer.transform(df)

In [23]:
remove_int_udf = udf(lambda line: remove_int(line), ArrayType(StringType()))
shorten_words_udf = udf(lambda line: short_words(line), ArrayType(StringType()))
join_tokens_udf = udf(lambda line: join_tokens(line), ArrayType(StringType()))
sentiment_score_udf = udf(lambda line: sentiment_score(line), ArrayType(StringType()))

In [24]:
df_tokened = df_regtokened.withColumn('comment_tokens_cleaned', remove_int_udf("comment_tokens"))

df_tokened = df_tokened.withColumn('comment_tokens_cleaned', shorten_words_udf("comment_tokens_cleaned"))
df_tokened = df_tokened.withColumn('comment_tokens_joined', join_tokens_udf("comment_tokens_cleaned"))

df_tokened = df_tokened.withColumn('comment_sentiment', sentiment_score_udf("comment_tokens_joined"))
df_tokened = df_tokened.withColumn('sentiment_neg', df_tokened.comment_sentiment[0]
                                  ).withColumn('sentiment_neu', df_tokened.comment_sentiment[1]
                                  ).withColumn('sentiment_pos', df_tokened.comment_sentiment[2]
                                  ).withColumn('sentiment_com', df_tokened.comment_sentiment[3])


In [25]:
df_tokened.select('comment_sentiment', 'sentiment_neg', 'sentiment_neu','sentiment_pos', 'sentiment_com').show(10, False)

In [26]:
from pyspark.sql.types import FloatType

def find_average_len_words(line):
    return sum([len(part) for part in line.split(' ')]) / len(line.split(' '))
  
find_average_len_words_udf = udf(lambda line: find_average_len_words(line[0]), FloatType())

df_tokened_check = df_tokened.withColumn('comment_tokens_mean', find_average_len_words_udf("comment_tokens_joined"))

In [27]:
from pyspark.sql.functions import mean, stddev, col

df_tokened_check.select(mean(col('comment_tokens_mean')), stddev(col('comment_tokens_mean'))).show()

In [28]:
## now that avg comment token is close to 4.7
## http://norvig.com/mayzner.html

## also stdev is reduced considerebly

In [29]:
# VADER relies on several key words in the sentence
# removing them would alter the polarity scores
# - conjuctions (no stopword removal)
# - degree modifiers (no lemmatizing)
# - capitalization (no lowercasing)
# - punctuation (no punctuation removal)

# Still I will have functions for each case that could be used later on

In [30]:
# Here I do a simple check.
# To see how urls in the string will effect scores.
# Urls and long strings wont effect. They will be removed to keep our data in smaller size.

In [31]:
print(analyzer.polarity_scores("at Least wait a bit before bad [reposting]"))

In [32]:
print(analyzer.polarity_scores("at least wait a bit before bad [reposting](http://www.reddit.com/r/woahdude/comments/jyxly/mighty_morphing_power_art_gif/)."))

In [33]:
# Now I check how numbers alter polarity scores.
# Numbers affect our scores. But numbers are not useful since they have no semantic value. They will be removed. 

In [34]:
print(analyzer.polarity_scores("at least wait a bit before bad [reposting]"))

In [35]:
print(analyzer.polarity_scores("at Least wait a bit before bad [reposting] 123"))

In [36]:
# So only removed 
# - very long words
# - numbers 

In [37]:
# Data in rdd are just rows.
# Each function that is applied on rdd goes through each row.
# In rdd line[0] is unix time and line[1] is comments in string.
# Functions are applied by lambda and only uses line[1] since it contains comments.

In [38]:
# ## tokenize for sentences
# ## rdd rows are given in the tuple
# ## rdd_subset contains comments in the last part of the tuple
# ## so I return all elements until the last one and modify the last element which is reddit comment
# sent_rdd = rdd_subset.map(lambda line: (line[:9], 
#                                         tokenize_sent(line[9])))

# ## tokenize for words
# ## now sent_rdd is changed and each row is tuple of tuple and list together
# ## tuple's second element is list which is modified comments
# ## tuple's first element is tuple of line[:9] from the first step
# word_rdd = sent_rdd.map(lambda line: (line[0], 
#                                       tokenize_word(line[1])))

# ## remove int
# removed_int_rdd = word_rdd.map(lambda line: (line[0], 
#                                              remove_int(line[1])))

# ## remove long tokens
# shortened_rdd = removed_int_rdd.map(lambda line: (line[0], 
#                                                   short_words(line[1])))

# ## join cleaned tokens for sentiment analysis
# joined_rdd = shortened_rdd.map(lambda line: (line[0], 
#                                              join_tokens(line[1])))

# ## sentiments are added as well 
# sentiment_rdd = joined_rdd.map(lambda line: (line[0], line[1], 
#                                              sentiment_score(line[1])))

# ## sentiment scores are wrapped in tuples
# ## so now each row is tuple + list + tuple
# ## here i open last tuple
# rdd_processed = sentiment_rdd.map(lambda line: (line[0], line[1][0], 
#                                                 line[2][0], line[2][1], 
#                                                 line[2][2], line[2][3]))

In [39]:
# ## here I just open nested tuples for each row/line tuple 
# rdd_processed_ = rdd_processed.map(lambda line: (line[0][0], line[0][1], line[0][2],
#                                                  line[0][3], line[0][4], line[0][5],
#                                                  line[0][6], line[0][7], line[0][8], 
#                                                  line[1], line[2], line[3], line[4], line[5]))

In [40]:
# ## now our data is as following, just list
# print(rdd_processed_.take(1))

In [41]:
# ## here I create a dataframe from rdd and give column names
# df_subset = spark.createDataFrame(rdd_processed_).toDF("name","author",
#                                                        "author_flair_text",
#                                                        "unix_time","parent_id",
#                                                        "ups","downs","retrieved_on",
#                                                        "subreddit", "comment", "neg", 
#                                                        "neu", "pos", "com")
# # print(df_subset.show(1, truncate=False))


In [42]:
# display(df_subset.show(1, truncate=False))

In [43]:
## save the df
df_subset.write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/tmp/df_09_subset.csv")

In [44]:
## check if the data is saved
## it will be used by another notebook 
%fs ls dbfs:/FileStore/tmp

path,name,size
dbfs:/FileStore/tmp/df_09_subset.csv/,df_09_subset.csv/,0
