In [14]:
import os
import time
import json
from dataclasses import dataclass
from typing import Optional, Dict, Any

import requests

In [15]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [16]:
DATA_API: str = "https://data-api.polymarket.com"
GAMMA: str = "https://gamma-api.polymarket.com"
CLOB: str = "https://clob.polymarket.com"

In [17]:
import json

slug = "btc-updown-15m-1766302200"
url = f"https://gamma-api.polymarket.com/markets/slug/{slug}"

mkt = requests.get(url).json()

# clobTokenIds is sometimes a JSON-encoded string, sometimes already a list
clob_ids = mkt["clobTokenIds"]
if isinstance(clob_ids, str):
    clob_ids = json.loads(clob_ids)

# outcomes is sometimes a JSON-encoded string, sometimes already a list
outcomes = mkt["outcomes"]
if isinstance(outcomes, str):
    outcomes = json.loads(outcomes)

token_map = dict(zip(outcomes, clob_ids))
print(token_map)


{'Up': '65746565293819997775300173478269020460544951341535209461919854133553859025488', 'Down': '42751939750927982695925613318616870696027411162171797959002334921720792035718'}


In [18]:
import time
import json
from dataclasses import dataclass
from typing import Dict, Any, List, Optional, Tuple
import requests

DATA_API = "https://data-api.polymarket.com"

ASSETS = {
    "Up":   "65746565293819997775300173478269020460544951341535209461919854133553859025488",
    "Down": "42751939750927982695925613318616870696027411162171797959002334921720792035718",
}

@dataclass
class TradeEvent:
    timestamp: int          # unix seconds (per docs)
    side: str               # BUY / SELL
    asset: str              # asset id string
    price: float
    size: float
    usdc_size: Optional[float]
    tx_hash: Optional[str]

def _trade_key(t: Dict[str, Any]) -> Tuple:
    """
    Dedup key. Prefer transactionHash if present; otherwise fall back to a composite key.
    """
    tx = t.get("transactionHash") or t.get("txHash") or None
    if tx:
        return ("tx", tx)
    return (
        "k",
        int(t["timestamp"]),
        t.get("asset"),
        t.get("side"),
        float(t.get("price", 0)),
        float(t.get("size", 0)),
    )

def fetch_trades(session: requests.Session, *, user: str,
                 condition_id: Optional[str] = None,
                 event_slug: Optional[str] = None,
                 limit: int = 200,
                 after_ts: Optional[int] = None) -> List[Dict[str, Any]]:
    """
    Fetch trades for a user, optionally filtered.
    Docs show user/proxyWallet, conditionId/eventSlug, side, etc. :contentReference[oaicite:6]{index=6}
    """
    params: Dict[str, Any] = {"user": user, "limit": limit}

    # Use whichever filter you have that the endpoint supports in your environment.
    # conditionId is the most canonical filter for “this market”.
    if condition_id:
        params["conditionId"] = condition_id
    if event_slug:
        params["eventSlug"] = event_slug

    # Some variants support time filters. If your call errors, remove this and just dedupe locally.
    if after_ts is not None:
        params["after"] = after_ts  # if unsupported, server will ignore or error

    r = session.get(f"{DATA_API}/trades", params=params, timeout=20)
    r.raise_for_status()
    data = r.json()
    # Some endpoints return list; some wrap in {"data": [...]}.
    if isinstance(data, dict) and "data" in data:
        return data["data"]
    if isinstance(data, list):
        return data
    raise ValueError(f"Unexpected trades response shape: {type(data)}")

