In [1]:
# Required libraries
import sys
import datetime
import time
import os
import mmh3

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as sFuncs
from pyspark.sql.window import Window

## PA5 Question 1 - Moving Averages

Calculation of moving stock price averages are part of many a trading strategies ([reference](https://www.investopedia.com/articles/active-trading/052014/how-use-moving-average-buy-stocks.asp)). We will be using the two moving averages strategy, with the shorter-term MA being 10-day and the longer average being 40-day. When the shorter-term MA crosses above the longer-term MA, it's a buy signal, as it indicates that the trend is shifting up. This is known as a "golden cross."

Meanwhile, when the shorter-term MA crosses below the longer-term MA, it's a sell signal, as it indicates that the trend is shifting down. This is known as a "dead/death cross."

To simulate a data stream, you are given a python program `stream-feeder.py` which reads in `dj30.csv` file and pipes it, line by line. `dj30.csv` contains a 25-year history of the Dow Jones Industrial Average prices. We will only be concerned with the Close price. The command `stream-feeder.py | nc -lk 9999` can be run on the master machine of your spark cluster to feed the Close data into pyspark.

1. Set up the stream to feed data into a pyspark DStream. Write and submit a summary of the steps you took (in English) and enclose the (cleaned up after editing) output of `history > /tmp/my_session.txt`. This history should include what you typed into the shell outside of the pyspark session. \[2 pts\]
2. Use DStream windowing to separately accumulate the sum and count of prices, thus creating moving average DStreams. Write and submit the (cleaned up after editing) transcript of your session along with your code. \[4 pts\]
3. \[Optional, 4 bonus points\]. Compare the two moving averages to indicate buy and sell signals. Your output should be of the form `[( <date> buy), ( <date> sell), etc]`

#### Load the data
Note that this below cell needs to be run once!

In [None]:
# to unpack the dataset into the current directory
# NOTE that this cell needs to run once
# %%bash
# cd /home/saberbf/BigData/PA5
# sudo apt-get install python3-pip
# pip3 install pandas
# pip3 install feedparser
# gsutil cp gs://datathinks-home/stream-feeder.py .
# gsutil cp gs://datathinks-home/dj30.csv .
# gsutil cp gs://datathinks-home/headline-extractor.py .
# gsutil cp gs://datathinks-home/feed-parser.py .
# gsutil cp gs://datathinks-home/2020-headlines.csv .

#### Create Spark Streaming Context

##### Notes:
- In case you are using a single-node with k many threads cluster, it is essential to use setMaster('local[k]') or less than k. Otherwise, SparkContext put the sc.master on 'yarn'. It doesn't consider threads as workers and looks for individual workers to do the job. The result would be you will not see a collect() to converge.
- master is a Spark, Mesos or YARN cluster URL, or a special “local[*]” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process (detects the number of cores in the local system).

In [None]:
# sc._conf.getAll()
sc.stop()
# Create a local StreamingContext with 4 working threads
conf = SparkConf().setMaster('local[*]')
sc = SparkContext(conf=conf, appName='NetworkWordCount')

# test the SparkContext to see if it works
# rdd = sc.parallelize([('a',7),('a',2),('b',2)])
# rdd.collect()

In [None]:
# Create a local StreamingContext with batch interval of 2.5 seconds which should 
# hold to 10 sample per RDD
ssc = StreamingContext(sc, 0.25)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" ")).map(lambda word:(word, 1))
# reduce last 30 seconds of data, every 10 seconds
priceSum = words.reduceByWindow(
    lambda x, y: ((float(x[0])+float(y[0]))/(int(x[1])+int(y[1])), (int(x[1])+int(y[1]))),
    None,#lambda x, y: (float(x[0])-float(y[0]), int(x[1])-int(y[1])),
    windowDuration=2.5,
    slideDuration=0.25)
# Print the first ten elements of each RDD generated in this DStream to the console
priceSum.pprint()
# priceSum.saveAsTextFiles("hdfs:///results/MA10")
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

In [None]:
ssc.stop()#(stopSparkContext=True)

In [None]:
# Create a local StreamingContext with batch interval of 10 seconds which should hold to 40 sample per RDD
ssc = StreamingContext(sc, 0.25)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" ")).map(lambda word:(word, 1))
# reduce last 30 seconds of data, every 10 seconds
priceSum = words.reduceByWindow(
    lambda x, y: ((float(x[0])+float(y[0]))/(int(x[1])+int(y[1])), (int(x[1])+int(y[1]))),
    None,#lambda x, y: (float(x[0])-float(y[0]), int(x[1])-int(y[1])),
    windowDuration=4 * 2.5,
    slideDuration=0.25)
# Print the first ten elements of each RDD generated in this DStream to the console
priceSum.pprint()
# priceSum.saveAsTextFiles("hdfs:///results/MA40")
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

In [None]:
ssc.stop()#(stopSparkContext=True)

#### Extra resources
This part is to load the dataset and try to get it into another RDD. It shouldn't be included in the report.

