In [1]:
from datetime import date, datetime
import numpy as np
import pandas as pd
import perspective
from zeta_py.exchange import Exchange
from solana.rpc import commitment
from zeta_py.utils import cluster_endpoint
from zeta_py.types import LoadExchangeConfig
from solana.rpc.async_api import AsyncClient
from zeta_py.types import Asset, Side

In [2]:
network = "mainnet_beta"
opts = {
    "skip_preflight": False,
    "preflight_commitment": commitment.Confirmed,
}
endpoint = cluster_endpoint(network)
config = LoadExchangeConfig(
    **{
        "network": network,
        "connection": AsyncClient(endpoint, commitment=opts.get("preflight_commitment")),
        "assets": [Asset.SOL],
        "opts": opts,
        "throttle_ms": 0,
        "load_from_store": False,
    }
)

zeta = Exchange(config)
await zeta.load(config, subscribe=True)

Loaded account: State
Loaded account: Pricing
Loading Market: SOL
Loaded account: PerpSyncQueue
Subscribed to SOL:bid
Subscribed to SOL:ask
Loaded account: Clock
Subscribed to Clock
Subscribed to Pricing


In [11]:
# data = {
#     'price': [zeta.accounts.pricing.account.mark_prices[0]],
#     'time': [zeta.clock.account.unix_timestamp]
# }

bid = zeta.markets[Asset.SOL].get_l2(Side.BID, depth=None)
ask = zeta.markets[Asset.SOL].get_l2(Side.ASK, depth=None)
data = {
    'price': [float(l.price) for l in bid+ask],
    'size': [float(l.size) for l in bid+ask],
    'side': ['bid']*len(bid)+['ask']*len(ask),
    'time': [datetime.fromtimestamp(zeta.clock.account.unix_timestamp)]*len(bid+ask)
}

# data
table = perspective.Table(data)

In [12]:
type(data['size'][0])

float

In [13]:
import time
import asyncio

async def _update():
    while True:
        bid = zeta.markets[Asset.SOL].get_l2(Side.BID, depth=None)
        ask = zeta.markets[Asset.SOL].get_l2(Side.ASK, depth=None)
        data = {
            'price': [float(l.price) for l in bid+ask],
            'size': [float(l.size) for l in bid+ask],
            'side': ['bid']*len(bid)+['ask']*len(ask),
            'time': [datetime.fromtimestamp(zeta.clock.account.unix_timestamp)]*len(bid+ask)
        }
        # print(data)
        table.update(data)
        await asyncio.sleep(1)

def update_table():
    loop = asyncio.new_event_loop()
    task = loop.create_task(_update())
    # loop.call_later(60, task.cancel())

    try:
        loop.run_until_complete(task)
    except asyncio.CancelledError:
        print("Stopped streaming")

In [14]:
table.num_rows()

58

In [15]:
from perspective import Aggregate, Sort, Plugin
w = perspective.PerspectiveWidget(
    table,
    plugin=Plugin.YLINE,
    # aggregates={"price": Aggregate.SUM},
    # group_by=["time"],
    # sort=[["time", Sort.ASC]],
    # server=True,
    # limit=10
)

In [16]:
w

PerspectiveWidget(columns=['price', 'size', 'side', 'time'], plugin='Y Line', theme=None)

In [17]:
import threading

thread = threading.Thread(target=update_table)
thread.start()