In [1]:
try:
    import uuid
    from pydal import DAL, Field
    from psycopg2.errors import DuplicateTable
    from attrs import define, field, asdict
    from IPython.display import display, HTML
    import dill
    from tabulate import tabulate
    from attrs import define, field
    from typing import Any, Iterable
    import threading
    import functools
except ImportError as e:
    print(e)
    !pip install --upgrade pip
    !pip install --upgrade tabulate  > /dev/null
    !pip install --upgrade dill > /dev/null
    !pip install --upgrade psycopg2-binary > /dev/null
    !pip install --upgrade pydal > /dev/null

In [2]:
def show(title=None, rows=None, tablefmt='github'):
    if rows is None:
        rows, title = title, rows
    rowlist = rows.as_list()
    table = tabulate(rowlist, headers='keys', tablefmt=tablefmt)
    if title:
        width = len(table.split('\n')[0])
        dashes = width - len(title) - 4
        c1 = dashes // 2
        c2 = dashes - c1
        print(f'{"-"*c1}[ {title} ]{"-"*c2}')
    print(table)
    print()

In [3]:

db = DAL('postgres://username:password@localhost:5433/db')
db.define_table('item',
    Field('gid','string'),
    Field('value','text')
)

<Table item (id, gid, value)>

In [4]:
db.define_table('cache',
    Field('gid','string'),
    Field('value','blob'),
    redefine=True
)
try:
    db.rollback()
    db.executesql('''
    alter table cache
        alter column gid set not null;

    create unique index cache_gid_uindex
        on cache (gid);
    ''')
    db.commit()
except DuplicateTable as e:
    db.rollback()

db.define_table('deps',
    Field('cache_id','reference cache',),
    Field('depends_on','reference cache'),
    redefine=True
)
def cache_ids_which_this_id_depends_on(cache_id):
    return db.executesql('''
    with recursive composite as (
     select id, cache_id, depends_on from deps where cache_id = %s
    UNION
     select deps.id, deps.cache_id, deps.depends_on from deps inner join composite on  deps.cache_id = composite.depends_on
    )
    select cache_id from composite
    union
    select depends_on from composite
    ''', fields=[db.deps.cache_id], placeholders=(cache_id,))

In [5]:

db.executesql('''
create or replace function derivatives(param_cache_id integer) returns TABLE(cache_id integer) as $$
    with recursive composite as (
      select deps.id, deps.cache_id, deps.depends_on
        from deps
       where deps.depends_on = param_cache_id
      UNION
      select deps.id, deps.cache_id, deps.depends_on
        from deps
              inner join composite on deps.depends_on = composite.cache_id
    )
    select composite.depends_on
    from composite
    union
    select composite.cache_id
    from composite;
$$ language  sql
''')
db.commit()
def cache_ids_that_depend_on(cache_id):
    return db.executesql('''
    select cache_id from derivatives(%s)
    ''', fields=[db.deps.cache_id], placeholders=(cache_id,))
db.commit()

In [6]:
db.rollback()
db(db.cache).delete()
db(db.deps).delete()
a = db.cache.insert(gid='a',value='aa')
b = db.cache.insert(gid='b',value='aabb')
db.deps.insert(cache_id=b,depends_on=a)
c = db.cache.insert(gid='c',value='aaabbbccc')
db.deps.insert(cache_id=c,depends_on=b)
d = db.cache.insert(gid='d',value='bbdd')
db.deps.insert(cache_id=d,depends_on=b)
db.commit()

In [7]:

show('cache',db(db.cache).select())
show('deps',db(db.deps).select())

---------[ cache ]----------
|   id | gid   | value     |
|------|-------|-----------|
|    1 | a     | aa        |
|    2 | b     | aabb      |
|    3 | c     | aaabbbccc |
|    4 | d     | bbdd      |

--------------[ deps ]--------------
|   id |   cache_id |   depends_on |
|------|------------|--------------|
|    1 |          2 |            1 |
|    2 |          3 |            2 |
|    3 |          4 |            2 |



In [8]:
db.rollback()
for row in db(db.cache).select():
    show(f'{row.gid}.requires',
         db(db.cache.id.belongs([r.cache_id for r in cache_ids_which_this_id_depends_on(row.id)])).select())

[ a.requires ]


