In [1]:
import pyspark
from pyspark.sql import  SparkSession, SQLContext
from pyspark import SparkContext
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import desc, max, avg, col, when, explode
import pandas as pd
from pyspark.ml.feature import VectorAssembler

In [2]:
conf = pyspark.SparkConf().set('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.1').setMaster('local').setAppName('mongoJSON').setAll([('spark.drover.memory','40g'),('spark.executor.memory','50g')])

In [3]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()


<h1><i>Data Loading

In [4]:
mongo_ip="mongodb://localhost:27017/Reddit."

In [5]:
df = spark.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_ip+'wsb' ).load()

<h1><i>Analytics

In [6]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- body: string (nullable = true)
 |-- comms_num: integer (nullable = true)
 |-- created: double (nullable = true)
 |-- id: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [7]:
df.na.drop(how = 'any')

DataFrame[_id: struct<oid:string>, body: string, comms_num: int, created: double, id: string, score: int, timestamp: string, title: string, url: string]

In [8]:
top_10 = df.orderBy(desc('score')).limit(10)
top_10.show()

+--------------------+----+---------+-------------+------+------+-------------------+--------------------+--------------------+
|                 _id|body|comms_num|      created|    id| score|          timestamp|               title|                 url|
+--------------------+----+---------+-------------+------+------+-------------------+--------------------+--------------------+
|{664ec9bef302b656...|NULL|    11554|1.612058438E9|l8rf4k|348241|2021-01-31 04:00:38|Times Square righ...|https://v.redd.it...|
|{664ec9bdf302b656...|NULL|    23309|1.611896783E9|l78uct|225870|2021-01-29 07:06:23|GME YOLO update —...|https://i.redd.it...|
|{664ec9bdf302b656...|NULL|    20105|1.611983085E9|l846a1|219779|2021-01-30 07:04:45|GME YOLO month-en...|https://i.redd.it...|
|{664ec9bff302b656...|NULL|    12846|1.613797555E9|lnqgz8|201168|2021-02-20 07:05:55|GME YOLO update —...|https://i.redd.it...|
|{664ec9bdf302b656...|NULL|     4523|1.611992459E9|l881ia|195782|2021-01-30 09:40:59|   It’s treason the

In [9]:
top_10_pd = top_10.toPandas()

In [10]:
engagement_per_post = df.select('id','score','comms_num').orderBy(desc('score')).limit(10)

<h1><i>Engagement per Post Analysis

In [11]:
engagement_per_post.show()

+------+------+---------+
|    id| score|comms_num|
+------+------+---------+
|l8rf4k|348241|    11554|
|l78uct|225870|    23309|
|l846a1|219779|    20105|
|lnqgz8|201168|    12846|
|l881ia|195782|     4523|
|l8c0u4|192980|     6785|
|lexy8t|191380|     3849|
|l890i7|175665|     4603|
|l7feld|174401|     5211|
|l90oq6|171778|     3633|
+------+------+---------+



In [12]:
vector_columns = 'features'
assembler = VectorAssembler(inputCols = ['score', 'comms_num'],outputCol = vector_columns)

In [13]:
df_vector = assembler.transform(df.select('score','comms_num')).select(vector_columns)

In [14]:
from pyspark.ml.stat import Correlation

corr_matrix = Correlation.corr(df_vector, vector_columns).head()[0]

<h1><i>Correlation Matrix

In [15]:
print('corr_matrix :\n'+str(corr_matrix))

corr_matrix :
DenseMatrix([[1.        , 0.19002685],
             [0.19002685, 1.        ]])


In [16]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

In [17]:
tokenizer = Tokenizer(inputCol = 'title', outputCol = 'title_words')

In [18]:
df = tokenizer.transform(df)

In [19]:
remover = StopWordsRemover(inputCol="title_words", outputCol="filtered_title_words")


In [20]:
df = remover.transform(df)

In [21]:
word_counts = df.withColumn('word', explode(col('filtered_title_words')))\
                .groupBy('word').count()\
                .orderBy(desc('count'))

<h1><i>Word Frequency Distribution Analysis

In [22]:
word_counts.show()

+---------+-----+
|     word|count|
+---------+-----+
|      gme| 5924|
|      buy| 3665|
|        -| 3597|
|     hold| 2538|
|robinhood| 2469|
|      amc| 2316|
|    still| 1979|
|     yolo| 1735|
|     like| 1734|
|  holding| 1620|
|    stock| 1591|
|       🚀| 1485|
|    short| 1408|
|   shares| 1405|
|   bought| 1401|
|      get| 1384|
|    going| 1368|
|       go| 1182|
|     moon| 1141|
|     sell| 1133|
+---------+-----+
only showing top 20 rows



<h1><i>Classification (popularity)

In [23]:
df = df.withColumn('label',when(col('score')>100,1).otherwise(0))

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [25]:
data = df.select('comms_num','score','label')

In [26]:
assembler = VectorAssembler(inputCols = ['comms_num','score'], outputCol = 'features')

In [27]:
log = LogisticRegression()

In [28]:
pipeline = Pipeline(stages = [assembler,log])

In [29]:
(train,test) = data.randomSplit([0.75,0.25])

In [30]:
model = pipeline.fit(train)

In [31]:
Predictions = model.transform(test)

In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [33]:
evaluator = BinaryClassificationEvaluator( labelCol="label", metricName="areaUnderROC")

In [34]:
roc_auc = evaluator.evaluate(Predictions)


In [35]:
print(f"Test ROC AUC = {roc_auc}")

Test ROC AUC = 0.9999998512520339


In [36]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

In [37]:
DTC = DecisionTreeClassifier()

In [38]:
pipeline = Pipeline(stages = [assembler,DTC])

In [40]:
model = pipeline.fit(train)

In [41]:
Predictions = model.transform(test)

In [42]:
evaluator = BinaryClassificationEvaluator( labelCol="label", metricName="areaUnderROC")

In [43]:
roc_auc = evaluator.evaluate(Predictions)


In [44]:
print(f"Test ROC AUC = {roc_auc}")

Test ROC AUC = 0.9993753701035331


In [49]:
RFC=RandomForestClassifier()

In [50]:
pipeline = Pipeline(stages = [assembler,RFC])

In [51]:
model = pipeline.fit(train)

In [52]:
Predictions = model.transform(test)

In [53]:
evaluator = BinaryClassificationEvaluator( labelCol="label", metricName="areaUnderROC")

In [54]:
roc_auc = evaluator.evaluate(Predictions)
print(f"Test ROC AUC = {roc_auc}")

Test ROC AUC = 0.9996002894238759


<h1><i>Regression

In [58]:
from pyspark.ml.regression import LinearRegression

In [59]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- body: string (nullable = true)
 |-- comms_num: integer (nullable = true)
 |-- created: double (nullable = true)
 |-- id: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_title_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- label: integer (nullable = false)



In [60]:
reg_data = df.select('label','score','comms_num')


In [61]:
featureAssembler = VectorAssembler(inputCols = ['label','comms_num'], outputCol = 'independentFeatures')

In [62]:
transformed = featureAssembler.transform(reg_data)

In [63]:
transformed.show()

+-----+-----+---------+-------------------+
|label|score|comms_num|independentFeatures|
+-----+-----+---------+-------------------+
|    0|   55|        6|          [0.0,6.0]|
|    1|  110|       23|         [1.0,23.0]|
|    0|    0|       47|         [0.0,47.0]|
|    0|   29|       74|         [0.0,74.0]|
|    0|   71|      156|        [0.0,156.0]|
|    1|  405|       84|         [1.0,84.0]|
|    1|  317|       53|         [1.0,53.0]|
|    1|  405|      178|        [1.0,178.0]|
|    1|  200|      161|        [1.0,161.0]|
|    1|  291|       27|         [1.0,27.0]|
|    1|  222|       70|         [1.0,70.0]|
|    1|  562|       97|         [1.0,97.0]|
|    0|    0|       16|         [0.0,16.0]|
|    1|  382|       61|         [1.0,61.0]|
|    1|  176|       32|         [1.0,32.0]|
|    1|  240|       49|         [1.0,49.0]|
|    1|  107|       14|         [1.0,14.0]|
|    1|  339|       33|         [1.0,33.0]|
|    1|  515|       41|         [1.0,41.0]|
|    1|  841|     5942|       [1

In [64]:
final = transformed.select('independentFeatures','score')

In [65]:
train_data, test_data = final.randomSplit([0.75,0.25])

In [66]:
regressor=LinearRegression(featuresCol='independentFeatures', labelCol='score')
regressor=regressor.fit(train_data)

In [67]:
pred = regressor.transform(test_data)

In [68]:
pred.show()

+-------------------+-----+------------------+
|independentFeatures|score|        prediction|
+-------------------+-----+------------------+
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2,[],[])|    0|10.211632405767425|
|          (2

In [69]:
result = regressor.evaluate(test_data)

In [70]:
result.r2

0.0836542698097793

In [71]:
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor

In [103]:
regressor=DecisionTreeRegressor(featuresCol='independentFeatures', labelCol='score')
regressor=regressor.fit(train_data)

In [104]:
pred = regressor.transform(test_data)

In [105]:
pred.show()

+-------------------+-----+-----------------+
|independentFeatures|score|       prediction|
+-------------------+-----+-----------------+
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.481117200991066|
|          (2,[],[])|    0|7.48111

In [106]:
from pyspark.ml.evaluation import RegressionEvaluator

In [107]:

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="score", metricName="r2")
r2 = evaluator.evaluate(pred)
print(f" R2 ): {r2}")

 R2 ): 0.4268834945013483


In [98]:
regressor=RandomForestRegressor(featuresCol='independentFeatures', labelCol='score')
regressor=regressor.fit(train_data)

In [99]:
pred = regressor.transform(test_data)

In [101]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="score", metricName="r2")
r2 = evaluator.evaluate(pred)
print(f" R2 ): {r2}")

 R2 ): 0.4071249217101993
