Skip to content

Commit

Permalink
implement cockroach trns fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem committed Dec 21, 2017
1 parent 31ea3ac commit 132d0e2
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 45 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -7,9 +7,9 @@ run-postgres:


run-cockroachdb:
docker pull cockroachdb/cockroach:v1.0
docker pull cockroachdb/cockroach:v1.1.3
docker run -p 127.0.0.1:26257:26257 -p 127.0.0.1:9080:8080 --rm cockroachdb/cockroach:v1.0 start --insecure


create-cockroachdb:
./bin/py _cockroachdb-createdb.py
./bin/python _cockroachdb-createdb.py
2 changes: 1 addition & 1 deletion guillotina/commands/shell.py
Expand Up @@ -157,7 +157,7 @@ def get_loop(self):
asyncio.set_event_loop(self.loop)
return self.loop

def run_command(self, app, settings):
def run(self, arguments, settings, app):
loop = self.get_loop()
loop.setup(app)
try:
Expand Down
51 changes: 17 additions & 34 deletions guillotina/db/storages/cockroach.py
Expand Up @@ -52,31 +52,17 @@


NEXT_TID = """SELECT unique_rowid()"""
MAX_TID = "SELECT COALESCE(MAX(tid), 0) from objects;"
MAX_TID = "SELECT COALESCE(MAX(tid), 0) from objects WHERE tid > $1::int;"

DELETE_FROM_BLOBS = """DELETE FROM blobs WHERE zoid = $1::varchar(32);"""
DELETE_CHILDREN = """DELETE FROM objects where parent_id = $1::varchar(32);"""
DELETE_DANGLING = '''DELETE FROM objects
WHERE
parent_id IS NOT NULL and parent_id NOT IN (
SELECT zoid FROM objects
)
RETURNING 1
'''
DELETE_BY_PARENT_OID = '''DELETE FROM objects
WHERE parent_id = $1::varchar(32);'''
GET_OIDS_BY_PARENT = '''SELECT zoid FROM objects
WHERE parent_id = $1::varchar(32);'''
BATCHED_GET_CHILDREN_OIDS = """SELECT zoid FROM objects
WHERE parent_id = $1::varchar(32)
ORDER BY zoid
LIMIT $2::int
OFFSET $3::int"""

DELETE_FROM_OBJECTS = """
DELETE FROM objects WHERE zoid = $1::varchar(32);
"""


