In [1]:
spark

In [2]:
#import statements
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.types import IntegerType, StructType, StructField, StringType
import time

In [3]:
spark = SparkSession.builder.appName('Final_project').getOrCreate()
#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '6'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])

sc = spark.sparkContext

In [4]:
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.jars',
  'local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/*'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.sql.queryExecutionListeners',
  'com.cloudera.spark.lineage.NavigatorQueryListener'),
 ('spark.executor.memory', '5g'),
 ('spark.lineage.log.dir', '/var/log/spark/lineage'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'md01.rcc.local,md02.rcc.local'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.executorEnv.PYTHONPATH',
  '/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.10.7-src.zip:/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/py4j-0.10.7-src.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/pyspark.zip'),

In [5]:
#Datapath
path_comments = "/user/rroongseang/bigdata/comments/"
path_users = "/user/rroongseang/bigdata/users/"

In [6]:
#Read data
df = spark.read.json(path_comments+"politics_json*")
users = spark.read.csv(path_users+"RA.2019-09.csv", inferSchema=True, header=True)
botusers = sqlContext.read.csv(path_users+'bot_userdata.csv',header=True)

# Data Cleaning

In [7]:
df.count()

6420881

In [8]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+--------+------+----------------------+-----------------+----+----------------+-----------+-------------+-------+------+---+-------+-------+---------+------------+-----+------------+---------+------------+-------+
|archived|author|author_flair_css_class|author_flair_text|body|controversiality|created_utc|distinguished|  downs|gilded| id|link_id|   name|parent_id|retrieved_on|score|score_hidden|subreddit|subreddit_id|    ups|
+--------+------+----------------------+-----------------+----+----------------+-----------+-------------+-------+------+---+-------+-------+---------+------------+-----+------------+---------+------------+-------+
| 4921465|     0|               5610956|          5600126|   0|               0|          0|      6288640|6260155|     0|  0|      0|6260155|        0|        1370|    0|     5702021|        0|           0|5162127|
+--------+------+----------------------+-----------------+----+----------------+-----------+-------------+-------+------+---+-------+-------

In [9]:
#dropping several columns with missing and irrelevant data
#subreddit/subreddit_id is the same for all comments since we are only looking in r/politics.
#author flairs are texts + images next to a username that shows up when they post a comment. Most users don't use flairs
#name is a unique identifier that is mostly null.
#id is a unique identifier for the commment and does not add any value to analysis
df = df.drop('archived','author_flair_css_class','author_flair_text',
             'subreddit','subreddit_id','name','score_hidden','id')

In [10]:
#Convert column types to integers and timestamps

df = df.withColumn("ups", df["ups"].cast(IntegerType())) 
df = df.withColumn("downs", df["downs"].cast(IntegerType()))
df = df.withColumn("gilded", df["gilded"].cast(IntegerType()))
df = df.withColumn("score", df["score"].cast(IntegerType()))
df = df.withColumn("retrieved_on", df["retrieved_on"].cast(IntegerType()))
df = df.withColumn("controversiality", df["controversiality"].cast(IntegerType()))
df = df.withColumn('created_utc',df["created_utc"].cast(IntegerType()))
df = df.withColumn('retrieved_on',df["retrieved_on"].cast(IntegerType()))


df = df.withColumn('created_utc',to_timestamp(df["created_utc"]))
df = df.withColumn('retrieved_on',to_timestamp(df["retrieved_on"]))

