Skip to content

SqlAlchemy ORM Async Mode

James Brucker edited this page Jun 25, 2025 · 3 revisions

Example of SqlAlchemy ORM using Async Mode

1. Define URL for an Async Database Connection and Create an Engine

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

# Create an async engine using SQLite. For an in-memory database use "sqlite_aiosqlite:///:memory:" (lots of colons)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"  # or postgresql+asyncpg://...

engine = create_async_engine(
    DATABASE_URL,
    echo=True,  # Set to False in production
    future=True
)

# Create an async session factory
AsyncSessionLocal = async_sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False
)

2. Using the Async Session in Your Code

Here's how to properly use the session to perform database operations:

Basic Usage Pattern

async def get_db() -> AsyncSession:
    """Dependency to get async database session"""
    async with AsyncSessionLocal() as session:
        yield session

async def perform_operations():
    # Get a session
    async with AsyncSessionLocal() as session:
        try:
            # Example: Create a new record
            new_item = Item(name="Example")
            session.add(new_item)
            
            # Commit the transaction
            await session.commit()
            
            # Refresh the instance to get any database-generated values
            await session.refresh(new_item)
            
            # Example: Query records
            result = await session.execute(select(Item).where(Item.name == "Example"))
            items = result.scalars().all()
            
            return items
        except Exception:
            # Rollback on error
            await session.rollback()
            raise
        finally:
            # Session is automatically closed when exiting the context manager
            pass

3. Common Async Database Operations

Here are examples of common operations with an async session:

Create

async def create_item(item_data):
    async with AsyncSessionLocal() as session:
        new_item = Item(**item_data)
        session.add(new_item)
        await session.commit()
        await session.refresh(new_item)
        return new_item

Read

async def get_item(item_id: int):
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(Item).where(Item.id == item_id))
        return result.scalars().first()

Update

async def update_item(item_id: int, update_data: dict):
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(Item).where(Item.id == item_id))
        item = result.scalars().first()
        if item:
            for key, value in update_data.items():
                setattr(item, key, value)
            await session.commit()
            await session.refresh(item)
        return item

Delete

async def delete_item(item_id: int):
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(Item).where(Item.id == item_id))
        item = result.scalars().first()
        if item:
            await session.delete(item)
            await session.commit()
            return True
        return False

4. Using with FastAPI (Dependency Injection)

If you're using FastAPI, here's how to integrate it:

from fastapi import Depends

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

@app.post("/items/")
async def create_item(item_data: dict, db: AsyncSession = Depends(get_db)):
    new_item = Item(**item_data)
    db.add(new_item)
    await db.commit()
    await db.refresh(new_item)
    return new_item

Important Notes:

  1. Always use await with async session methods (commit, execute, refresh, etc.)
  2. Transactions: Remember to commit or rollback your transactions
  3. Session Lifecycle: Let the context manager (async with) handle session closing
  4. N+1 Problem: Be mindful of lazy loading in async context - use selectinload or similar
  5. Connection Pooling: Configure your engine with appropriate pool settings for production

Would you like me to elaborate on any specific aspect of async SQLAlchemy usage?

Clone this wiki locally