# Account Streamer Discovery (TT-29)

Interactive exploration of TastyTrade's Account Streamer WebSocket API.
Validates protocol assumptions before building the AccountStreamer SDK component.

**Key questions to answer:**
1. Can we connect to the Account Streamer WebSocket?
2. What auth token format does it expect?
3. What message formats does it send for positions, balances, orders?
4. How does heartbeat/keepalive work?
5. What happens on disconnect — full state replay or incremental?
6. Do events arrive individually or in batched `results` arrays?

In [None]:
import asyncio
import json
import logging
import os
import time

import pandas as pd
from dotenv import load_dotenv
from IPython.display import Markdown, display
from websockets.asyncio.client import connect
from websockets.exceptions import ConnectionClosed

from tastytrade.accounts import AccountsClient
from tastytrade.config import RedisConfigManager
from tastytrade.connections import Credentials
from tastytrade.connections.requests import AsyncSessionHandler

pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", None)
pd.set_option("display.width", None)
pd.set_option("display.max_colwidth", 80)

# Load .env into os.environ so observability module can find Grafana credentials
load_dotenv("/workspace/.env", override=True)

# Initialize observability — routes logs to stdout (JSON) + Grafana Cloud (OTLP)
if os.getenv("GRAFANA_CLOUD_TOKEN"):
    from tastytrade.common.observability import init_observability
    init_observability()
    print("Observability: Grafana Cloud enabled")
else:
    logging.basicConfig(level=logging.INFO)
    print("Observability: Local only (no GRAFANA_CLOUD_TOKEN)")

# Set OBFUSCATE_ACCOUNTS=true in .env to mask account numbers in output
_OBFUSCATE = os.getenv("OBFUSCATE_ACCOUNTS", "false").lower() == "true"

# Global WebSocket reference — managed by ensure_connected()
ws = None


def mask(account_number: str) -> str:
    """Mask account number for safe display, showing only last 4 chars."""
    if not _OBFUSCATE or len(account_number) <= 4:
        return account_number
    return "***" + account_number[-4:]


def mask_json(text: str, account_numbers: list[str]) -> str:
    """Mask all known account numbers in a raw JSON string."""
    if not _OBFUSCATE:
        return text
    for acct in account_numbers:
        text = text.replace(acct, mask(acct))
    return text


def mask_token(token: str) -> str:
    """Mask auth token for safe display."""
    if len(token) <= 8:
        return "***"
    return token[:4] + "..." + token[-4:]


async def ensure_connected():
    """Reconnect and re-subscribe if the WebSocket is closed.

    Notebooks have human delay between cells — the server's 60s heartbeat
    timeout will close idle connections. This helper makes each cell
    self-contained instead of depending on prior cells staying alive.
    """
    global ws
    needs_reconnect = ws is None or ws.close_code is not None

    if needs_reconnect:
        if ws is not None:
            print("(reconnecting — server closed idle connection)")
        ws = await connect(ws_url)
        sub = {
            "action": "connect",
            "value": [primary_account],
            "auth-token": session_token,
            "request-id": int(time.time()) % 10000,
        }
        await ws.send(json.dumps(sub))
        resp = await asyncio.wait_for(ws.recv(), timeout=10)
        resp_data = json.loads(resp)
        if resp_data.get("status") != "ok":
            raise ConnectionError(f"Subscribe failed: {resp_data}")
        print(f"Connected to {ws_url} (session {resp_data.get('web-socket-session-id')})")
    return ws


print(f"Account obfuscation: {'ON' if _OBFUSCATE else 'OFF'}")

Observability: Grafana Cloud enabled
Account obfuscation: ON