In [None]:
# to load the data into a DataFrame
spark = SparkSession.builder.master('local[4]').getOrCreate()
dj30DF = spark.read.load('gs://datathinks-home/dj30.csv', 
                     format='csv', inferSchema=True, header=True, delimiter=',')
dj30DF = dj30DF.select(['Close', 
           'Date', 
           'Long Date', 
           'Total Stks above 30MA', 
           'Total Stks below 30MA', 
           'Total Stks equal 30MA'])

# dj30DF.withColumn('movingAverage', sum(dj30DF[Close])).over(Window.rowsBetween(-10,0)).show(10)
dj30DF.show(10)

#### DataFrame and SQL Operations
You can easily use DataFrames and SQL operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore, this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession.

In [None]:
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

In [None]:
ssc.stop()

## PA5 Question 2 - Notable News

Most news outlets distribute their news through rss feeds for use by news reader programs. We are writing a news reader that reads the news headlines and only reports those headlines that contain an unfamiliar word. (It's not going to be all that useful but hey...)

The file `2020-headlines.csv` contains headlines from 2020 for mining for familiar words and `headline-extractor.py` for extracting words from such headlines. The program is only half-written. Add to it as follows:

1. Create a Bloom Filter string whose size is approximately 8 times the number of understood words and write the buffer into a text file `bloom.txt` in your shell.
2. The file `bloom.txt` will be used in pyspark. You may want to store it in hdfs so it is accessible from pyspark.
To simulate a data stream, you are given a python program `news-feeder.py` which reads rss feeds from several news outlets. It is rate controlled, feeding us 4 titles per second. The commands `news-feeder.py | nc -lk 9999` or `./news-feeder.py  | tee /dev/stderr | nc -lk 9999`can be run on the master machine of your spark cluster to feed the titles data into pyspark.

1. Set up the stream to feed data into a pyspark DStream. Write and submit a summary of the steps you took (in English) and enclose the (cleaned up after editing) output of `history > /tmp/my_session.txt`. This history should include what you typed into the shell outside of the pyspark session. \[no points for this\]
2. Use DStream windowing to filter incoming headlines. Use a Bloom filter based on `bloom.txt` to emit only the headlines with unfamiliar words in them. Write and submit the (cleaned up after editing) transcript of your session along with your code. \[4 pts\]

In [2]:
import os

dir_path = os.path.dirname('/home/BigData/PA5/')

In [None]:
# # part 1: to create bloom.txt
# ! $dir_path'/streaming/headline-extractor.py'

# # part 2: to put the bloom.txt into the hdfs
# ! hadoop fs -put -f $dir_path/streaming/bloom.txt hdfs:///user/`whoami`/
# ! hadoop fs -ls hdfs:///user/`whoami`/

In [3]:
with open(dir_path + '/streaming/bloom.txt', 'r') as file:
    bloom_filter = file.read()

In [4]:
# Number of items expected to be stored in bloom filter
items_count = len(bloom_filter)
# Size of the bloom filter in bits
filter_size = 8 * items_count
# Number of hash functions used in bloom filteritems_count = len(understood_words) 
hash_count = 6

def bloom_check(items_count):
    '''
    Check for existence of an item in filter
    '''
    for i in range(hash_count):
        digest = mmh3.hash(item, i) % self.size
        if bloom_filter[digest] == '0':

            # if any of bit is False then,its not present
            # in filter
            # else there is probability that it exist
            return False
    return True

In [5]:
# sc._conf.getAll()
sc.stop()
# Create a local StreamingContext with 4 working threads
conf = SparkConf().setMaster('local[*]')
sc = SparkContext(conf=conf, appName='NetworkWordCount')

In [6]:
# Create a local StreamingContext with batch interval of 2.5 seconds which should 
# hold to 10 sample per RDD
ssc = StreamingContext(sc, 0.25)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" ")).map(lambda x: x)
# reduce last 30 seconds of data, every 10 seconds
words = words.window(windowDuration=1, slideDuration=0.25).filter(not bloom_check)
# Print the first ten elements of each RDD generated in this DStream to the console
words.pprint()
# priceSum.saveAsTextFiles("hdfs:///results/MA10")
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

-------------------------------------------
Time: 2020-12-11 11:34:02.250000
-------------------------------------------

-------------------------------------------
Time: 2020-12-11 11:34:02.500000
-------------------------------------------

-------------------------------------------
Time: 2020-12-11 11:34:02.750000
-------------------------------------------

-------------------------------------------
Time: 2020-12-11 11:34:03
-------------------------------------------

-------------------------------------------
Time: 2020-12-11 11:34:03.250000
-------------------------------------------



Py4JJavaError: An error occurred while calling o100.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/lib/spark/python/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1360, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/lib/spark/python/pyspark/context.py", line 1069, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: 'bool' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:457)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:196)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1926)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1914)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1913)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2147)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2096)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2085)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: 'bool' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:457)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:196)


	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1$adapted(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
ssc.stop()#(stopSparkContext=True)