# Transaction Isolation Levels
This notebook demonstrates the 4 standard isolation levels:
- READ UNCOMMITTED
- READ COMMITTED
- REPEATABLE READ
- SERIALIZABLE

In [1]:
import asyncio
from enum import Enum
from typing import Literal

import nest_asyncio
from sqlalchemy import delete, select, text, update
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession

from db.mysql import async_engine as mysql_engine, async_session as mysql_session
from db.postgre import async_engine as postgre_engine, async_session as postgre_session
from models.account import Account, Base


nest_asyncio.apply()

In [2]:
class IsolationLevel(str, Enum):
    READ_UNCOMMITTED = "READ UNCOMMITTED"
    READ_COMMITTED = "READ COMMITTED"
    REPEATABLE_READ = "REPEATABLE READ"
    SERIALIZABLE = "SERIALIZABLE"


class Database(str, Enum):
    MYSQL = "MySQL"
    POSTGRESQL = "PostgreSQL"


IsolationLevelLiteral = Literal[
    IsolationLevel.READ_UNCOMMITTED, 
    IsolationLevel.READ_COMMITTED, 
    IsolationLevel.REPEATABLE_READ, 
    IsolationLevel.SERIALIZABLE,
]

DatabaseLiteral = Literal[
    Database.MYSQL,
    Database.POSTGRESQL,
]

## 0. Default Transaction Isolation Levels
- PostgreSQL: `READ COMMITED`
- MySQL: `REPEATABLE READ`

In [3]:
async def check_isolation_level(
        engine: AsyncEngine,
        database: DatabaseLiteral,
) -> str | None:
    async with engine.connect() as conn:
        
        if database == Database.POSTGRESQL:
            stmt = "SHOW default_transaction_isolation"
        elif database == Database.MYSQL:
            stmt = "SELECT @@transaction_isolation"
        else:
            raise ValueError("Unsupported database engine")

        result = await conn.execute(text(stmt))
        isolation_level = result.scalar()
        return isolation_level

In [4]:
mysql_isolation_level = await check_isolation_level(
    engine=mysql_engine,
    database=Database.MYSQL,
    )
postgre_isolation_level = await check_isolation_level(
    engine=postgre_engine,
    database=Database.POSTGRESQL,
    )

print(f"{Database.MYSQL}: {mysql_isolation_level}")
print(f"{Database.POSTGRESQL}: {postgre_isolation_level}")

Database.MYSQL: REPEATABLE-READ
Database.POSTGRESQL: read committed


## 1. READ UNCOMMITTED
`READ UNCOMMITTED` is the lowest isolation level in the SQL standard. It allows a transaction to read data that has been modified by other transactions but not yet committed.

This leads to `dirty reads` — transactions can see intermediate, uncommitted changes from others.

🔒 PostgreSQL does not allow dirty reads, even if you set `READ UNCOMMITTED` — it behaves like `READ COMMITTED`. 
Therefore, the example for this isolation level will be demonstrated using a MySQL database.