{"asctime": "2026-02-09T23:51:36", "levelname": "INFO", "name": "tastytrade.config.manager", "message": "Initialized 52 variables from .env file in Redis"}
{"asctime": "2026-02-09T23:51:37", "levelname": "INFO", "name": "tastytrade.connections.requests", "message": "Session created successfully"}
{"asctime": "2026-02-09T23:51:37", "levelname": "INFO", "name": "tastytrade.accounts.client", "message": "Fetched 2 accounts"}
{"asctime": "2026-02-09T23:51:37", "levelname": "INFO", "name": "tastytrade.accounts.client", "message": "Fetched 12 positions for account 5WY89822"}
{"asctime": "2026-02-09T23:51:37", "levelname": "INFO", "name": "tastytrade.accounts.client", "message": "Fetched balances for account 5WY89822 \u2014 net_liq=176249.61, cash=40506.72"}


# 1. REST Session Setup

Authenticate via the REST API to obtain the session token.
The Account Streamer uses the **same session token** — no separate `/api-quote-tokens` call needed.

In [2]:
config = RedisConfigManager(env_file="/workspace/.env")
config.initialize(force=True)

credentials = Credentials(config=config, env="Live")
session = await AsyncSessionHandler.create(credentials)
client = AccountsClient(session)

# Extract the session token — this is our auth-token for the WebSocket
session_token = session.session.headers["Authorization"]

# Determine WebSocket URL based on environment
STREAMER_URLS = {
    True: "wss://streamer.cert.tastyworks.com",   # sandbox
    False: "wss://streamer.tastyworks.com",        # production
}
ws_url = STREAMER_URLS[credentials.is_sandbox]

print(f"REST API: {credentials.base_url}")
print(f"Streamer URL: {ws_url}")
print(f"Session token: {mask_token(session_token)}")
print(f"Account: {mask(credentials.account_number)}")
print(f"Environment: {'Sandbox' if credentials.is_sandbox else 'Production'}")

REST API: https://api.tastyworks.com
Streamer URL: wss://streamer.tastyworks.com
Session token: r8Zv...2g+C
Account: ***9822
Environment: Production


# 2. Fetch Current State via REST (Baseline)

Before connecting the streamer, capture current positions and balances via REST.
This gives us a baseline to compare streamer events against.

In [3]:
accounts = await client.get_accounts()
account_numbers = [a.account_number for a in accounts]

# Validate that the configured account exists in the authenticated session
primary_account = credentials.account_number
if not primary_account:
    raise ValueError(
        "No account number configured. "
        f"Set TT_ACCOUNT (live) or TT_SANDBOX_ACCOUNT (sandbox) in .env to one of: "
        f"{[mask(a) for a in account_numbers]}"
    )
if primary_account not in account_numbers:
    raise ValueError(
        f"Configured account {mask(primary_account)} not found in authenticated session. "
        f"Available accounts: {[mask(a) for a in account_numbers]}. "
        f"Update TT_ACCOUNT or TT_SANDBOX_ACCOUNT in .env."
    )

print(f"Account {mask(primary_account)} verified against session")

# Current positions
positions = await client.get_positions(primary_account)
display(Markdown(f"**{len(positions)} positions** in {mask(primary_account)}"))
if positions:
    pos_data = [{
        "Symbol": p.symbol,
        "Type": p.instrument_type.value,
        "Qty": p.quantity,
        "Direction": p.quantity_direction.value,
    } for p in positions]
    display(pd.DataFrame(pos_data))

# Current balance
balance = await client.get_balances(primary_account)
display(Markdown(f"**Balance:** Cash=${balance.cash_balance}, Net Liq=${balance.net_liquidating_value}"))

Account ***9822 verified against session


**12 positions** in ***9822

Unnamed: 0,Symbol,Type,Qty,Direction
0,./6EM6 EUUJ6 260403C1.225,Future Option,1.0,Short
1,./CLJ6 LOJ6 260317P56,Future Option,1.0,Short
2,./CLJ6 LOJ6 260317C85,Future Option,1.0,Short
3,./MESM6EX3H6 260320P6450,Future Option,3.0,Short
4,MCD 260320P00305000,Equity Option,1.0,Short
5,MCD 260320P00295000,Equity Option,1.0,Long
6,./MESM6EX3H6 260320C7275,Future Option,3.0,Short
7,./6EM6 EUUJ6 260403P1.16,Future Option,1.0,Short
8,CSCO 260227C00078000,Equity Option,1.0,Short
9,SPY,Equity,100.29,Long


