## Spark Code Development Notebook
* Version: 1.0.0
* Last updated: Jul 19, 2020

This notebook is used for development and demonstration of PySpark code to process the news articles for a given analysis window and then write the resulting sentiment analysis scores and labels back to DynamoDB.

V1.0.0 is testing the pipeline with only looking at news_title

### Load the data from DynamoDB

In [1]:
import boto3
from boto3.dynamodb.conditions import Key

In [2]:
table_name = 'test_data'

In [3]:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)

In [4]:
# Grab data from a particular analysis window
target_analysis_window = '2020-07-15_Hour=17'

response = table.query(
    KeyConditionExpression=Key('analysis_window').eq(target_analysis_window)
)

In [5]:
# This is a list of dictionaries where
# the keys of each dictionary is a key/column in the DynamoDB table
news_data = response['Items']

### Start spark and parallelize the data for processing

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = (SparkSession.builder
                        .appName("SparkTest") # Set app name
                        .master("local[2]") # Run locally with 2 cores
                        .config("spark.driver.memory", "4g")
                        .config("spark.executor.memory", "3g")
                        .getOrCreate())

In [8]:
# This method throws deprecated function warning
#Convert list to RDD
news_rdd = spark.sparkContext.parallelize(news_data)
#Create data frame
news_df = spark.createDataFrame(news_rdd)

# Alternative method which doesn't throw deprecated warning:
# https://kontext.tech/column/spark/366/convert-python-dictionary-list-to-pyspark-dataframe
# from pyspark.sql import Row
# news_df = spark.createDataFrame([Row(**i) for i in news_data])
# BUT! This method throws an error with .rdd.flatMap(lambda x: x).collect()
# java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 
# 11 fields are required while 10 values are provided.
# However, we don't usually collect until the end of the pipeline.

news_df.printSchema()



root
 |-- analysis_window: string (nullable = true)
 |-- api_success_utc_str: string (nullable = true)
 |-- api_success_utc_ts: decimal(38,18) (nullable = true)
 |-- news_content: string (nullable = true)
 |-- news_link: string (nullable = true)
 |-- news_publisher: string (nullable = true)
 |-- news_timestamp: decimal(38,18) (nullable = true)
 |-- news_title: string (nullable = true)
 |-- source_api: string (nullable = true)
 |-- symb_id_source: string (nullable = true)
 |-- t_symb: string (nullable = true)



In [9]:
news_data[0]['news_title']

'Apple News adds new audio features, including a daily briefing, alongside expanded local coverage'

In [10]:
# Drop nulls, most likely there won't be any for titles
title_df = news_df.select('analysis_window', 't_symb', 'news_timestamp', 'news_title').na.drop()
# https://stackoverflow.com/questions/38610559/
title_list = title_df.rdd.flatMap(lambda x: x).collect()

### Cleaning and processing pipeline

In [11]:
from bs4 import BeautifulSoup
from transformers import pipeline
from multiprocessing import cpu_count
from multiprocessing import Pool

In [12]:
class SentimentAnalysisPipeline:
    def __init__(self):
        # Note that the transformers pipelines can take list of strings as input
        # instead of just one string.
#         self.summarizer_pipeline = pipeline("summarization")
        self.sentiment_pipeline = pipeline("sentiment-analysis")
        
        # multiprocessing core count heuristic from 
        # comment in https://stackoverflow.com/questions/20886565/
        self.pool_cores = cpu_count()-1 or 1
        
    def strip_html(self,input_string):
        """
        Strips any HTML tags from a string
        """ 
        
        cleaned_text = BeautifulSoup(input_string).text
        
        return cleaned_text
    
    def strip_html_multi(self, input_string_list):
                
        # Make sure input is a list, or if it's one string
        # convert to list.
        if type(input_string_list) is not list:
            input_string_list = list(input_string_list)     

        # TODO may want to initialize and destroy pool somwhere else
        # Or it might not matter too much given this function is only
        # called once per spark job / AWS EMR startup
        p = 2
        with Pool(processes=p) as pool:
            chunksize = 3
            no_html_text = pool.map(self.strip_html, input_string_list, chunksize)        
        
        return no_html_text
    
    def raw_text_to_sentiment(self,input_string):
        """
        Takes a list of strings or just one regular string and runs it
        through a pipeline to get the positive/negative label and the
        respective scores.
        
        Pipeline consists of:
        (1) Removing HTML if present
        (2) Summarizing the news articles
        (3) Calculating a sentiment score and label (Positive or Negative)
        
        Returns a list of dictionaries with label and score as keys.
        """