------[ b.requires ]------
|   id | gid   | value   |
|------|-------|---------|
|    1 | a     | aa      |
|    2 | b     | aabb    |

-------[ c.requires ]-------
|   id | gid   | value     |
|------|-------|-----------|
|    1 | a     | aa        |
|    2 | b     | aabb      |
|    3 | c     | aaabbbccc |

------[ d.requires ]------
|   id | gid   | value   |
|------|-------|---------|
|    1 | a     | aa      |
|    2 | b     | aabb    |
|    4 | d     | bbdd    |



In [9]:
db.rollback()
for row in db(db.cache).select():
    show(f'{row.gid}.required_for',
         db(db.cache.id.belongs([r.cache_id for r in cache_ids_that_depend_on(row.id)])).select())

-----[ a.required_for ]-----
|   id | gid   | value     |
|------|-------|-----------|
|    1 | a     | aa        |
|    2 | b     | aabb      |
|    3 | c     | aaabbbccc |
|    4 | d     | bbdd      |

-----[ b.required_for ]-----
|   id | gid   | value     |
|------|-------|-----------|
|    2 | b     | aabb      |
|    3 | c     | aaabbbccc |
|    4 | d     | bbdd      |

[ c.required_for ]


[ d.required_for ]




In [23]:

@define
class CachedResult:
    value:Any
    required_gids: Iterable[str]

class KeyFormattingError(KeyError):pass

def cached(db:DAL, key:str, todb:callable=lambda x:x, fromdb:callable=lambda x:x):
    key_template = key
    def cache_wrapper(func):
        # keep a thread local object to keep track of the
        # gids used with this call
        tracked_deps = threading.local()
        tracked_deps.gids = []

        # Create a new cached function
        @functools.wraps(func)
        def cached_func(*args, **kwargs):
            # calcule the key based on key_template and values
            if callable(key_template):
                key = key_template(*args, **kwargs)
            else:
                try:
                    key = key_template.format(*args, **kwargs)
                except:
                    raise KeyFormattingError(f'Formatting failed for cache key "{key_template}" using {args!r} and {kwargs!r} for fn {func!r}')
            # If entry is cached, return from cache
            cached = db.cache(gid=key)
            if cached:
                print(f'Cache hit "{key}":{cached.value!r}')
                return CachedResult(fromdb(cached.value), (key, ))

            # on every new invocation, clear the cache for this the tracker
            # to this cached_function.
            # todo: make it reintrent proof voor recursive elements,
            # thought that's not used too soon probably.
            tracked_deps.gids.clear()
            # If not cached, execute , add to cache and return value
            # execute
            print('Execute',key)
            returned = func(*args, **kwargs)
            # book requirements
            if isinstance(returned, CachedResult):
                # assume there is a list of gids to keep an eye on
                tracked_deps.gids += returned.required_gids
            else:
                # assume no list of items was specicically returned, just use the tracked once,
                # which are alread in tracked_deps.gids
                # raise ValueError(f'Cached function {func} did not return a CachedResult, but {returned!r} instead.')
                returned = CachedResult(returned, []+tracked_deps.gids)
            # insert to cache
            print(f'Save to stash "{key}", "{returned.value!r}" requires {returned.required_gids!r}')
            cache_id = db.cache.insert(gid=key, value=todb(returned.value))
            if tracked_deps.gids:
                # door de in selector wordt deduplicatie toegepast
                # maar worden ook elementen niet meegenomen die niet voorkomen in de cache tabel.
                # TODO: fixen dat gids die niet voorkomen in de cache tabel via een fake record in de cache komen.
                # dan zijn ze in ieder geval te gebruiken als selectie van dependencies.
                sub_select = db(db.cache.gid.belongs(tracked_deps.gids))._select(cache_id, db.cache.id)
                db.executesql('''
                    insert into deps (cache_id, depends_on)
                ''' + sub_select)
            # after having registered all the dependencies, return with the current key as the new dependency
            return CachedResult(returned.value, [key])
        def track(cachedresult:CachedResult):
            '''Returns cachedresult.value but keeps track of the required_gids.'''
            tracked_deps.gids += cachedresult.required_gids
            return cachedresult.value
        cached_func.track = track
        # Return our new cached function
        return cached_func
    return cache_wrapper