**Balance:** Cash=$40506.72, Net Liq=$176249.61

# 3. Connect to Account Streamer WebSocket

Open a raw WebSocket connection to the streamer endpoint.

**Protocol from TastyTrade API docs:**
- Connect message: `{"action": "connect", "value": [accounts], "auth-token": token}`
- Response: `{"status": "ok", "action": "connect", "web-socket-session-id": "...", ...}`

**Questions to validate:**
- Does the session token work as-is, or does it need a `Bearer ` prefix?
- What does the connect response look like?
- Does connecting immediately trigger any initial state events?

In [4]:
# Open WebSocket connection and subscribe
ws = await ensure_connected()

print(f"Sent connect for account {mask(primary_account)}")
print(f"\nTo inspect the raw connect response, see ensure_connected() in Setup cell.")

Connected to wss://streamer.tastyworks.com (session 4d02ac05)
Sent connect for account ***9822

To inspect the raw connect response, see ensure_connected() in Setup cell.


## 3a. Auth Token Format Investigation

If the connect above failed, try with `Bearer ` prefix.
The unofficial Python SDK uses `Bearer <token>`, while the JS SDK uses raw token.
Run this cell only if needed.

In [5]:
# Only run this cell if the connect above returned an error
# Try with Bearer prefix
connect_msg_bearer = {
    "action": "connect",
    "value": [primary_account],
    "auth-token": f"Bearer {session_token}",
    "request-id": 2,
}

await ws.send(json.dumps(connect_msg_bearer))
response = await asyncio.wait_for(ws.recv(), timeout=10)
bearer_response = json.loads(response)
print("Bearer prefix response:")
print(mask_json(json.dumps(bearer_response, indent=2), account_numbers))

Bearer prefix response:
{
  "status": "error",
  "action": "connect",
  "web-socket-session-id": "4d02ac05",
  "message": "failed"
}


# 4. Heartbeat Protocol

**From API docs:**
- Min interval: 2 seconds
- Max interval: 60 seconds (server disconnects if exceeded)
- JS SDK default: 20 seconds
- DXLinkManager uses: 30 seconds

**Test:** Send a heartbeat and observe the response format.

In [6]:
ws = await ensure_connected()

# Send heartbeat
heartbeat_msg = {
    "action": "heartbeat",
    "auth-token": session_token,
    "request-id": 10,
}

await ws.send(json.dumps(heartbeat_msg))
response = await asyncio.wait_for(ws.recv(), timeout=10)
heartbeat_response = json.loads(response)
print("Heartbeat response:")
print(json.dumps(heartbeat_response, indent=2))

(reconnecting — server closed idle connection)
Connected to wss://streamer.tastyworks.com (session a8cf98eb)
Heartbeat response:
{
  "status": "ok",
  "action": "heartbeat",
  "web-socket-session-id": "a8cf98eb",
  "request-id": 10
}


# 5. Listen for Events

Listen for incoming events for a fixed duration.
Events arrive when positions, balances, or orders change.

**Expected event format (from API docs):**
```json
{"type": "CurrentPosition", "data": {...}, "timestamp": 1688595114405}
```

**Known notification types:**
- `AccountBalance` — balance changes
- `CurrentPosition` — position updates
- `Order` — order status changes
- `ComplexOrder` — multi-leg order updates
- `ExternalTransaction` — deposits/withdrawals
- `TradingStatus` — account trading status changes

**Batch format:** Events may arrive in `{"results": [{...}, {...}]}` arrays.

**Note:** In sandbox during quiet periods, you may not receive events.
To trigger events, place a trade in the sandbox while this cell is running.

