Skip to content

Commit

Permalink
DatastoreStorage.read_blocks_by_seq: can't use @ndb_context since it'…
Browse files Browse the repository at this point in the history
…s a generator

...so handle context manually instead
  • Loading branch information
snarfed committed Apr 1, 2024
1 parent 3e8e8f0 commit 9e1a356
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
16 changes: 10 additions & 6 deletions arroba/datastore_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,19 @@ def read_many(self, cids):
return {cid: block.to_block() if block else None
for cid, block in got}

@ndb_context
# can't use @ndb_context because this is a generator, not a normal function
def read_blocks_by_seq(self, start=0):
assert start >= 0

# lexrpc event subscription handlers like subscribeRepos call this on a
# different thread, so if we're there, we need to create a new ndb context
for atp_block in AtpBlock.query(AtpBlock.seq >= start)\
.order(AtpBlock.seq):
yield atp_block.to_block()
context = get_context(raise_context_error=False)

with context.use() if context else self.ndb_client.context() as cm:
# lexrpc event subscription handlers like subscribeRepos call this
# on a different thread, so if we're there, we need to create a new
# ndb context
for atp_block in AtpBlock.query(AtpBlock.seq >= start)\
.order(AtpBlock.seq):
yield atp_block.to_block()

@ndb_context
def has(self, cid):
Expand Down
7 changes: 7 additions & 0 deletions arroba/tests/test_datastore_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ def test_read_blocks_by_seq(self):
[b.cid for b in self.storage.read_blocks_by_seq(start=4)])
self.assertEqual([], [b.cid for b in self.storage.read_blocks_by_seq(start=6)])

def test_read_blocks_by_seq_no_ndb_context(self):
AtpSequence.allocate(SUBSCRIBE_REPOS_NSID)
block = self.storage.write(repo_did='did:plc:123', obj={'foo': 2})

self.ndb_context.__exit__(None, None, None)
self.assertEqual([block], [b.cid for b in self.storage.read_blocks_by_seq()])

def assert_same_seq(self, cids):
"""
Args:
Expand Down

0 comments on commit 9e1a356

Please sign in to comment.