# ST446 Distributed Computing for Big Data
---

## Spark streaming

_You may use a GCP compute engine or your own computer._

In this homework problem your task is to track the sample mean and unbiased sample variance of the number of words per tweet using the Spark streaming API. 

**Part 1** Write functions to calculate the mean and variance for all the tweets that you receive over time, not just for the last received batch of the stream. This means that you need to calculate the mean and the variance recursively using the Spark streaming concept of a "stateful" operation.

Note that you may also use `nc` to manually stream messages instead of using the twitter stream, if you have trouble getting the twitter stream to run.

**Part 2** Calculate two different versions of sample mean and variance estimators with different step sizes:

**Decaying step size**: Recursive evaluations of mean and unbiased sample variance for an input stream of observations $x_1, x_2, \ldots$ with decaying step size:

* Mean: $m_{n+1} = (1-w_n) m_n + w_n x_{n+1}$ where $w_n = 1/(n+1)$.
* Sample variance: $\sigma^2_{n+1} = a_n \sigma^2_n + b_n (x_{n+1}-m_n)^2$ where $a_n$ and $b_n$ are two sequences whose values you need to work out first.

**Fixed step size**: Recursive evaluation with fixed step size (exponentially weighted smoothing):

* Mean: same as above but with $w_n = 0.2$ for all $n$
* Sample variance: same as above with $a_n = 1 - b_n$ and $b_n = 0.2$ for all $n$

Document both versions of the code and how you got it to run (on a cluster or on your own computer). You do not need to run the code in a notebook.

## How to get Twitter data or set up a stream using `nc`

Please see the exercise from one of our class sessions for guidance on how to receive a live Twitter data stream: https://github.com/lse-st446/lectures2021/blob/main/Week07/class/streaming_examples.md and the Python scripts in the same directory.

**The project was done in the python scripts in this folder using Kafka in GCPs Virtual Machine.**

In [None]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

# your code

-------------------------------------------
Time: 2018-03-18 01:10:30
-------------------------------------------
('value', [24.811613475177307, 44.20085841549906, 24.209978560582634, 24.957847579984524, 2256])

-------------------------------------------
Time: 2018-03-18 01:10:40
-------------------------------------------
('value', [24.97726411519515, 43.06620009359806, 26.519676561841425, 43.536080409329095, 2639])

-------------------------------------------
Time: 2018-03-18 01:10:50
-------------------------------------------
('value', [24.93192333113022, 44.386107836764644, 26.637484695351, 29.70932422359988, 3026])

-------------------------------------------
Time: 2018-03-18 01:11:00
-------------------------------------------
('value', [24.84276546091014, 45.746206764934485, 24.73456555282074, 127.55689030474566, 3428])

-------------------------------------------
Time: 2018-03-18 01:11:10
-------------------------------------------
('value', [24.861234053631836, 45.7809994142

KeyboardInterrupt: 