In [None]:
from __future__ import annotations
import datetime as dt
from typing import NamedTuple
from collections.abc import Iterable
from random import random
import sys

from micro_namedtuple_sqlite_persister.persister import Engine
from micro_namedtuple_sqlite_persister.adaptconvert import enable_included_adaptconverters
enable_included_adaptconverters()

# Models

In [None]:
class MyModel(NamedTuple):
    id: int | None
    name: str
    date: dt.datetime
    score: float | None

Connect to the database and create tables with an `Engine`

In [None]:
engine = Engine(":memory:")
# engine = Engine("example.db")
engine.ensure_table_created(MyModel, force_recreate=True)
engine.connection.set_trace_callback(lambda sql: print(sql, file=sys.stderr)) # echo SQL
engine.connection # just the real connection object

# Basic CRUD

## Insert row

In [None]:
row = MyModel(None, "Bart", dt.datetime.now(), 6.5)
row = engine.insert(row)
engine.connection.commit()
row

## Get row by id

In [None]:
engine.get(MyModel,row.id)

## Update row

In [None]:
engine.update(row._replace(score=78.9))
engine.connection.commit()

## Delete row

by id

In [None]:
row2 = engine.insert(MyModel(None, "foo", dt.datetime.now(), 6.5))

engine.delete(MyModel, row2.id)
engine.connection.commit()

by instance

In [None]:
row3 = engine.insert(MyModel(None, "bar", dt.datetime.now(), 9.5))

engine.delete(row3)
engine.connection.commit()

# Foreign Keys Relationships
Related models are eagerly loaded automatically, when the model field type is another model.

In [None]:
class Team(NamedTuple):
    id: int | None
    name: str

class Person(NamedTuple):
    id: int | None
    name: str
    team: Team

engine.ensure_table_created(Team)
engine.ensure_table_created(Person)

team = Team(None, "Team A")
person = Person(None, "Alice", team)

# recursive insert
person = engine.insert(person)
engine.connection.commit()

# recursive get
row = engine.get(Person, person.id)

print(row)
row.team.name

## Alternate Models
Create a model that queries a subset or alternate form of the data, for example pulling in a foreign key as an int id instead of the full Model instance.

The name of the table comes before a '_'

In [None]:
class Team_NameOnly(NamedTuple):
    id: int | None
    name: str

engine.get(Team_NameOnly, person.team.id)

In [None]:
class Person_TeamAsIntId(NamedTuple):
    id: int | None
    name: str
    team: int

engine.get(Person_TeamAsIntId, row.id)

# Querying

In [None]:
from micro_namedtuple_sqlite_persister.query import select, gt, and_, eq

def print_30_per_line(ss: Iterable[str]):
    for i,s in enumerate(ss, 1):
        print(s, end=" ")
        if i % 30 == 0:
            print()
    print()

# select is the interface for generating queries
M, q = select(
    MyModel,
    where=gt(MyModel.score, 99.7),
    limit=20,
    )
rows = engine.query(M, q).fetchall()
print_30_per_line(f"{r.score:5.1f}" for r in rows)

## Using Model instance as a parameter in a query

In [None]:
P, q = select(
    Person,
    where=eq(Person.team, person.team),
    )
rows = engine.query(P, q).fetchall()
print(rows)

## SQLite3 Cursor
Notice that query returns a real `sqlite3.Cursor`, you can use it to `fetchall`, `fetchone`, `fetchmany`, etc.

The only difference is that the `Cursor.row_factory` is set to return Model instances.

In [None]:
engine.query(M, q).fetchone()

## Arbitrary Queries
It's possible to use models that are not tables. Just provide bespoke SQL queries to the `query` parameter, along with the Model you want returned.

In [None]:
class AverageScoreResults(NamedTuple):
    avg_score: float
    scorecount: int

sql = 'select avg(score), count(*) from MyModel'

result = engine.query(AverageScoreResults, sql).fetchone()
assert result is not None
print(f'The table has {result.scorecount} rows, with and average of {result.avg_score:0.2f}')

# Persisting Custom Types: Adapt/Convert

In [None]:
import pandas as pd
import pickle

from micro_namedtuple_sqlite_persister.adaptconvert import register_adapt_convert

def adapt_df(obj: pd.DataFrame) -> bytes:
    return pickle.dumps(obj)


def convert_df(data: bytes) -> pd.DataFrame:
    return pickle.loads(data)


register_adapt_convert(pd.DataFrame, adapt_df, convert_df, overwrite=True)


class MyModel2(NamedTuple):
    id: int | None
    name: str
    df: pd.DataFrame

engine.ensure_table_created(MyModel2)

df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
row = engine.insert(MyModel2(None, "foo", df))

engine.get(MyModel2, row.id).df


# Performance scenarios
Every call to insert real full trip to the db. The data is ready to be queried immediately, in SQLAlchemy parlance, 'flushed'. Committig ends the implicit transaction and ensures that the data is persisted to disk. Data is then avialable to other connections e.g. other worker processes

