Skip to content

SqlAlchemy ORM Async Mode

James Brucker edited this page Jun 25, 2025 · 3 revisions

Example of SqlAlchemy ORM using Async Mode

1. Define URL for an Async Database Connection and Create an Engine

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

# Create an async engine using SQLite. For an in-memory database use "sqlite_aiosqlite:///:memory:" (lots of colons)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"  # or postgresql+asyncpg://...

engine = create_async_engine(
    DATABASE_URL,
    echo=True,  # Set to False in production
    future=True
)

# Create an async session factory.
# Confusing name collision!
# sqlalchemy.ext.asyncio.async_sessionmaker returns a *class*
# of type sqlalchemy.ext.asyncio.session.async_sessionmaker
# It returns a *class* not a session object.
async_session_factory = async_sessionmaker(
    bind=engine,
    class_=AsyncSession,     # Not necessary
    expire_on_commit=False,
    autoflush=False
)
type(async_session_factory)
<class 'sqlalchemy.ext.asyncio.session.async_sessionmaker'>

2. Using the Async Session Factory in Your Code

from typing import AsyncGenerator

# Misleading Name!  This does *NOT* return a session, it returns a AsyncSession generator.
# typing.AsyncGenerator is a Generator that uses "async def" and "yield" to produce objects
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    """Dependency to get an async database session"""
    async with async_session_factory() as session:
        yield session

async def add_users():
    # Get a session
    async with async_session_factory() as session:
        try:
            # Example: Create a new record
            user = User(username="Example", email="example@nowhere.com")
            session.add(user)
            
            # Commit the transaction
            await session.commit()
            
            # Refresh the instance to get any database-generated values
            await session.refresh(user)
            print(user)
            
            # Query records
            result = await session.execute(select(User))
            users = result.scalars().all()
            
            return users
        except Exception:
            # Rollback on error
            await session.rollback()
            raise
        finally:
            # Session is automatically closed when exiting the context manager
            pass

3. Common Async Database Operations

Create

async def create_item(item_data):
    async with async_session_factory() as session:
        new_item = Item(**item_data)
        session.add(new_item)
        await session.commit()
        await session.refresh(new_item)
        return new_item

Read

async def get_item(item_id: int):
    async with async_session_factory() as session:
        result = await session.execute(select(Item).where(Item.id == item_id))
        return result.scalars().first()

Update

async def update_item(item_id: int, update_data: dict):
    async with async_session_factory() as session:
        result = await session.execute(select(Item).where(Item.id == item_id))
        item = result.scalars().first()
        if item:
            for key, value in update_data.items():
                setattr(item, key, value)
            await session.commit()
            await session.refresh(item)
        return item

Delete

async def delete_item(item_id: int):
    async with async_session_factory() as session:
        result = await session.execute(select(Item).where(Item.id == item_id))
        item = result.scalars().first()
        if item:
            await session.delete(item)
            await session.commit()
            return True
        return False

4. Using with FastAPI (Dependency Injection)

To integrate this into FastAPI use dependency injection

from fastapi import Depends

async def get_session() -> AsyncSession:
    async with async_session_factory() as session:
        yield session

@app.post("/items/")
async def create_item(item_data: dict, session: AsyncSession = Depends(get_session)):
    new_item = Item(**item_data)
    session.add(new_item)
    await session.commit()
    await session.refresh(new_item)
    return new_item

Important Notes

  1. Always use await with async session methods (commit, execute, refresh, etc.)
  2. Transactions: Remember to commit or rollback your transactions
  3. Session Lifecycle: Let the context manager (async with) handle session closing
  4. N+1 Problem: Be mindful of lazy loading in async context - use selectinload or similar
  5. Connection Pooling: Configure your engine with appropriate pool settings for production

Clone this wiki locally