async def iterate_children(conn, parent_oid, page_size=1000):
smt = await conn.prepare(BATCHED_GET_CHILDREN_OIDS)
Expand Down Expand Up @@ -138,13 +124,16 @@ def __init__(self, txn):
async def start(self):
assert self._status in ('none',)
await self._conn.execute(f'''
BEGIN ISOLATION LEVEL {self._storage._isolation_level.upper()},
PRIORITY {self._priority};''')
BEGIN;
SET TRANSACTION ISOLATION LEVEL {self._storage._isolation_level.upper()}, PRIORITY {self._priority};
SAVEPOINT cockroach_restart;''')
self._status = 'started'

async def commit(self):
assert self._status in ('started',)
await self._conn.execute('COMMIT;')
await self._conn.execute(f'''
RELEASE SAVEPOINT cockroach_restart;
COMMIT;''')
self._status = 'committed'

async def rollback(self):
Expand All @@ -163,10 +152,7 @@ class CockroachStorage(pg.PostgresqlStorage):
- no jsonb support
- no CASCADE support(ON DELETE CASCADE)
- used by objects and blobs tables
- right now, deleting will potentially leave dangling rows around
- potential solutions
- utility to recursively delete?
- complex delete from query that does the sub queries to delete?
- utility to recursively delete?
- no sequence support
- use serial construct of unique_rowid() instead
- no referencial integrity support!
Expand Down Expand Up @@ -195,6 +181,7 @@ class CockroachStorage(pg.PostgresqlStorage):
'CREATE INDEX IF NOT EXISTS object_part ON objects (part);',
'CREATE INDEX IF NOT EXISTS object_parent ON objects (parent_id);',
'CREATE INDEX IF NOT EXISTS object_id ON objects (id);',
'CREATE INDEX IF NOT EXISTS object_type ON objects (type);',
'CREATE INDEX IF NOT EXISTS blob_bid ON blobs (bid);',
'CREATE INDEX IF NOT EXISTS blob_zoid ON blobs (zoid);',
'CREATE INDEX IF NOT EXISTS blob_chunk ON blobs (chunk_index);'
Expand All @@ -206,7 +193,7 @@ class CockroachStorage(pg.PostgresqlStorage):
_vacuum_class = CockroachVacuum

def __init__(self, *args, **kwargs):
transaction_strategy = kwargs.get('transaction_strategy', 'novote')
transaction_strategy = kwargs.get('transaction_strategy', 'resolve')
self._isolation_level = kwargs.get('isolation_level', 'snapshot').lower()
if (self._isolation_level == 'serializable' and
transaction_strategy not in ('none', 'tidonly', 'novote', 'lock')):
Expand All @@ -221,14 +208,10 @@ async def initialize_tid_statements(self):
self._stmt_next_tid = await self._read_conn.prepare(NEXT_TID)
self._stmt_max_tid = await self._read_conn.prepare(MAX_TID)

async def open(self):
conn = await super().open()
if self._transaction_strategy in ('none', 'tidonly', 'lock'):
# if a strategy is used that is not a db transaction we can't
# set the isolation level along with the transaction start
await conn.execute(
'SET DEFAULT_TRANSACTION_ISOLATION TO ' + self._isolation_level)
return conn
async def get_current_tid(self, txn):
async with self._lock:
# again, use storage lock here instead of trns lock
return await self._stmt_max_tid.fetchval(txn._tid)

async def initialize(self, loop=None, **kw):
await super().initialize(loop=loop, **kw)
Expand Down Expand Up @@ -287,7 +270,7 @@ async def store(self, oid, old_serial, writer, obj, txn):
async def delete(self, txn, oid):
# no cascade support, so we push to vacuum
async with txn._lock:
await txn._db_conn.execute(DELETE_FROM_OBJECTS, oid)
await txn._db_conn.execute(pg.DELETE_OBJECT, oid)
await txn._db_conn.execute(DELETE_FROM_BLOBS, oid)
txn.add_after_commit_hook(self._txn_oid_commit_hook, [oid])

Expand All @@ -297,7 +280,7 @@ async def commit(self, transaction):
try:
await transaction._db_txn.commit()
except asyncpg.exceptions.SerializationError as ex:
if 'restart transaction' in ex.args[0]:
if ex.sqlstate in ('40001', '40003'):
raise ConflictError(ex.args[0])
elif self._transaction_strategy not in ('none', 'tidonly'):
logger.warning('Do not have db transaction to commit')
Expand All @@ -310,6 +293,6 @@ async def get_one_row(self, smt, *args):
try:
result = await smt.fetch(*args)
except asyncpg.exceptions.SerializationError as ex:
if 'restart transaction' in ex.args[0]:
if ex.sqlstate in ('40001', '40003'):
raise ConflictError(ex.args[0])
return result[0] if len(result) > 0 else None
2 changes: 1 addition & 1 deletion guillotina/db/strategies/simple.py
Expand Up @@ -16,11 +16,11 @@ class SimpleStrategy(BaseStrategy):
'''

async def tpc_begin(self):
await self._storage.start_transaction(self._transaction)
if self.writable_transaction:
tid = await self._storage.get_next_tid(self._transaction)
if tid is not None:
self._transaction._tid = tid
await self._storage.start_transaction(self._transaction)

async def tpc_vote(self):
if not self.writable_transaction:
Expand Down
6 changes: 3 additions & 3 deletions guillotina/db/transaction.py
Expand Up @@ -245,6 +245,9 @@ async def commit(self):
self.status = Status.COMMITTING
try:
await self.real_commit()
# vote will do conflict resolution if there are conflicting writes
await self.tpc_vote()
await self.tpc_finish()
except (ConflictError, TIDConflictError) as ex:
# this exception should bubble up
# in the case of TIDConflictError, we should make sure to try
Expand All @@ -254,9 +257,6 @@ async def commit(self):
await self._cache.close(invalidate=isinstance(ex, TIDConflictError))
self.tpc_cleanup()
raise
# vote will do conflict resolution if there are conflicting writes
await self.tpc_vote()
await self.tpc_finish()
self.status = Status.COMMITTED
await self._call_after_commit_hooks()

Expand Down
2 changes: 1 addition & 1 deletion guillotina/tests/docker_containers/cockroach.py
Expand Up @@ -5,7 +5,7 @@

class CockroachDB(BaseImage):
name = 'cockroach'
image = 'cockroachdb/cockroach:v1.0'
image = 'cockroachdb/cockroach:v1.1.3'
port = 26257

def get_image_options(self):
Expand Down
5 changes: 2 additions & 3 deletions guillotina/tests/test_api.py
Expand Up @@ -114,12 +114,11 @@ async def test_create_delete_contenttype(container_requester):
'/db/guillotina/',
data=json.dumps({
"@type": "Item",
"title": "Item1",
"id": "item1"
"id": "foobar"
})
)
assert status == 201
response, status = await requester('DELETE', '/db/guillotina/item1')
response, status = await requester('DELETE', '/db/guillotina/foobar')
assert status == 200


Expand Down

0 comments on commit 132d0e2

Please sign in to comment.