# Data Harvester

In [None]:
# all our files are in JSON
#
import json

### Read in client referential

In [None]:
file_name= '../data/example_clients.json'
with open(file_name, 'r') as fp:
    records = json.load(fp)

print('example record:', records[:1])
print('nos records', len(records))

### Create graph

In [None]:
from src.amgraph import AMFGraph

client_ref_graph = AMFGraph()

for record in records:
    
    node_id = ('client', record['name'])
    
    node_attr = {(('has_sector', None), ('sector', record['group'])): {'prob': 1.0}}
    
    client_ref_graph.set_node(node=node_id, node_attr=node_attr)

client_ref_graph.plot(dimension=3)

### Connect to Dask distributed cache and store

In [None]:
from src.distributed_cache import DistributedCache
import os
from dotenv import load_dotenv

load_dotenv()
config = {'db_name': os.getenv("DB_NAME"),
              'db_username': os.getenv("DB_USERNAME"),
              'db_password': os.getenv("DB_PASSWORD"),
              'db_system': os.getenv("DB_SYSTEM"),
              'db_config_file_path': os.getenv("DB_CONFIG_PATH"),
              'db_queries_file_path': os.getenv("DB_QUERIES_PATH"),
              'scheduler_address': 'tcp://192.168.1.67:8786'}


In [None]:
dc = DistributedCache(config=config)

dc.set_kv(store_name='HSG', key='client_referential', value=client_ref_graph)

### Read in rainbow trades

In [None]:
file_name= '../data/rainbow_trades.json'
with open(file_name, 'r') as fp:
    records = json.load(fp)

print('example record:', records[:1])
print('nos records', len(records))

In [None]:
from src.pubsub import PubSub

publisher = PubSub(uid='trade_dh', config=config)

### Publish each trade record as a Trade Event

In [None]:
import pprint as pp

start_trade_id = 0
end_trade_id = 1

for trade_id in range(start_trade_id, min(end_trade_id + 1, len(records))):
    
    record = records[trade_id]
     
    pp.pprint(record)
    publisher.publish(topic='trade_ev', msg=record)
    