# Spark Streaming
This notebook takes in the `gemini-feed` Kafka topic and produces to the `spark.out` topic a feed which includes the order price volume ratio and bid/ask liquidity for BTC.

In [1]:
# !pip install --force-reinstall pyspark==2.4.6

In [2]:
from kafka import KafkaProducer
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  

In [3]:
producer = KafkaProducer(bootstrap_servers='localhost:9092')

In [4]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils  
from kafka import KafkaProducer
import json
import time

In [5]:
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("ERROR") 
ssc = StreamingContext(sc, 60)  

In [6]:
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'gemini-feed':1})  

In [7]:
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

In [8]:
def handler(message):
    records = message.collect()
    for record in records:
        output = {}
        output['type'] = record[0]
        output['value'] = record[1]
        output['timestamp'] = int(time.time())
        producer.send('spark.out', bytes(json.dumps(output), 'utf-8'))
        producer.flush()

In [9]:
def price_volume(x):
    price = float(x['price'])
    remaining = float(x['remaining'])
    x['price_volume'] = price * remaining
    return (x['side'], x['price_volume'])

In [10]:
parsed_pv = parsed.map(lambda x: price_volume(x))

In [11]:
grouped = parsed_pv.reduceByKey(lambda accum, n: accum + n)

In [12]:
grouped_sorted = grouped.transform(lambda rdd: rdd.sortByKey())

In [13]:
grouped_sorted.foreachRDD(handler)

In [14]:
results = grouped_sorted.map(lambda x: ('price_volume', x[1]))

In [15]:
ratio = results.reduceByKey(lambda x, y: x/y)

In [16]:
ratio.foreachRDD(handler)

In [17]:
grouped_sorted.pprint()
ratio.pprint()

In [18]:
ssc.start()

In [19]:
ssc.stop()

In [20]:
# from kafka import KafkaProducer, KafkaConsumer

In [21]:
# producer = KafkaProducer(bootstrap_servers='localhost:9092')

In [25]:
# consumer = KafkaConsumer('gemini-feed',
#                          bootstrap_servers=['localhost:9092'],
#                          auto_offset_reset='earliest',
#                          enable_auto_commit=True,
#                         value_deserializer=lambda x: x.decode('utf-8')
# )

In [27]:
# for message in consumer:
#     print(message)

