# CSGY-6513 Big Data Final Project
In this notebook, I implement the Count-Based model on Kaggle Dataset.

## 1. Setting Up PySpark, Read the Data.

In [1]:
import os
import pyspark

conf = pyspark.SparkConf()
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory', '4g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/14 17:40:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




In [3]:
from pyspark.sql.functions import *

news_data = 'Combined_News_DJIA.csv'
data_df = spark.read.format('csv').option('inferSchema','true').option('header','true').load(news_data)
data_df = data_df.withColumn('Date', to_date('Date'))

                                                                                

In [4]:
data_df.show(3)

22/12/14 17:42:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      Date|Label|                Top1|                Top2|                Top3|                Top4|                Top5|                Top6|                Top7|                Top8|                Top9|               Top10|               Top11|               Top12|      

For each day, we have label and 25 news headlines. The label is 1 if the DJIA (Dow Jones Industrial Average) daily return is plus, and 0 if minus.

## 2. Preprocessing, Computing the Score for Each Frequent Word.

In [5]:
# Concatenate all the news headlines into column "News" for each day.
data_df = data_df.withColumn('News', col('Top1'))
for i in range(1, 26):
    data_df = data_df.withColumn('News', concat_ws(' ', 'News', 'Top'+str(i)))

# Lowercase them, remove letters that is not numbers and alphabets, and split them into words.
data_df = data_df.withColumn('News', regexp_replace(lower(col('News')), r"[^0-9a-z]", " "))
data_df = data_df.withColumn('News', split(col('News'), " "))

# Discard unnecessary columns and prepare for the word count using "explode" function.
data_df = data_df.select(col('Date'), col('Label'), col('News'))
data_df = data_df.withColumn('News', explode(col('News'))).withColumnRenamed('News', 'word')
data_df = data_df.withColumn('word', trim(col('word')))
# Remove the row with empty string and "b". All the texts start with a letter "b" which is nothing to do with the news headlines.
data_df = data_df.where((col('word')!='') & (col('word')!='b'))

# Split the data into training and test sets.
train_df = data_df.where(col('Date') < '2015-09-17')
test_df = data_df.where(col('Date') >= '2015-09-17')

In [6]:
# Perform the word counting on training data.
count_df = train_df.select(col('word')).where(col('Label')==1).groupBy('word').count()
count_df = count_df.withColumnRenamed('count', 'pos')
neg_df = train_df.select(col('word')).where(col('Label')==0).groupBy('word').count()
neg_df = neg_df.withColumnRenamed('count', 'neg')
count_df = count_df.join(neg_df, ['word'], 'outer')
count_df = count_df.na.fill(value=0)
count_df = count_df.withColumn('sum', col('pos')+col('neg'))

# We only focus on frequent words, which is top 900 words in this case.
count_df = count_df.where(col('sum')>100)
print(f"Number of Words: {count_df.count()}")

[Stage 7:>                                                          (0 + 1) / 1]

Number of Words: 900


                                                                                

In [7]:
# Since the number of positives (the DJIA daily return is plus) and negatives are not the same, we normalize it.
# Multiplying by 100 doesn't have a specific meaning. Just scaling so that the values don't get too small.
pos_sum = count_df.agg({'pos': 'sum'}).collect()[0][0]
neg_sum = count_df.agg({'neg': 'sum'}).collect()[0][0]
count_df = count_df.withColumn('pos', col('pos') * 100 / pos_sum)
count_df = count_df.withColumn('neg', col('neg') * 100 / neg_sum)
count_df.sort(col('sum').desc()).show(10)



+----+------------------+------------------+-----+
|word|               pos|               neg|  sum|
+----+------------------+------------------+-----+
| the| 5.506913833705694| 5.594660578965725|22443|
|  to|4.5537941317181705| 4.541467178202649|18398|
|  of|3.9295285959134763|3.8550154834754853|15755|
|  in| 3.841741254940941| 3.846560343278692|15550|
|   a|2.8036211116942957|2.8155616855321983|11364|
| and|2.2360225367290156| 2.140735808575626| 8865|
|   s| 1.684216393473081|1.6889142543094793| 6822|
| for|1.6623856790513394|1.6434678757517147| 6689|
|  on|1.3028756160209576|1.3501802001754442| 5360|
|  is|1.1658530467781116|1.1583542069606942| 4702|
+----+------------------+------------------+-----+
only showing top 10 rows



                                                                                

These are the top 10 frequent words. However, since pos and neg values are nearly the same, the final score will be close to 0, meaning these words have limited influence on our prediction.

In [8]:
from pyspark.sql.types import DoubleType

