In [2]:
import findspark
findspark.init()

from pyspark import SparkContext
sc=SparkContext()

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("FinalProject").getOrCreate()



In [3]:
sc


Load Reddit data

In [4]:
#RedditData=spark.read.json("s3://jk2060/final_project/")

RedditData=spark.read.json("s3://jk2060/final_project/")

In [5]:
RedditData.show(5)

+--------+-----------+--------------+------------------+-----------------------------+----------------------+---------------------+------------------------+-----------------+-----------------------+-----------------+---------------+--------------------+--------------------+--------+------------+---------+--------------------+----------------+-----------+-------------+------+------+---------+-------+------------+---------+---------+----------+--------------------+-----------+--------------+------------+-----+------------+--------+------------+------------+-----------------------+--------------+
|archived|     author|author_cakeday|author_created_utc|author_flair_background_color|author_flair_css_class|author_flair_richtext|author_flair_template_id|author_flair_text|author_flair_text_color|author_flair_type|author_fullname|author_patreon_flair|                body|can_gild|can_mod_post|collapsed|    collapsed_reason|controversiality|created_utc|distinguished|edited|gilded| gildings|     

In [6]:
RedditData.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_created_utc: long (nullable = true)
 |-- author_flair_background_color: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_richtext: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- e: string (nullable = true)
 |    |    |-- t: string (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- author_flair_template_id: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_flair_text_color: string (nullable = true)
 |-- author_flair_type: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_patreon_flair: boolean (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- can_mod_post: boolean (nullable = true)
 |-

In [7]:
import bz2
import json
import time 
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import RegressionEvaluator




time function to calculate the total time in exexuting the function

In [8]:
def time(method):
    def timed(*args,**kw):
        ts=time.time()
        result=method(*args,**kw)
        te=time.time()
        
        print (te-ts,'sec')
        return result
    return time


Select the columns "id","ups","body" from RedditData

In [9]:
PartialDF=RedditData.select('id','score','body')
PartialDF.show()


+-------+-----+--------------------+
|     id|score|                body|
+-------+-----+--------------------+
|egp0iaf|    1|           [deleted]|
|egp0iag|    5|That’s it folks. ...|
|egp0iah|    3|For 3 grand it be...|
|egp0iai|   -5|          Minecraft |
|egp0iaj|    2|I was watching Ne...|
|egp0iak|    2|Do you have a bui...|
|egp0ial|    3|I vaguely remembe...|
|egp0iam|    1|Haha, I bet those...|
|egp0ian|    6|Staying on Goats ...|
|egp0iao|   28|me getting a bad ...|
|egp0iap|    2|Well, he just got...|
|egp0iaq|    1|If this is a dank...|
|egp0iar|    4|Thanks. It's a sh...|
|egp0ias|    1|Thanks for the ad...|
|egp0iat|    2|             who dat|
|egp0iau|    1|              +karma|
|egp0iav|    2|   Not a bad lineup |
|egp0iaw|    7|Yeah we call thos...|
|egp0iax|    1|Hello, we only al...|
|egp0iay|   15|It's not the same...|
+-------+-----+--------------------+
only showing top 20 rows



Tokenize comment bodies as array and remove stop words

In [10]:
#Tokenize
tokenizer=Tokenizer(inputCol="body",outputCol="words")
TokenizedDF=tokenizer.transform(PartialDF)

#Remove stop words
Remover=StopWordsRemover(inputCol="words",outputCol="NoStopWords")
NoStopWords_DF=Remover.transform(TokenizedDF)


FinalWords=NoStopWords_DF.select('id','score','NoStopWords')
FinalWords.show()




+-------+-----+--------------------+
|     id|score|         NoStopWords|
+-------+-----+--------------------+
|egp0iaf|    1|         [[deleted]]|
|egp0iag|    5|[that’s, folks., ...|
|egp0iah|    3|  [3, grand, better]|
|egp0iai|   -5|         [minecraft]|
|egp0iaj|    2|[watching, neptun...|
|egp0iak|    2|[build, list?, cu...|
|egp0ial|    3|[vaguely, remembe...|
|egp0iam|    1|[haha,, bet, two,...|
|egp0ian|    6|[staying, goats, ...|
|egp0iao|   28|[getting, bad, gr...|
|egp0iap|    2|[well,, got, heal...|
|egp0iaq|    1|[dank, meme,, **u...|
|egp0iar|    4|[thanks., shame,,...|
|egp0ias|    1|[thanks, advice,,...|
|egp0iat|    2|               [dat]|
|egp0iau|    1|            [+karma]|
|egp0iav|    2|       [bad, lineup]|
|egp0iaw|    7|[yeah, call, warm...|
|egp0iax|    1|[hello,, allow, p...|
|egp0iay|   15|[quality, dude,, ...|
+-------+-----+--------------------+
only showing top 20 rows



## Two methods in determining bag of words.

For CV, it creates a bag of words found in the body of comments

For HTF, it creates a bag of words found in the body of comments,but place similiar words in the same bucket





In [11]:
def termF(df,inputCol,outputCol,hashFeatures=None):
    if hashFeatures is None:
        cv=CountVectorizer(inputCol=inputCol,outputCol=outputCol)
        feature_extract=cv.fit(FinalWords)
    else:
        feature_extract=HashingTF(inputCol=inputCol,outputCol=outputCol,numFeatures=hashFeatures)
        
    return feature_extract.transform(df)
        
        
        
        

In [12]:
cvDF=termF(df=FinalWords,inputCol="NoStopWords",outputCol="features")

cvDF.show(5)
cvDF.select('NoStopWords','features').show(5)




+-------+-----+--------------------+--------------------+
|     id|score|         NoStopWords|            features|
+-------+-----+--------------------+--------------------+
|egp0iaf|    1|         [[deleted]]| (262144,[12],[1.0])|
|egp0iag|    5|[that’s, folks., ...|(262144,[119,772,...|
|egp0iah|    3|  [3, grand, better]|(262144,[44,116,2...|
|egp0iai|   -5|         [minecraft]|(262144,[4983],[1...|
|egp0iaj|    2|[watching, neptun...|(262144,[521,794,...|
+-------+-----+--------------------+--------------------+
only showing top 5 rows

+--------------------+--------------------+
|         NoStopWords|            features|
+--------------------+--------------------+
|         [[deleted]]| (262144,[12],[1.0])|
|[that’s, folks., ...|(262144,[119,772,...|
|  [3, grand, better]|(262144,[44,116,2...|
|         [minecraft]|(262144,[4983],[1...|
|[watching, neptun...|(262144,[521,794,...|
+--------------------+--------------------+
only showing top 5 rows



In [13]:
hftDF=termF(df=FinalWords,inputCol="NoStopWords",outputCol="features",hashFeatures=1024)

hftDF.select('NoStopWords','features').show(5)



+--------------------+--------------------+
|         NoStopWords|            features|
+--------------------+--------------------+
|         [[deleted]]|  (1024,[953],[1.0])|
|[that’s, folks., ...|(1024,[719,732,73...|
|  [3, grand, better]|(1024,[205,585,10...|
|         [minecraft]|  (1024,[279],[1.0])|
|[watching, neptun...|(1024,[283,365,39...|
+--------------------+--------------------+
only showing top 5 rows



## Machine Learning: Random Forest Regressor

We will implement Random Forest Regressor to see the computation and accuracy. 




In [14]:
def RandomForestRegression(df,featuresCol,labelCol):
    (TrainDF,TestDF)=df.randomSplit([0.8,0.2])
    
    df_RFR=RandomForestRegressor(featuresCol=featuresCol,labelCol=labelCol)
    
    model=df_RFR.fit(TrainDF)
    
    predictions=model.transform(TestDF)
    
    return predictions

In [None]:
#CV method: prediction and evaluation

predictions=RandomForestRegression(df=cvDF,featuresCol="features",labelCol="score")

predictions.show(5)

evaluator=RegressionEvaluator(labelCol="score",predictionCol="prediction",metricName="rmse")
rmse=evaluator.evaluate(predictions)


#We've tried it, but took more than 3 hours and nothing finished.


In [15]:
#HTF method: prediction and evaluation

predictions=RandomForestRegression(df=hftDF,featuresCol="features",labelCol="score")

predictions.show(5)

evaluator=RegressionEvaluator(labelCol="score",predictionCol="prediction",metricName="rmse")
rmse=evaluator.evaluate(predictions)



+-------+-----+--------------------+--------------------+-----------------+
|     id|score|         NoStopWords|            features|       prediction|
+-------+-----+--------------------+--------------------+-----------------+
|egp0ial|    3|[vaguely, remembe...|(1024,[261,279,36...|8.445206051243071|
|egp0ias|    1|[thanks, advice,,...|(1024,[411,606,72...|8.445206051243071|
|egp0iaz|    1|[[right,, yes...,...|(1024,[10,215,307...|8.445206051243071|
|egp0ib7|    2|[mark, henry, kev...|(1024,[220,273,57...|8.445206051243071|
|egp0ibb|    3|[&gt;, betty, cro...|(1024,[170,304,34...|8.691576235924545|
+-------+-----+--------------------+--------------------+-----------------+
only showing top 5 rows



In [16]:
print(rmse)

123.71892122055176


In [17]:

sc.stop()
