# Reddit Posts Popularity Prediction

In [1]:
df_train = spark.table("rs_v2_2006_03").select('brand_safe','can_gild','is_crosspostable','selftext','created_utc','num_comments','no_follow','over_18',
'author','domain','parent_whitelist_status','subreddit','subreddit_type','suggested_sort','title','score')
# OOT data is tested by modify "rs_v2_2006_04" to "rs_v2_2006_05"
df_test = spark.table("rs_v2_2006_04").select('brand_safe','can_gild','is_crosspostable','selftext','created_utc','num_comments','no_follow','over_18',
'author','domain','parent_whitelist_status','subreddit','subreddit_type','suggested_sort','title','score')

In [2]:
# Package installed
!pip install nltk
!python -m nltk.downloader all
!pip install tldextract

In [3]:
# Sentiment Analysis Function
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import format_number as fmt
from pyspark.sql.functions import udf
from pyspark.sql.types import *

from pyspark.ml.feature import Tokenizer,StopWordsRemover,Word2Vec
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import re
import numpy as np
import pandas as pd
from nltk.sentiment.vader import SentimentIntensityAnalyzer

import nltk
from os import path, getcwd
import matplotlib.pyplot as plt
import seaborn as sns

def getCleanTweetText(filteredTweetText):
    return ' '.join(filteredTweetText)
  
def getSentimentScore(tweetText):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(tweetText)
    return float(vs['compound'])

def getSentiment(score):
    return 1 if score > 0 else 0

In [4]:
############## Sentiment Score for title ##############
nltk.download('vader_lexicon')

import nltk.sentiment.util
import nltk.sentiment.sentiment_analyzer
from nltk.sentiment.vader import SentimentIntensityAnalyzer 
import pyspark.sql.functions as f

senti = SentimentIntensityAnalyzer

df_train = df_train.withColumn("title", f.regexp_replace(f.col("title"), "[:]", " ").alias("replaced"))

tokenizer = Tokenizer(inputCol='title', outputCol='words')
Tokenized_title = tokenizer.transform(df_train)

remover = StopWordsRemover(inputCol='words', outputCol='filteredTweetText')
StopwordRemoved_title = remover.transform(Tokenized_title)

udfCleanTweetText = udf(getCleanTweetText, StringType())
dfFilteredCleanedTweet = StopwordRemoved_title.withColumn('filteredCleanedTweetText', udfCleanTweetText('filteredTweetText'))

udfSentimentScore = udf(getSentimentScore, FloatType())
df_train = dfFilteredCleanedTweet.withColumn('sentimentScore', udfSentimentScore('filteredCleanedTweetText')).select('sentimentScore','brand_safe','can_gild','is_crosspostable','selftext','created_utc','num_comments','no_follow','over_18',
'author','domain','parent_whitelist_status','subreddit','subreddit_type','suggested_sort','title','score')
# Test data
df_test= df_test.withColumn("title", f.regexp_replace(f.col("title"), "[:]", " ").alias("replaced"))

Tokenized_title2 = tokenizer.transform(df_test)

StopwordRemoved_title2 = remover.transform(Tokenized_title2)

dfFilteredCleanedTweet2 = StopwordRemoved_title2.withColumn('filteredCleanedTweetText', udfCleanTweetText('filteredTweetText'))

df_test = dfFilteredCleanedTweet2.withColumn('sentimentScore', udfSentimentScore('filteredCleanedTweetText')).select('sentimentScore','brand_safe','can_gild','is_crosspostable','selftext','created_utc','num_comments','no_follow','over_18',
'author','domain','parent_whitelist_status','subreddit','subreddit_type','suggested_sort','title','score')

In [5]:
# clean up train data set

# deal with utc
from datetime import date
import time
def convert_utc_to_day_of_the_week(utc_stamp):
    d = date.fromtimestamp(utc_stamp / 1000)
    return time.strptime(d.strftime('%A'), '%A').tm_wday
  
