Skip to content

Commit

Permalink
Merge pull request #22 from newtdb/fix-redo
Browse files Browse the repository at this point in the history
Fix redo
  • Loading branch information
jimfulton committed Apr 18, 2017
2 parents 37c5f2f + 344b259 commit 1c2fe02
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
language: python
sudo: false
dist: trusty
services:
- postgresql
addons:
postgresql: "9.4"
postgresql: "9.5"

python:
- pypy-5.4.1
Expand Down
10 changes: 10 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ Changes

- Fixed: Newt DB couldn't be added to an existing RelStorage database.

- Removed the updater ``--redo`` option. It was implemented incorrectly.
Implementing it correctly will be necessary at some point, but for
now, we'll punt.

- Added a new ``--compute-missing`` option to compute missing Newt
records after updating an application from plain RelStorage. (This
is similar to the removed ``--redo`` but simpler and narrower in
scope.)


0.5.2 (2017-04-01)
------------------

Expand Down
23 changes: 23 additions & 0 deletions doc/topics/for-zodb-users.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,26 @@ RelStorage ``zodbconvert`` works with Newt DB.
The next version of Newt will provide a options for batch-computation
of JSON data, which will allow the conversion of existing Postgres
RelStorage databases in place.

Updating an existing PostgreSQL RelStorage ZODB application to use Newt DB
==========================================================================

There are two ways to add Newt DB to an existing PostgreSQL RelStorage
ZODB application.

a. Update your :doc:`database text configuration <text-configuration>`
to include a ``newt`` tag and optionally a ``newtdb`` tag. After
all of your database clients have been updated (and restarted),
then new database records will be written to the ``newt`` table.
You'll need to run the :doc:`newt updater <updater>` with the
``--compute-missing`` option to write ``newt`` records for your
older data:

.. code-block:: console
newt-updater --compute-missing CONNECTION_STRING
b. Use the :doc:`Newt DB updater <updater>` to maintain Newt data
asynchronously. This requires no change to your database setup, but
requires managing a separate process. Because updates are
asynchronous, Newt JSON data may be slightly out of date at times.
21 changes: 21 additions & 0 deletions doc/topics/updater.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ To use Newt's asynchronous updater:
This is a backstop to PostgreSQL's notification. The default timeout
is 300 seconds.

-m, --transaction-size-limit
The target transaction batch size. This limits (loosely) the number
of records processed in a batch. Larger batches incur less overhead,
but long-lasting transactions can cause interfere with other
processing. The default is 100 thousand records.

This option only comes into play when a large number of records have
to be processed, typically when first running the updater or using
the ``--compute-missing option``.

-T, --remove-delete-trigger
Remove the Newt DB delete trigger, if it exists.

Expand All @@ -122,6 +132,17 @@ To use Newt's asynchronous updater:
psycopg2cffi). By default, the appropriate driver will be
selected automatically.

--compute-missing
Compute missing newt records.

Rather than processing new records, process records written up through
the current time and stop. Only missing records are updated. **This
option requires PostgreSQL 9.5 or later.**

This is used to compute newt records after adding Newt DB to an existing
PostgreSQL RelStorage application.


Garbage collection
==================

Expand Down
1 change: 1 addition & 0 deletions src/newt/db/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def store_temp(self, cursor, batcher, oid, prev_tid, data):
)

