Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to register tortoise on a celery app? #805

Closed
lgarcia11 opened this issue Jun 26, 2021 · 9 comments
Closed

How to register tortoise on a celery app? #805

lgarcia11 opened this issue Jun 26, 2021 · 9 comments

Comments

@lgarcia11
Copy link

Can Tortoise ORM be used from a celery task, if so what is the best way of registering it?

@long2ice
Copy link
Member

no, which don't support asyncio

@mookrs
Copy link

mookrs commented Jun 26, 2021

@lgarcia11 I use these tricks for my Celery periodic tasks to use Tortoise ORM:

async def wrap_db_ctx(func: Callable, *args, **kwargs) -> None:
    try:
        await connect_db()
        await func(*args, **kwargs)
    finally:
        await disconnect_db()


def async_to_sync(func: Callable, *args, **kwargs) -> None:
    asyncio.run(wrap_db_ctx(func, *args, **kwargs))


@celery_app.task
def my_task() -> None:
    async_to_sync(my_async_func)

Just register Tortoise ORM in connect_db() as the document says.

@long2ice
Copy link
Member

Which can be a solution

@MLBZ521
Copy link

MLBZ521 commented Feb 13, 2023

I was trying to do the same thing and this ended up working for me:

import asyncio

from celery import current_app as proj_celery_app
from celery.schedules import crontab

from tortoise import Tortoise

from proj import settings
from proj.tasks import task


async def create_celery(celery_app=proj_celery_app):

    celery_app.config_from_object(settings.celery.settings)
    celery_app.conf.update(task_acks_late=True)
    celery_app.conf.update(task_default_priority=5)
    celery_app.conf.update(task_queue_max_priority=10)
    celery_app.conf.beat_schedule = {
        # Executes daily at midnight.
        "backend_cleanup": {
            "task": "proj:cache_items",
            "schedule": crontab(minute=0, hour=0),
            "args": (),
            "options": {
                "priority": 10,
                "queue": "proj"
            }
        }
    }

    await Tortoise.init(config=settings.db.TORTOISE_CONFIG)

    return celery_app


celery_app = asyncio.run(create_celery(celery_app=proj_celery_app))

And then starting the services:
python3 -m celery -A main.celery worker --loglevel=debug
python3 -m celery -A main.celery beat -s ./Database/db --loglevel=debug

In short, adding await Tortoise.init(config=settings.db.TORTOISE_CONFIG) to the function that initializes the Celery App and decorating my Celery Task functions with @shared_task allowed everything to work.

If someone else comes across this in a search, hopefully this will help.

I provided additional details in this StackOverflow answer.

@jiangying000
Copy link
Contributor

@lgarcia11 I use these tricks for my Celery periodic tasks to use Tortoise ORM:

async def wrap_db_ctx(func: Callable, *args, **kwargs) -> None:
    try:
        await connect_db()
        await func(*args, **kwargs)
    finally:
        await disconnect_db()


def async_to_sync(func: Callable, *args, **kwargs) -> None:
    asyncio.run(wrap_db_ctx(func, *args, **kwargs))


@celery_app.task
def my_task() -> None:
    async_to_sync(my_async_func)

Just register Tortoise ORM in connect_db() as the document says.

    await connect_db()
    await disconnect_db()

how to write these functions

@jiangying000
Copy link
Contributor

async def connect_db() -> None:
await Tortoise.init(config=TORTOISE_ORM)

async def disconnect_db() -> None:
await Tortoise.close_connections()

@ulgens
Copy link

ulgens commented Mar 7, 2024

@MLBZ521 Thanks for that answer, it saved my day.

I improved it by using run_async:

def create_celery():
    celery_app = Celery(
        __name__,
        broker=settings.CELERY_BROKER_URL,
        backend=settings.CELERY_RESULT_BACKEND,
    )
    run_async(Tortoise.init(config=TORTOISE_ORM_CONFIG))
    run_async(Tortoise.generate_schemas())

    return celery_app

celery = create_celery()

This way, it doesn't need to be wrapped by asyncio.run(), and tasks can call async code and interact with the ORM using the same method.

@Madi-S
Copy link

Madi-S commented Mar 26, 2024

@MLBZ521 Thanks for that answer, it saved my day.

I improved it by using run_async:

def create_celery():
    celery_app = Celery(
        __name__,
        broker=settings.CELERY_BROKER_URL,
        backend=settings.CELERY_RESULT_BACKEND,
    )
    run_async(Tortoise.init(config=TORTOISE_ORM_CONFIG))
    run_async(Tortoise.generate_schemas())

    return celery_app

celery = create_celery()

This way, it doesn't need to be wrapped by asyncio.run(), and tasks can call async code and interact with the ORM using the same method.

Where did you get run_async()? Or how did you define it?

@ulgens
Copy link

ulgens commented Mar 26, 2024

@Madi-S from tortoise import run_async

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants