In [6]:
from bittensor.core.async_subtensor import get_async_subtensor, AsyncSubtensor

s = await get_async_subtensor("archive")

In [23]:
from tqdm import tqdm

head = await s.get_current_block()
blocks = range(head, 0, -1)

for block in tqdm(blocks, desc="Scanning blocks"):
    extrinsics = await s.substrate.get_extrinsics(block_number=block)
    for ext in extrinsics:
        call_function = ext.__dict__["value_serialized"]["call"]["call_function"]
        if call_function == "schedule_swap_coldkey":
            print(block)
            print(ext)
            break

Scanning blocks:   0%|          | 13/6342041 [01:39<13543:54:10,  7.69s/it]


CancelledError: 

In [4]:
all_s = await s.all_subnets()

In [10]:
all_s[0]

DynamicInfo(netuid=0, owner_hotkey='5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM', owner_coldkey='5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM', subnet_name='root', symbol='Τ', tempo=100, last_step=4919910, blocks_since_last_step=1421381, emission=τ0.000000000, alpha_in=τ449,400.112573830, alpha_out=τ5,601,213.245253731, tao_in=τ6,095,627.465499507, price=τ1.000000000, k=2739375669203608490272366101810, is_dynamic=False, alpha_out_emission=τ0.000000000, alpha_in_emission=τ0.000000000, tao_in_emission=τ0.000000000, pending_alpha_emission=τ0.000000000, pending_root_emission=τ0.000000000, network_registered_at=0, subnet_volume=τ23,573,638.696552988, subnet_identity=None, moving_price=0.0)

In [1]:
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
import os
import json
import time
import asyncio
import websockets
import bittensor as bt
from typing import Optional, List, Any, Dict


class KafkaProducerManager:
    """
    Thin wrapper around AIOKafkaProducer with JSON sending helpers and retries.
    """

    def __init__(
        self,
        bootstrap_servers: str,
        topic: str,
        client_id: str = "swap-coldkey-monitor",
    ):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.client_id = client_id
        self._producer: Optional[AIOKafkaProducer] = None

    async def start(self):
        if self._producer is None:
            self._producer = AIOKafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                client_id=self.client_id,
                linger_ms=20,  # small batching without much latency
                acks="all",  # strongest durability for a single-node dev broker
                max_request_size=1024 * 1024,
                value_serializer=lambda v: json.dumps(
                    v, separators=(",", ":"), ensure_ascii=False
                ).encode("utf-8"),
            )
        await self._producer.start()
        bt.logging.info(
            f"✅ Kafka producer connected to {self.bootstrap_servers} (topic={self.topic})"
        )

    async def stop(self):
        if self._producer is not None:
            try:
                await self._producer.stop()
            finally:
                self._producer = None
                bt.logging.info("🛑 Kafka producer stopped")

    async def send_json(
        self, payload: Dict[str, Any], key: Optional[str] = None, retries: int = 3
    ):
        """
        Send a JSON-serializable payload to Kafka with basic retry.
        """
        if self._producer is None:
            raise RuntimeError("Producer not started")

        attempt = 0
        last_err: Optional[Exception] = None
        while attempt <= retries:
            try:
                await self._producer.send_and_wait(
                    self.topic,
                    value=payload,
                    key=(key.encode("utf-8") if key else None),
                )
                return
            except KafkaError as e:
                last_err = e
                backoff = min(0.5 * (2**attempt), 5.0)
                bt.logging.warning(
                    f"⚠️ Kafka send failed (attempt {attempt+1}/{retries+1}): {e}. Retrying in {backoff:.1f}s"
                )
                await asyncio.sleep(backoff)
                attempt += 1
        # If we get here, all retries failed
        bt.logging.error("❌ Kafka send permanently failed", exc_info=last_err)
    


In [3]:
KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9093")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "extrinsics.swap_coldkey")
producer = KafkaProducerManager(
    bootstrap_servers=KAFKA_BOOTSTRAP,
    topic=KAFKA_TOPIC,
    client_id="swap-coldkey-monitor",
)
await producer.start()

In [5]:
await producer.send_json({"x": 23, "y": 1})