Skip to content

Commit

Permalink
Fix cockroach integration and provide travis support for cockroach (#85)
Browse files Browse the repository at this point in the history
Fix cockroach integration and provide travis support for cockroach
  • Loading branch information
vangheem committed May 22, 2017
1 parent 8d8dfb1 commit debf104
Show file tree
Hide file tree
Showing 18 changed files with 366 additions and 202 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Expand Up @@ -7,3 +7,4 @@ omit =
*/docs/*
*/commands/*
*/one_connection_storage.py
*/cockroach.py
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -23,6 +23,7 @@ install:
- sleep 15
script:
- bin/py.test -s --cov=guillotina -v --cov-report term-missing guillotina
- USE_COCKROACH=true bin/py.test -s -v guillotina
- bin/code-analysis
after_success:
- pip install coveralls
Expand Down
13 changes: 10 additions & 3 deletions CHANGELOG.rst
@@ -1,7 +1,14 @@
1.0.0a29 (unreleased)
---------------------
1.1.0a1 (unreleased)
--------------------

- Include cockroachdb in our CI testing
[vangheem]

- Nothing changed yet.
- Simplify docker testing infrastructure
[vangheem]

- Fix cockroachdb integration
[vangheem]


1.0.0a28 (2017-05-18)
Expand Down
9 changes: 9 additions & 0 deletions Makefile
Expand Up @@ -20,3 +20,12 @@ run-etcd:
--initial-cluster-token my-etcd-token \
--initial-cluster-state new \
--auto-compaction-retention 1


run-cockroachdb:
docker pull cockroachdb/cockroach:v1.0
docker run -p 127.0.0.1:26257:26257 -p 127.0.0.1:9080:8080 --rm cockroachdb/cockroach:v1.0 start --insecure


create-cockroachdb:
./bin/py _cockroachdb-createdb.py
5 changes: 5 additions & 0 deletions README.rst
Expand Up @@ -88,6 +88,11 @@ With file watcher...

./bin/ptw guillotina --runner=./bin/py.test


To run tests with cockroach db:

USE_COCKROACH=true ./bin/pytest guillotina

Default
-------

Expand Down
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
1.0.0a29.dev0
1.1.0a1.dev0
15 changes: 15 additions & 0 deletions _cockroachdb-createdb.py
@@ -0,0 +1,15 @@
import asyncpg
import asyncio


async def run():
# Establish a connection to an existing database named "test"
# as a "postgres" user.
conn = await asyncpg.connect('postgresql://root@localhost:26257?sslmode=disable')
# Execute a statement to create a new table.
await conn.execute('''CREATE DATABASE guillotina''')


if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(run())
12 changes: 7 additions & 5 deletions guillotina/api/ws.py
Expand Up @@ -145,8 +145,8 @@ async def handle_ws_request(self, ws, message):
self.request._futures = {}

async def __call__(self):
tm = get_tm()
await tm.abort()
tm = get_tm(self.request)
await tm.abort(self.request)
ws = web.WebSocketResponse()
await ws.prepare(self.request)

Expand All @@ -156,14 +156,16 @@ async def __call__(self):
if message['op'] == 'close':
await ws.close()
elif message['op'] == 'GET':
await tm.begin()
txn = await tm.begin(request=self.request)
try:
await self.handle_ws_request(ws, message)
await tm.commit()
except Exception:
await tm.abort()
await ws.close()
raise
finally:
# only currently support GET requests which are *never*
# supposed to be commits
await tm.abort(txn=txn)
else:
await ws.close()
elif msg.tp == aiohttp.WSMsgType.error:
Expand Down
157 changes: 118 additions & 39 deletions guillotina/db/storages/cockroach.py
@@ -1,28 +1,65 @@
from guillotina.db.storages import pg
from guillotina.exceptions import ConflictError
from guillotina.exceptions import TIDConflictError

import asyncpg
import sys

INSERT = """
INSERT INTO objects
(zoid, tid, state_size, part, resource, of, otid, parent_id, id, type, state)
VALUES ($1::varchar(32), $2::int, $3::int, $4::int, $5::boolean, $6::varchar(32), $7::int,
$8::varchar(32), $9::text, $10::text, $11::bytea)
ON CONFLICT (zoid)
DO UPDATE SET
tid = EXCLUDED.tid,
state_size = EXCLUDED.state_size,
part = EXCLUDED.part,
resource = EXCLUDED.resource,
of = EXCLUDED.of,
otid = EXCLUDED.otid,
parent_id = EXCLUDED.parent_id,
id = EXCLUDED.id,
type = EXCLUDED.type,
state = EXCLUDED.state;
"""

# upsert without checking matching tids on updated object
NAIVE_UPSERT = """
INSERT INTO objects
(zoid, tid, state_size, part, resource, of, otid, parent_id, id, type, state)
VALUES ($1::varchar(32), $2::int, $3::int, $4::int, $5::boolean, $6::varchar(32), $7::int,
$8::varchar(32), $9::text, $10::text, $11::bytea)
ON CONFLICT (zoid)
DO UPDATE SET
tid = EXCLUDED.tid,
state_size = EXCLUDED.state_size,
part = EXCLUDED.part,
resource = EXCLUDED.resource,
of = EXCLUDED.of,
otid = EXCLUDED.otid,
parent_id = EXCLUDED.parent_id,
id = EXCLUDED.id,
type = EXCLUDED.type,
state = EXCLUDED.state"""
UPSERT = NAIVE_UPSERT + """
WHERE
tid = EXCLUDED.otid"""


# update without checking matching tids on updated object
UPDATE = """
UPDATE objects
SET
tid = $2::int,
state_size = $3::int,
part = $4::int,
resource = $5::boolean,
of = $6::varchar(32),
otid = $7::int,
parent_id = $8::varchar(32),
id = $9::text,
type = $10::text,
state = $11::bytea
WHERE
zoid = $1::varchar(32)
AND tid = $7::int
RETURNING tid, otid"""


NEXT_TID = """SELECT unique_rowid()"""
MAX_TID = "SELECT COALESCE(MAX(tid), 0) from objects;"
MAX_TID = "SELECT MAX(tid) from objects;"

DELETE_FROM_BLOBS = """DELETE FROM blobs WHERE zoid = $1::varchar(32);"""
UPDATE_REFERENCED_DELETED_OBJECTS = """
UPDATE objects
SET
parent_id = NULL
WHERE
parent_id = $1::varchar(32)
"""


class CockroachStorage(pg.PostgresqlStorage):
Expand All @@ -37,18 +74,24 @@ class CockroachStorage(pg.PostgresqlStorage):
- complex delete from query that does the sub queries to delete?
- no sequence support
- use serial construct of unique_rowid() instead
- no referencial integrity support!
- because we can't do ON DELETE support of any kind, we would get
errors after we run deletes unless we walk the whole sub tree
first, which is costly
- so we need to manually clean it up in a task that runs periodically,
our own db vacuum task.
'''

_object_schema = pg.PostgresqlStorage._object_schema.copy()
del _object_schema['json'] # no json db support
_object_schema.update({
'of': 'VARCHAR(32) REFERENCES objects',
'parent_id': 'VARCHAR(32) REFERENCES objects', # parent oid
'of': 'VARCHAR(32)',
'parent_id': 'VARCHAR(32)'
})

_blob_schema = pg.PostgresqlStorage._blob_schema.copy()
_blob_schema.update({
'zoid': 'VARCHAR(32) NOT NULL REFERENCES objects',
'zoid': 'VARCHAR(32) NOT NULL',
})

_initialize_statements = [
Expand All @@ -62,34 +105,70 @@ class CockroachStorage(pg.PostgresqlStorage):
'CREATE INDEX IF NOT EXISTS blob_chunk ON blobs (chunk_index);'
]

_max_tid = 0

async def initialize_tid_statements(self):
self._stmt_next_tid = await self._read_conn.prepare(NEXT_TID)
self._stmt_max_tid = await self._read_conn.prepare(MAX_TID)
if hasattr(sys, '_db_tests'):
self.get_current_tid = self._test_get_current_tid

async def _test_get_current_tid(self, txn):
return self._max_tid

async def store(self, oid, old_serial, writer, obj, txn):
assert oid is not None

smt = await self._get_prepared_statement(txn, 'insert', INSERT)

p = writer.serialize() # This calls __getstate__ of obj
if len(p) >= self._large_record_size:
self._log.warn("Too long object %d" % (obj.__class__, len(p)))
part = writer.part
if part is None:
part = 0
# (zoid, tid, state_size, part, main, parent_id, type, json, state)
await smt.fetchval(
oid, # The OID of the object
txn._tid, # Our TID
len(p), # Len of the object
part, # Partition indicator
writer.resource, # Is a resource ?
writer.of, # It belogs to a main
old_serial, # Old serial
writer.parent_id, # Parent OID
writer.id, # Traversal ID
writer.type, # Guillotina type
p # Pickle state
)

update = False
statement_sql = NAIVE_UPSERT
if not obj.__new_marker__ and obj._p_serial is not None:
# we should be confident this is an object update
statement_sql = UPDATE
update = True

if hasattr(sys, '_db_tests') and txn._tid > self._max_tid:
self._max_tid = txn._tid

async with txn._lock:
smt = await txn._db_conn.prepare(statement_sql)
try:
result = await smt.fetch(
oid, # The OID of the object
txn._tid, # Our TID
len(p), # Len of the object
part, # Partition indicator
writer.resource, # Is a resource ?
writer.of, # It belogs to a main
old_serial, # Old serial
writer.parent_id, # Parent OID
writer.id, # Traversal ID
writer.type, # Guillotina type
p # Pickle state)
)
except asyncpg.exceptions._base.InterfaceError as ex:
if 'another operation is in progress' in ex.args[0]:
raise ConflictError(
'asyncpg error, another operation in progress.')
raise
if update and len(result) != 1:
# raise tid conflict error
raise TIDConflictError(
'Mismatch of tid of object being updated. This is likely '
'caused by a cache invalidation race condition and should '
'be an edge case. This should resolve on request retry.')
obj._p_estimated_size = len(p)
return txn._tid, len(p)

async def delete(self, txn, oid):
# XXX no cascade support!
# need to move things around and recursively delete here...
async with txn._lock:
await txn._db_conn.execute(UPDATE_REFERENCED_DELETED_OBJECTS, oid)
await txn._db_conn.execute(DELETE_FROM_BLOBS, oid)
await txn._db_conn.execute(pg.DELETE_FROM_OBJECTS, oid)
3 changes: 1 addition & 2 deletions guillotina/db/storages/pg.py
Expand Up @@ -246,19 +246,18 @@ async def finalize(self):

async def create(self):
# Check DB
log.info('Creating initial database objects')
statements = [
get_table_definition('objects', self._object_schema),
get_table_definition('blobs', self._blob_schema,
primary_keys=('bid', 'zoid', 'chunk_index'))
]
statements.extend(self._initialize_statements)

async with self._pool.acquire() as conn:
for statement in statements:
await conn.execute(statement)

await self.initialize_tid_statements()

# migrate old transaction table scheme over
try:
old_tid = await self._read_conn.fetchval('SELECT max(tid) from transaction')
Expand Down

0 comments on commit debf104

Please sign in to comment.