# Test run_to_thread argument parsing and behavior
This notebook validates how our run_to_thread (async wrapper over asyncio.to_thread) forwards positional and keyword arguments and propagates exceptions.

In [None]:
from __future__ import annotations
import asyncio
from functools import partial
from services.async_utils import run_to_thread

def add(a, b):
    return a + b

class Acc:
    def __init__(self, base=0):
        self.base = base
    def add(self, x, y=0):
        return self.base + x + y

async def demo():
    print('[pos] add(2,3)=', await run_to_thread(add, 2, 3))
    print('[kw] pow(2, exp=5)=', await run_to_thread(pow, 2, exp=5))
    acc = Acc(10)
    print('[bound] Acc.add(2,y=3)=', await run_to_thread(acc.add, 2, y=3))
    p = partial(add, 10)
    print('[partial] add(10,5)=', await run_to_thread(p, 5))
    f = lambda x, y: x * y
    print('[lambda] 4*7=', await run_to_thread(f, 4, 7))
    try:
        def boom(x): raise ValueError(f'bad {x}')
        await run_to_thread(boom, 42)
    except Exception as e:
        print('[raise] got:', type(e).__name__, e)

await demo()


## Optional: Live API sanity (AgentsApi.get_my_agent)
If you have BEARER_TOKEN in .env, uncomment and run to verify to_thread with SDK calls.

In [None]:
# import os
# from dotenv import load_dotenv
# import openapi_client
# load_dotenv()
# token = os.getenv('BEARER_TOKEN')
# if token:
#     cfg = openapi_client.Configuration(access_token=token)
#     client = openapi_client.ApiClient(cfg)
#     agents = openapi_client.AgentsApi(client)
#     data = await run_to_thread(agents.get_my_agent)
#     print('[agents] ok:', hasattr(data, 'data'))