In [24]:
@cached(db, key='times2-{0}')
def times2value(gid):
    value = db.cache(gid=gid).value
    result = value * 2
    return CachedResult(result, (gid,))

assert times2value('a').value == 'aaaa'
assert times2value('d').value == 'bbddbbdd'

Execute times2-a


AttributeError: 'NoneType' object has no attribute 'value'

In [None]:
show('cache',db(db.cache).select())
show('deps',db(db.deps).select())

In [25]:
db.commit()

Onderstaande werkte wel in single-threaded modus,
maar met verschillende threads loopt alles in de soep blijkbaar.
https://realpython.com/intro-to-python-threading/
https://github.com/gonzalo123/pglistener/blob/main/pg_listener/pg_listener.py


In [26]:

# import select
# import threading
# from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
# messagebus = []
# def notified(notification):
#     messagebus.append(notification)
#     print(f'received on {notification.channel:>20}: {notification.payload!r}')
#
# def listen(db_uri, channel):
#     print('listen started. ')
#     db = DAL(db_uri)
#     conn = db._adapter.connection
#     conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
#     db.executesql(f'listen {channel};')
#     # db.executesql('select pg_listening_channels();')
#     while True:
#         print('selecting')
#         if select.select([conn], [], [], 5)[0]:
#             db.commit()
#             print('polling', messagebus )
#             conn.poll()
#             while conn.notifies:
#                 notify = conn.notifies.pop()
#                 print(threading.Thread(target=notified, args=(notify,)))
#
# print(threading.active_count())
# threading.Thread(target=listen, args=(db._uri, 'bla'))
# import time
# time.sleep(0.600)
# print(threading.active_count())
# listen(db._uri, 'bla')

In [27]:
# print(db.executesql("select pg_notify('bla','test');"))
# db.commit()
# print(messagebus)

In [28]:
import uuid
db.define_table('item',
    Field('gid','string',default=uuid.uuid4),
    Field('title','string'),
    Field('body','string'),
    redefine=True
)

<Table item (id, gid, title, body)>

In [29]:
db.executesql('listen cacheinvalidation')
db.rollback()
db.executesql('''
CREATE OR REPLACE FUNCTION item_update() RETURNS trigger AS $item_update$
    declare
        cache_id int;
    BEGIN
        select id into cache_id from cache where cache.gid = NEW.gid;
        delete from cache using derivatives(cache_id) dep where cache.id = dep.cache_id;
        -- perform pg_notify('cacheinvalidation', NEW.gid::text);
        RETURN NEW;
    END;
$item_update$ LANGUAGE plpgsql;
''')
db.executesql('''
create or replace TRIGGER item_update_trigger AFTER UPDATE ON item
    FOR EACH ROW EXECUTE FUNCTION item_update();
''')
db.commit()

In [30]:
db(db.item).delete()
db(db.cache).delete()
db(db.deps).delete()
for x in range(5):
    db.item.insert(gid=f'g{x}', title=f'title {x}', body=' '.join([str(_) for _ in [x,x,x,x,x]]))
db.commit()

In [31]:
show('item',db(db.item).select())
show('cache',db(db.cache).select())
show('deps',db(db.deps).select())

---------------[ item ]---------------
|   id | gid   | title   | body      |
|------|-------|---------|-----------|
|    6 | g0    | title 0 | 0 0 0 0 0 |
|    7 | g1    | title 1 | 1 1 1 1 1 |
|    8 | g2    | title 2 | 2 2 2 2 2 |
|    9 | g3    | title 3 | 3 3 3 3 3 |
|   10 | g4    | title 4 | 4 4 4 4 4 |

[ cache ]


[ deps ]




In [34]:

class DillableAttrsClass:
    @staticmethod
    def todb(item):
        return dill.dumps(asdict(item))

    @classmethod
    def fromdb(cls, dump):
        return cls(**dill.loads(dump))


@define
class Item(DillableAttrsClass):
    id:int
    gid:str
    title:str
    body:str

@cached(db,
        key='{}',
        todb=Item.todb,
        fromdb=Item.fromdb
        )
def item(gid):
    return CachedResult(Item(**db.item(gid=gid).as_dict()), (gid,))