Because the db and app share a process, the performance is good enough that you can basically ignore the N+1 problem. This also simplifies implementation of this library, no need to track session etc. It also simplifies your app as data is syncronized immediately with the database, thus eliminates the need for a stateful cache, a source off many bugs and complexity.

In [None]:
engine.connection.set_trace_callback(None) # disable echo SQL

## Insert Many

In [None]:
for i in range(17000):
    engine.insert(MyModel(None, "foo", dt.datetime.now(), random()*100))

engine.connection.commit()

## Update many

In [None]:
for id in range(1, 7000):
    engine.update(MyModel(id, "drew", dt.datetime.now(), random()*100))

engine.connection.commit()

## Query many

In [None]:
def print_30_per_line(ss: Iterable[str]):
    for i,s in enumerate(ss, 1):
        print(s, end=" ")
        if i % 30 == 0:
            print()
    print()


rows = engine.query(*select( MyModel, where=gt(MyModel.score, 95.7))).fetchall()
print_30_per_line(f"{r.score:5.1f}" for r in rows)

## Giant Recursive BOM

In [None]:
class BOM(NamedTuple):
    id: int | None
    name: str
    value: float
    child_a: BOM | None
    child_b: BOM | None

engine.ensure_table_created(BOM, force_recreate=True)

from random import random, choice
node_count = 0
def generate_node_name_node(depth: int) -> str:
    alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    return f"{choice(alphabet)}{choice(alphabet)}{choice(alphabet)}{depth:05d}_{node_count}"


# create a giant BOM, of 15 levels deep
def create_bom(depth: int) -> BOM:
    global node_count
    node_count += 1

    if depth == 1:
        child_a = None
        child_b = None
    else:
        child_a = create_bom(depth-1)
        child_b = create_bom(depth-1)

    return BOM(None, generate_node_name_node(depth), random()*1000 - 500, child_a, child_b)

root = create_bom(13)
print(f"Created a BOM with {node_count} nodes")

In [None]:
inserted_root = engine.insert(root)
engine.connection.commit()

print(f"Inserted BOM with id: {inserted_root.id}")

In [None]:
recovered_root = engine.get(BOM, inserted_root.id)

def count_nodes(node: BOM | None) -> int:
    if node is None:
        return 0
    return 1 + count_nodes(node.child_a) + count_nodes(node.child_b)

print(f"Recovered BOM with {count_nodes(recovered_root)} nodes")

In [None]:
import matplotlib.pyplot as plt
import networkx as nx

def add_nodes_edges(G: nx.Graph, node: BOM | None):
    if node is None:
        return

    G.add_node(node.id, label=node.name)
    if node.child_a is not None:
        G.add_edge(node.id, node.child_a.id)
        add_nodes_edges(G, node.child_a)

    if node.child_b is not None:
        G.add_edge(node.id, node.child_b.id)
        add_nodes_edges(G, node.child_b)

G = nx.Graph()
add_nodes_edges(G, recovered_root)

pos = nx.nx_agraph.graphviz_layout(G, prog="twopi", args="")
plt.figure(figsize=(10, 10))
def alpha(x, a=-.8, b=6.5):
    import math
    return 1 / (1 + math.exp(-a * (math.log(x) - b)))
nx.draw(G, pos, node_size=10, alpha=alpha(node_count), node_color="blue", with_labels=node_count<1200, labels=nx.get_node_attributes(G, "label"))
plt.axis("equal")
plt.show()

In [None]:
# use an alt model to be able to query the BOM table without recursively pulling in children for EVERY row
class BOM_ChildrenAsId(NamedTuple):
    id: int | None
    name: str
    value: float
    child_a: int | None
    child_b: int | None

len(engine.query(*select(BOM)).fetchall())

# Error Scenarios

In [None]:
# inserting a row with an id that already exists will raise an error
engine.insert(MyModel(row.id, "bar", dt.datetime.now(), 3.14))

In [None]:
# Trying to update a row without specifying an id will raise an error
engine.update(MyModel(None, "bar", dt.datetime.now(), 3.14))

In [None]:
# Raises an error if the id does not exist
engine.update(MyModel(878787879879, "bar", dt.datetime.now(), 3.14))

In [None]:
# If schema already exists, but is not correct

class MyModelExists(NamedTuple): # type: ignore this is part of the error
    id: int | None
    name: str
engine.ensure_table_created(MyModelExists)

class MyModelExists(NamedTuple):
    id: int | None
    name: str | None
engine.ensure_table_created(MyModelExists)

In [None]:
# you have to have id: `int | None` as the first field

class MyModelMissingId(NamedTuple):
    name: str

engine.ensure_table_created(MyModelMissingId)

# Meta
Right now this is just debugging internals

In [None]:
from micro_namedtuple_sqlite_persister.model import _meta
for k,v in _meta.items():
    print(f"{k}: {v}")