#         no_html_text = self.strip_html_multi(input_string)
        
#         news_summary = self.summarizer_pipeline(no_html_text, 
#                                                 max_length=300, 
#                                                 min_length=30)[0]['summary_text']
#         sentiment_scores = self.sentiment_pipeline(summary_news)

        # TODO check for token length

        # Temporary version expects news titles only
        label = self.sentiment_pipeline(input_string)[0]['label'] 
        score = self.sentiment_pipeline(input_string)[0]['score']

        return (label, score)

In [13]:
s_pipe = SentimentAnalysisPipeline()

In [14]:
title_list[0]

'2020-07-15_Hour=17'

In [15]:
test_result = s_pipe.raw_text_to_sentiment(title_list[0])

In [16]:
test_result

('NEGATIVE', 0.6570926308631897)

## Try adding a UDF for transformers

In [17]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [18]:
udf_schema = StructType([
    StructField("label", StringType(), nullable=False),
    StructField("score", FloatType(), nullable=False)
])

In [19]:
s_pipe_udf = udf(s_pipe.raw_text_to_sentiment,udf_schema)

In [20]:
title_df

DataFrame[analysis_window: string, t_symb: string, news_timestamp: decimal(38,18), news_title: string]

In [21]:
title_df.show(5)

+------------------+------+--------------------+--------------------+
|   analysis_window|t_symb|      news_timestamp|          news_title|
+------------------+------+--------------------+--------------------+
|2020-07-15_Hour=17|  AAPL|1594832503.000000...|Apple News adds n...|
|2020-07-15_Hour=17|  AAPL|1594826311.000000...|Over 2,500 games ...|
|2020-07-15_Hour=17|  AAPL|1594859001.000000...|Dow Jones Futures...|
|2020-07-15_Hour=17|  AAPL|1594851916.000000...|Twitter Hack Snag...|
|2020-07-15_Hour=17|  AAPL|1594850401.000000...|Apple Ruling Make...|
+------------------+------+--------------------+--------------------+
only showing top 5 rows



In [22]:
sentiment_df = title_df.withColumn('sentiment', s_pipe_udf(title_df['news_title']))

In [23]:
sentiment_df.printSchema()

root
 |-- analysis_window: string (nullable = true)
 |-- t_symb: string (nullable = true)
 |-- news_timestamp: decimal(38,18) (nullable = true)
 |-- news_title: string (nullable = true)
 |-- sentiment: struct (nullable = true)
 |    |-- label: string (nullable = false)
 |    |-- score: float (nullable = false)



In [24]:
sentiment_df = sentiment_df.select('analysis_window', 't_symb', 'news_timestamp',
                                   'news_title', 'sentiment.label', 'sentiment.score')

In [25]:
sentiment_df.printSchema()

root
 |-- analysis_window: string (nullable = true)
 |-- t_symb: string (nullable = true)
 |-- news_timestamp: decimal(38,18) (nullable = true)
 |-- news_title: string (nullable = true)
 |-- label: string (nullable = true)
 |-- score: float (nullable = true)



In [26]:
sentiment_df.show(10)

