#### This file contains the sentiment analysis model that will classify events as ```success``` or ```failure```

In [1]:
import time

## Sklearn Implementation

In [34]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import mean_squared_error
from sklearn.feature_extraction.text import TfidfVectorizer

In [35]:
sentiment140_sklearn = pd.read_parquet('../data-processed/sentiment140_model_data.parquet')

In [36]:
sentiment140_sklearn.head()

Unnamed: 0,text,label
0,love health4uandpet u guy r best,1
1,im meet one besti tonight cant wait girl talk,1
2,darealsunisakim thank twitter add sunisa got m...,1
3,sick realli cheap hurt much eat real food plu ...,1
4,lovesbrooklyn2 effect everyon,1


#### Preparing word embeddings

In [37]:
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(sentiment140_sklearn['text'])
y = sentiment140_sklearn['label']

#### Training the model

In [38]:
Xtr, Xts, ytr, yts = train_test_split(
    X,
    y,
    test_size=0.3, 
    random_state=42
)

In [39]:
start_time_sklearn = time.time()
model_sklearn = LogisticRegression(random_state=42, max_iter=100).fit(Xtr.astype(int), ytr)

In [40]:
yhat = model_sklearn.predict(Xts.astype(int))

In [41]:
time_sklearn = time.time() - start_time_sklearn

In [42]:
mse_sklearn = mean_squared_error(yts, yhat)

## Pyspark Implementation

In [11]:
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer

In [12]:
conf = pyspark.SparkConf()
# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.driver.memory','8g')
conf.set('spark.ui.showConsoleProgress', False)
try:
    sc = pyspark.SparkContext(conf=conf)
    spark = pyspark.SQLContext.getOrCreate(sc)
