# This will load play data into a Kafka instance. Only run once as otherwise the topics won't be ordered.

To start Kafka 9 in Docker (using image from https://hub.docker.com/r/flozano/kafka/) run the following in the Docker bash:

```
docker run -p 2181:2181 -p 9092:9092 --env _KAFKA_advertised_host_name=`docker-machine ip \`docker-machine active\`` --env _KAFKA_advertised_port=9092 --name=local_kafka flozano/kafka &
```

To kill it run  
```
docker kill local_kafka
```
then 
```
docker rm local_kafka
```

In [1]:
# require at least Python 3.5 for async/await to work
import sys
ver=sys.version_info
assert (ver[0]>=3 and ver[1]>=5)
sys.path.append('../src')
sys.path.append('../../msq-domain/src')

import aiostreams.operators as op
from aiostreams.config import QCConfigProvider

#QCConfigProvider().kafka_broker = '0.0.0.0:9092'
kafka_host = QCConfigProvider().kafka_broker
# !set _KAFKA_advertised_host_name=0.0.0.0
# !set _KAFKA_advertised_port=9092
print(kafka_host)

kafka:9092


In [2]:
# try writing just one record to Kafka using the basic kafka library, 
# and reading back from it, to make sure Kafka is up, 
from kafka import KafkaProducer, KafkaConsumer
import pickle

topic='test_topic_2'

kprod=KafkaProducer(bootstrap_servers=kafka_host)
kprod.send(topic,pickle.dumps('test'))

consumer = KafkaConsumer(topic,bootstrap_servers=kafka_host,auto_offset_reset='earliest',
                         group_id=None, value_deserializer=lambda x: pickle.loads(x))
next(consumer).value

'test'

In [3]:
# let's check the dump was successful
from aiostreams import AsyncKafkaSource, run
import aiostreams.operators as op
from mosaicsmartdata.common.json_convertor import *
import json

input_graph = AsyncKafkaSource('bond-trades-topic', bootstrap_servers=kafka_host ,\
                               value_deserializer =  lambda x: x.decode('utf-8'))|\
                            op.map(lambda x: json.loads(x.value) ) > print# op.map(lambda x: json_to_domain(x.value)) > print
                #
                
run(input_graph)

In [22]:
# let's check the dump was successful
from aiostreams import AsyncKafkaSource, run
import aiostreams.operators as op
from mosaicsmartdata.common.json_convertor import *
import json

input_graph = AsyncKafkaSource('bond-quotes-topic', bootstrap_servers=kafka_host,\
                               value_deserializer =  lambda x: x.decode('utf-8')) > []#| op.passthrough(print) |\
                                #op.map(lambda x: json_to_domain(x.value)) > print# #op.map(lambda x: json.loads(x.value) ) > []
                           #
                            #
                #
                
run(input_graph)
print(input_graph.sink[0].value)
json_to_domain(input_graph.sink[0].value)

{"orderId":"0","marketDataSnapshotFullRefreshList":[{"key":"912796KN81504495801933","securityId":"912796KN8","timestamp":1504495801933,"marketDataEntryList":[{"entryId":"pxPre2hEntry_bid","entryType":"BID","entryPx":{"value":"92.71875"},"currencyCode":{"value":"USD"},"settlementCurrencyCode":{"value":"USD"},"entrySize":{"value":"10000000"},"quoteEntryId":{"value":"quoteEntryId"}},{"entryId":"pxPre2hEntry_mid","entryType":"MID","entryPx":{"value":"92.71875"},"currencyCode":{"value":"USD"},"settlementCurrencyCode":{"value":"USD"},"entrySize":{"value":"10000000"},"quoteEntryId":{"value":"quoteEntryId"}},{"entryId":"pxPre2hEntry_ask","entryType":"OFFER","entryPx":{"value":"92.71875"},"currencyCode":{"value":"USD"},"settlementCurrencyCode":{"value":"USD"},"entrySize":{"value":"10000000"},"quoteEntryId":{"value":"quoteEntryId"}}]},{"key":"912796KN81504499401933","securityId":"912796KN8","timestamp":1504499401933,"marketDataEntryList":[{"entryId":"pxPre1hEntry_bid","entryType":"BID","entryPx"

TypeError: float() argument must be a string or a number, not 'X'

In [32]:
# let's check the dump was successful
from aiostreams import AsyncKafkaSource, run
import aiostreams.operators as op
from mosaicsmartdata.common.json_convertor import *
import json

input_graph = AsyncKafkaSource('output-topic', bootstrap_servers=kafka_host,\
                               value_deserializer =  lambda x: x.decode('utf-8')) >print
                #op.map(lambda x: json_to_domain(x.value)) > print
                #op.map(lambda x: json.loads(x.value) ) > print
                
run(input_graph)

In [None]:
# now let's feed that data into our nice Kafka wrapper: first eur quotes
from aiostreams.kafka import AsyncKafkaPublisher

print(len(eurquotes))

pub1 = AsyncKafkaPublisher(bootstrap_servers=kafka_host, topic='eurquotes', value_serializer = 'json')
#pub1.verbose=True
to_async_iterable(eurquotes) > pub1


In [None]:
# and then gbp quotes
print(len(eurgbpquotes))

pub1.topic='eurgbpquotes'
to_async_iterable(eurgbpquotes) > pub1

In [None]:
# and now let's publish trades to a joint topic, just for the fun of it

eurtrades = to_async_iterable(eurtrades)
eurgbptrades = to_async_iterable(eurgbptrades)
joint_trade_stream = op.merge_sorted([eurgbptrades,eurtrades], lambda x: x.timestamp)

pub1.topic = 'trades'
joint_trade_stream > pub1