def convert_utc_to_hour(utc_stamp):
  string_hour = time.strftime('%H', time.localtime(utc_stamp))
  return int(string_hour)

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

convert_day_udf = udf(convert_utc_to_day_of_the_week, LongType())
convert_hour_udf = udf(convert_utc_to_hour, LongType())

tmp_data_utc_week_day = df_train.withColumn("day_of_week", convert_day_udf(df_train.created_utc))
tmp_data_utc_hour = tmp_data_utc_week_day.withColumn("hour", convert_hour_udf(tmp_data_utc_week_day.created_utc))

# deal with domain

import tldextract
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

def convert_domain(x):
  temp = []
  y = tldextract.extract(x)
  if y.subdomain != '':
    temp.append(str(y.subdomain))
    temp.append(str(y.domain))
  else:
    temp.append(str(y.domain))
    
  return temp

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

convert_domain_udf = udf(convert_domain, ArrayType(StringType()))

tmp_data_domain_converted = tmp_data_utc_hour.withColumn("domain_converted", convert_domain_udf(tmp_data_utc_hour.domain))

# deal with title

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import re

REPLACE_NO_SPACE = re.compile("[.;:!/\-'?,\"()\[\]]")
  
def preprocess_reviews(reviews):
    reviews = REPLACE_NO_SPACE.sub(" ", reviews.lower())
    return reviews

preprocess_udf = udf(lambda y: preprocess_reviews(y))
temp_with_cleaned_title = tmp_data_domain_converted.select('*', preprocess_udf('title').alias('cleaned_title'))

temp_with_cleaned_title.printSchema()

data_for_model = temp_with_cleaned_title.select('sentimentScore','brand_safe','can_gild','is_crosspostable','selftext','num_comments',
                                               'no_follow','over_18','subreddit','day_of_week','hour','domain_converted','cleaned_title','author',
                                                'parent_whitelist_status','subreddit_type','suggested_sort','score')
data_for_model = data_for_model.selectExpr('sentimentScore','brand_safe','can_gild','is_crosspostable','selftext',
                                           'num_comments','no_follow','over_18','subreddit','day_of_week','hour','domain_converted','cleaned_title','author',
                                            'parent_whitelist_status','subreddit_type','suggested_sort','score as label')

In [6]:
# clean up test data set

# deal with utc
from datetime import date
import time
def convert_utc_to_day_of_the_week(utc_stamp):
    d = date.fromtimestamp(utc_stamp)
    return time.strptime(d.strftime('%A'), '%A').tm_wday
  
def convert_utc_to_hour(utc_stamp):
  string_hour = time.strftime('%H', time.localtime(utc_stamp))
  return int(string_hour)

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

convert_day_udf = udf(convert_utc_to_day_of_the_week, LongType())
convert_hour_udf = udf(convert_utc_to_hour, LongType())

tmp_data_utc_week_day_test = df_test.withColumn("day_of_week", convert_day_udf(df_test.created_utc))
tmp_data_utc_hour_test = tmp_data_utc_week_day_test.withColumn("hour", convert_hour_udf(tmp_data_utc_week_day_test.created_utc))

# deal with domain

import tldextract
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

def convert_domain(x):
  temp = []
  y = tldextract.extract(x)
  if y.subdomain != '':
    temp.append(str(y.subdomain))
    temp.append(str(y.domain))
  else:
    temp.append(str(y.domain))
    
  return temp

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

convert_domain_udf = udf(convert_domain, ArrayType(StringType()))

tmp_data_domain_converted_test = tmp_data_utc_hour_test.withColumn("domain_converted", convert_domain_udf(tmp_data_utc_hour_test.domain))

# deal with title

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import re

REPLACE_NO_SPACE = re.compile("[.;:!/\-'?,\"()\[\]]")

def preprocess_reviews(reviews):
    reviews = REPLACE_NO_SPACE.sub(" ", reviews.lower())
    return reviews