+------------------+------+--------------------+--------------------+--------+----------+
|   analysis_window|t_symb|      news_timestamp|          news_title|   label|     score|
+------------------+------+--------------------+--------------------+--------+----------+
|2020-07-15_Hour=17|  AAPL|1594832503.000000...|Apple News adds n...|POSITIVE| 0.9947432|
|2020-07-15_Hour=17|  AAPL|1594826311.000000...|Over 2,500 games ...|NEGATIVE| 0.9982638|
|2020-07-15_Hour=17|  AAPL|1594859001.000000...|Dow Jones Futures...|NEGATIVE|0.99574244|
|2020-07-15_Hour=17|  AAPL|1594851916.000000...|Twitter Hack Snag...|NEGATIVE|0.99829704|
|2020-07-15_Hour=17|  AAPL|1594850401.000000...|Apple Ruling Make...|NEGATIVE|0.99890846|
|2020-07-15_Hour=17|  AAPL|1594847766.000000...|TikTok’s Huge Dat...|NEGATIVE|  0.968786|
|2020-07-15_Hour=17|  AAPL|1594847280.000000...|Hackers compromis...|NEGATIVE| 0.9991414|
|2020-07-15_Hour=17|  AAPL|1594844640.000000...|Is Tesla Worth $2...|NEGATIVE|0.98299336|
|2020-07-1

### Final Label Calculation:

We want a final label of either **POSITIVE**, **NEGATIVE**, or **UNCERTAIN**. 

We will use a somewhat naive and simple approach to calculating sentiment through averaging.

Criteria for final score:
* The final score should be between -1 and 1.
* The older news, the less important it is, scores are weighed exponentially less every 3 hours from the most
* recent news. E.g. Most recent news have a weight of 1 and news that are 3 hours away from the MAX timestamp have a weight of 0.5, 6 hour away from MAX timestamp has weight of 0.25 and so on.
* Any score between -0.7 and 0.7 (exclusive) is labelled UNCERTAIN
* Scores that are 0.7 or greater are labelled POSITIVE
* Scores that are -0.7 or less are labelled NEGATIVE

In [27]:
from pyspark.sql.functions import when, floor as spark_floor

### Scores are bounded between -1 and 1
If the label is NEGATIVE, make the score value negative.

In [28]:
sentiment_df = sentiment_df.withColumn('score', 
                                       (when(sentiment_df.label == 'NEGATIVE', -sentiment_df.score)
                                        .otherwise(sentiment_df.score))
                                      )

In [29]:
sentiment_df.show(10)

+------------------+------+--------------------+--------------------+--------+-----------+
|   analysis_window|t_symb|      news_timestamp|          news_title|   label|      score|
+------------------+------+--------------------+--------------------+--------+-----------+
|2020-07-15_Hour=17|  AAPL|1594832503.000000...|Apple News adds n...|POSITIVE|  0.9947432|
|2020-07-15_Hour=17|  AAPL|1594826311.000000...|Over 2,500 games ...|NEGATIVE| -0.9982638|
|2020-07-15_Hour=17|  AAPL|1594859001.000000...|Dow Jones Futures...|NEGATIVE|-0.99574244|
|2020-07-15_Hour=17|  AAPL|1594851916.000000...|Twitter Hack Snag...|NEGATIVE|-0.99829704|
|2020-07-15_Hour=17|  AAPL|1594850401.000000...|Apple Ruling Make...|NEGATIVE|-0.99890846|
|2020-07-15_Hour=17|  AAPL|1594847766.000000...|TikTok’s Huge Dat...|NEGATIVE|  -0.968786|
|2020-07-15_Hour=17|  AAPL|1594847280.000000...|Hackers compromis...|NEGATIVE| -0.9991414|
|2020-07-15_Hour=17|  AAPL|1594844640.000000...|Is Tesla Worth $2...|NEGATIVE|-0.98299336|

### Old news weigh less

The older the news, the less important it is, scores are weighted exponentially less every 3 hours from the most recent timestamp in the analysis window.

In [30]:
from pyspark.sql import Window
from pyspark.sql.functions import max as spark_max
from pyspark.sql.functions import col

In [31]:
# Side note: Remember that Spark Dataframes are based on RDDs which are immutable, 
# instead the steps are all lazily evaluated so that process can be optimized by Spark.
# Also note that we are calling .show() for demo/debug purposes only as this causes Spark
# to execute and collect everything onto the Driver node.

# Calculate weight factor
# Since we want the weight to be halved every 3 hours, the weight is basically
# 1 / (2^h) where h is the hours away from max divided by 3 and rounded down [h = floor(diff_hour/3)]
# e.g. 5 Hour difference from MAX timestamp means h = floor(5/3) and weight = 1/(2^1) = 1/2

