Skip to content

Commit

Permalink
fix(rln-relay): missed roots during sync (#2015)
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Sep 8, 2023
1 parent 5d976df commit 21604e6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 43 deletions.
18 changes: 18 additions & 0 deletions tests/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ suite "Onchain group manager":
manager.initialized
manager.rlnContractDeployedBlockNumber > 0

await manager.stop()

asyncTest "should error on initialization when loaded metadata does not match":
let manager = await setup()
await manager.init()
Expand Down Expand Up @@ -220,12 +222,14 @@ suite "Onchain group manager":

await manager.init()
await manager.startGroupSync()
await manager.stop()

asyncTest "startGroupSync: should guard against uninitialized state":
let manager = await setup()

expect(ValueError):
await manager.startGroupSync()
await manager.stop()

asyncTest "startGroupSync: should sync to the state of the group":
let manager = await setup()
Expand Down Expand Up @@ -262,6 +266,7 @@ suite "Onchain group manager":

check:
merkleRootBefore != merkleRootAfter
await manager.stop()

asyncTest "startGroupSync: should fetch history correctly":
let manager = await setup()
Expand Down Expand Up @@ -303,13 +308,15 @@ suite "Onchain group manager":
check:
merkleRootBefore != merkleRootAfter
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize
await manager.stop()

asyncTest "register: should guard against uninitialized state":
let manager = await setup()
let dummyCommitment = default(IDCommitment)

expect(ValueError):
await manager.register(dummyCommitment)
await manager.stop()

asyncTest "register: should register successfully":
let manager = await setup()
Expand All @@ -329,6 +336,7 @@ suite "Onchain group manager":
check:
merkleRootAfter.inHex() != merkleRootBefore.inHex()
manager.latestIndex == 1
await manager.stop()

asyncTest "register: callback is called":
let manager = await setup()
Expand All @@ -354,13 +362,15 @@ suite "Onchain group manager":

check:
manager.rlnInstance.getMetadata().get().validRoots == manager.validRoots.toSeq()
await manager.stop()

asyncTest "withdraw: should guard against uninitialized state":
let manager = await setup()
let idSecretHash = generateCredentials(manager.rlnInstance).idSecretHash

expect(ValueError):
await manager.withdraw(idSecretHash)
await manager.stop()

asyncTest "validateRoot: should validate good root":
let manager = await setup()
Expand Down Expand Up @@ -402,6 +412,7 @@ suite "Onchain group manager":

check:
validated
await manager.stop()

asyncTest "validateRoot: should reject bad root":
let manager = await setup()
Expand Down Expand Up @@ -432,6 +443,7 @@ suite "Onchain group manager":

check:
validated == false
await manager.stop()

asyncTest "verifyProof: should verify valid proof":
let manager = await setup()
Expand Down Expand Up @@ -474,6 +486,7 @@ suite "Onchain group manager":

check:
verifiedRes.get()
await manager.stop()

asyncTest "verifyProof: should reject invalid proof":
let manager = await setup()
Expand Down Expand Up @@ -510,6 +523,7 @@ suite "Onchain group manager":

check:
verifiedRes.get() == false
await manager.stop()

asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
let manager = await setup()
Expand Down Expand Up @@ -554,6 +568,7 @@ suite "Onchain group manager":
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 0
manager.validRoots[credentialCount - 2] == expectedLastRoot
await manager.stop()

asyncTest "isReady should return false if ethRpc is none":
var manager = await setup()
Expand All @@ -563,13 +578,15 @@ suite "Onchain group manager":

check:
(await manager.isReady()) == false
await manager.stop()

asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
var manager = await setup()
await manager.init()

check:
(await manager.isReady()) == false
await manager.stop()

asyncTest "isReady should return true if ethRpc is ready":
var manager = await setup()
Expand All @@ -579,6 +596,7 @@ suite "Onchain group manager":

check:
(await manager.isReady()) == true
await manager.stop()


################################
Expand Down
64 changes: 21 additions & 43 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,30 +232,20 @@ proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Member

proc getRawEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} =
toBlock: BlockNumber): Future[JsonNode] {.async.} =
initializedGuard(g)

let ethRpc = g.ethRpc.get()
let rlnContract = g.rlnContract.get()

var normalizedToBlock: BlockNumber
if toBlock.isSome():
var value = toBlock.get()
if value == 0:
# set to latest block
value = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
normalizedToBlock = value
else:
normalizedToBlock = fromBlock

let events = await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(normalizedToBlock.blockId()))
toBlock = some(toBlock.blockId()))
return events

proc getBlockTable(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
fromBlock: BlockNumber,
toBlock: BlockNumber): Future[BlockTable] {.async.} =
initializedGuard(g)

var blockTable = default(BlockTable)
Expand Down Expand Up @@ -311,23 +301,14 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future

proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
toBlock: BlockNumber): Future[void] {.async.} =
initializedGuard(g)
proc getLatestBlockNumber(): BlockNumber =
if toBlock.isSome():
# if toBlock = 0, that implies the latest block
# which is the case when we are syncing block-by-block
# therefore, toBlock = fromBlock + 1
# if toBlock != 0, then we are chunking blocks
# therefore, toBlock = fromBlock + blockChunkSize (which is handled)
return max(fromBlock + 1, toBlock.get())
return fromBlock

let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)

g.latestProcessedBlock = getLatestBlockNumber()
g.latestProcessedBlock = toBlock
let metadataSetRes = g.setMetadata()
if metadataSetRes.isErr():
# this is not a fatal error, hence we don't raise an exception
Expand All @@ -337,11 +318,13 @@ proc getAndHandleEvents(g: OnchainGroupManager,

proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
let latestBlock = blockheader.number.uint
let latestBlock = BlockNumber(blockheader.number)
trace "block received", blockNumber = latestBlock
# get logs from the last block
try:
asyncSpawn g.getAndHandleEvents(latestBlock)
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
return newHeadCallback
Expand All @@ -368,28 +351,23 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
let blockChunkSize = 2_000

var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock
g.latestProcessedBlock + 1
else:
info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber
g.rlnContractDeployedBlockNumber

let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
try:
# we always want to sync from last processed block => latest
if fromBlock == BlockNumber(0) or
fromBlock + BlockNumber(blockChunkSize) < latestBlock:
# chunk events
while true:
let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
info "chunking events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, some(toBlock))
fromBlock = toBlock + 1
if fromBlock >= currentLatestBlock:
break
else:
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
# chunk events
while true:
let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock:
break

let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1

except CatchableError:
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())

Expand Down

0 comments on commit 21604e6

Please sign in to comment.