@cached(db,
        key=lambda gid:f'teaser-{gid}',
        # todb=lambda teaser:teaser,
        # fromdb=lambda teaser:teaser
)
def item_teaser(gid):
    record = item_teaser.track(item(gid))
    short = record.body[:5]+'...'
    result = f'{record.title}: {short}'
    return result

print(item_teaser('g0'))
print(item_teaser('g2'))

@cached(db, key=lambda gid:f'tile-{gid}')
def item_tile(gid):
    record=db.item(gid=gid)
    teaser=item_tile.track(item_teaser(gid=gid))
    return f'''
    [ {record.id} ]
    [ {record.title} ]
    [ {record.body} ]
    '''
print(item_tile('g0'))
print(item_tile('g3'))
print(item_tile('g4'))

# @cached(db, key='page-{}-{}')
# def page(start, stop):
#     required_gids =[]
#     for x in range(start, stop):
#         cached_result = item_tile(f'g{x}')
#         required_gids.extend(cached_result.required_gids)
#     return CachedResult(f'Page for {start} to {stop}', required_gids)
@cached(db, key='page-{}-{}')
def page(start, stop):
    for x in range(start, stop):
        tile = page.track(item_tile(f'g{x}'))
    return f'Page for {start} to {stop}'
print(page(0,2).value)
print(page(1,3).value)
print(page(0,4).value)



Cache hit "teaser-g0":'testing2 27: 0 0 0...'
CachedResult(value='testing2 27: 0 0 0...', required_gids=('teaser-g0',))
Cache hit "teaser-g2":'title 2: 2 2 2...'
CachedResult(value='title 2: 2 2 2...', required_gids=('teaser-g2',))
Cache hit "tile-g0":'\n    [ 6 ]\n    [ testing2 27 ]\n    [ 0 0 0 0 0 ]\n    '
CachedResult(value='\n    [ 6 ]\n    [ testing2 27 ]\n    [ 0 0 0 0 0 ]\n    ', required_gids=('tile-g0',))
Cache hit "tile-g3":'\n    [ 9 ]\n    [ title 3 ]\n    [ 3 3 3 3 3 ]\n    '
CachedResult(value='\n    [ 9 ]\n    [ title 3 ]\n    [ 3 3 3 3 3 ]\n    ', required_gids=('tile-g3',))
Cache hit "tile-g4":'\n    [ 10 ]\n    [ title 4 ]\n    [ 4 4 4 4 4 ]\n    '
CachedResult(value='\n    [ 10 ]\n    [ title 4 ]\n    [ 4 4 4 4 4 ]\n    ', required_gids=('tile-g4',))
Cache hit "page-0-2":'Page for 0 to 2'
Page for 0 to 2
Cache hit "page-1-3":'Page for 1 to 3'
Page for 1 to 3
Cache hit "page-0-4":'Page for 0 to 4'
Page for 0 to 4


In [35]:
print('should be stable')
page(0,2)
page(1,3)
page(0,4)
print()
print('updating')
import random
db.item(gid='g0').update_record(title=f'testing2 {random.randint(1,100)}')
db.commit()
print()
print('should recalculate')
page(0,2) # Tile-G0 (g0) ,Tile-G1
page(1,3) # g1, g2,
page(0,4) # tile-g0, g1, g2, g3
print()
print('should be stable')
page(0,2)
page(1,3)
page(0,4)


should be stable
Cache hit "page-0-2":'Page for 0 to 2'
Cache hit "page-1-3":'Page for 1 to 3'
Cache hit "page-0-4":'Page for 0 to 4'

updating

should recalculate
Execute page-0-2
Execute tile-g0
Execute teaser-g0
Execute g0
Save to stash "g0", "Item(id=6, gid='g0', title='testing2 50', body='0 0 0 0 0')" requires ('g0',)
Save to stash "teaser-g0", "'testing2 50: 0 0 0...'" requires ['g0']
Save to stash "tile-g0", "'\n    [ 6 ]\n    [ testing2 50 ]\n    [ 0 0 0 0 0 ]\n    '" requires ['teaser-g0']
Cache hit "tile-g1":'\n    [ 7 ]\n    [ title 1 ]\n    [ 1 1 1 1 1 ]\n    '
Save to stash "page-0-2", "'Page for 0 to 2'" requires ['tile-g0', 'tile-g1']
Cache hit "page-1-3":'Page for 1 to 3'
Execute page-0-4
Cache hit "tile-g0":'\n    [ 6 ]\n    [ testing2 50 ]\n    [ 0 0 0 0 0 ]\n    '
Cache hit "tile-g1":'\n    [ 7 ]\n    [ title 1 ]\n    [ 1 1 1 1 1 ]\n    '
Cache hit "tile-g2":'\n    [ 8 ]\n    [ title 2 ]\n    [ 2 2 2 2 2 ]\n    '
Cache hit "tile-g3":'\n    [ 9 ]\n    [ title 3 ]\n   

