In [1]:
from sqlmodel import create_engine, Session, SQLModel, Field, select
from sqlalchemy import Engine
from sqlalchemy.exc import DatabaseError
from sqltypes import PydanticModel
from pydantic import RootModel
from kv.api import KV
from haskellian import Either, Left, Right, either as E
from typing import TypeVar, Generic
T = TypeVar('T')

In [2]:
from typing_extensions import AsyncIterable, Awaitable, Sequence
from kv.api import DBError, InexistentItem, InvalidData

class SQLKV(KV[T], Generic[T]):

  def __init__(self, Type: type[T], engine: Engine, table: str = 'kv'):
    self.Type = RootModel[Type]
    self.engine = engine

    class Table(SQLModel, table=True):
      __tablename__ = table # type: ignore (duh)
      key: str = Field(primary_key=True)
      value: RootModel[Type] = Field(sa_type=PydanticModel(self.Type))

    self.Table = Table
    SQLModel.metadata.create_all(engine)

  async def _delete(self, key: str) -> Either[DBError | InexistentItem, None]:
    try:
      with Session(self.engine) as session:
        stmt = select(self.Table).where(self.Table.key == key)
        row = session.exec(stmt).first()
        if row is None:
          return Left(InexistentItem(key))
        session.delete(row)
        session.commit()
        return Right(None)
    except DatabaseError as e:
      return Left(DBError(e))

  async def _read(self, key: str) -> Either[DBError | InvalidData | InexistentItem, T]:
    try:
      with Session(self.engine) as session:
        stmt = select(self.Table).where(self.Table.key == key)
        row = session.exec(stmt).first()
        if row is None:
          return Left(InexistentItem(key))
        return Right(row.value.root)
    except DatabaseError as e:
      return Left(DBError(e))

  async def _insert(self, key: str, value: T) -> Either[DBError, None]:
    try:
      with Session(self.engine) as session:
        stmt = select(self.Table).where(self.Table.key == key)
        row = session.exec(stmt).first()
        if row is not None:
          session.delete(row)
        session.add(self.Table(key=key, value=self.Type(value)))
        session.commit()
        return Right(None)
    except DatabaseError as e:
      return Left(DBError(e))

  async def _keys(self) -> Either[DBError, Sequence[str]]:
    try:
      with Session(self.engine) as session:
        stmt = select(self.Table.key)
        return Right(session.exec(stmt).all())
    except DatabaseError as e:
      return Left(DBError(e))

  async def _items(self, batch_size: int | None = None) -> AsyncIterable[Either[DBError | InvalidData, tuple[str, T]]]:
    try:
      with Session(self.engine) as session:
        result = session.exec(select(self.Table))
        while (batch := result.fetchmany(batch_size)) != []:
          for row in batch:
            yield Right((row.key, row.value.root))
    except DatabaseError as e:
      yield Left(DBError(e))

In [3]:
engine = create_engine('sqlite:///db.sqlite')
kv = SQLKV(tuple[int, str], engine)

In [5]:
await kv.insert('bye', (1, 'one'))

Right(value=None, tag='right')