Skip to content

Thread-safe, mixed-threading-and-asyncio, producer-consumer queue for Python

License

Notifications You must be signed in to change notification settings

kaelzhang/python-newt

Repository files navigation

Thread-safe, mixed-threading-and-asyncio, producer-consumer queue for Python.

Heavily based on janus, but newt lazily initializes event loop which makes the queue much more flexible:

  • newt.Queue could be initialized outside subthread or coroutine
  • supports information exchange between a thread and a coroutine
  • ensures thread-safety

Install

$ pip install newt

Usage

Suppose there is a threaded target function which produces items, and a coroutine which consumes items.

from newt import Queue


def threaded(sync_queue):
    for i in range(100):
        sync_queue.put(i)
    sync_queue.join()

sync_queue follows the interface of Python built-in synchronized queue class

async def coroutine(async_queue):
    for i in range(100):
        assert await async_queue.get() == i
        async_queue.task_done()

async_queue follows the vanilla Python asyncio.Queue

Thread in an executor -> Coroutine

The following example shows how to produce item in a thread which executed in the executor, and consume the item in a coroutine.

import asyncio

loop = asyncio.get_event_loop()


async def main():
    future = loop.run_in_executor(None, threaded, queue.sync_queue)
    await coroutine(queue.async_queue)
    await future

    queue.close()
    await queue.wait_closed()

loop.run_until_complete(main())

Normal thread -> Coroutine

newt.Queue also supports to produce item in a normal threading,

loop = asyncio.get_event_loop()


async def main():
    await coroutine(queue.async_queue)
    queue.close()
    await queue.wait_closed()

t = threading.Thread(target=threaded, args=(queue.sync_queue,))
t.start()

loop.run_until_complete(main())

License

MIT

About

Thread-safe, mixed-threading-and-asyncio, producer-consumer queue for Python

Topics

Resources

License

Stars

Watchers

Forks

Packages