Skip to content

tastyware/streaq

Repository files navigation

Docs PyPI Downloads Release)

streaQ

Fast, async, type-safe distributed task queue via Redis streams

Features

  • Up to 5x faster than arq
  • Strongly typed
  • 95%+ unit test coverage
  • Comprehensive documentation
  • Support for delayed/scheduled tasks
  • Cron jobs
  • Task middleware
  • Task dependency graph
  • Pipelining
  • Priority queues
  • Support for synchronous tasks (run in separate threads)
  • Redis Sentinel support for production
  • Built-in web UI for monitoring tasks
  • Built with structured concurrency on anyio

Installation

$ pip install streaq

Getting started

To start, you'll need to create a Worker object:

from streaq import Worker

worker = Worker(redis_url="redis://localhost:6379")

You can then register async tasks with the worker like this:

import asyncio

@worker.task()
async def sleeper(time: int) -> int:
    await asyncio.sleep(time)
    return time

@worker.cron("* * * * mon-fri")  # every minute on weekdays
async def cronjob() -> None:
    print("Nobody respects the spammish repetition!")

Finally, let's queue up some tasks:

await sleeper.enqueue(3)
# enqueue returns a task object that can be used to get results/info
task = await sleeper.enqueue(1).start(delay=3)
print(await task.info())
print(await task.result(timeout=5))

Putting this all together gives us example.py. Let's spin up a worker:

$ streaq example.worker

and queue up some tasks like so:

$ python example.py

Let's see what the output looks like:

[INFO] 02:14:30: starting worker 3265311d for 2 functions
[INFO] 02:14:35: task cf0c55387a214320bd23e8987283a562 → worker 3265311d
[INFO] 02:14:38: task cf0c55387a214320bd23e8987283a562 ← 3
[INFO] 02:14:40: task 1de3f192ee4a40d4884ebf303874681c → worker 3265311d
[INFO] 02:14:41: task 1de3f192ee4a40d4884ebf303874681c ← 1
[INFO] 02:15:00: task 2a4b864e5ecd4fc99979a92f5db3a6e0 → worker 3265311d
Nobody respects the spammish repetition!
[INFO] 02:15:00: task 2a4b864e5ecd4fc99979a92f5db3a6e0 ← None
TaskInfo(fn_name='sleeper', enqueue_time=1751508876961, tries=0, scheduled=datetime.datetime(2025, 7, 3, 2, 14, 39, 961000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
TaskResult(fn_name='sleeper', enqueue_time=1751508876961, success=True, result=1, start_time=1751508880500, finish_time=1751508881503, tries=1, worker_id='ca5bd9eb')

For more examples, check out the documentation.

About

Fast, async, type-safe distributed task queue via Redis streams

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •