# Tigrbl v3 — Book Management API (Notebook)

This notebook illustrates how to build a small Book API using Tigrbl v3 with:
- `acol` for persisted columns and `vcol` for a computed slug
- Custom request schema (`publish.in`) and custom ops (`publish`, `search`) via `op_ctx`
- A simple hook that normalizes payload strings before handlers run
- FastAPI + REST/JSON-RPC wiring

Run cells top-to-bottom. Optionally start a server with Uvicorn to exercise REST/JSON-RPC endpoints.

In [None]:
# Imports — SQLAlchemy (async), FastAPI, Tigrbl v3
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any

from sqlalchemy import DateTime, Float, Integer, String, func, select, update
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import Mapped, declarative_base

from fastapi import FastAPI

from tigrbl.tigrbl import TigrblApi
from tigrbl.op import alias_ctx, op_ctx
from tigrbl.hook import hook_ctx
from tigrbl.schema.decorators import schema_ctx
from tigrbl.specs import F, IO, S, acol, vcol
from tigrbl.transport.rest import mount_rest
from pydantic import BaseModel, Field

## Async SQLAlchemy Setup

In [None]:
Base = declarative_base()
engine = create_async_engine("sqlite+aiosqlite:///./books.db", future=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)


async def get_async_db() -> AsyncSession:
    async with SessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

## Tigrbl Instance

In [None]:
api = TigrblApi(get_async_db=get_async_db)

## Model — `Book` with `acol`/`vcol` and `alias_ctx`

In [None]:
class Book(Base):
    __tablename__ = "books"

    id: Mapped[int] = acol(
        storage=S(Integer, primary_key=True, autoincrement=True),
        field=F(py_type=int),
        io=IO(out_verbs=("read", "list"), sortable=True),
    )
    title: Mapped[str] = acol(
        storage=S(String, nullable=False, index=True),
        field=F(
            py_type=str,
            constraints={"max_length": 200},
            required_in=("create", "update", "replace", "merge"),
        ),
        io=IO(
            in_verbs=("create", "update", "replace", "merge"),
            out_verbs=("read", "list"),
            filter_ops=("contains", "eq"),
            sortable=True,
        ),
    )
    author: Mapped[str] = acol(
        storage=S(String, nullable=False, index=True),
        field=F(
            py_type=str,
            constraints={"max_length": 120},
            required_in=("create", "update", "replace", "merge"),
        ),
        io=IO(
            in_verbs=("create", "update", "replace", "merge"),
            out_verbs=("read", "list"),
            filter_ops=("eq",),
            sortable=True,
        ),
    )
    price: Mapped[float] = acol(
        storage=S(Float, nullable=False),
        field=F(py_type=float, required_in=("create",)),
        io=IO(
            in_verbs=("create", "update", "replace", "merge"),
            out_verbs=("read", "list"),
            sortable=True,
        ),
    )
    published_at: Mapped[datetime | None] = acol(
        storage=S(DateTime(timezone=True), nullable=True, index=True),
        field=F(py_type=datetime | None),
        io=IO(in_verbs=("update", "replace", "merge"), out_verbs=("read", "list")),
    )

    slug: Mapped[str] = vcol(
        field=F(py_type=str, constraints={"max_length": 256}),
        io=IO(out_verbs=("read", "list")),
        read_producer=lambda obj, ctx: f"{(obj.title or '').strip().lower().replace(' ', '-')}-{(obj.author or '').strip().lower().replace(' ', '-')}",
    )

## Custom Request Schema — `publish.in`

In [None]:
class PublishIn(BaseModel):
    """Request payload for the `publish` op.
    If `at` is not provided, publishing defaults to `now()`.
    """

    at: datetime | None = Field(None, description="Publish timestamp (defaults to now)")


# Register schema binding via decorator
PublishIn = schema_ctx(alias="publish", kind="in", for_=Book)(PublishIn)

## Custom Ops — `publish` (member) and `search` (collection)

In [None]:
@op_ctx(
    alias="publish",
    target="custom",
    arity="member",
    request_schema="publish.in",
    response_schema="read.out",
)
async def publish(
    cls,
    ctx,
) -> Book | None:
    db: AsyncSession = ctx.db
    p = dict(ctx.payload or {})
    ident = p.get("id") or p.get("item_id")
    if ident is None:
        raise ValueError("Missing id for publish")
    when = p.get("at") or datetime.now(timezone.utc)
    await db.execute(update(cls).where(cls.id == ident).values(published_at=when))
    await db.commit()
    res = await db.execute(select(cls).where(cls.id == ident))
    return res.scalars().first()