In [None]:
ws = await ensure_connected()
streamer_logger = logging.getLogger("tastytrade.account_streamer")

# Listen for events with periodic heartbeats
LISTEN_DURATION = 60  # seconds
HEARTBEAT_INTERVAL = 20  # seconds (JS SDK default)

events_received = []
start_time = time.time()
last_heartbeat = start_time
request_id = 100

print(f"Listening for {LISTEN_DURATION}s (heartbeat every {HEARTBEAT_INTERVAL}s)...")
print("Tip: Place a trade in sandbox to trigger events.\n")


def log_account_event(event: dict) -> None:
    """Log account streamer event to Grafana via the observability pipeline."""
    etype = event.get("type", "unknown")
    data = event.get("data", {})
    ts = event.get("timestamp", "")
    seq = event.get("ws-sequence", "")
    account = data.get("account-number", "unknown")

    if etype == "AccountBalance":
        streamer_logger.info(
            "AccountBalance event — account=%s net_liq=%s cash=%s",
            account,
            data.get("net-liquidating-value", "?"),
            data.get("cash-balance", "?"),
        )
    elif etype == "CurrentPosition":
        streamer_logger.info(
            "CurrentPosition event — account=%s symbol=%s qty=%s direction=%s",
            account,
            data.get("symbol", "?"),
            data.get("quantity", "?"),
            data.get("quantity-direction", "?"),
        )
    elif etype == "Order":
        streamer_logger.info(
            "Order event — account=%s status=%s type=%s symbol=%s size=%s",
            account,
            data.get("status", "?"),
            data.get("order-type", "?"),
            data.get("underlying-symbol", "?"),
            data.get("size", "?"),
        )
    else:
        streamer_logger.info(
            "Account event — type=%s account=%s timestamp=%s",
            etype, account, ts,
        )


try:
    while time.time() - start_time < LISTEN_DURATION:
        # Send heartbeat if interval elapsed
        now = time.time()
        if now - last_heartbeat >= HEARTBEAT_INTERVAL:
            request_id += 1
            hb = {"action": "heartbeat", "auth-token": session_token, "request-id": request_id}
            await ws.send(json.dumps(hb))
            last_heartbeat = now
            elapsed = int(now - start_time)
            print(f"  [{elapsed}s] Heartbeat sent (request-id={request_id})")

        # Try to receive a message (non-blocking with short timeout)
        try:
            raw = await asyncio.wait_for(ws.recv(), timeout=1.0)
            msg = json.loads(raw)
            elapsed = int(time.time() - start_time)

            # Check if this is a heartbeat response or an event
            if msg.get("action") == "heartbeat":
                print(f"  [{elapsed}s] Heartbeat response OK")
            elif "results" in msg:
                # Batched events
                for event in msg["results"]:
                    events_received.append(event)
                    log_account_event(event)
                    etype = event.get("type", "unknown")
                    ts = event.get("timestamp", "")
                    print(f"  [{elapsed}s] EVENT (batch): {etype} @ {ts}")
            elif "type" in msg:
                # Single event
                events_received.append(msg)
                log_account_event(msg)
                etype = msg.get("type", "unknown")
                ts = msg.get("timestamp", "")
                print(f"  [{elapsed}s] EVENT: {etype} @ {ts}")
            else:
                # Unknown message format
                print(f"  [{elapsed}s] UNKNOWN: {json.dumps(msg)[:200]}")
        except asyncio.TimeoutError:
            pass  # No message within timeout, continue loop
except ConnectionClosed as e:
    print(f"\nConnection closed during listen: {e}")

print(f"\nDone. Received {len(events_received)} events in {int(time.time() - start_time)}s.")

# 6. Inspect Received Events

Examine the raw event data to understand field names, types, and structure.
This validates our Pydantic model assumptions.