# Spark transformations needed:
# 1. Get most latest (max) timestamp of news articles in each analysis window and stock
# 2. Convert news_timestamp which is seconds from epoch to hour from epoch.
# 3. Calculate the number of hour difference between the current row value and max value
#    in terms of news_timestamp hours from epoch.
# 4. Divide this difference by 3 and get the floor
# 5. Calculate the weight which is 1/(2^h) where h = floor(diff_hour/3), h was calculated in step (3)
# 6. Multiply the sentiment score by the weight to get the new time weighted score column


# 1. Get max timestamp (epoch seconds) for each analysis window and stock ticker
# https://stackoverflow.com/questions/49241264/
# https://stackoverflow.com/questions/62863632/

column_list = ['analysis_window', 't_symb']
window_spec = Window.partitionBy([col(x) for x in column_list])
sentiment_df = sentiment_df.withColumn('max_timestamp', spark_max(col('news_timestamp')).over(window_spec))

# Convert from seconds from epoch to hour from epoch
# Just divide the timestamp by 3600 seconds number of hours since epoch. 
# (Worry about taking floor later)
sentiment_df = sentiment_df.withColumn('max_timestamp_hours', sentiment_df.max_timestamp / 3600)
sentiment_df = sentiment_df.drop('max_timestamp')


# 2. Convert news_timestamp which is seconds from epoch to hour from epoch.
sentiment_df = sentiment_df.withColumn('news_timestamp_hours', sentiment_df.news_timestamp/3600)
sentiment_df = sentiment_df.drop('news_timestamp')

# 3. Calculate the number of hour difference between the current row value and max value in terms of 
#    news_timestamp hours from epoch.

sentiment_df = sentiment_df.withColumn('diff_hours', sentiment_df.max_timestamp_hours - sentiment_df.news_timestamp_hours)
sentiment_df = sentiment_df.drop('news_timestamp_hours') # don't need it anymore

# 4. Divide this difference by 3 and get the floor
staleness_period = 3
sentiment_df = sentiment_df.withColumn('weight_denom_power', spark_floor(sentiment_df.diff_hours / staleness_period))
sentiment_df = sentiment_df.drop('diff_hours')

# Check for when difference is negative and throw and error or log it because something is wrong.
# TODO maybe add this number to log file or throw error
num_negatives = sentiment_df.filter(sentiment_df.weight_denom_power < 0).count()

# 5. Calculate the weight which is 1/(2^h) where h = floor(diff_hour/3), h was calculated in step (3)
sentiment_df = sentiment_df.withColumn('score_weight', 1/(2**sentiment_df.weight_denom_power))
sentiment_df = sentiment_df.drop('weight_denom_power')

# 6. Multiply the sentiment score by the weight to get the new time weighted score column
sentiment_df = sentiment_df.withColumn('weighted_score', sentiment_df.score_weight * sentiment_df.score)

In [32]:
sentiment_df.show(5)

+------------------+------+--------------------+--------+-----------+--------------------+------------+-------------------+
|   analysis_window|t_symb|          news_title|   label|      score| max_timestamp_hours|score_weight|     weighted_score|
+------------------+------+--------------------+--------+-----------+--------------------+------------+-------------------+
|2020-07-15_Hour=17|  GOOG|Lawsuits allege M...|NEGATIVE|-0.99602497|443016.3488888888...|         0.5|-0.4980124831199646|
|2020-07-15_Hour=17|  GOOG|Gmail for G Suite...|POSITIVE| 0.99527836|443016.3488888888...|         0.5|0.49763917922973633|
|2020-07-15_Hour=17|  GOOG|After $20 Billion...|NEGATIVE| -0.9974083|443016.3488888888...|         1.0|-0.9974082708358765|
|2020-07-15_Hour=17|  GOOG|Amazon Extends Wo...|NEGATIVE| -0.9492833|443016.3488888888...|         1.0|-0.9492833018302917|
|2020-07-15_Hour=17|  GOOG|Apple Ruling Make...|NEGATIVE|-0.99890846|443016.3488888888...|         1.0|-0.9989084601402283|
+-------

### Weighted average scores and change the labels