preprocess_udf = udf(lambda y: preprocess_reviews(y))
temp_with_cleaned_title_test = tmp_data_domain_converted_test.select('*', preprocess_udf('title').alias('cleaned_title'))

temp_with_cleaned_title_test.printSchema()

data_for_model_test = temp_with_cleaned_title_test.select('sentimentScore','brand_safe','can_gild','is_crosspostable','selftext','num_comments','no_follow','over_18','subreddit','day_of_week','hour','domain_converted','cleaned_title','author','parent_whitelist_status','subreddit_type','suggested_sort','score')
data_for_model_test = data_for_model_test.selectExpr('sentimentScore','brand_safe','can_gild','is_crosspostable','selftext','num_comments','no_follow','over_18','subreddit','day_of_week','hour','domain_converted','cleaned_title','author','parent_whitelist_status','subreddit_type','suggested_sort','score as label')
data_for_model_test.printSchema()

In [7]:
# Model fitting and evaluation
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import FeatureHasher, RegexTokenizer, StopWordsRemover, HashingTF, IDF, Tokenizer, VectorAssembler

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and rf.
hashingTF = HashingTF(inputCol="domain_converted", outputCol="rawDomain", numFeatures = 64) # numFeatures
idf = IDF(inputCol="rawDomain", outputCol="domain_vector")
regexTokenizer = RegexTokenizer(inputCol="cleaned_title", outputCol="words_title", pattern="\\W")
remover = StopWordsRemover(inputCol="words_title", outputCol="filtered_title")
hashingTF2 = HashingTF(inputCol="filtered_title", outputCol="rawTitle", numFeatures = 64) # numFeatures
idf2 = IDF(inputCol="rawTitle", outputCol="title_vector")
hasher = FeatureHasher(inputCols=['sentimentScore','brand_safe','can_gild','is_crosspostable','selftext','no_follow', 'over_18','subreddit', 'hour','num_comments', 'author','parent_whitelist_status','subreddit_type','suggested_sort'],
                       outputCol="other_features_vector")

vectorAssembler = VectorAssembler(inputCols = ['other_features_vector','title_vector','domain_vector'], outputCol = 'features')

(trainingData, testData) = (data_for_model,data_for_model_test)


# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.

# For RandomForestRegressor Only
rf = RandomForestRegressor(featuresCol='features', labelCol = 'label')

pipeline = Pipeline(stages=[hashingTF, idf,regexTokenizer,remover,hashingTF2,idf2, hasher, vectorAssembler, rf])

paramGrid = ParamGridBuilder()\
    .addGrid(hasher.numFeatures, [50,100])\
    .addGrid(rf.numTrees, [10,20,40])\
    .addGrid(rf.maxDepth, [3,5,10])\
    .build()

###########################################################
# For LinearRegression Only
# lr= LinearRegression(featuresCol = 'features', labelCol='label')
#
# pipeline = Pipeline(stages=[hashingTF, idf,regexTokenizer,remover,hashingTF2,idf2,hasher, vectorAssembler, lr])
#
# paramGrid = ParamGridBuilder()\
#    .addGrid(hasher.numFeatures, [100,200])\
#    .addGrid(lr.regParam, [0.1, 0.01]) \
#    .addGrid(lr.fitIntercept, [False, True])\
#    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
#    .build()
##########################################################

crossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=RegressionEvaluator(),numFolds=2,parallelism=2)


# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainingData)


# Make predictions on test documents. cvModel uses the best model found.
predictions = cvModel.transform(testData)

# Select example rows to display.
predictions.select("features",'label','prediction').show(50)

# Select (prediction, true label) and compute test error
rmse_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
r2_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print("R2 is = %g" % r2)