ConsumerRecord(topic='gemini-feed', partition=0, offset=1, timestamp=1602623330613, timestamp_type=0, key=None, value='{"side": "bid", "price": "0.02", "remaining": "30960", "price_volume": "1"}', headers=[], checksum=2672202781, serialized_key_size=-1, serialized_value_size=75, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=2, timestamp=1602623330614, timestamp_type=0, key=None, value='{"side": "bid", "price": "0.04", "remaining": "0.00055", "price_volume": "1"}', headers=[], checksum=2356585512, serialized_key_size=-1, serialized_value_size=77, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=3, timestamp=1602623330615, timestamp_type=0, key=None, value='{"side": "bid", "price": "0.05", "remaining": "7494", "price_volume": "1"}', headers=[], checksum=4182436564, serialized_key_size=-1, serialized_value_size=74, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=4, timestamp=160262333

ConsumerRecord(topic='gemini-feed', partition=0, offset=1817, timestamp=1602623331142, timestamp_type=0, key=None, value='{"side": "bid", "price": "9011.00", "remaining": "0.22195094", "price_volume": "1"}', headers=[], checksum=3907689206, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=1818, timestamp=1602623331142, timestamp_type=0, key=None, value='{"side": "bid", "price": "9012.00", "remaining": "0.02774079", "price_volume": "1"}', headers=[], checksum=2110497831, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=1819, timestamp=1602623331142, timestamp_type=0, key=None, value='{"side": "bid", "price": "9012.29", "remaining": "0.00179", "price_volume": "1"}', headers=[], checksum=609570054, serialized_key_size=-1, serialized_value_size=80, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, o

ConsumerRecord(topic='gemini-feed', partition=0, offset=3317, timestamp=1602623331551, timestamp_type=0, key=None, value='{"side": "bid", "price": "10978.00", "remaining": "0.04563672", "price_volume": "1"}', headers=[], checksum=3167175607, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=3318, timestamp=1602623331551, timestamp_type=0, key=None, value='{"side": "bid", "price": "10978.18", "remaining": "0.00768561", "price_volume": "1"}', headers=[], checksum=2064723352, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=3319, timestamp=1602623331551, timestamp_type=0, key=None, value='{"side": "bid", "price": "10978.59", "remaining": "0.001", "price_volume": "1"}', headers=[], checksum=3883370381, serialized_key_size=-1, serialized_value_size=79, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0,

ConsumerRecord(topic='gemini-feed', partition=0, offset=4816, timestamp=1602623331950, timestamp_type=0, key=None, value='{"side": "ask", "price": "14387.00", "remaining": "0.01", "price_volume": "1"}', headers=[], checksum=3641428912, serialized_key_size=-1, serialized_value_size=78, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=4817, timestamp=1602623331950, timestamp_type=0, key=None, value='{"side": "ask", "price": "14388.00", "remaining": "1", "price_volume": "1"}', headers=[], checksum=1211460311, serialized_key_size=-1, serialized_value_size=75, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=4818, timestamp=1602623331950, timestamp_type=0, key=None, value='{"side": "ask", "price": "14390.00", "remaining": "1", "price_volume": "1"}', headers=[], checksum=723165411, serialized_key_size=-1, serialized_value_size=75, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=4819, timest

ConsumerRecord(topic='gemini-feed', partition=0, offset=6316, timestamp=1602623400473, timestamp_type=0, key=None, value='{"side": "bid", "price": "11454.95", "remaining": "0.03745609", "price_volume": "1"}', headers=[], checksum=650635428, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=6317, timestamp=1602623400476, timestamp_type=0, key=None, value='{"side": "bid", "price": "11454.95", "remaining": "0.02709116", "price_volume": "1"}', headers=[], checksum=3966359242, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=6318, timestamp=1602623400479, timestamp_type=0, key=None, value='{"side": "bid", "price": "11454.95", "remaining": "0.03616015", "price_volume": "1"}', headers=[], checksum=4022181987, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partitio

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



ConsumerRecord(topic='gemini-feed', partition=0, offset=12509, timestamp=1602624719512, timestamp_type=0, key=None, value='{"side": "bid", "price": "0.01", "remaining": "112209.1", "price_volume": "1"}', headers=[], checksum=653132759, serialized_key_size=-1, serialized_value_size=78, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=12510, timestamp=1602624719513, timestamp_type=0, key=None, value='{"side": "bid", "price": "0.02", "remaining": "30960", "price_volume": "1"}', headers=[], checksum=2698368685, serialized_key_size=-1, serialized_value_size=75, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=12511, timestamp=1602624719513, timestamp_type=0, key=None, value='{"side": "bid", "price": "0.04", "remaining": "0.00055", "price_volume": "1"}', headers=[], checksum=2227979584, serialized_key_size=-1, serialized_value_size=77, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=12512, 

ConsumerRecord(topic='gemini-feed', partition=0, offset=13469, timestamp=1602624719725, timestamp_type=0, key=None, value='{"side": "bid", "price": "5888.88", "remaining": "0.02547173", "price_volume": "1"}', headers=[], checksum=182974883, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=13470, timestamp=1602624719725, timestamp_type=0, key=None, value='{"side": "bid", "price": "5889.93", "remaining": "0.011088", "price_volume": "1"}', headers=[], checksum=3617358580, serialized_key_size=-1, serialized_value_size=81, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=13471, timestamp=1602624719725, timestamp_type=0, key=None, value='{"side": "bid", "price": "5890.00", "remaining": "0.08488964", "price_volume": "1"}', headers=[], checksum=3274403668, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=

ConsumerRecord(topic='gemini-feed', partition=0, offset=13945, timestamp=1602624719984, timestamp_type=0, key=None, value='{"side": "bid", "price": "8015.13", "remaining": "0.0249528", "price_volume": "1"}', headers=[], checksum=4161419611, serialized_key_size=-1, serialized_value_size=82, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=13946, timestamp=1602624719985, timestamp_type=0, key=None, value='{"side": "bid", "price": "8015.41", "remaining": "0.002047", "price_volume": "1"}', headers=[], checksum=3381055546, serialized_key_size=-1, serialized_value_size=81, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=13947, timestamp=1602624719985, timestamp_type=0, key=None, value='{"side": "bid", "price": "8024.72", "remaining": "0.001954", "price_volume": "1"}', headers=[], checksum=3266131996, serialized_key_size=-1, serialized_value_size=81, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0,

ConsumerRecord(topic='gemini-feed', partition=0, offset=14445, timestamp=1602624720211, timestamp_type=0, key=None, value='{"side": "bid", "price": "9165.15", "remaining": "0.12765748", "price_volume": "1"}', headers=[], checksum=2392310458, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=14446, timestamp=1602624720212, timestamp_type=0, key=None, value='{"side": "bid", "price": "9167.50", "remaining": "0.00027", "price_volume": "1"}', headers=[], checksum=1265455573, serialized_key_size=-1, serialized_value_size=80, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=14447, timestamp=1602624720212, timestamp_type=0, key=None, value='{"side": "bid", "price": "9169.00", "remaining": "0.5", "price_volume": "1"}', headers=[], checksum=3652949224, serialized_key_size=-1, serialized_value_size=76, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offs

ConsumerRecord(topic='gemini-feed', partition=0, offset=14895, timestamp=1602624720416, timestamp_type=0, key=None, value='{"side": "bid", "price": "9713.00", "remaining": "0.2059096", "price_volume": "1"}', headers=[], checksum=1913876890, serialized_key_size=-1, serialized_value_size=82, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=14896, timestamp=1602624720418, timestamp_type=0, key=None, value='{"side": "bid", "price": "9713.95", "remaining": "0.4", "price_volume": "1"}', headers=[], checksum=3162740569, serialized_key_size=-1, serialized_value_size=76, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=14897, timestamp=1602624720418, timestamp_type=0, key=None, value='{"side": "bid", "price": "9714.89", "remaining": "0.00411739", "price_volume": "1"}', headers=[], checksum=3064300064, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, of

ConsumerRecord(topic='gemini-feed', partition=0, offset=15233, timestamp=1602624720620, timestamp_type=0, key=None, value='{"side": "bid", "price": "10119.00", "remaining": "0.00494119", "price_volume": "1"}', headers=[], checksum=4113454098, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=15234, timestamp=1602624720622, timestamp_type=0, key=None, value='{"side": "bid", "price": "10120.15", "remaining": "0.003904", "price_volume": "1"}', headers=[], checksum=388374382, serialized_key_size=-1, serialized_value_size=82, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=15235, timestamp=1602624720622, timestamp_type=0, key=None, value='{"side": "bid", "price": "10120.50", "remaining": "0.001248", "price_volume": "1"}', headers=[], checksum=2399860009, serialized_key_size=-1, serialized_value_size=82, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition

ConsumerRecord(topic='gemini-feed', partition=0, offset=15435, timestamp=1602624720827, timestamp_type=0, key=None, value='{"side": "bid", "price": "10351.68", "remaining": "0.01356496", "price_volume": "1"}', headers=[], checksum=2097302205, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=15436, timestamp=1602624720828, timestamp_type=0, key=None, value='{"side": "bid", "price": "10353.24", "remaining": "0.0059688", "price_volume": "1"}', headers=[], checksum=216852676, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=15437, timestamp=1602624720828, timestamp_type=0, key=None, value='{"side": "bid", "price": "10353.45", "remaining": "0.65212487", "price_volume": "1"}', headers=[], checksum=2269273791, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partit

ConsumerRecord(topic='gemini-feed', partition=0, offset=15891, timestamp=1602624721025, timestamp_type=0, key=None, value='{"side": "bid", "price": "11423.13", "remaining": "0.795", "price_volume": "1"}', headers=[], checksum=2415147127, serialized_key_size=-1, serialized_value_size=79, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=15892, timestamp=1602624721025, timestamp_type=0, key=None, value='{"side": "bid", "price": "11424.00", "remaining": "0.004544", "price_volume": "1"}', headers=[], checksum=4276737024, serialized_key_size=-1, serialized_value_size=82, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=15893, timestamp=1602624721026, timestamp_type=0, key=None, value='{"side": "bid", "price": "11424.14", "remaining": "4.29", "price_volume": "1"}', headers=[], checksum=2165427412, serialized_key_size=-1, serialized_value_size=78, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offs

ConsumerRecord(topic='gemini-feed', partition=0, offset=16537, timestamp=1602624721231, timestamp_type=0, key=None, value='{"side": "ask", "price": "12797.13", "remaining": "0.00023462", "price_volume": "1"}', headers=[], checksum=3944008467, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=16538, timestamp=1602624721231, timestamp_type=0, key=None, value='{"side": "ask", "price": "12798.00", "remaining": "0.1", "price_volume": "1"}', headers=[], checksum=850338952, serialized_key_size=-1, serialized_value_size=77, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=16539, timestamp=1602624721233, timestamp_type=0, key=None, value='{"side": "ask", "price": "12799.00", "remaining": "0.6294", "price_volume": "1"}', headers=[], checksum=655464317, serialized_key_size=-1, serialized_value_size=80, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offs

ConsumerRecord(topic='gemini-feed', partition=0, offset=16976, timestamp=1602624721443, timestamp_type=0, key=None, value='{"side": "ask", "price": "13614.57", "remaining": "0.00026395", "price_volume": "1"}', headers=[], checksum=1508686840, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=16977, timestamp=1602624721443, timestamp_type=0, key=None, value='{"side": "ask", "price": "13617.13", "remaining": "0.00026395", "price_volume": "1"}', headers=[], checksum=3109647769, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=16978, timestamp=1602624721444, timestamp_type=0, key=None, value='{"side": "ask", "price": "13619.69", "remaining": "0.00026395", "price_volume": "1"}', headers=[], checksum=2984975691, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', part

ConsumerRecord(topic='gemini-feed', partition=0, offset=17260, timestamp=1602624721639, timestamp_type=0, key=None, value='{"side": "ask", "price": "15497.51", "remaining": "0.645", "price_volume": "1"}', headers=[], checksum=2607292281, serialized_key_size=-1, serialized_value_size=79, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17261, timestamp=1602624721641, timestamp_type=0, key=None, value='{"side": "ask", "price": "15499.91", "remaining": "5", "price_volume": "1"}', headers=[], checksum=2546073109, serialized_key_size=-1, serialized_value_size=75, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17262, timestamp=1602624721641, timestamp_type=0, key=None, value='{"side": "ask", "price": "15500.00", "remaining": "1.41002602", "price_volume": "1"}', headers=[], checksum=4156251593, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offse

ConsumerRecord(topic='gemini-feed', partition=0, offset=17594, timestamp=1602624721855, timestamp_type=0, key=None, value='{"side": "ask", "price": "22987.00", "remaining": "0.5", "price_volume": "1"}', headers=[], checksum=3558184276, serialized_key_size=-1, serialized_value_size=77, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17595, timestamp=1602624721855, timestamp_type=0, key=None, value='{"side": "ask", "price": "23000.00", "remaining": "2.12173914", "price_volume": "1"}', headers=[], checksum=2657009923, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17596, timestamp=1602624721856, timestamp_type=0, key=None, value='{"side": "ask", "price": "23053.00", "remaining": "0.0056478", "price_volume": "1"}', headers=[], checksum=2729147569, serialized_key_size=-1, serialized_value_size=83, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0,

ConsumerRecord(topic='gemini-feed', partition=0, offset=17859, timestamp=1602624722212, timestamp_type=0, key=None, value='{"side": "ask", "price": "11476.00", "remaining": "1.91", "price_volume": "1"}', headers=[], checksum=3203268251, serialized_key_size=-1, serialized_value_size=78, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17860, timestamp=1602624723344, timestamp_type=0, key=None, value='{"side": "bid", "price": "11306.73", "remaining": "0.45698714", "price_volume": "1"}', headers=[], checksum=317969753, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17861, timestamp=1602624723486, timestamp_type=0, key=None, value='{"side": "ask", "price": "11457.12", "remaining": "5.37666071", "price_volume": "1"}', headers=[], checksum=1839030431, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0

ConsumerRecord(topic='gemini-feed', partition=0, offset=17887, timestamp=1602624725385, timestamp_type=0, key=None, value='{"side": "bid", "price": "11457.11", "remaining": "0.01501094", "price_volume": "1"}', headers=[], checksum=3477158476, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17888, timestamp=1602624725538, timestamp_type=0, key=None, value='{"side": "ask", "price": "11476.00", "remaining": "0", "price_volume": "1"}', headers=[], checksum=3326762264, serialized_key_size=-1, serialized_value_size=75, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, offset=17889, timestamp=1602624726074, timestamp_type=0, key=None, value='{"side": "bid", "price": "11434.23", "remaining": "0.99235243", "price_volume": "1"}', headers=[], checksum=1868168898, serialized_key_size=-1, serialized_value_size=84, serialized_header_size=-1)
ConsumerRecord(topic='gemini-feed', partition=0, 

KeyboardInterrupt: 

In [None]:
from kafka import KafkaProducer
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell' 
producer = KafkaProducer(bootstrap_servers='localhost:9092')
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils  
from kafka import KafkaProducer
import json
import time

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("ERROR") 
ssc = StreamingContext(sc, 60)  

kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'gemini-feed':1}) 
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

def handler(message):
    records = message.collect()
    for record in records:
        output = {}
        output['type'] = record[0]
        output['value'] = record[1]
        output['timestamp'] = int(time.time())
        producer.send('spark.out', bytes(json.dumps(output), 'utf-8'))
        producer.flush()
        
def price_volume(x):
    price = float(x['price'])
    remaining = float(x['remaining'])
    x['price_volume'] = price * remaining
    return (x['side'], x['price_volume'])

parsed_pv = parsed.map(lambda x: price_volume(x))
grouped = parsed_pv.reduceByKey(lambda accum, n: accum + n)
grouped_sorted = grouped.transform(lambda rdd: rdd.sortByKey())
grouped_sorted.foreachRDD(handler)
results = grouped_sorted.map(lambda x: ('price_volume', x[1]))
ratio = results.reduceByKey(lambda x, y: x/y)
ratio.foreachRDD(handler)
grouped_sorted.pprint()
ratio.pprint()
ssc.start()