# ST446 Distributed Computing for Big Data
## Homework - Traci Lim Zheng Wen
### Milan Vojnovic and Christine Yuen, LT 2018
---

## P2: Spark streaming

In this homework assignment problem your task is to track the sample mean and unbiased sample variance of the number of words per tweet using Spark streaming API. You should calculate the mean and variance for all the tweets that you receive over time, not just for the last received batch of the stream. This means that you need to calculate the mean and the variance recursively using the Spark streaming concept of a "stateful" operation.

You should calculate two different versions of sample mean and variance estimators with different step sizes:

**Decaying step size**: Recursive evaluations of mean and unbiased sample variance for an input stream of observations $x_1, x_2, \ldots$ with decaying step size:

* Mean: $m_{n+1} = (1-w_n) m_n + w_n x_{n+1}$ where $w_n = 1/(n+1)$
* Sample variance: $\sigma^2_{n+1} = a_n \sigma^2_n + b_n (x_{n+1}-m_n)^2$ where $a_n$ and $b_n$ are two sequences whose values you need to work out as a warm-up exercise

**Fixed step size**: Recursive evaluation with fixed step size (exponentially weighted smoothing):

* Mean: same as above but with $w_n = 0.2$ for all $n$
* Sample variance: same as above with $a_n = 1 - b_n$ and $b_n = 0.2$ for all $n$

## How to get Twitter data

Please see the exercise from one of our class sessions for guidance on how to receive a live Twitter data stream: https://github.com/lse-st446/lectures/blob/master/week06/class/TwitterStreamingAPI.ipynb 

In [1]:
import nltk
nltk.download()

showing info https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml


True

In [1]:
# Create Spark Context
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'
from pyspark import SparkContext
sc = SparkContext()

In [2]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

ssc = StreamingContext(sc, 10)
kafka_stream = KafkaUtils.createStream(ssc, \
                                       "localhost:2181", \
                                       "test-consumer-group", \
                                        {"twitter-stream":1})
ssc.checkpoint("checkpoint")

In [3]:
##Print the first tweet of each batch
def returnText(x):
    try:
        return x['text']
    except:
        return ""

lines = kafka_stream.map(lambda x: json.loads(x[1])).map(returnText)
lines.pprint(1)

In [4]:
lines.count().pprint()

In [5]:
from nltk.tokenize import sent_tokenize, word_tokenize
#from nltk.corpus import stopwords
import string
#stop_words = set(stopwords.words('english'))

def get_tokens(line):
    tokens = word_tokenize(line)
    # convert to lower case
    tokens = [w.lower() for w in tokens]
    # remove punctuation from each word
    table = str.maketrans('', '', string.punctuation)
    stripped = [w.translate(table) for w in tokens]
    # remove remaining tokens that are not alphabetic
    words = [word for word in stripped if word.isalpha()]
    # NOT filter out stop words
    #words = [w for w in words if not w in stop_words]
    return len(words)

counts = lines.map(lambda row: (1, get_tokens(row))) # number of words per tweet
#counts.pprint(1)

## Part 1: decaying step size

In [6]:
def updateMetricsDecay(new_values, prev_decay):
    # if old_metric do not exist, set them to 0
    if not prev_decay:
        prev_decay = [0,0,0]
    count_decay = prev_decay[0] # retrieve prior total count
    mean_decay = prev_decay[1] # retrieve prior mean
    var_decay = prev_decay[2] # retrieve prior variance
    for i in new_values:
        count_decay += 1 # n+1
        w_n = 1/count_decay # 1/(n+1)
        if count_decay >= 2:
            b_n = 1/(count_decay-1) # equivalent to 1/(n+1-1) = 1/n
        elif count_decay == 1:
            b_n = 0 # to avoid 1/0
        prev_mean_decay = mean_decay
        mean_decay = (1-w_n)*prev_mean_decay + i*w_n # M_n+1 = (1-W_n)M_n + W_n*x_n+1
        var_decay = (1-b_n)*var_decay + b_n*(i-prev_mean_decay)**2 # var_n+1 = (1-b_n)*var_n + b_n*(x_n+1 - M_n)^2
    return [count_decay, mean_decay, var_decay]

running_metrics = counts.updateStateByKey(updateMetricsDecay)
# Print count_decay, mean_decay, var_decay
running_metrics.pprint(1)

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2018-03-27 14:08:40
-------------------------------------------
RT @mitchellvii: I think Trump's takeaway message from this Omnibus will be this:

"If I wanted funding for our military I had to accept th…
...

-------------------------------------------
Time: 2018-03-27 14:08:40
-------------------------------------------
9225

-------------------------------------------
Time: 2018-03-27 14:08:40
-------------------------------------------
(1, [9225, 18.00747967479676, 30.86996021140999])

-------------------------------------------
Time: 2018-03-27 14:08:50
-------------------------------------------
RT @EricHolder: Constitution does not require citizenship question. This is purely political. Trump Administration is trying to rig the 202…
...

-------------------------------------------
Time: 2018-03-27 14:08:50
-------------------------------------------
1712

-------------------------------------------
Time: 2018-03-27 14:08:50
-----

## Part 2: fixed step size

In [6]:
w_n = 0.2 # fixed step size for each instance iteration

def updateMetricsFixed(new_values, old_metric_fix):
    # if old_metric do not exist, set them to 0
    if not old_metric_fix:
        old_metric_fix = [0,0]
    mean_fix = old_metric_fix[0] # retrieve prior mean
    var_fix = old_metric_fix[1] # retrieve prior variance
    for i in new_values:
        mean_fix = (1-w_n)*mean_fix + w_n*i # update mean with fixed step size
        var_fix = (1-w_n)*var_fix + w_n*(i-mean_fix)**2 # update variance with fixed step-size
    return [mean_fix, var_fix]

running_metrics = counts.updateStateByKey(updateMetricsFixed)
# Print mean_fixed, var_fixed
running_metrics.pprint(1)

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2018-03-27 14:09:50
-------------------------------------------

-------------------------------------------
Time: 2018-03-27 14:09:50
-------------------------------------------

-------------------------------------------
Time: 2018-03-27 14:09:50
-------------------------------------------

-------------------------------------------
Time: 2018-03-27 14:10:00
-------------------------------------------
RT @mitchellvii: I think Trump's takeaway message from this Omnibus will be this:

"If I wanted funding for our military I had to accept th…
...

-------------------------------------------
Time: 2018-03-27 14:10:00
-------------------------------------------
12496

-------------------------------------------
Time: 2018-03-27 14:10:00
-------------------------------------------
(1, [18.022925032345793, 14.103199518654023])

-------------------------------------------
Time: 2018-03-27 14:10:10
-------------------------------------------