def clip(val, floor, ceiling):
    if val < floor:
        val = floor
    elif val > ceiling:
        val = ceiling
    return val
clip_udf = udf(clip, DoubleType())

# We lower bound with small value to avoid having log(0). Upper bound doesn't have any meaning but we need to feed some value to the function.
lower, upper = 1e-7, 100
count_df = count_df.withColumn('pos', clip_udf(col('pos'), lit(lower), lit(upper)))
count_df = count_df.withColumn('neg', clip_udf(col('neg'), lit(lower), lit(upper)))

# Computing the score. Since less frequent values tend to get high score, we bound it.
count_df = count_df.withColumn('score', log(col('pos')) - log(col('neg')))
lower, upper = -0.5, 0.5
count_df = count_df.withColumn('score', clip_udf(col('score'), lit(lower), lit(upper)))

Below is the words with top 10 highest scores, which means they show up more in the positive contexts (Days when DJIA rose).

In [9]:
count_df.sort(col('score').desc()).show(10)

[Stage 39:>                                                         (0 + 1) / 1]

+--------+--------------------+--------------------+---+-------------------+
|    word|                 pos|                 neg|sum|              score|
+--------+--------------------+--------------------+---+-------------------+
|chemical|0.036229696274379566|0.020080957967384298|116|                0.5|
| spanish|0.038087629416655444|0.020609404229683884|121|                0.5|
| doctors|  0.0357652129888106|0.021666296754283058|118|                0.5|
| nigeria| 0.03251382998982782| 0.01955251170508471|107|                0.5|
|   coast| 0.04226797898677616| 0.02430852806578099|137|                0.5|
|     non| 0.03855211270222441| 0.02272318927888223|126|                0.5|
|hospital| 0.03437176313210369| 0.02113785049198347|114|0.48617009251120136|
|   judge| 0.04273246227234513|0.026422313114979338|142|0.48075002504186237|
|northern| 0.04273246227234513|0.026422313114979338|142|0.48075002504186237|
|    jews|0.039945562558931315|0.024836974328080578|133| 0.4751841479644172|

                                                                                

Similarly, below is the words with top 10 lowest scores.

In [10]:
count_df.sort(col('score')).show(10)

[Stage 44:>                                                         (0 + 1) / 1]

+---------+--------------------+--------------------+---+--------------------+
|     word|                 pos|                 neg|sum|               score|
+---------+--------------------+--------------------+---+--------------------+
|   speech|  0.0222951977073105|0.036991238360971075|118|                -0.5|
|      bin|0.017650364851620814| 0.03329211452487397|101|                -0.5|
|  beijing|0.020901747850603596| 0.03487745331177273|111|                -0.5|
|      web|0.020901747850603596|0.033820560787173555|109|-0.48123614016838445|
|   search|0.019508297993896688|0.031178329475675618|101|  -0.468883372201383|
|    users| 0.02415313084958638|0.038576577147869835|125|-0.46823126914599555|
|   monday| 0.03344279656096576|0.052844626229958676|172| -0.4575196135510682|
|     hong|0.020437264565034627| 0.03170677573797521|104|  -0.439170474882872|
|ukrainian|0.032049346704258845|0.049673948656161156|163|-0.43820382425177673|
|     kong|0.020437264565034627| 0.03064988321337603

                                                                                

## 3. Prediction

In [11]:
# For each word in test data, we assign the computed score by left join.
test_df = test_df.join(count_df.select(col('word'), col('score')), ['word'], 'left')
test_df = test_df.na.fill(value=0, subset=['score'])

In [12]:
# Use groupBy to aggregate the word scores.
# min('Label') is just keeping one label (ground truth) for each day. Taking minimum is not necessary since the labels are the same for a certain day.
test_df = test_df.select(col('Date'), col('Label'), col('score')).groupBy('Date').agg(min('Label'),sum('score'))
test_df = test_df.withColumnRenamed('min(Label)', 'Label').withColumnRenamed('sum(score)', 'sum')

In [123]:
# Finally, compute the accuracy.
acc = test_df.where((col('Label')==1)==(col('sum')>0)).count() / test_df.count()
print(f"Test Accuracy: {acc}")

                                                                                

Test Accuracy: 0.505


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 60104)
Traceback (most recent call last):
  File "/opt/conda/envs/bigdata-fall22/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/envs/bigdata-fall22/lib/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/envs/bigdata-fall22/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/envs/bigdata-fall22/lib/python3.7/socketserver.py", line 720, in __init__
    self.handle()
  File "/opt/conda/envs/bigdata-fall22/lib/python3.7/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/conda/envs/bigdata-fall22/lib/python3.7/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/o