An asynchronous framework for Python with efficient task pooling that provides simple and flexible management of asynchronous execution of independent tasks.
Taskorbit is an asynchronous framework for managing a queue of asynchronous tasks. Inspired by ideas from Celery, Taskiq, Propan and Aiogram. This framework is based on message brokers. Currently, there is only support for NATS JetStream. An expansion is planned for the future.
The framework allows you to create a powerful service for processing any tasks of any complexity in a short time. Entry is minimal, the development was oriented on beginners in the world of bot building and microservices development.
We can say simply - it is an improved version of Celery, Taskiq, Propan in the style of Aiogram. The differences are in specific requirements. It is important to choose technologies carefully to achieve optimal results.
Documentation is currently being developed and will be available at: https://morington.github.io/taskorbit/
Use the pip tool to install the framework:
pip install taskorbit
Currently, with support for the NATS message broker only, variable installation with the broker is not supported. The library will install the necessary dependencies if required.
Taskorbit currently includes:
- magic_filter - a handy way to enable dynamic signatures, created by the Aiogram developers.
- ormsgpack - a quick way to serialize data.
- nats-py - a standard message broker.
You can read the full example on the repository page: https://github.com/morington/taskorbit/blob/main/examples/base_example.py.
Create a broker object, distpecker object in your asynchronous function, load the configuration and start receiving messages!
# For the example I will not use routers, the dispatcher inherits from routers so can also integrate handlers.
# DON'T DO THIS! USE taskorbit.dispatching.Router !
dp = Dispatcher(max_queue_size=5)
@dp.include_handler(F.metadata.type_event == "Test")
async def handler_test(metadata: Metadata) -> None:
logger.info(f"Handler got the message! Task-{metadata.uuid}")
async def main():
broker = await nats_broker(
{
"url": "nats://localhost:4222",
"stream": "STREAM_NAME",
"subject": "STREAM_NAME.SUBJECT",
"durable": "DURABLE",
}
)
await broker.include_dispatcher(dp)
if __name__ == "__main__":
asyncio.run(main())
At the moment, development is underway on out-of-the-box custom message models. Please wait, the standard Metadata model is currently available for both service messages and task data messages:
class Message(BaseType):
uuid: str
type_event: str
data: Optional[dict] = None
@dataclass
class ServiceMessage(BaseType):
uuid: str
command: Commands
You can send messages to a thread using the pub method. Generate a unique UUID for each message to handle each shuffle:
# Data messages for tasks:
uuid = uuid.uuid4().hex
await broker.pub({"uuid": uuid, "type_event": "TEST_CLASS", "data": {"some_data": 123}}))
# Service messages to work with tasks
# Service messages are not stored in the task pool. It needs to send the UUID it will work with
await broker.pub({"uuid": uuid, "command": Commands.GET_STATUS})
The framework also supports outer-middlewares and inner-middlewares. Middlewares fully support context managers throughout task processing.
Currently, the Filters classes are disabled. Under testing.
Please don't forget to refer to EXAMPLES in the repository structure for help with the framework. Stable examples that have been tested are posted there.
Taskorbit is distributed under the MIT license. Details can be found in the LICENSE file.