# Spark Streaming
This notebook takes in the `gemini-feed` Kafka topic and produces to the `spark.out` topic a feed which includes the order price volume ratio and bid/ask liquidity for BTC.

In [1]:
!pip install kafka-python

Collecting kafka-python
  Using cached https://files.pythonhosted.org/packages/82/39/aebe3ad518513bbb2260dd84ac21e5c30af860cc4c95b32acbd64b9d9d0d/kafka_python-1.4.6-py2.py3-none-any.whl
Installing collected packages: kafka-python
Successfully installed kafka-python-1.4.6


In [21]:
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 pyspark-shell'  

In [22]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils  
from kafka import KafkaProducer
import json
import time

In [23]:
producer = KafkaProducer(bootstrap_servers='kafka-node:9092')

In [24]:
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("ERROR") 
ssc = StreamingContext(sc, 60)  

num_streams=5 
kafkaStreams = [KafkaUtils.createStream(ssc,"kafka-node:2181","spark-streaming",{"gemini-feed":1}) for i in range(num_streams)]
unified_stream = ssc.union(*kafkaStreams)


In [25]:
kafkaStream = KafkaUtils.createStream(ssc, 'kafka-node:2181', 'spark-streaming', {'gemini-feed':1})  

In [26]:
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

In [27]:
def handler(message):
    records = message.collect()
    for record in records:
        output = {}
        output['type'] = record[0]
        output['value'] = record[1]
        output['timestamp'] = int(time.time())
        producer.send('spark.out', bytes(json.dumps(output), 'utf-8'))
        producer.flush()

In [28]:
def price_volume(x):
    price = float(x['price'])
    remaining = float(x['remaining'])
    x['price_volume'] = price * remaining
    return (x['side'], x['price_volume'])

Get price/volume ratio

In [29]:
parsed_pv = parsed.map(lambda x: price_volume(x))

Group by side (bid or ask)

In [30]:
grouped = parsed_pv.reduceByKey(lambda accum, n: accum + n)

In [31]:
grouped_sorted = grouped.transform(lambda rdd: rdd.sortByKey())

In [32]:
grouped_sorted.foreachRDD(handler)

Generate a ratio of the price_volume of ask to bids. So, if ask volume is 4000 and bid volume is 1000 then the ratio is 4. This suggests ask (sell) side liquidity greatly exceeds buy side.

In [33]:
results = grouped_sorted.map(lambda x: ('price_volume', x[1]))

In [34]:
ratio = results.reduceByKey(lambda x, y: x/y)

In [35]:
ratio.foreachRDD(handler)

Print results

In [36]:
grouped_sorted.pprint()
ratio.pprint()

In [37]:
ssc.start()

-------------------------------------------
Time: 2019-05-23 01:14:00
-------------------------------------------

-------------------------------------------
Time: 2019-05-23 01:14:00
-------------------------------------------



In [38]:
ssc.stop()