In [11]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: integer (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: integer (nullable = true)
 |-- gilded: integer (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: integer (nullable = true)
 |-- ups: integer (nullable = true)



In [12]:
#fill upvote and downvote null values with zeros
df = df.fillna({ 'ups':0, 'downs':0 })

In [13]:
#Rename a user and botuser columns to match for union of dataframe
users = users.selectExpr('id',"name as username", "created_utc as acct_creation",
                         "updated_on","comment_karma","link_karma")

botusers = botusers.selectExpr('username',"post_karma as link_karma", "comment_karma",
                                 "cake_day as acct_creation","is_bot")

In [14]:
import pyspark.sql.functions as f
#Convert account creation and updated on to timestampps
users = users.withColumn('acct_creation',to_timestamp(users["acct_creation"]))
users = users.withColumn('updated_on',to_timestamp(users["updated_on"]))

#Add column is_bot to users dataframe
users = users.withColumn('is_bot',f.lit('False'))

In [15]:
#Cast link karma and comment karma as integer type
#Convert account creation and updated on to timestamps
botusers = botusers.withColumn('link_karma',botusers["link_karma"].cast(IntegerType()))
botusers = botusers.withColumn('comment_karma',botusers["comment_karma"].cast(IntegerType()))
botusers = botusers.withColumn('acct_creation',botusers["acct_creation"].cast(IntegerType()))
botusers = botusers.withColumn('acct_creation',to_timestamp(botusers["acct_creation"]))

In [16]:
#remove rows from user df that exists in botusers
users = users.join(botusers, users.username==botusers.username, "leftanti")

In [17]:
#Append users dataframe with botusers dataframe 
all_users = users.select('username','acct_creation','comment_karma','link_karma','is_bot')\
            .union(botusers.select('username','acct_creation','comment_karma','link_karma','is_bot'))

In [18]:
all_users.printSchema()

root
 |-- username: string (nullable = true)
 |-- acct_creation: timestamp (nullable = true)
 |-- comment_karma: integer (nullable = true)
 |-- link_karma: integer (nullable = true)
 |-- is_bot: string (nullable = true)



In [19]:
#Checking to see if one of the Russian banned accts is in the users list
all_users.filter(all_users.username == 'MiraranaMogra').show()

+-------------+-------------------+-------------+----------+------+
|     username|      acct_creation|comment_karma|link_karma|is_bot|
+-------------+-------------------+-------------+----------+------+
|MiraranaMogra|2015-05-14 09:58:34|          103|      4902|  True|
+-------------+-------------------+-------------+----------+------+



In [20]:
#Checking to see if one of the Russian comments show up in the comments.
banned = pd.read_csv('bot_accounts2019.csv')['bot_acct'].tolist()

df.filter(df.author.isin(banned)).select('author','body').show(truncate = False)

#THEY DO!! Nice!

+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author      |body                                                                                                                                                                                                                                                                                                |
+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|BlackToLive |By submitting to an independent, non-profit community, the aut

In [21]:
#Combining data frames
df_raw = df.join(all_users, df.author == users.username, 'left')

In [22]:
#Number of comments matched with users dataframe
df_raw.filter(users.username.isNotNull()).count()

5158836

In [23]:
#drop rows where comments authors do not appear in users dataframe
df_raw = df_raw.filter(df_raw.is_bot.isNotNull())
df_raw.count()

5158836

In [24]:
#number of banned account comments in dataframe
df_raw.filter(df_raw.is_bot == 'True').count()

10

# Data Preprocessing

In [25]:
#Extracting comment posted datetime to new column format HHMMSS
df_raw = df_raw.withColumn('created_time',concat(format_string("%02d",hour(df_raw['created_utc'])),
                                                     format_string("%02d",minute(df_raw['created_utc'])),
                                                     format_string("%02d",second(df_raw['created_utc']))))
df_raw = df_raw.withColumn('created_time',df_raw['created_time'].cast(IntegerType()))

In [26]:
#remove formatting characters
df_raw = df_raw.withColumn('body_vec', f.regexp_replace('body', "[^a-zA-Z0-9\\s]", ""))

#Next, we will put all letters into lower-case
df_raw = df_raw.withColumn('body_vec', lower(col('body_vec')))

In [27]:
df_raw.show(5)

+-----------+--------------------+----------------+-------------------+-------------+-----+------+---------+----------+-------------------+-----+---+-----------+-------------------+-------------+----------+------+------------+--------------------+
|     author|                body|controversiality|        created_utc|distinguished|downs|gilded|  link_id| parent_id|       retrieved_on|score|ups|   username|      acct_creation|comment_karma|link_karma|is_bot|created_time|            body_vec|
+-----------+--------------------+----------------+-------------------+-------------+-----+------+---------+----------+-------------------+-----+---+-----------+-------------------+-------------+----------+------+------------+--------------------+
|-El-Zilcho-|You come up with ...|               0|2017-10-30 22:42:36|         null|    0|     0|t3_79qrlp|t1_dp4kl2j|2017-11-10 15:08:19|   12|  0|-El-Zilcho-|2012-06-28 18:27:27|         2292|        63| False|      224236|you come up with ...|
|-El-Zil

In [28]:
from pyspark import keyword_only
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
#import nltk
from nltk.stem.porter import PorterStemmer

#this code courtesy of http://michael-harmon.com/blog/SentimentAnalysisP2.html

class PorterStemming(Transformer, HasInputCol, HasOutputCol):
    """
    PosterStemming class using the NLTK Porter Stemmer
    
    This comes from https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
    Adapted to work with the Porter Stemmer from NLTK.
    """
    
    @keyword_only
    def __init__(self, 
                 inputCol  : str = None, 
                 outputCol : str = None, 
                 min_size  : int = None):
        """
        Constructor takes in the input column name, output column name,
        plus the minimum legnth of a token (min_size)
        """
        # call Transformer classes constructor since were extending it.
        super(Transformer, self).__init__()

        # set Parameter objects minimum token size
        self.min_size = Param(self, "min_size", "")
        self._setDefault(min_size=0)

        # set the input keywork arguments
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

        # initialize Stemmer object
        self.stemmer  = PorterStemmer()

        
    @keyword_only
    def setParams(self, 
                  inputCol  : str = None, 
                  outputCol : str = None, 
                  min_size  : int = None
      ) -> None:
        """
        Function to set the keyword arguemnts
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    

    def _stem_func(self, words  : list) -> list:
        """
        Stemmer function call that performs stemming on a
        list of tokens in words and returns a list of tokens
        that have meet the minimum length requiremnt.
        """
        # We need a way to get min_size and cannot access it 
        # with self.min_size
        min_size       = self.getMinSize()

        # stem that actual tokens by applying 
        # self.stemmer.stem function to each token in 
        # the words list
        stemmed_words  = map(self.stemmer.stem, words)

        # now create the new list of tokens from
        # stemmed_words by filtering out those
        # that are not of legnth > min_size
        filtered_words = filter(lambda x: len(x) > min_size, stemmed_words)

        return list(filtered_words)
    
    def _transform(self, df: DataFrame) -> DataFrame:
        """
        Transform function is the method that is called in the 
        MLPipleline.  We have to override this function for our own use
        and have it call the _stem_func.

        Notice how it takes in a type DataFrame and returns type Dataframe
        """
        # Get the names of the input and output columns to use
        out_col       = self.getOutputCol()
        in_col        = self.getInputCol()

        # create the stemming function UDF by wrapping the stemmer 
        # method function
        stem_func_udf = F.udf(self._stem_func, ArrayType(StringType()))
        
        # now apply that UDF to the column in the dataframe to return
        # a new column that has the same list of words after being stemmed
        df2           = df.withColumn(out_col, stem_func_udf(df[in_col]))

        return df2
  
  
    def setMinSize(self,value):
        """
        This method sets the minimum size value
        for the _paramMap dictionary.
        """
        self._paramMap[self.min_size] = value
        return self

    def getMinSize(self) -> int:
        """
        This method uses the parent classes (Transformer)
        .getOrDefault method to get the minimum
        size of a token.
        """
        return self.getOrDefault(self.min_size)


In [29]:
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, IDF, StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="body_vec", outputCol="body_vec_token")
remover = StopWordsRemover(inputCol = "body_vec_token", outputCol = "body_vec_token_nosw")
stemmer = PorterStemming(inputCol = "body_vec_token_nosw", outputCol = "body_vec_cleaned")
hashingTF = HashingTF(inputCol="body_vec_cleaned", outputCol="body_vec_tf", numFeatures=100)
idf = IDF(inputCol="body_vec_tf", outputCol="body_vec_tfidf")
label_stringIdx = StringIndexer(inputCol = "is_bot", outputCol = "label")

pipeline = Pipeline(stages=[tokenizer, remover, stemmer, hashingTF,idf, label_stringIdx])

In [30]:
%time nlpdf = pipeline.fit(df_raw).transform(df_raw)

CPU times: user 146 ms, sys: 54.8 ms, total: 200 ms
Wall time: 3min 27s


In [31]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['controversiality', 'ups', 'downs', 'gilded', 
                                       'created_time', 'comment_karma','link_karma',
                                        'score'], outputCol='features')
nlpdf = assembler.transform(nlpdf)

In [32]:
# nlpdf = df_raw.drop('body_vec')
# nlpdf = df_raw.drop('body_vec_token')
# nlpdf = df_raw.drop('body_vec_token_nosw')
# nlpdf = df_raw.drop('body_vec_cleaned')

In [33]:
nlpdf.select('body_vec_tfidf').show(5)

+--------------------+
|      body_vec_tfidf|
+--------------------+
|(100,[1,10,30],[2...|
|(100,[30,39,40,44...|
|(100,[6,48,49,70,...|
|(100,[19,30,38,43...|
|(100,[3,4,8,9,10,...|
+--------------------+
only showing top 5 rows



# Modeling

In [34]:
nlpdf.printSchema()

root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: integer (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: integer (nullable = false)
 |-- gilded: integer (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: integer (nullable = true)
 |-- ups: integer (nullable = false)
 |-- username: string (nullable = true)
 |-- acct_creation: timestamp (nullable = true)
 |-- comment_karma: integer (nullable = true)
 |-- link_karma: integer (nullable = true)
 |-- is_bot: string (nullable = true)
 |-- created_time: integer (nullable = true)
 |-- body_vec: string (nullable = true)
 |-- body_vec_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- body_vec_token_nosw: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- body_

In [35]:
#split data into train and test and ensure label classes are stratified
#train_df_notbanned, test_df_notbanned = nlpdf.filter(nlpdf.label==0).sample(False,0.0001,seed=5).randomSplit([0.8, 0.2])
train_df_notbanned, test_df_notbanned = nlpdf.filter(nlpdf.label==0).randomSplit([0.8, 0.2])
train_df_banned, test_df_banned = nlpdf.filter(nlpdf.label==1).randomSplit([0.8, 0.2])

In [36]:
#unioning training sets and test sets
train_df = train_df_notbanned.union(train_df_banned)
test_df = test_df_notbanned.union(test_df_banned)

In [37]:
# sample_train = train_df.cache()
# sample_test = test_df.cache()

# print("length of training set sample {}".format(sample_train.count()))
# print("length of test set {}".format(sample_test.count()))

length of training set sample 431
length of test set 107


In [38]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
start = time.time()
# Set parameters for Logistic Regression
lgr = LogisticRegression(maxIter=10, featuresCol = 'body_vec_tfidf', labelCol='label')

# Fit the model to the data.
lgrm = lgr.fit(train_df)
#lgrm = lgr.fit(sample_train)
end = time.time()
print('time to run: {}'.format(end - start))

time to run: 8.967144966125488


In [39]:
start = time.time()
# Given a dataset, predict each point's label, and show the results.
predictions = lgrm.transform(test_df)
#predictions = lgrm.transform(sample_test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print('prediction accuracy: {}'\
          .format(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})))
print('prediction f1 score: {}'\
          .format(evaluator.evaluate(predictions, {evaluator.metricName: "f1"})))
end = time.time()
print('time to run: {}'.format(end - start))

prediction accuracy: 0.9626168224299065
prediction f1 score: 0.9626168224299065
time to run: 2.2132484912872314


In [40]:
# y_true = predictions.select('label').collect()
# y_pred = predictions.select('prediction').collect()
# true_list = [result.label for result in y_true]
# pred_list = [result.prediction for result in y_pred]

In [42]:
# from sklearn.metrics import classification_report, confusion_matrix
# print(classification_report(true_list, pred_list))

             precision    recall  f1-score   support

        0.0       0.98      0.98      0.98       105
        1.0       0.00      0.00      0.00         2

avg / total       0.96      0.96      0.96       107



In [43]:
from pyspark.ml.classification import RandomForestClassifier
# Set parameters for the Random Forest.
rfc = RandomForestClassifier(maxDepth=5, numTrees=15, impurity="gini", featuresCol='features',
                             labelCol="label")

# Fit the model to the data.
rfcm = rfc.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
rfpredictions = rfcm.transform(test_df)

In [47]:
print('prediction accuracy: {}'\
          .format(evaluator.evaluate(rfpredictions, {evaluator.metricName: "accuracy"})))
print('prediction f1 score: {}'\
          .format(evaluator.evaluate(rfpredictions, {evaluator.metricName: "f1"})))

prediction accuracy: 1.0
prediction f1 score: 1.0


In [48]:
# y_true = rfpredictions.select('label').collect()
# y_pred = rfpredictions.select('prediction').collect()
# true_list = [result.label for result in y_true]
# pred_list = [result.prediction for result in y_pred]

In [49]:
# from sklearn.metrics import classification_report, confusion_matrix
# print(classification_report(true_list, pred_list))

             precision    recall  f1-score   support

        0.0       1.00      1.00      1.00       105
        1.0       1.00      1.00      1.00         2

avg / total       1.00      1.00      1.00       107