In [None]:
if events_received:
    # Group by event type
    by_type = {}
    for e in events_received:
        etype = e.get("type", "unknown")
        by_type.setdefault(etype, []).append(e)

    display(Markdown(f"**{len(events_received)} events across {len(by_type)} types:**"))
    for etype, events in by_type.items():
        print(f"  {etype}: {len(events)} events")

    # Show first event of each type with full structure
    for etype, events in by_type.items():
        display(Markdown(f"### {etype} (sample)"))
        sample = events[0]
        raw = json.dumps(sample, indent=2, default=str)
        print(mask_json(raw, account_numbers))

        # List all field names and types in the data payload
        data = sample.get("data", {})
        if data:
            display(Markdown("**Fields in `data`:**"))
            field_info = [{
                "Field": k,
                "Type": type(v).__name__,
                "Sample": str(v)[:60],
            } for k, v in data.items()]
            display(pd.DataFrame(field_info))
else:
    display(Markdown("*No events received during listening period.*"))
    display(Markdown("**To generate events:** Place a trade in the sandbox, then re-run the listener."))

# 7. Model Compatibility Check

Test whether our existing TT-28 Pydantic models can parse streamer event data.
The API docs state that streamer event `data` fields match the REST API response format.
If true, our existing `Position` and `AccountBalance` models should parse them directly.

In [None]:
from tastytrade.accounts.models import AccountBalance, Position

if events_received:
    for e in events_received:
        etype = e.get("type", "")
        data = e.get("data", {})
        ts = e.get("timestamp", "")

        if etype == "CurrentPosition":
            try:
                pos = Position.model_validate(data)
                print(f"Position parsed: {pos.symbol} | {pos.quantity} {pos.quantity_direction.value}")
                print(f"  streamer_symbol={pos.streamer_symbol}")
                print(f"  Fields match REST model.")
            except Exception as ex:
                print(f"Position PARSE FAILED: {ex}")
                print(f"  Raw data keys: {list(data.keys())}")

        elif etype == "AccountBalance":
            try:
                bal = AccountBalance.model_validate(data)
                print(f"Balance parsed: cash=${bal.cash_balance}, net_liq=${bal.net_liquidating_value}")
                print(f"  Fields match REST model.")
            except Exception as ex:
                print(f"Balance PARSE FAILED: {ex}")
                print(f"  Raw data keys: {list(data.keys())}")

        elif etype == "Order":
            # Order model not yet built — just inspect the structure
            print(f"Order event (no model yet): {json.dumps(data, indent=2, default=str)[:500]}")

        else:
            print(f"Unhandled event type: {etype}")
else:
    display(Markdown("*No events to validate. Run the listener first (Section 5).*"))

# 8. Initial State Delivery Test

**Key question:** Does the Account Streamer deliver full position/balance state on initial connect,
or only subsequent changes?

Test: Open a fresh connection and immediately listen for any initial state events.

In [None]:
# Close any existing connection and open fresh one
if ws is not None and ws.close_code is None:
    await ws.close()

ws = await connect(ws_url)
print(f"Fresh WebSocket connected to {ws_url}")

# Subscribe
connect_msg2 = {
    "action": "connect",
    "value": [primary_account],
    "auth-token": session_token,
    "request-id": 200,
}
await ws.send(json.dumps(connect_msg2))

# Collect all messages for 10 seconds right after connect
initial_messages = []
start = time.time()
while time.time() - start < 10:
    try:
        raw = await asyncio.wait_for(ws.recv(), timeout=2.0)
        msg = json.loads(raw)
        initial_messages.append(msg)
        elapsed = round(time.time() - start, 2)

        if msg.get("action"):
            print(f"  [{elapsed}s] Response: action={msg['action']}, status={msg.get('status')}")
        elif "results" in msg:
            for evt in msg["results"]:
                print(f"  [{elapsed}s] Initial event: {evt.get('type')} (batch)")
        elif "type" in msg:
            print(f"  [{elapsed}s] Initial event: {msg['type']}")
        else:
            print(f"  [{elapsed}s] Unknown: {json.dumps(msg)[:200]}")
    except asyncio.TimeoutError:
        pass

