Skip to content

phi-friday/timeout-executor

Repository files navigation

timeout-executor

License: MIT github action PyPI version python version

how to install

$ pip install timeout_executor
# or
$ pip install "timeout_executor[uvloop]"

how to use

from __future__ import annotations

import time

import anyio

from timeout_executor import AsyncResult, TimeoutExecutor


def sample_sync_func(x: float) -> str:
    time.sleep(x)
    return "done"


async def sample_async_func(x: float) -> str:
    await anyio.sleep(x)
    return "done"


def main() -> None:
    executor = TimeoutExecutor(2)
    result = executor.apply(sample_sync_func, 10)
    assert isinstance(result, AsyncResult)

    try:
        value = result.result()
    except Exception as exc:
        assert isinstance(exc, TimeoutError)

    result = executor.apply(sample_async_func, 1)
    assert isinstance(result, AsyncResult)
    value = result.result()
    assert value == "done"

    result = executor.apply(lambda: "done")
    assert isinstance(result, AsyncResult)
    value = result.result()
    assert value == "done"


async def async_main() -> None:
    executor = TimeoutExecutor(2)
    result = await executor.delay(sample_sync_func, 10)
    assert isinstance(result, AsyncResult)

    try:
        value = await result.delay()
    except Exception as exc:
        assert isinstance(exc, TimeoutError)

    result = await executor.delay(sample_async_func, 1)
    assert isinstance(result, AsyncResult)
    value = await result.delay()
    assert value == "done"

    result = await executor.delay(lambda: "done")
    assert isinstance(result, AsyncResult)
    value = await result.delay()
    assert value == "done"


if __name__ == "__main__":
    main()
    anyio.run(async_main)

License

MIT, see LICENSE.