CachedResult(value='Page for 0 to 4', required_gids=('page-0-4',))

In [39]:
show('item',db(db.item).select())
show('cache',db(db.cache).select())
show('deps',db(db.deps).select())
print(tabulate(db.executesql('select cache.gid, dep.gid from deps inner join cache on cache.id = deps.cache_id inner join cache dep on dep.id = deps.depends_on'), headers=['cache_id','depends_on']))

-----------------[ item ]-----------------
|   id | gid   | title       | body      |
|------|-------|-------------|-----------|
|    7 | g1    | title 1     | 1 1 1 1 1 |
|    8 | g2    | title 2     | 2 2 2 2 2 |
|    9 | g3    | title 3     | 3 3 3 3 3 |
|   10 | g4    | title 4     | 4 4 4 4 4 |
|    6 | g0    | testing2 50 | 0 0 0 0 0 |

---------------[ cache ]----------------
|   id | gid       | value             |
|------|-----------|-------------------|
|   37 | g2        |                   |
|   38 | teaser-g2 | title 2: 2 2 2... |
|   40 | g3        |                   |
|   41 | teaser-g3 | title 3: 3 3 3... |
|   42 | tile-g3   | [ 9 ]
    [ title 3 ]
    [ 3 3 3 3 3 ]                   |
|   43 | g4        |                   |
|   44 | teaser-g4 | title 4: 4 4 4... |
|   45 | tile-g4   | [ 10 ]
    [ title 4 ]
    [ 4 4 4 4 4 ]                   |
|   46 | g1        |                   |
|   47 | teaser-g1 | title 1: 1 1 1... |
|   48 | tile-g1   | [ 7 ]
    [ title 1 

In [22]:
db.item(gid='g0').update_record(title='prut G011')
print(db._lastsql)
db.commit()

('UPDATE "item" SET "title"=\'prut G011\' WHERE ("item"."id" = 173);', 0.001567840576171875)


In [37]:
show('item',db(db.item).select())
show('cache',db(db.cache).select())
show('deps',db(db.deps).select())

-----------------[ item ]-----------------
|   id | gid   | title       | body      |
|------|-------|-------------|-----------|
|    7 | g1    | title 1     | 1 1 1 1 1 |
|    8 | g2    | title 2     | 2 2 2 2 2 |
|    9 | g3    | title 3     | 3 3 3 3 3 |
|   10 | g4    | title 4     | 4 4 4 4 4 |
|    6 | g0    | testing2 50 | 0 0 0 0 0 |

-----------------[ cache ]------------------
|   id | gid       | value                 |
|------|-----------|-----------------------|
|   37 | g2        |                       |
|   38 | teaser-g2 | title 2: 2 2 2...     |
|   40 | g3        |                       |
|   41 | teaser-g3 | title 3: 3 3 3...     |
|   42 | tile-g3   | [ 9 ]
    [ title 3 ]
    [ 3 3 3 3 3 ]                       |
|   43 | g4        |                       |
|   44 | teaser-g4 | title 4: 4 4 4...     |
|   45 | tile-g4   | [ 10 ]
    [ title 4 ]
    [ 4 4 4 4 4 ]                       |
|   46 | g1        |                       |
|   47 | teaser-g1 | title 1: 1 1 

In [38]:
db.rollback()
db.executesql('select * from derivatives(%s)',placeholders=(db.cache(gid='g0').id,))

AttributeError: 'NoneType' object has no attribute 'id'

In [None]:
db.commit()

In [None]:
item('g0').value
db.item(gid='g0').update_record(title='prut G011')
print(db._lastsql)
item('g0').value
item('g0').value