First we will sum all the weighted scores and divide it by the sum of the score weights i.e. get a weighted average. This operation will be on rows grouped by their respective analysis window and stock ticker symbol.

Then, instead of just **positive** and **negative**, we want one more label called **uncertain** which is for scores less than 0.7 for either positive or negative.


In [33]:
from pyspark.sql.functions import sum as spark_sum

In [34]:
# Get sum of weights and sum of weighted scores
sentiment_df = (sentiment_df.groupBy('analysis_window', 't_symb')
                            .agg(spark_sum('weighted_score').alias('sum_scores'),
                                 spark_sum('score_weight').alias('sum_weights'))
               )

In [35]:
sentiment_df.printSchema()

root
 |-- analysis_window: string (nullable = true)
 |-- t_symb: string (nullable = true)
 |-- sum_scores: double (nullable = true)
 |-- sum_weights: double (nullable = true)



In [36]:
# Calculate final score for each stock
sentiment_df = sentiment_df.withColumn('final_score', sentiment_df.sum_scores / sentiment_df.sum_weights)

In [37]:
sentiment_df.show()

+------------------+------+-------------------+-----------------+--------------------+
|   analysis_window|t_symb|         sum_scores|      sum_weights|         final_score|
+------------------+------+-------------------+-----------------+--------------------+
|2020-07-15_Hour=17|  GOOG| -6.029178576936829|   10.82861328125| -0.5567821493243732|
|2020-07-15_Hour=17|  AAPL| -5.456488942727447|         13.15625|-0.41474500277263254|
|2020-07-15_Hour=17|    FB| -3.147122867685539|5.354244232177734| -0.5877809698653788|
|2020-07-15_Hour=17|  NFLX|-5.9542470138840145|   9.009521484375| -0.6608838243196739|
|2020-07-15_Hour=17|  AMZN|  -5.34244602243416|      11.93359375| -0.4476812378864632|
+------------------+------+-------------------+-----------------+--------------------+



In [38]:
sentiment_df = (sentiment_df.withColumn('label',when(sentiment_df.final_score >= 0.5, 'POSITIVE')
                                               .when(sentiment_df.final_score <= -0.5, 'NEGATIVE')
                                               .otherwise('UNCERTAIN'))
                )

In [39]:
sentiment_df.select('analysis_window', 't_symb', 'label', 'final_score').show()

+------------------+------+---------+--------------------+
|   analysis_window|t_symb|    label|         final_score|
+------------------+------+---------+--------------------+
|2020-07-15_Hour=17|  GOOG| NEGATIVE| -0.5567821493243732|
|2020-07-15_Hour=17|  AAPL|UNCERTAIN|-0.41474500277263254|
|2020-07-15_Hour=17|    FB| NEGATIVE| -0.5877809698653788|
|2020-07-15_Hour=17|  NFLX| NEGATIVE| -0.6608838243196739|
|2020-07-15_Hour=17|  AMZN|UNCERTAIN| -0.4476812378864632|
+------------------+------+---------+--------------------+



### Store the results in DynamoDB

DynamoDB only accepts Decimal objects for floating point numbers when using boto3, so we need to convert the final score column from float to Decimal.

In [40]:
# Keep only entries that we need for the website
sentiment_df = sentiment_df.select('analysis_window', 't_symb', 'label', 'final_score')

# Cast float to Decimal
# precision: the maximum total number of digits (default: 10)
# scale: the number of digits on right side of dot. (default: 0)
sentiment_df = sentiment_df.withColumn('final_score', sentiment_df.final_score.cast(DecimalType(precision=10, scale=8)))

In [41]:
sentiment_df.printSchema()

root
 |-- analysis_window: string (nullable = true)
 |-- t_symb: string (nullable = true)
 |-- label: string (nullable = false)
 |-- final_score: decimal(10,8) (nullable = true)



In [42]:
results = sentiment_df.collect()

In [43]:
results_dict = [row.asDict() for row in results]

In [44]:
results_table = dynamodb.Table('test_results')

In [45]:
# Use batch writer to automatically handle buffering and sending items in batches
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/dynamodb.html
with results_table.batch_writer() as batch:
    for rd in results_dict:
        batch.put_item(
            Item=rd
        )