def fetch_positions(session: requests.Session, *, user: str,
                    condition_id: Optional[str] = None,
                    event_slug: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    Fetch current positions for a user. :contentReference[oaicite:7]{index=7}
    """
    params: Dict[str, Any] = {"user": user}
    if condition_id:
        params["conditionId"] = condition_id
    if event_slug:
        params["eventSlug"] = event_slug

    r = session.get(f"{DATA_API}/positions", params=params, timeout=20)
    r.raise_for_status()
    data = r.json()
    if isinstance(data, dict) and "data" in data:
        return data["data"]
    if isinstance(data, list):
        return data
    raise ValueError(f"Unexpected positions response shape: {type(data)}")

def normalize_trade(t: Dict[str, Any]) -> TradeEvent:
    return TradeEvent(
        timestamp=int(t["timestamp"]),
        side=str(t["side"]).upper(),
        asset=str(t["asset"]),
        price=float(t["price"]),
        size=float(t["size"]),
        usdc_size=(float(t["usdcSize"]) if t.get("usdcSize") is not None else None),
        tx_hash=t.get("transactionHash") or t.get("txHash"),
    )

def apply_trade_to_position(pos: Dict[str, float], te: TradeEvent) -> None:
    """
    Positions tracked in contract units per asset_id.
    BUY increases holdings; SELL decreases.
    """
    delta = te.size if te.side == "BUY" else -te.size
    pos[te.asset] = pos.get(te.asset, 0.0) + delta

def run_tracker(
    *,
    bot_wallet: str,
    condition_id: Optional[str] = None,
    event_slug: Optional[str] = None,
    poll_seconds: float = 2.0,
    positions_poll_every: int = 20,
    out_jsonl_path: str = "bot_fills_with_position.jsonl",
):
    session = requests.Session()

    # Local reconstructed position (only for the assets we care about)
    position: Dict[str, float] = {ASSETS["Up"]: 0.0, ASSETS["Down"]: 0.0}

    seen = set()
    last_seen_ts: Optional[int] = None
    iter_count = 0

    # Optional: initialize from current positions snapshot (best-effort)
    try:
        cur = fetch_positions(session, user=bot_wallet, condition_id=condition_id, event_slug=event_slug)
        # Try to seed only our two assets; schema varies so be defensive.
        for p in cur:
            asset = str(p.get("asset") or p.get("token") or "")
            if asset in position:
                # size/position fields differ across versions
                qty = p.get("size") or p.get("quantity") or p.get("position") or 0
                position[asset] = float(qty)
        print("Seeded from /positions:", position)
    except Exception as e:
        print("Could not seed from /positions (ok):", e)

    while True:
        iter_count += 1
        try:
            trades_raw = fetch_trades(
                session,
                user=bot_wallet,
                condition_id=condition_id,
                event_slug=event_slug,
                limit=200,
                after_ts=last_seen_ts,
            )
        except Exception as e:
            print("fetch_trades error:", e)
            time.sleep(min(10.0, poll_seconds * 2))
            continue

        # Filter to our two assets (Up/Down)
        trades_raw = [t for t in trades_raw if str(t.get("asset")) in position]

        # Sort oldest->newest so replay is consistent
        trades_raw.sort(key=lambda x: int(x["timestamp"]))

        new_events = 0
        with open(out_jsonl_path, "a") as f:
            for t in trades_raw:
                k = _trade_key(t)
                if k in seen:
                    continue
                seen.add(k)

                te = normalize_trade(t)
                apply_trade_to_position(position, te)

                snapshot = {
                    "timestamp": te.timestamp,
                    "side": te.side,
                    "asset": te.asset,
                    "price": te.price,
                    "size": te.size,
                    "usdcSize": te.usdc_size,
                    "txHash": te.tx_hash,
                    "pos_up": position[ASSETS["Up"]],
                    "pos_down": position[ASSETS["Down"]],
                    "slug": event_slug,
                    "conditionId": condition_id,
                }
                f.write(json.dumps(snapshot) + "\n")
                new_events += 1
                last_seen_ts = max(last_seen_ts or te.timestamp, te.timestamp)

        if new_events:
            print(f"+{new_events} trades | pos_up={position[ASSETS['Up']]:.4f} pos_down={position[ASSETS['Down']]:.4f}")

        # Periodic reconciliation against /positions (optional but recommended)
        if iter_count % positions_poll_every == 0:
            try:
                cur = fetch_positions(session, user=bot_wallet, condition_id=condition_id, event_slug=event_slug)
                # Extract our two assets if present
                cur_map = {}
                for p in cur:
                    asset = str(p.get("asset") or p.get("token") or "")
                    if asset in position:
                        qty = p.get("size") or p.get("quantity") or p.get("position") or 0
                        cur_map[asset] = float(qty)
                if cur_map:
                    print("reconcile /positions:", cur_map, "local:", position)
            except Exception as e:
                print("fetch_positions error:", e)

        time.sleep(poll_seconds)

# Example usage:
# run_tracker(
#   bot_wallet="0xBOTPROXYWALLET...",
#   condition_id="0x....",          # best if you have it
#   event_slug="btc-updown-15m-1766301300",
# )

In [22]:
run_tracker(
    bot_wallet = "0x6031b6eed1c97e853c6e0f03ad3ce3529351f96d",
    condition_id = "0x4f563c18d100afd42f6a28d91ee7589e04fbce81c1de193ca17a4c2995da9ae5",
    event_slug = "btc-updown-15m-1766301300"
)

Seeded from /positions: {'65746565293819997775300173478269020460544951341535209461919854133553859025488': 2168.037785, '42751939750927982695925613318616870696027411162171797959002334921720792035718': 2102.866292}


TypeError: apply_trade_to_position() missing 2 required positional arguments: 'asset' and 'size'

In [20]:
import asyncio
import time
import json
from typing import Dict, Any, List, Optional, Tuple
import requests
import pandas as pd

DATA_API = "https://data-api.polymarket.com"

ASSETS = {
    "Up":   "65746565293819997775300173478269020460544951341535209461919854133553859025488",
    "Down": "42751939750927982695925613318616870696027411162171797959002334921720792035718",
}

SLUG = "btc-updown-15m-1766302200"

# ---------- Rate limiter (token bucket) ----------
class TokenBucket:
    """
    Approximates "X requests per 10s" by refilling tokens continuously.
    Good enough to stay under limits with a buffer.
    """
    def __init__(self, capacity: float, refill_per_sec: float):
        self.capacity = float(capacity)
        self.refill_per_sec = float(refill_per_sec)
        self.tokens = float(capacity)
        self.last = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, n: float = 1.0):
        async with self._lock:
            while True:
                now = time.monotonic()
                elapsed = now - self.last
                self.last = now
                self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_per_sec)

                if self.tokens >= n:
                    self.tokens -= n
                    return

                # Sleep just enough to earn the missing tokens
                missing = n - self.tokens
                sleep_s = missing / self.refill_per_sec if self.refill_per_sec > 0 else 0.1
                await asyncio.sleep(max(0.001, sleep_s))


# ---------- Helpers ----------
def trade_key(t: Dict[str, Any]) -> Tuple:
    tx = t.get("transactionHash") or t.get("txHash")
    if tx:
        return ("tx", tx)
    return (
        "k",
        int(t.get("timestamp", 0)),
        str(t.get("asset", "")),
        str(t.get("side", "")).upper(),
        float(t.get("price", 0.0)),
        float(t.get("size", 0.0)),
    )

def fetch_json(session: requests.Session, url: str, params: Dict[str, Any]) -> Any:
    r = session.get(url, params=params, timeout=20)
    r.raise_for_status()
    return r.json()

def normalize_list_payload(payload: Any) -> List[Dict[str, Any]]:
    if isinstance(payload, list):
        return payload
    if isinstance(payload, dict) and "data" in payload and isinstance(payload["data"], list):
        return payload["data"]
    raise ValueError(f"Unexpected response shape: {type(payload)}")

def apply_trade_to_position(pos: Dict[str, float], side: str, asset: str, size: float):
    side = side.upper()
    delta = size if side == "BUY" else -size
    pos[asset] = pos.get(asset, 0.0) + delta


# ---------- Main tracker ----------
class PolymarketUserTracker:
    def __init__(
        self,
        bot_wallet: str,
        slug: str,
        assets: Dict[str, str],
        trades_rps: float,
        positions_rps: float,
        trades_limit: int = 200,
        flush_every: int = 2000,  # convert buffer to df every N new trades
    ):
        self.bot_wallet = bot_wallet
        self.slug = slug
        self.assets = assets
        self.asset_set = set(assets.values())

        self.session = requests.Session()

        self.trades_bucket = TokenBucket(capacity=trades_rps * 2, refill_per_sec=trades_rps)
        self.positions_bucket = TokenBucket(capacity=positions_rps * 2, refill_per_sec=positions_rps)

        self.trades_limit = trades_limit
        self.flush_every = flush_every

        self.seen = set()
        self.pos_replayed = {assets["Up"]: 0.0, assets["Down"]: 0.0}

        self.buffer: List[Dict[str, Any]] = []
        self.df = pd.DataFrame()

        self.latest_positions_snapshot: Optional[Dict[str, Any]] = None

    async def poll_trades_loop(self):
        """
        Near-max polling of /trades. Each new fill is appended with replayed position-at-that-trade.
        """
        while True:
            await self.trades_bucket.acquire(1.0)

            try:
                payload = await asyncio.to_thread(
                    fetch_json,
                    self.session,
                    f"{DATA_API}/trades",
                    {
                        "user": self.bot_wallet,
                        "eventSlug": self.slug,   # if your deployment prefers conditionId, swap it in
                        "limit": self.trades_limit,
                    },
                )
                trades = normalize_list_payload(payload)
            except Exception as e:
                # brief backoff on transient issues
                await asyncio.sleep(0.25)
                continue

            # Filter to our two assets and process oldest->newest
            trades = [t for t in trades if str(t.get("asset")) in self.asset_set]
            trades.sort(key=lambda x: int(x.get("timestamp", 0)))

            new_count = 0
            for t in trades:
                k = trade_key(t)
                if k in self.seen:
                    continue
                self.seen.add(k)
                new_count += 1

                ts = int(t["timestamp"])
                side = str(t["side"]).upper()
                asset = str(t["asset"])
                price = float(t["price"])
                size = float(t["size"])
                usdc_size = float(t["usdcSize"]) if t.get("usdcSize") is not None else None
                tx = t.get("transactionHash") or t.get("txHash")

                apply_trade_to_position(self.pos_replayed, side, asset, size)

                row = {
                    "timestamp": ts,
                    "side": side,
                    "asset": asset,
                    "price": price,
                    "size": size,
                    "usdcSize": usdc_size,
                    "txHash": tx,
                    "pos_up_replayed": self.pos_replayed[self.assets["Up"]],
                    "pos_down_replayed": self.pos_replayed[self.assets["Down"]],
                }

                # attach latest polled /positions snapshot info if you want (optional)
                if self.latest_positions_snapshot is not None:
                    row["positions_snapshot_ts"] = self.latest_positions_snapshot.get("snapshot_ts")
                    row["pos_up_snapshot"] = self.latest_positions_snapshot.get("pos_up")
                    row["pos_down_snapshot"] = self.latest_positions_snapshot.get("pos_down")

                self.buffer.append(row)

            # Flush buffer -> DataFrame periodically (avoid per-row df.append which is slow)
            if len(self.buffer) >= self.flush_every:
                self._flush_buffer_to_df()

            # tiny yield so we don't starve event loop if responses are immediate
            await asyncio.sleep(0)

    async def poll_positions_loop(self):
        """
        Poll /positions at configured rate. Useful as a sanity-check snapshot (current positions).
        """
        while True:
            await self.positions_bucket.acquire(1.0)
            try:
                payload = await asyncio.to_thread(
                    fetch_json,
                    self.session,
                    f"{DATA_API}/positions",
                    {"user": self.bot_wallet, "eventSlug": self.slug},
                )
                positions = normalize_list_payload(payload)
            except Exception:
                await asyncio.sleep(0.25)
                continue

            # Pull just our two assets
            cur = {self.assets["Up"]: None, self.assets["Down"]: None}
            for p in positions:
                asset = str(p.get("asset", ""))
                if asset in cur:
                    cur[asset] = float(p.get("size", 0.0))

            self.latest_positions_snapshot = {
                "snapshot_ts": int(time.time()),
                "pos_up": cur[self.assets["Up"]],
                "pos_down": cur[self.assets["Down"]],
            }

            await asyncio.sleep(0)

    def _flush_buffer_to_df(self):
        if not self.buffer:
            return
        chunk = pd.DataFrame(self.buffer)
        self.buffer.clear()
        if self.df.empty:
            self.df = chunk
        else:
            self.df = pd.concat([self.df, chunk], ignore_index=True)
        # Optional: keep it sorted
        self.df.sort_values("timestamp", inplace=True, kind="stable", ignore_index=True)

    def flush(self):
        self._flush_buffer_to_df()

    def save_parquet(self, path: str):
        self.flush()
        self.df.to_parquet(path, index=False)

    def save_csv(self, path: str):
        self.flush()
        self.df.to_csv(path, index=False)


async def main():
    bot_wallet = "0x6031b6eed1c97e853c6e0f03ad3ce3529351f96d"

    # Compute near-max RPS with a buffer
    buffer = 0.90
    trades_rps = (200 / 10) * buffer       # 18 rps
    positions_rps = (150 / 10) * buffer    # 13.5 rps  (you can set lower, e.g. 3.0)

    tracker = PolymarketUserTracker(
        bot_wallet=bot_wallet,
        slug=SLUG,
        assets=ASSETS,
        trades_rps=trades_rps,
        positions_rps=positions_rps,
        trades_limit=200,
        flush_every=1000,
    )

    tasks = [
        asyncio.create_task(tracker.poll_trades_loop()),
        asyncio.create_task(tracker.poll_positions_loop()),
    ]

    # Example: run and periodically print / save
    try:
        while True:
            await asyncio.sleep(5)
            tracker.flush()
            if not tracker.df.empty:
                print("rows:", len(tracker.df), "last_ts:", tracker.df["timestamp"].iloc[-1])
                # tracker.save_parquet("bot_trades.parquet")
    finally:
        for t in tasks:
            t.cancel()
        tracker.flush()
        tracker.save_parquet("bot_trades.parquet")



In [21]:
await main()

rows: 66 last_ts: 1766302754
rows: 66 last_ts: 1766302754
rows: 66 last_ts: 1766302754
rows: 66 last_ts: 1766302754
rows: 66 last_ts: 1766302754
rows: 66 last_ts: 1766302754


CancelledError: 