-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial storage change tracking implementation. #323
- Loading branch information
Showing
8 changed files
with
378 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Any | ||
from typing import TYPE_CHECKING | ||
|
||
from .._types import Action | ||
from .._types import Change | ||
from ..exceptions import StorageError | ||
from ._sql_utils import parse_schema | ||
from ._sql_utils import Query | ||
from ._sqlite_utils import ddl_transaction | ||
from ._sqlite_utils import wrap_exceptions | ||
|
||
if TYPE_CHECKING: # pragma: no cover | ||
from ._base import StorageBase | ||
|
||
|
||
class Changes: | ||
# FIXME: protocol | ||
|
||
def __init__(self, storage: StorageBase): | ||
self.storage = storage | ||
|
||
@wrap_exceptions(StorageError) | ||
def enable(self) -> None: | ||
# FIXME: already enabled exc | ||
with ddl_transaction(self.storage.get_db()) as db: | ||
for objects in SCHEMA.values(): | ||
for object in objects.values(): | ||
object.create(db) | ||
db.execute("UPDATE entries SET sequence = randomblob(16)") | ||
db.execute( | ||
""" | ||
INSERT INTO changes | ||
SELECT sequence, feed, id, '', 1 FROM entries | ||
""" | ||
) | ||
|
||
@wrap_exceptions(StorageError) | ||
def disable(self) -> None: | ||
# FIXME: already enabled exc | ||
with ddl_transaction(self.storage.get_db()) as db: | ||
for objects in SCHEMA.values(): | ||
for object in objects.values(): | ||
db.execute(f"DROP {object.type} {object.name}") | ||
db.execute("UPDATE entries SET sequence = NULL") | ||
|
||
@wrap_exceptions(StorageError) | ||
def get( | ||
self, action: Action | None = None, limit: int | None = None | ||
) -> list[Change]: | ||
# FIXME: not enabled exc | ||
context = { | ||
'limit': min(limit or self.storage.chunk_size, self.storage.chunk_size) | ||
} | ||
# the ORDER_BY is only used for testing; should this return a set instead? | ||
query = Query().SELECT('*').FROM('changes').ORDER_BY('rowid').LIMIT(':limit') | ||
if action: | ||
query.WHERE('action = :action') | ||
context['action'] = action.value | ||
rows = self.storage.get_db().execute(str(query), context) | ||
return list(map(change_factory, rows)) | ||
|
||
@wrap_exceptions(StorageError) | ||
def done(self, changes: list[Change]) -> None: | ||
# FIXME: not enabled exc | ||
# FIXME: len(changes) <= self.storage.chunk_size | ||
with self.storage.get_db() as db: | ||
for change in changes: | ||
db.execute( | ||
""" | ||
DELETE FROM changes | ||
WHERE (sequence, feed, id, key, action) | ||
= (:sequence, :feed, :id, :key, :action) | ||
""", | ||
change_to_dict(change), | ||
) | ||
|
||
|
||
def change_factory(row: tuple[Any, ...]) -> Change: | ||
sequence, feed, id, key, action = row | ||
return Change( | ||
Action(action), | ||
sequence, | ||
feed or None, | ||
id or None, | ||
key or None, | ||
) | ||
|
||
|
||
def change_to_dict(change: Change) -> dict[str, Any]: | ||
return dict( | ||
sequence=change.sequence, | ||
feed=change.feed_url or '', | ||
id=change.entry_id or '', | ||
key=change.tag_key or '', | ||
action=change.action.value, | ||
) | ||
|
||
|
||
SCHEMA = parse_schema(""" | ||
CREATE TABLE changes ( | ||
sequence BLOB NOT NULL, | ||
feed TEXT NOT NULL, | ||
id TEXT NOT NULL, | ||
key TEXT NOT NULL, | ||
action INTEGER NOT NULL, | ||
PRIMARY KEY (sequence, feed, id, key) | ||
); | ||
CREATE TRIGGER changes_entry_insert | ||
AFTER INSERT | ||
ON entries | ||
BEGIN | ||
-- SELECT print(' entry_insert', new.feed, new.id); | ||
UPDATE entries | ||
SET sequence = randomblob(16) | ||
WHERE (new.id, new.feed) = (id, feed); | ||
INSERT OR REPLACE INTO changes | ||
SELECT sequence, feed, id, '', 1 | ||
FROM entries | ||
WHERE (feed, id) = (new.feed, new.id); | ||
END; | ||
-- Can't handle feed URL changes in changes_entry_update because | ||
-- those entry updates are a consequence of ON UPDATE CASCADE, | ||
-- which overrides the INSERT OR REPLACE used in the trigger, | ||
-- because "conflict handling policy of the outer statement" | ||
-- takes precedence per https://sqlite.org/lang_createtrigger.html. | ||
-- Instead, we handle feed URL changes in changes_feed_changed. | ||
CREATE TRIGGER changes_entry_update | ||
AFTER UPDATE | ||
OF title, summary, content | ||
ON entries | ||
WHEN | ||
new.id = old.id AND new.feed = old.feed AND ( | ||
coalesce(new.title, '') != coalesce(old.title, '') | ||
OR coalesce(new.summary, '') != coalesce(old.summary, '') | ||
OR coalesce(new.content, '') != coalesce(old.content, '') | ||
) | ||
BEGIN | ||
-- SELECT print(' entry_update', old.feed, old.id, '->', new.feed, new.id); | ||
INSERT OR REPLACE INTO changes | ||
VALUES (old.sequence, old.feed, old.id, '', 2); | ||
UPDATE entries | ||
SET sequence = randomblob(16) | ||
WHERE (new.id, new.feed) = (id, feed); | ||
INSERT OR REPLACE INTO changes | ||
SELECT sequence, feed, id, '', 1 | ||
FROM entries | ||
WHERE (feed, id) = (new.feed, new.id); | ||
END; | ||
CREATE TRIGGER changes_entry_delete | ||
AFTER DELETE | ||
ON entries | ||
BEGIN | ||
-- SELECT print(' entry_delete', old.feed, old.id); | ||
INSERT OR REPLACE INTO changes | ||
VALUES (old.sequence, old.feed, old.id, '', 2); | ||
END; | ||
CREATE TRIGGER changes_feed_changed | ||
AFTER UPDATE | ||
OF url, title, user_title | ||
ON feeds | ||
WHEN | ||
new.url != old.url | ||
OR coalesce(new.title, '') != coalesce(old.title, '') | ||
OR coalesce(new.user_title, '') != coalesce(old.user_title, '') | ||
BEGIN | ||
-- SELECT print(' feed_url_change', old.url, '->', new.url); | ||
INSERT OR REPLACE INTO changes | ||
SELECT sequence, old.url, id, '', 2 | ||
FROM entries | ||
WHERE feed = new.url; | ||
UPDATE entries | ||
SET sequence = randomblob(16) | ||
WHERE feed = new.url; | ||
INSERT OR REPLACE INTO changes | ||
SELECT sequence, feed, id, '', 1 | ||
FROM entries | ||
WHERE feed = new.url; | ||
END; | ||
""") # fmt: skip |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.