In [5]:
async def setup(
    engine: AsyncEngine,
    session: AsyncSession,
) -> None:
    """Setup the database and create a test accounts."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with session() as session:
        await session.begin()
        await session.execute(delete(Account))
        session.add(Account(owner="alice", balance=100.0))
        session.add(Account(owner="bob", balance=50.0))
        await session.commit()


async def read_balance(
    owner: str,
    session: AsyncSession,
    session_name: str,
) -> float:
    result = await session.execute(
            select(Account.balance).where(Account.owner == owner)
        )
    current_balance = result.scalar_one()
    print(f"{session_name}: {owner} has the following balance: {current_balance}")
    return current_balance


async def update_balance(
    owner: str,
    session: AsyncSession,
    session_name: str,
    new_balance: float,
) -> None:
    await session.execute(
            update(Account).where(Account.owner == owner).values(balance=new_balance)
        )
    print(f"{session_name}: {owner}`s balance was updated to {new_balance}")


async def select_by_min_balance(
    min_balance: float,
    session: AsyncSession,
    session_name: str,
) -> None:
    result = await session.execute(
            select(Account.owner).where(Account.balance >= min_balance)
        )
    owners = result.fetchall()
    print(f"{session_name}: {owners=} with balance >= {min_balance}")


async def add_account(
    owner: str,
    balance: float,
    session: AsyncSession,
    session_name: str,
) -> None:
    account = Account(owner=owner, balance=balance)
    session.add(account)
    await session.flush()
    print(f"{session_name}: new account was added - {owner=}, {balance=}")


In [6]:
async def session1(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 1"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})
        
        await asyncio.sleep(5)
        await read_balance(owner="alice", session=session, session_name=session_name)
        await update_balance(owner="alice", session=session, session_name=session_name, new_balance=110.0)

        await asyncio.sleep(10)
        await session.commit()
        print(f"{session_name}: Session committed...")


async def session2(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 2"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        print("Showing dirty read")
        await read_balance(owner="alice", session=session, session_name=session_name)

        await asyncio.sleep(10)
        await read_balance(owner="alice", session=session, session_name=session_name)

        await session.commit()
        print(f"{session_name}: Session committed...")


await setup(
    engine=mysql_engine,
    session=mysql_session,
)
_ = await asyncio.gather(
    session1(
        isolation_level=IsolationLevel.READ_UNCOMMITTED,
        session=mysql_session,
    ),
    session2(
        isolation_level=IsolationLevel.READ_UNCOMMITTED,
        session=mysql_session,
    ),
)

Showing dirty read
Session 2: alice has the following balance: 100.0
Session 1: alice has the following balance: 100.0
Session 1: alice`s balance was updated to 110.0
Session 2: alice has the following balance: 110.0
Session 2: Session committed...
Session 1: Session committed...


## 2. READ COMMITTED (default in PostgreSQL)

Each query sees only committed data as of the start of the query. It allows:
- `non-repeatable reads` — occurs when a transaction reads **the same row** twice and gets different values;
- `phantom reads` — occurs when a transaction executes the same query (returning **multiple rows** based on a condition) twice and gets a different **set of rows**.

In [12]:
async def session1(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 1"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        # check for non-repeatable reads
        await asyncio.sleep(5)
        await read_balance(owner="alice", session=session, session_name=session_name)
        await update_balance(owner="alice", session=session, session_name=session_name, new_balance=110.0)

        await asyncio.sleep(10)
        await session.commit()
        print(f"{session_name}: Session committed...")

        # check for phantom rows
        await asyncio.sleep(10)
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        await select_by_min_balance(min_balance=100.0, session=session, session_name=session_name)
        await add_account(owner="brian", balance=150.0, session=session, session_name=session_name)
        await select_by_min_balance(min_balance=100.0, session=session, session_name=session_name)

        await asyncio.sleep(7)
        await session.commit()
        print(f"{session_name}: Session committed...")


async def session2(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 2"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        # check for non-repeatable reads
        print("Showing non-repeatable reads")
        await read_balance(owner="alice", session=session, session_name=session_name)

        await asyncio.sleep(10)
        await read_balance(owner="alice", session=session, session_name=session_name)

        await asyncio.sleep(7)
        await read_balance(owner="alice", session=session, session_name=session_name)
        await session.commit()
        print(f"{session_name}: Session committed...")

        # check for phantom rows
        await asyncio.sleep(5)
        print("Showing phantom rows")
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})
        
        await select_by_min_balance(min_balance=100.0, session=session, session_name=session_name)
        await asyncio.sleep(5)
        await select_by_min_balance(min_balance=100.0, session=session, session_name=session_name)
        await asyncio.sleep(7)
        await select_by_min_balance(min_balance=100.0, session=session, session_name=session_name)

        await session.commit()
        print(f"{session_name}: Session committed...")


await setup(
    engine=postgre_engine,
    session=postgre_session,
)
_ = await asyncio.gather(
    session1(
        isolation_level=IsolationLevel.READ_COMMITTED,
        session=postgre_session,
        ),
    session2(
        isolation_level=IsolationLevel.READ_COMMITTED,
        session=postgre_session,
        ),
    )

Showing non-repeatable reads
Session 2: alice has the following balance: 100.0
Session 1: alice has the following balance: 100.0
Session 1: alice`s balance was updated to 110.0
Session 2: alice has the following balance: 100.0
Session 1: Session committed...
Session 2: alice has the following balance: 110.0
Session 2: Session committed...
Showing phantom rows
Session 2: owners=[('alice',)] with balance >= 100.0
Session 1: owners=[('alice',)] with balance >= 100.0
Session 1: new account was added - owner='brian', balance=150.0
Session 1: owners=[('alice',), ('brian',)] with balance >= 100.0
Session 2: owners=[('alice',)] with balance >= 100.0
Session 1: Session committed...
Session 2: owners=[('alice',), ('brian',)] with balance >= 100.0
Session 2: Session committed...


## 3. REPEATABLE READ

All reads within the transaction see the same snapshot (taken at `BEGIN`).

Prevents `dirty reads` and `non-repeatable reads`, but theoretically allows `phantom rows`. 

PostgreSQL implements `REPEATABLE READ` as `snapshot isolation`, which means:
- Each transaction receives a snapshot of the database as it existed at the start of the transaction.
- As a result, all repeated `SELECT` queries within that transaction see the same set of rows, even if other transactions insert, update, or delete data afterward.

🔒 Therefore, `phantom reads` are not possible under `REPEATABLE READ` in PostgreSQL.

In [13]:
async def session1(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 1"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        # check for non-repeatable reads
        print("Showing absence non-repeatable reads (same will be for phantom rows)")
        await read_balance(owner="alice", session=session, session_name=session_name)
        await update_balance(owner="alice", session=session, session_name=session_name, new_balance=110.0)
        await asyncio.sleep(7)
        await session.commit()
        print(f"{session_name}: Session committed...")
        await asyncio.sleep(15)

        # check for serialization errors
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})
        # read bob`s balance
        await read_balance(owner="bob", session=session, session_name=session_name)
        await asyncio.sleep(5)
        # update alice`s balance
        await update_balance(owner="alice", session=session, session_name=session_name, new_balance=150.0)
        await asyncio.sleep(5)
        # commit
        await session.commit()
        print(f"{session_name}: Session committed...")


async def session2(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 2"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        # check for non-repeatable reads
        await read_balance(owner="alice", session=session, session_name=session_name)
        await asyncio.sleep(5)
        await read_balance(owner="alice", session=session, session_name=session_name)
        await asyncio.sleep(10)
        await read_balance(owner="alice", session=session, session_name=session_name)
        await session.commit()
        print(f"{session_name}: Session committed...")

        # check for serialization errors
        print("Showing absence for serialization errors")
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})
        # read alice`s balance
        await read_balance(owner="alice", session=session, session_name=session_name)
        await asyncio.sleep(7)
        # update bob`s balance
        await update_balance(owner="bob", session=session, session_name=session_name, new_balance=40.0)
        await asyncio.sleep(5)
        # commit
        await session.commit()
        print(f"{session_name}: Session committed...")


await setup(
    engine=postgre_engine,
    session=postgre_session,
)
_ = await asyncio.gather(
    session1(
        isolation_level=IsolationLevel.REPEATABLE_READ,
        session=postgre_session,
        ),
    session2(
        isolation_level=IsolationLevel.REPEATABLE_READ,
        session=postgre_session,
        )
    )

Showing absence non-repeatable reads (same will be for phantom rows)
Session 1: alice has the following balance: 100.0
Session 2: alice has the following balance: 100.0
Session 1: alice`s balance was updated to 110.0
Session 2: alice has the following balance: 100.0
Session 1: Session committed...
Session 2: alice has the following balance: 100.0
Session 2: Session committed...
Showing absence for serialization errors
Session 2: alice has the following balance: 110.0
Session 2: bob`s balance was updated to 40.0
Session 1: bob has the following balance: 50.0
Session 1: alice`s balance was updated to 150.0
Session 2: Session committed...
Session 1: Session committed...


## 4. Serializable

Full isolation, PostgreSQL will abort one of the transactions if a conflict is detected. Can say that it same as `REPEATABLE READ`, but with extra checks for serialization safety. A serialization failure (`SQLSTATE 40001`) in PostgreSQL typically occurs only when there's a potential `write-write` or `read-write` conflict that can't be resolved while preserving the illusion of serial execution.

In [14]:
async def session1(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,
) -> None:
    session_name = "Session 1"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        # check for serialization errors
        # read bob`s balance
        await read_balance(owner="bob", session=session, session_name=session_name)
        await asyncio.sleep(5)
        # update alice`s balance
        await update_balance(owner="alice", session=session, session_name=session_name, new_balance=110.0)
        await asyncio.sleep(5)
        # commit
        await session.commit()
        print(f"{session_name}: Session committed...")


async def session2(
    isolation_level: IsolationLevelLiteral,
    session: AsyncSession,    
) -> None:
    session_name = "Session 2"

    async with session() as session:
        await session.begin()
        await session.connection(execution_options={"isolation_level": isolation_level})

        # check for serialization errors
        print("Check for serialization errors")
        # read alice`s balance
        await read_balance(owner="alice", session=session, session_name=session_name)
        await asyncio.sleep(7)
        # update bob`s balance
        await update_balance(owner="bob", session=session, session_name=session_name, new_balance=40.0)
        await asyncio.sleep(5)
        # commit
        await session.commit()
        print(f"{session_name}: Session committed...")
    

await setup(
    engine=postgre_engine,
    session=postgre_session,
)
_ = await asyncio.gather(
    session1(
        isolation_level=IsolationLevel.SERIALIZABLE,
        session=postgre_session,
        ),
    session2(
        isolation_level=IsolationLevel.SERIALIZABLE,
        session=postgre_session,
        ),
    )

Check for serialization errors
Session 1: bob has the following balance: 50.0
Session 2: alice has the following balance: 100.0
Session 1: alice`s balance was updated to 110.0
Session 2: bob`s balance was updated to 40.0
Session 1: Session committed...


DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.SerializationError'>: could not serialize access due to read/write dependencies among transactions
DETAIL:  Reason code: Canceled on identification as a pivot, during commit attempt.
HINT:  The transaction might succeed if retried.
(Background on this error at: https://sqlalche.me/e/20/dbapi)

| Isolation level      | Dirty Reads                          | Non-Repeatable Reads | Phantom Reads                                  |
| -------------------- | -------------------------------------| -------------------- | -------------------------------------------    |
| **READ UNCOMMITTED** | *(in MySQL: ✅, in PostgreSQL: ❌)* | ✅                   | ✅                                            |
| **READ COMMITTED**   | ❌                                   | ✅                   | ✅                                            |
| **REPEATABLE READ**  | ❌                                   | ❌                   | *(in SQL: ✅, in PostgreSQL and MySQL: ❌)*  |
| **SERIALIZABLE**     | ❌                                   | ❌                   | ❌                                            |