initial_events = [m for m in initial_messages if "type" in m or "results" in m]
print(f"\nReceived {len(initial_messages)} messages ({len(initial_events)} events) within 10s of connect.")

if initial_events:
    print("\nInitial state IS delivered on connect.")
else:
    print("\nNo initial state delivered — streamer only sends changes.")

# 9. Disconnect & Reconnect Behavior

Simulate a disconnect and observe reconnection behavior.

**Questions:**
- Does the server send any events on reconnect (state replay)?
- Do we get a new `web-socket-session-id`?
- Is re-subscribing required after reconnect?

In [None]:
# Close current connection
if ws is not None and ws.close_code is None:
    await ws.close()
print("Disconnected.")
await asyncio.sleep(2)

# Reconnect
ws = await connect(ws_url)
print(f"Reconnected to {ws_url}")

# Re-subscribe
reconnect_msg = {
    "action": "connect",
    "value": [primary_account],
    "auth-token": session_token,
    "request-id": 300,
}
await ws.send(json.dumps(reconnect_msg))

# Collect messages for 10 seconds after reconnect
reconnect_messages = []
start = time.time()
while time.time() - start < 10:
    try:
        raw = await asyncio.wait_for(ws.recv(), timeout=2.0)
        msg = json.loads(raw)
        reconnect_messages.append(msg)
        elapsed = round(time.time() - start, 2)

        if msg.get("action"):
            session_id = msg.get("web-socket-session-id", "N/A")
            print(f"  [{elapsed}s] Response: action={msg['action']}, session={session_id}")
        elif "results" in msg:
            for evt in msg["results"]:
                print(f"  [{elapsed}s] Reconnect event: {evt.get('type')} (batch)")
        elif "type" in msg:
            print(f"  [{elapsed}s] Reconnect event: {msg['type']}")
        else:
            print(f"  [{elapsed}s] Unknown: {json.dumps(msg)[:200]}")
    except asyncio.TimeoutError:
        pass

print(f"\nReceived {len(reconnect_messages)} messages within 10s of reconnect.")
reconnect_events = [m for m in reconnect_messages if "type" in m or "results" in m]
if reconnect_events:
    print("State IS replayed on reconnect.")
else:
    print("No state replay on reconnect — must fetch via REST if needed.")

# 10. Protocol Summary

Summarize all findings for the TT-29 implementation.

In [None]:
display(Markdown("""
## Findings

Fill in after running each section above:

| Question | Finding |
|----------|--------|
| **WebSocket URL (sandbox)** | |
| **Auth token format** | Raw session token / Bearer prefix? |
| **Connect response shape** | |
| **Heartbeat response shape** | |
| **Event delivery format** | Individual / Batched `results` / Both? |
| **Initial state on connect?** | Yes / No |
| **State replay on reconnect?** | Yes / No |
| **Event types observed** | |
| **Position data matches REST model?** | Yes / No / Partial |
| **Balance data matches REST model?** | Yes / No / Partial |
| **Extra fields not in our models?** | |
| **Missing fields vs REST?** | |

## Design Implications

- **Heartbeat interval recommendation**: ___ seconds
- **Queue architecture**: One queue per event type (Position, Balance)
- **Orders**: Deferred to separate workstream
- **Redis state store**: Not needed (YAGNI) — re-subscribe from Credentials on reconnect
- **Batch handling**: Parse both individual events and `results` arrays
"""))

# Cleanup

In [None]:
if ws is not None and ws.close_code is None:
    await ws.close()
await session.close()

# Flush logs to Grafana before exit
if os.getenv("GRAFANA_CLOUD_TOKEN"):
    from tastytrade.common.observability import shutdown_observability
    shutdown_observability()

print("All connections closed.")