except:
    print('Spark context already exists, continuing with', sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/21 18:21:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [13]:
sentiment140_pyspark = spark.read.parquet('../data-processed/sentiment140_model_data.parquet')

#### Preparing word embeddings

In [14]:
tokenizer = Tokenizer(inputCol='text', outputCol='words')
sentiment140_pyspark = tokenizer.transform(sentiment140_pyspark)
sentiment140_pyspark.show(10)

+--------------------+-----+--------------------+
|                text|label|               words|
+--------------------+-----+--------------------+
|love health4uandp...|    1|[love, health4uan...|
|im meet one besti...|    1|[im, meet, one, b...|
|darealsunisakim t...|    1|[darealsunisakim,...|
|sick realli cheap...|    1|[sick, realli, ch...|
|lovesbrooklyn2 ef...|    1|[lovesbrooklyn2, ...|
|productoffear tel...|    1|[productoffear, t...|
|rkeithhil than re...|    1|[rkeithhil, than,...|
|keepinupwkri jeal...|    1|[keepinupwkri, je...|
|tommcfli ah congr...|    1|[tommcfli, ah, co...|
|e4voip respond st...|    1|[e4voip, respond,...|
+--------------------+-----+--------------------+
only showing top 10 rows



In [15]:
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures')
featurizedData = hashingTF.transform(sentiment140_pyspark)

idf = IDF(inputCol='rawFeatures', outputCol='embeddings')
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select('text', 'label', 'embeddings').show(10)

+--------------------+-----+--------------------+
|                text|label|          embeddings|
+--------------------+-----+--------------------+
|love health4uandp...|    1|(262144,[51783,14...|
|im meet one besti...|    1|(262144,[13781,21...|
|darealsunisakim t...|    1|(262144,[1512,292...|
|sick realli cheap...|    1|(262144,[60200,74...|
|lovesbrooklyn2 ef...|    1|(262144,[31924,11...|
|productoffear tel...|    1|(262144,[8940,124...|
|rkeithhil than re...|    1|(262144,[892,1152...|
|keepinupwkri jeal...|    1|(262144,[83747,12...|
|tommcfli ah congr...|    1|(262144,[1512,522...|
|e4voip respond st...|    1|(262144,[1696,581...|
+--------------------+-----+--------------------+
only showing top 10 rows



23/12/21 18:21:30 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


In [21]:
rescaledData = rescaledData.withColumn('label', F.col('label').cast(T.IntegerType()))
rescaledData.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- embeddings: vector (nullable = true)



In [22]:
# sentiment140_pyspark = sentiment140_pyspark.withColumn('label', F.col('label').cast(T.IntegerType()))

In [23]:
# sentiment140_pyspark.show(10)

#### Training the model

In [24]:
train, test = rescaledData.randomSplit([0.7, 0.3], seed=42)

In [25]:
model_pyspark = LogisticRegression(maxIter=100, featuresCol='embeddings', labelCol='label')

In [26]:
start_time_pyspark = time.time()
model_fitted_pyspark = model_pyspark.fit(train)

23/12/21 18:21:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/21 18:21:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:31 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:32 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:32 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:32 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:32 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/21 18:21:32 WARN DAGSchedul

In [27]:
model_predicted_pyspark = model_fitted_pyspark.transform(test)

In [28]:
time_pyspark = time.time() - start_time_pyspark

In [29]:
model_predicted_pyspark.withColumn('prediction', F.col('prediction').cast(T.IntegerType()))\
.select(['text', 'label', 'prediction'])\
.show(10)

+--------------------+-----+----------+
|                text|label|prediction|
+--------------------+-----+----------+
|21 day till chri ...|    1|         1|
|a5hleyf im spend ...|    0|         0|
|aaronrva bathroom...|    0|         0|
|across univers sl...|    1|         0|
|adriman roflmao n...|    1|         0|
|             aghsnow|    0|         0|
|ahh tedium fix br...|    1|         0|
|albinla think ton...|    1|         1|
|alicayaba cuuut h...|    0|         1|
|allanatrogu thank...|    1|         1|
+--------------------+-----+----------+
only showing top 10 rows



23/12/21 18:21:35 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [30]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='mse')
mse_pyspark = evaluator.evaluate(model_predicted_pyspark)

23/12/21 18:21:35 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [31]:
mse_pyspark

0.41406250000000006

#### Comparing the performance of models

In [32]:
from tabulate import tabulate

In [43]:
print(
  tabulate(
    [['Sklearn', time_sklearn, mse_sklearn],
    ['PySpark', time_pyspark, mse_pyspark]],
    headers=['Model type', 'Training and inference time', 'Mean Squared Error']
  )
)

Model type      Training and inference time    Mean Squared Error
------------  -----------------------------  --------------------
Sklearn                            0.732677              0.5
PySpark                            4.22457               0.414063


## Scrape Reddit Data

In [29]:
import praw
import pandas as pd
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import CountVectorizer

In [30]:
reddit = praw.Reddit(
    client_id="",
    client_secret="",
    password="",
    user_agent="",
    username="",
)

In [31]:
# Initialize Spark session
# spark = SparkSession.builder.appName('RedditData').getOrCreate()

subreddit_name = 'nyc'  # Change this to your target subreddit
search_query = 'event'  # Modify this based on how events are typically posted
six_months_ago = datetime.utcnow() - timedelta(days=6*30)  # Approximation of 6 months

# Lists to store event and comments data
events = []
comments_data = []

# Search the subreddit for posts containing 'event' in the title
for submission in reddit.subreddit(subreddit_name).search(search_query, limit=10):  # Adjust the limit as needed
    # if datetime.utcfromtimestamp(submission.created_utc) > six_months_ago:
    submission.comments.replace_more(limit=None)  # Load all comments
    event_info = (
        submission.title,
        submission.url,
        datetime.utcfromtimestamp(submission.created_utc).strftime('%Y-%m-%d %H:%M:%S'),
        len(submission.comments)
    )
    events.append(event_info)

    # Extract comments and add to comments_data
    for comment in submission.comments.list():
        comment_info = (submission.title, submission.url, comment.body)
        comments_data.append(comment_info)

In [32]:
# Create DataFrames
events_df = pd.DataFrame(events, columns=['title', 'url', 'created', 'num_comments'])
comments_df = pd.DataFrame(comments_data, columns=['title', 'url', 'comment'])

In [33]:
# Convert Pandas DataFrames to Spark DataFrames
spark_events_df = spark.createDataFrame(events_df)
spark_comments_df = spark.createDataFrame(comments_df)

In [34]:
# Display the DataFrames
spark_events_df.show(10)
spark_comments_df.show(10)

+--------------------+--------------------+-------------------+------------+
|               title|                 url|            created|num_comments|
+--------------------+--------------------+-------------------+------------+
|Rockefeller Cente...|https://www.reddi...|2023-11-30 05:50:02|          58|
|Drama at a drag q...|https://v.redd.it...|2022-12-12 07:28:34|          73|
|NYC Drag Story Ho...|https://www.nbcne...|2022-12-19 14:52:13|          47|
|Eric Adams attend...|https://www.polit...|2023-11-17 13:41:16|          27|
|NYC Mayor Adams i...|https://www.nydai...|2023-05-12 18:19:26|          28|
|NYC's Newest Park...|https://i.redd.it...|2021-05-21 17:03:37|          40|
|Republican Jewish...|https://www.haare...|2022-12-27 17:07:57|          35|
|Trump Attends UFC...|https://www.theda...|2019-11-03 12:41:48|          30|
|Ahead of potentia...|https://www.polit...|2023-02-19 17:28:21|          29|
|'Change in percep...|https://www.nbcne...|2021-12-18 16:10:25|          30|

In [35]:
comments = spark_comments_df.select(['comment']).withColumnRenamed('comment', 'text')

In [36]:
comments = comments.withColumn('text', F.lower(F.col('text')))

#### Remove stopwords from text

In [37]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/priyangshupal/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [38]:
from nltk.corpus import stopwords

In [39]:
stopwords.words('english')[:10]

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

In [40]:
def removeStopwords(text):
  return " ".join([word for word in text.split() if word not in stopwords.words('english')])

removeStopwordsUDF = F.udf(removeStopwords)

In [41]:
comments = comments.withColumn('text', removeStopwordsUDF(F.col('text')))

#### Remove punctuations

In [42]:
import string

In [43]:
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [44]:
def remove_punctuations(text):
    # Make a translation table that maps all punctuation characters to None
    translator = str.maketrans("", "", string.punctuation)

    # Apply the translation table to the input string
    return text.translate(translator)

removePunctuationsUDF = F.udf(remove_punctuations)

In [45]:
comments = comments.withColumn('text', removePunctuationsUDF(F.col('text')))

#### Remove emails, emojis, urls etc.

In [46]:
import emoji
import re

In [47]:
url_regex = '((www\.[^\s]+)|(https?://[^\s]+))'
username_regex = '@[^\s]+'

In [48]:
def remove_urls(text):
  return re.sub(url_regex, '', text)

def remove_usernames(text):
  return re.sub(username_regex, '', text)

def remove_emojis(text):
  return emoji.demojize(text)

remove_urlsUDF = F.udf(remove_urls)
remove_usernamesUDF = F.udf(remove_usernames)
remove_emojisUDF = F.udf(remove_emojis)

In [49]:
comments = comments.withColumn('text', remove_urlsUDF(F.col('text')))
comments = comments.withColumn('text', remove_usernamesUDF(F.col('text')))
comments = comments.withColumn('text', remove_emojisUDF(F.col('text')))

#### Tokenizing, stemming, and lemmatizing the text

In [50]:
from nltk.tokenize import RegexpTokenizer
from nltk.stem import PorterStemmer

In [51]:
def tokenize_stem_lemmatize(text):
    tokenizer = RegexpTokenizer('\w+')
    tokenized_words = tokenizer.tokenize(text)
    
    # Stemming logic
    stemmer = PorterStemmer()
    stemmed_words = [stemmer.stem(word) for word in tokenized_words]
    
    # Lemmatizing logic
    lemmatizer = nltk.WordNetLemmatizer()
    lemmatized_words = [lemmatizer.lemmatize(word, pos = 'a') for word in stemmed_words]
    
    return ' '.join(lemmatized_words)

tokenize_stem_lemmatizeUDF = F.udf(tokenize_stem_lemmatize)

In [52]:
comments = comments.withColumn('text', tokenize_stem_lemmatizeUDF(F.col('text')))

In [53]:
comments.show(10)

+--------------------+
|                text|
+--------------------+
|swear feel like e...|
|deal propalestini...|
|protest organ wit...|
|palestinian prote...|
|god forbid peopl ...|
|even protest righ...|
|god forbid enjoy ...|
|jesu christ feel ...|
|jew sorri guy did...|
|   there disgust see|
+--------------------+
only showing top 10 rows



#### Preparing word embeddings

In [54]:
comments = comments.withColumn('text', F.split(F.col('text'), ' '))

In [55]:
cv = CountVectorizer(inputCol='text', outputCol='embeddings', vocabSize=3216)

In [56]:
model = cv.fit(comments)

In [57]:
comments = model.transform(comments)

In [58]:
comments.show(10)

+--------------------+--------------------+
|                text|          embeddings|
+--------------------+--------------------+
|[swear, feel, lik...|(3216,[2,104,113,...|
|[deal, propalesti...|(3216,[33,87,102,...|
|[protest, organ, ...|(3216,[0,13,21,48...|
|[palestinian, pro...|(3216,[8,13,112,1...|
|[god, forbid, peo...|(3216,[0,6,41,62,...|
|[even, protest, r...|(3216,[1,6,13,17,...|
|[god, forbid, enj...|(3216,[12,91,211,...|
|[jesu, christ, fe...|(3216,[4,8,13,15,...|
|[jew, sorri, guy,...|(3216,[4,15,37,96...|
|[there, disgust, ...|(3216,[19,39,1407...|
+--------------------+--------------------+
only showing top 10 rows



In [59]:
prediction = model_fitted_pyspark.transform(comments).groupBy('prediction')\
.count().orderBy("count", ascending=False).first()[0]

In [60]:
print('Success' if prediction == 1.0 else 'Failure')

Failure
