Skip to content

Commit

Permalink
fix (#1): Use NullPool with SQLAlchemy in Celery tasks
Browse files Browse the repository at this point in the history
Need to improve this for the future, but at least we don't run into
asyncio issues due to shared connections improperly closed and handled
across celery tasks.
  • Loading branch information
birkjernstrom committed Feb 22, 2023
1 parent e2da37c commit fae98a8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
35 changes: 24 additions & 11 deletions server/polar/postgres.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
from polar.config import settings
from polar.ext.sqlalchemy import sql
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

from polar.config import settings
from polar.ext.sqlalchemy import sql

engine = create_async_engine(settings.postgres_dsn, echo=True)
AsyncSessionLocal = sessionmaker(
engine,
autocommit=False,
autoflush=False,
expire_on_commit=False,
class_=AsyncSession,
)
def create_sessionmaker(is_celery: bool = False) -> sessionmaker:
engine_options = dict(echo=settings.DEBUG)
if is_celery:
# TODO: Change pooling strategy for celery workers.
# In the meantime, we're using NullPool to avoid
# issues with asyncio in Celery.
engine_options.update(dict(poolclass=NullPool))

engine = create_async_engine(settings.postgres_dsn, **engine_options)
return sessionmaker(
engine,
autocommit=False,
autoflush=False,
expire_on_commit=False,
class_=AsyncSession,
)


AsyncSessionLocal = create_sessionmaker()


__all__ = ["sql", "engine", "AsyncSessionLocal", "AsyncSession"]
__all__ = ["sql", "AsyncSessionLocal", "AsyncSession"]
6 changes: 3 additions & 3 deletions server/polar/tasks/github/webhook.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import Any

import structlog

from polar import actions
from polar.clients import github
from polar.models import Issue, Organization, PullRequest, Repository
from polar.platforms import Platforms
from polar.postgres import AsyncSession
Expand All @@ -13,6 +10,9 @@
from polar.schema.repository import CreateRepository
from polar.worker import asyncify_task, task

from polar import actions
from polar.clients import github

log = structlog.get_logger()


Expand Down
6 changes: 4 additions & 2 deletions server/polar/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

from asgiref.sync import async_to_sync
from celery import Celery

from polar.config import settings
from polar.postgres import AsyncSessionLocal
from polar.postgres import create_sessionmaker

app = Celery(
"polar", backend=settings.CELERY_BACKEND_URL, broker=settings.CELERY_BROKER_URL
Expand All @@ -20,6 +19,9 @@
ReturnValue = TypeVar("ReturnValue")


AsyncSessionLocal = create_sessionmaker(is_celery=True)


def asyncify_task(
with_session: bool = False,
) -> Callable[[Callable[Params, ReturnValue]], Callable[Params, ReturnValue]]:
Expand Down

0 comments on commit fae98a8

Please sign in to comment.