@op_ctx(
    alias="search",
    target="custom",
    arity="collection",
    response_schema="read.out",
)
async def search(cls, ctx) -> list[Book]:
    db: AsyncSession = ctx.db
    q = (ctx.payload or {}).get("q", "")
    stmt = select(cls).where(func.lower(cls.title).like(f"%{q.lower()}%"))
    res = await db.execute(stmt)
    return list(res.scalars())

## Hook — Normalize strings on `create`/`update` (PRE_HANDLER)

In [None]:
@hook_ctx(ops=("create", "update"), phase="PRE_HANDLER")
def strip_strings(cls: type[Book], ctx: Any) -> None:
    p = ctx.payload or {}
    if "title" in p:
        p["title"] = p["title"].strip()
    if "author" in p:
        p["author"] = p["author"].strip()

## API Wiring — Include model, mount REST and JSON-RPC

In [None]:
import nest_asyncio
import uvicorn

nest_asyncio.apply()

api.include_model(Book)
await api.initialize_async()

app = FastAPI()
api.app = app
api.mount_jsonrpc(prefix="/rpc")
api.attach_diagnostics(prefix="/system")
mount_rest(api, app, base_prefix="/api")

<fastapi.routing.APIRouter at 0x1078f0c50>

In [None]:
# Start uvicorn in the same cell
config = uvicorn.Config(app, host="0.0.0.0", port=8000, reload=False)
server = uvicorn.Server(config)

await server.serve()

INFO:     Started server process [89871]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:60991 - "GET / HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:60991 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:60991 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:60991 - "GET /openapi.json HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [89871]


In [None]:
# Verify model and basic table info
print("Model:", Book)
print("Resource columns:", getattr(Book, "columns", ()))
print("Table config:", getattr(Book, "table_config", {}))

# Alias map (canonical verb -> alias)
print("Alias map:", getattr(Book, "alias_map", {}))

Model: <class '__main__.Book'>
Resource columns: ('id', 'title', 'author', 'price', 'published_at', 'slug')
Table config: {}
Alias map: {}


In [None]:
# OpSpecs: alias, target(canonical), arity, persist, expose flags
specs = getattr(Book, "opspecs", None)
all_specs = list(getattr(specs, "all", ())) if specs else []
print("Total ops:", len(all_specs))


def spec_row(sp):
    return {
        "alias": sp.alias,
        "target": sp.target,
        "arity": sp.arity,
        "persist": sp.persist,
        "expose_routes": getattr(sp, "expose_routes", None),
        "expose_rpc": getattr(sp, "expose_rpc", None),
        "status_code": getattr(sp, "status_code", None),
        "returns": getattr(sp, "returns", None),
    }


for row in sorted(
    (spec_row(sp) for sp in all_specs), key=lambda r: (r["alias"], r["target"])
):
    print(row)

Total ops: 7
{'alias': 'clear', 'target': 'clear', 'arity': 'collection', 'persist': 'default', 'expose_routes': True, 'expose_rpc': True, 'status_code': None, 'returns': 'raw'}
{'alias': 'create', 'target': 'create', 'arity': 'collection', 'persist': 'default', 'expose_routes': True, 'expose_rpc': True, 'status_code': None, 'returns': 'raw'}
{'alias': 'delete', 'target': 'delete', 'arity': 'member', 'persist': 'default', 'expose_routes': True, 'expose_rpc': True, 'status_code': None, 'returns': 'raw'}
{'alias': 'list', 'target': 'list', 'arity': 'collection', 'persist': 'default', 'expose_routes': True, 'expose_rpc': True, 'status_code': None, 'returns': 'raw'}
{'alias': 'read', 'target': 'read', 'arity': 'member', 'persist': 'default', 'expose_routes': True, 'expose_rpc': True, 'status_code': None, 'returns': 'raw'}
{'alias': 'replace', 'target': 'replace', 'arity': 'member', 'persist': 'default', 'expose_routes': True, 'expose_rpc': True, 'status_code': None, 'returns': 'raw'}
{'ali

In [None]:
# List REST endpoints and methods under /api
from fastapi.routing import APIRoute

routes = [r for r in app.routes if isinstance(r, APIRoute)]
api_routes = [
    (r.path, sorted(list(r.methods or [])), getattr(r, "name", ""))
    for r in routes
    if "/api" in r.path
]

for path, methods, name in sorted(api_routes, key=lambda t: (t[0], ",".join(t[1]))):
    methods_s = ",".join(methods)
    print(f"{methods_s:<16} {path}  name={name}")

DELETE           /api/book  name=Book.clear
GET              /api/book  name=Book.list
POST             /api/book  name=Book.create
DELETE           /api/book/{item_id}  name=Book.delete
GET              /api/book/{item_id}  name=Book.read
PATCH            /api/book/{item_id}  name=Book.update
PUT              /api/book/{item_id}  name=Book.replace


In [None]:
# Introspect Pydantic schemas bound to each alias
from types import SimpleNamespace
from pydantic import BaseModel

schemas_root = getattr(Book, "schemas", SimpleNamespace())
aliases = [a for a in dir(schemas_root) if not a.startswith("_")]


def schema_info(alias):
    ns = getattr(schemas_root, alias, None)
    if not ns:
        return alias, None, None
    return alias, getattr(ns, "in_", None), getattr(ns, "out", None)


for alias in sorted(aliases):
    a, in_s, out_s = schema_info(alias)
    print(
        f'[{a}] in_={getattr(in_s, "__name__", type(in_s).__name__)} out={getattr(out_s, "__name__", type(out_s).__name__)}'
    )

# Example: dump JSON schema for 'create.in_' and 'read.out' if present
ci = getattr(getattr(schemas_root, "create", None), "in_", None)
ro = getattr(getattr(schemas_root, "read", None), "out", None)


def try_dump_json_schema(model_cls):
    if model_cls is None:
        return
    try:
        # Pydantic v2
        js = model_cls.model_json_schema()
    except Exception:
        try:
            # Pydantic v1
            js = model_cls.schema()
        except Exception as e:
            print("Schema dump failed:", e)
            return
    print(model_cls.__name__, "→ fields:", list(js.get("properties", {}).keys()))


try_dump_json_schema(ci)
try_dump_json_schema(ro)

[clear] in_=BookClearRequest out=BookClearResponse
[create] in_=BookCreateRequest out=BookCreateResponse
[delete] in_=BookDeleteRequest out=BookDeleteResponse
[list] in_=BookListRequest__Optionalized out=BookListResponse
[publish] in_=PublishIn out=NoneType
[read] in_=BookReadRequest out=BookReadResponse
[replace] in_=BookReplaceRequest out=BookReplaceResponse
[update] in_=BookUpdateRequest out=BookUpdateResponse
BookCreateRequest → fields: ['title', 'author', 'price']
BookReadResponse → fields: ['id', 'title', 'author', 'price', 'published_at', 'slug']


In [None]:
# Show hooks per alias: phases and handler chain length
hooks_root = getattr(Book, "hooks", SimpleNamespace())
aliases = sorted(set(list(getattr(Book.opspecs, "by_alias", {}).keys())))

for alias in aliases:
    ns = getattr(hooks_root, alias, SimpleNamespace())
    phases = sorted([k for k in dir(ns) if k.isupper()])
    print(f"[{alias}] phases:", phases)
    # Show function names per phase
    for ph in phases:
        fns = getattr(ns, ph, [])
        names = [getattr(getattr(f, "__wrapped__", f), "__name__", str(f)) for f in fns]
        print("  ", ph, "len=", len(fns), "→", names)
    # HANDLER is the core execution chain
    hchain = getattr(ns, "HANDLER", [])
    print("  HANDLER length:", len(hchain))

[add] phases: ['END_TX', 'HANDLER', 'ON_END_TX_ERROR', 'ON_ERROR', 'ON_HANDLER_ERROR', 'ON_POST_COMMIT_ERROR', 'ON_POST_HANDLER_ERROR', 'ON_POST_RESPONSE_ERROR', 'ON_PRE_COMMIT_ERROR', 'ON_PRE_HANDLER_ERROR', 'ON_PRE_TX_BEGIN_ERROR', 'ON_ROLLBACK', 'ON_START_TX_ERROR', 'POST_COMMIT', 'POST_HANDLER', 'POST_RESPONSE', 'PRE_COMMIT', 'PRE_HANDLER', 'PRE_TX_BEGIN', 'START_TX']
   END_TX len= 0 → []
   HANDLER len= 1 → ['create']
   ON_END_TX_ERROR len= 0 → []
   ON_ERROR len= 0 → []
   ON_HANDLER_ERROR len= 0 → []
   ON_POST_COMMIT_ERROR len= 0 → []
   ON_POST_HANDLER_ERROR len= 0 → []
   ON_POST_RESPONSE_ERROR len= 0 → []
   ON_PRE_COMMIT_ERROR len= 0 → []
   ON_PRE_HANDLER_ERROR len= 0 → []
   ON_PRE_TX_BEGIN_ERROR len= 0 → []
   ON_ROLLBACK len= 0 → []
   ON_START_TX_ERROR len= 0 → []
   POST_COMMIT len= 0 → []
   POST_HANDLER len= 0 → []
   POST_RESPONSE len= 0 → []
   PRE_COMMIT len= 0 → []
   PRE_HANDLER len= 0 → []
   PRE_TX_BEGIN len= 0 → []
   START_TX len= 0 → []
  HANDLER length:

In [12]:
# Inspect Tigrbl v3 runtime processes for the Book model

from types import SimpleNamespace
from pprint import pprint

# Core runtime helpers
from tigrbl.runtime import labels as L, events as E
from tigrbl.runtime.kernel import build_phase_chains
# Hook phase list (ordering for display)
from tigrbl.hook.types import PHASES as HOOK_PHASES

def _func_name(fn):
    return getattr(fn, "__qualname__", getattr(fn, "__name__", str(fn)))

def _aliases_for_model(model):
    # Try registered opspecs
    opspecs = getattr(model, "opspecs", None)
    if opspecs and getattr(opspecs, "by_alias", None):
        return sorted(opspecs.by_alias.keys())
    # Fallback to common defaults if opspec registry not present
    return ["create", "read", "update", "delete", "list"]

def _is_persistent_for_alias(model, alias):
    """
    Mirror kernel._is_persistent heuristic:
      - persistent if START_TX chain includes a 'start_tx'-like step
      - not persistent if PRE_TX_BEGIN includes a 'mark_skip_persist'
      - otherwise assume False (read-only)
    """
    chains = build_phase_chains(model, alias)
    def has_named(phase, name):
        for fn in chains.get(phase, []) or ():
            if getattr(fn, "__name__", "") == name:
                return True
        return False
    if has_named("START_TX", "start_tx"):
        return True
    if has_named("PRE_TX_BEGIN", "mark_skip_persist"):
        return False
    return False

def show_runtime_legend():
    print("Step kinds, domains, phases, anchors")
    print("====================================")
    pprint(L.legend())
    print()

def show_anchor_order():
    print("Canonical anchor order")
    print("======================")
    print(E.all_events_ordered())
    print()

def show_model_overview(model):
    print(f"Model: {getattr(model, '__name__', model)}")
    hooks_root = getattr(model, "hooks", SimpleNamespace())
    alias_map = getattr(model, "alias_map", {})
    print("Aliases (canonical → alias):", alias_map)
    print()

def show_phase_chains(model, alias):
    chains = build_phase_chains(model, alias)
    persistent = _is_persistent_for_alias(model, alias)
    print(f"[{alias}] persistent={persistent}")
    for phase in HOOK_PHASES:
        steps = chains.get(phase, ()) or ()
        if steps:
            print(f"  {phase:<18} ({len(steps)}):", [ _func_name(s) for s in steps ])
    print()

# ---- Run the inspections ----

# 1) Legend: step kinds, domains, sys phases, anchors
show_runtime_legend()

# 2) Canonical anchor ordering across the lifecycle
show_anchor_order()

# 3) Model overview
show_model_overview(Book)

# 4) Per-alias phase chains (what actually runs in each phase)
for a in _aliases_for_model(Book):

    # 5) If you defined custom ops (e.g., 'publish' in the notebook), include them too:
    try:
        if "publish" in _aliases_for_model(Book) or hasattr(Book, "hooks") and hasattr(Book.hooks, "publish"):
            show_phase_chains(Book, "publish")
    except Exception:
        pass

Step kinds, domains, phases, anchors
{'anchors': ['schema:collect_in',
             'in:validate',
             'resolve:values',
             'pre:flush',
             'emit:aliases:pre_flush',
             'post:flush',
             'emit:aliases:post_refresh',
             'schema:collect_out',
             'out:build',
             'emit:aliases:readtime',
             'out:dump'],
 'atom_domains': ('emit',
                  'out',
                  'refresh',
                  'resolve',
                  'schema',
                  'storage',
                  'wire'),
 'notes': {'atom/hook': "Use '<domain>:<subject>@<anchor>#field' (field "
                        'optional).',
           'secdep/dep': "Run before any anchor; shape is 'secdep:<name>' / "
                         "'dep:<name>'.",
           'sys': 'Subject describes the system op; anchor is a PHASE.'},
 'step_kinds': ('secdep', 'dep', 'sys', 'atom', 'hook'),
 'sys_phases': ('PRE_HANDLER',
                'START_T