_move_json_sql = """
LOCK TABLE newt IN SHARE MODE;
DELETE FROM newt WHERE zoid IN (SELECT zoid FROM temp_store);
INSERT INTO newt (zoid, class_name, ghost_pickle, state)
Expand Down
11 changes: 6 additions & 5 deletions src/newt/db/follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
logger = logging.getLogger(__name__)

NOTIFY = 'newt_object_state_changed'
PROGRESS_TABLE = 'newt_follow_progress'

def non_empty_generator(gen):
try:
Expand Down Expand Up @@ -223,8 +224,8 @@ def _ex_progress(conn, cursor, sql, *args):
except Exception:
# Hm, maybe the table doesn't exist:
conn.rollback()
if not table_exists(cursor, 'newt_follow_progress'):
cursor.execute("create table newt_follow_progress"
if not table_exists(cursor, PROGRESS_TABLE):
cursor.execute("create table " + PROGRESS_TABLE +
" (id text primary key, tid bigint)")

# Try again. Note that if we didn't create the table, this
Expand Down Expand Up @@ -258,7 +259,7 @@ def get_progress_tid(connection, id):
with closing(connection.cursor()) as cursor:
_ex_progress(
connection, cursor,
"select tid from newt_follow_progress where id = %s", id)
"select tid from %s where id = %%s" % PROGRESS_TABLE, id)

tid = list(cursor)
if tid:
Expand Down Expand Up @@ -289,9 +290,9 @@ def set_progress_tid(connection, id, tid):

with closing(connection.cursor()) as cursor:
_ex_progress(connection, cursor,
"delete from newt_follow_progress where id=%s", id)
"delete from %s where id=%%s" % PROGRESS_TABLE, id)
cursor.execute(
"insert into newt_follow_progress(id, tid) values(%s, %s)",
"insert into %s(id, tid) values(%%s, %%s)" % PROGRESS_TABLE,
(id, tid))

def stop_updates(conn):
Expand Down
97 changes: 97 additions & 0 deletions src/newt/db/tests/testupdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,100 @@ def make_tid(*args):
self.assertEqual(2, updater.main([self.dsn, '--nagios', '1,99']))
self.assertEqual("Updater is too far behind | 121.100\n",
''.join(writes))


class ComputeMissingTests(base.TestCase):

def test_compute_missing_wo_updater(self):
# When redoing, if we aren't running the updater, we shouldn't
# create the follow table and should use the max(tid) from
# object state as the end tid.
import ZODB.config
from .. import follow, Object, pg_connection, _util, updater

db = ZODB.config.databaseFromString("""\
%%import relstorage
<zodb>
<relstorage>
keep-history false
<postgresql>
dsn %s
</postgresql>
</relstorage>
</zodb>
""" % self.dsn)
with db.transaction() as conn:
conn.root.x = Object(a=1)
db.close()

conn = pg_connection(self.dsn)
cursor = conn.cursor()
self.assertFalse(_util.table_exists(cursor, 'newt'))
self.assertFalse(_util.table_exists(cursor, follow.PROGRESS_TABLE))

# If we try to run redo now, we'll get an error, because the
# net table doesn't exist:
with self.assertRaises(AssertionError):
updater.main(['--compute-missing', self.dsn])

# create newt table
from .. import connection
connection(self.dsn).close()

# The table is empty:
cursor.execute("select count(*) from newt")
[[c]] = cursor
self.assertEqual(0, c)

# Now run the redo:
updater.main(['--compute-missing', self.dsn])

# We have rows:
cursor.execute("select count(*) from newt")
[[c]] = cursor
self.assertEqual(2, c)

# The progres table still doesn't exist
self.assertFalse(_util.table_exists(cursor, follow.PROGRESS_TABLE))

cursor.close()
conn.close()

def test_catch_up_consistency(self, back=False):
import newt.db, BTrees.OOBTree
conn = newt.db.connection(self.dsn)
conn.root.b = BTrees.OOBTree.BTree()
N = 300
for i in range(*((N-1, -1, -1) if back else (N,))):
conn.root.b[i] = newt.db.Object(i=i, x=0)
conn.commit()
pg = newt.db.pg_connection(self.dsn)
cursor = pg.cursor()
cursor.execute("truncate newt")
pg.commit()

import threading
ev = threading.Event()

def update():
ev.set()
for o in conn.root.b.values():
o.x = 1; conn.commit()

from .. import updater

t = threading.Thread(target=update)
t.setDaemon(True)
t.start()
ev.wait(9)
updater.main(['--compute-missing', self.dsn])
t.join(N/10)
self.assertEqual(N, len([o for o in conn.root.b.values() if o.x == 1]))
pg.rollback()
cursor.execute(
"""select count(*) from newt where state @> '{"x": 1}'""")
[[n]] = cursor
self.assertEqual(N, n)

def test_catch_up_consistency_back(self):
self.test_catch_up_consistency(True)
90 changes: 63 additions & 27 deletions src/newt/db/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@
help="Don't perform garbage collection on startup.")

parser.add_argument(
'--redo', action='store_true',
'--compute-missing', action='store_true',
help="""\
Redo updates
Compute missing newt records.
Rather than processing records written before the current tid (in
object_json_tid), process records writen up through the current tid
and stop.
Rather than processing new records, process records written up through
the current time and stop. Only missing records are updated. This
option requires PostgreSQL 9.5.
This is used to update records after changes to data
transformations. It should be run *after* restarting the regulsr
updater.
This is used to compute newt records after adding Newt DB to an existing
PostgreSQL RelStorage application.
""")

parser.add_argument(
Expand Down Expand Up @@ -127,6 +126,32 @@ def _update_newt(conn, cursor, jsonifier, Binary, batch):

conn.commit()

def _compute_missing(conn, cursor, jsonifier, Binary, batch):
ex = cursor.execute
mogrify = cursor.mogrify

tid = None
while True:
data = list(itertools.islice(batch, 0, 100))
if not data:
break
tid = data[-1][0]

# Convert, filtering out null conversions (uninteresting classes)
to_save = []
for tid, zoid, state in data:
class_name, ghost_pickle, state = jsonifier((tid, zoid), state)
if state is not None:
to_save.append((zoid, class_name, Binary(ghost_pickle), state))

if to_save:
ex("insert into newt (zoid, class_name, ghost_pickle, state)"
" values %s on conflict do nothing" %
', '.join(mogrify('(%s, %s, %s, %s)', d).decode('ascii')
for d in to_save)
)

conn.commit()

logging_levels = 'DEBUG INFO WARNING ERROR CRITICAL'.split()

Expand Down Expand Up @@ -184,23 +209,32 @@ def main(args=None):
print("OK | %s" % flag())
return 0

tid = follow.get_progress_tid(conn, __name__)
if tid < 0 and not table_exists(cursor, 'newt'):
from ._adapter import _newt_ddl
cursor.execute(_newt_ddl)
elif trigger_exists(cursor, DELETE_TRIGGER):
if options.remove_delete_trigger:
cursor.execute("drop trigger %s on object_state" %
DELETE_TRIGGER)
else:
logger.error(
"The Newt DB delete trigger exists.\n"
"It is incompatible with the updater.\n"
"Use -T to remove it.")
return 1
compute_missing = options.compute_missing
if (compute_missing and
not table_exists(cursor, follow.PROGRESS_TABLE)
):
if not table_exists(cursor, 'newt'):
raise AssertionError("newt table doesn't exist")
cursor.execute("select max(tid) from object_state")
[[tid]] = cursor
else:
tid = follow.get_progress_tid(conn, __name__)
if tid < 0 and not table_exists(cursor, 'newt'):
from ._adapter import _newt_ddl
cursor.execute(_newt_ddl)
elif trigger_exists(cursor, DELETE_TRIGGER):
if options.remove_delete_trigger:
cursor.execute("drop trigger %s on object_state" %
DELETE_TRIGGER)
else:
logger.error(
"The Newt DB delete trigger exists.\n"
"It is incompatible with the updater.\n"
"Use -T to remove it.")
return 1

if not options.no_gc:
cursor.execute(gc_sql)
if not options.no_gc:
cursor.execute(gc_sql)

conn.commit()

Expand All @@ -211,14 +245,16 @@ def main(args=None):
"but garbage collection was suppressed.")
return 0

if options.redo:
if options.compute_missing:
start_tid = -1
end_tid = tid
logger.info("Redoing through", tid)
logger.info("Compute_missing through %s", tid)
process = _compute_missing
else:
logger.info("Starting updater at %s", tid)
start_tid = tid
end_tid = None
process = _update_newt

for batch in follow.updates(
dsn,
Expand All @@ -227,7 +263,7 @@ def main(args=None):
batch_limit=options.transaction_size_limit,
poll_timeout=options.poll_timeout,
):
_update_newt(conn, cursor, jsonifier, Binary, batch)
process(conn, cursor, jsonifier, Binary, batch)

if __name__ == '__main__':
sys.exit(main())

0 comments on commit 1c2fe02

Please sign in to comment.