In [8]:
# Evaluation of model on training data
train_predictions = cvModel.transform(trainingData)
print("Root Mean Squared Error (RMSE) on training data: %f" % rmse_evaluator.evaluate(train_predictions))
print("Coefficient of Determination (R2) on training data: %f" % r2_evaluator.evaluate(train_predictions))

In [9]:
# Feature importance 
# (To obtain importance of string features, string features should be converted to numbers individually)

cvModel.bestModel.stages[-1].featureImportances

# Feature importance extraction function
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
  
# Depict feature importance
varlist = ExtractFeatureImp(cvModel.bestModel.stages[-1].featureImportances, train_predictions, "features").head(40)
print(varlist)
varidx = [x for x in varlist['idx'][0:40]]
print(varidx)

In [10]:
# Discarded code
# NEG of domain
'''
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
def NER(Token_text: str) -> str :
  doc = nlp(Token_text)
  a = 'base'
  for X in doc.ents:
#     a += str(X.text + ' ' + X.label_ + ' ')
    a += str(X.label_ + ' ')
  return a

udf_NER = udf(NER, StringType())
df_domain_NER = df_train.withColumn('domain_NER_info', udf_NER(col('domain')))

tokenizer = Tokenizer(inputCol="domain_NER_info", outputCol="domain_token")
Tokenized_test = tokenizer.transform(df_domain_NER)

word2Vec = Word2Vec(vectorSize=16, minCount=0, inputCol="domain_token", outputCol="domain_NEG")
model = word2Vec.fit(Tokenized_test)
df_train = model.transform(Tokenized_test).drop("domain_NER_info","domain_token")
# Test data
df_domain_NER2 = df_test.withColumn('domain_NER_info', udf_NER(col('domain')))

Tokenized_test2 = tokenizer.transform(df_domain_NER2)

model = word2Vec.fit(Tokenized_test2)
df_test = model.transform(Tokenized_test2).drop("domain_NER_info","domain_token")
'''

In [11]:
# Discarded code
# NEG of title
'''
df_title_NER = df_train.withColumn('title_NER_info', udf_NER(col('title')))

tokenizer = Tokenizer(inputCol="title_NER_info", outputCol="title_token")
Tokenized_test = tokenizer.transform(df_title_NER)

word2Vec = Word2Vec(vectorSize=16, minCount=0, inputCol="title_token", outputCol="title_NEG")
model = word2Vec.fit(Tokenized_test)
df_train = model.transform(Tokenized_test).drop("domain_NER_info","domain_token")
# Test data
df_title_NER2 = df_test.withColumn('title_NER_info', udf_NER(col('title')))

Tokenized_test2 = tokenizer.transform(df_title_NER2)

model = word2Vec.fit(Tokenized_test2)
df_test = model.transform(Tokenized_test2).drop("title_NER_info","title_token")
'''

In [12]:
# Discarded code
# PCA & Outlier Detection
'''
# PCA
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import GaussianMixture
from pyspark.sql.types import StringType

pca = PCA(k=10, inputCol="features_vec", outputCol="pcaFeatures")
model = pca.fit(df_train)

df_train_pca = model.transform(df_train).select("id","score","features_vec","pcaFeatures")

# Clustering
gmm = GaussianMixture(featuresCol="pcaFeatures",k=3,tol=0.001,maxIter=10,seed=10000)
model = gmm.fit(df_train_pca)
predictions = model.transform(df_train_pca)

# Outlier identification
# Threshold value is tuned based on prediction error
def outlier(x):
  if x[0] >= 0.4:
    return 'no'
  elif x[1] >= 0.4:
    return 'no'
  elif x[2] >= 0.4:
    return 'no'
  else:
    return 'yes'

outlier_udf = udf(outlier,StringType())
predictions = predictions.withColumn("outlier",outlier_udf(predictions["probability"]))
df_train_clear = predictions.filter(predictions["outlier"] == "no")
df_train_clear = df_train_clear.drop('prediction','probability','outlier')
df_train_clear.show(truncate=50)
'''