From f9e0418b5bae1a8369b793a56580c7ad46f1b323 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 27 Mar 2019 14:17:01 -0600 Subject: [PATCH] fetch multiple blocks at a time * avoid crash on invalid block production (fixes #209) * fetch blocks every second, roughly * fix start.sh param order * run beacon node sim at slower pace --- beacon_chain/beacon_node.nim | 76 +++++++++++++++++++++--------- beacon_chain/beacon_node_types.nim | 5 ++ beacon_chain/block_pool.nim | 22 +++++++-- tests/simulation/start.sh | 5 +- tests/test_block_pool.nim | 2 +- 5 files changed, 82 insertions(+), 28 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 52d8dc162d..c55fd49954 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -11,8 +11,10 @@ import const topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks" + topicBeaconBlocks2 = "ethereum/2.1/beacon_chain/blocks2" topicAttestations = "ethereum/2.1/beacon_chain/attestations" - topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch" + topicFetchBlocks = "ethereum/2.1/beacon_chain/fetch" + topicFetchBlocks2 = "ethereum/2.1/beacon_chain/fetch2" dataDirValidators = "validators" networkMetadataFile = "network.json" @@ -364,8 +366,12 @@ proc proposeBlock(node: BeaconNode, let blockRoot = signed_root(newBlock) - # TODO return new BlockRef from add? let newBlockRef = node.blockPool.add(node.state, blockRoot, newBlock) + if newBlockRef == nil: + warn "Unable to add proposed block to block pool", + newBlock = shortLog(newBlock), + blockRoot = shortLog(blockRoot) + return head info "Block proposed", blck = shortLog(newBlock), @@ -379,21 +385,36 @@ proc proposeBlock(node: BeaconNode, return newBlockRef -proc fetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) = +proc fetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) = if roots.len == 0: return + debug "Fetching blocks", roots + # TODO shouldn't send to all! # TODO should never fail - asyncCheck is wrong here.. - asyncCheck node.network.broadcast(topicfetchBlocks, roots) + asyncCheck node.network.broadcast(topicfetchBlocks2, roots) -proc onFetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) = +proc onFetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) = # TODO placeholder logic for block recovery - debug "fetchBlocks received", roots = roots.len - for root in roots: - if (let blck = node.db.getBlock(root); blck.isSome()): - # TODO should never fail - asyncCheck is wrong here.. - # TODO should obviously not spam, but rather send it back to the requester - asyncCheck node.network.broadcast(topicBeaconBlocks, blck.get()) + var resp: seq[BeaconBlock] + for rec in roots: + if (var blck = node.db.getBlock(rec.root); blck.isSome()): + # TODO validate historySlots + let firstSlot = blck.get().slot - rec.historySlots + + for i in 0.. 0: + asyncCheck node.network.broadcast(topicBeaconBlocks2, resp) proc onAttestation(node: BeaconNode, attestation: Attestation) = # We received an attestation from the network but don't know much about it @@ -414,10 +435,6 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) = blockRoot = shortLog(blockRoot) if node.blockPool.add(node.state, blockRoot, blck).isNil: - # TODO this will cause us to fetch parent, even for invalid blocks.. fix - #debug "Missing block detected. Fetching from network", - # `block` = blck.previous_block_root - node.fetchBlocks(@[blck.previous_block_root]) return # The block we received contains attestations, and we might not yet know about @@ -517,12 +534,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn scheduledSlot = humaneSlotNum(scheduledSlot), slot = humaneSlotNum(slot) - # TODO in this setup, we retry fetching blocks at the beginning of every slot, - # hoping that we'll get some before it's time to attest or propose - is - # there a better time to do this? - let missingBlocks = node.blockPool.checkUnresolved() - node.fetchBlocks(missingBlocks) - if slot < lastSlot: # This can happen if the system clock changes time for example, and it's # pretty bad @@ -646,6 +657,14 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn addTimer(nextSlotStart) do (p: pointer): asyncCheck node.onSlotStart(slot, nextSlot) +proc onSecond(node: BeaconNode, moment: Moment) {.async.} = + let missingBlocks = node.blockPool.checkUnresolved() + node.fetchBlocks(missingBlocks) + + let nextSecond = max(Moment.now(), moment + chronos.seconds(1)) + addTimer(nextSecond) do (p: pointer): + asyncCheck node.onSecond(nextSecond) + proc run*(node: BeaconNode) = waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock): node.onBeaconBlock(blck) @@ -654,8 +673,19 @@ proc run*(node: BeaconNode) = node.onAttestation(attestation) waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]): + # Backwards compat, remove eventually + # TODO proof of concept block fetcher - need serious anti-spam rework + node.onFetchBlocks(roots.mapIt(FetchRecord(root: it, historySlots: 1))) + + waitFor node.network.subscribe(topicfetchBlocks2) do (roots: seq[FetchRecord]): + # TODO proof of concept block fetcher - need serious anti-spam rework node.onFetchBlocks(roots) + waitFor node.network.subscribe(topicBeaconBlocks2) do (blcks: seq[BeaconBlock]): + # TODO proof of concept block transfer - need serious anti-spam rework + for blck in blcks: + node.onBeaconBlock(blck) + let slot = node.beaconClock.now().toSlot() startSlot = @@ -670,6 +700,10 @@ proc run*(node: BeaconNode) = addTimer(fromNow) do (p: pointer): asyncCheck node.onSlotStart(startSlot - 1, startSlot) + let second = Moment.now() + chronos.seconds(1) + addTimer(second) do (p: pointer): + asyncCheck node.onSecond(second) + runForever() var gPidFile: string diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index af2a1cddc2..caf31640ae 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -180,6 +180,7 @@ type db*: BeaconChainDB UnresolvedBlock* = object + slots*: uint64 # number of slots that are suspected missing tries*: int BlockRef* = ref object {.acyclic.} @@ -259,5 +260,9 @@ type totalValidators*: uint64 lastUserValidator*: uint64 + FetchRecord* = object + root*: Eth2Digest + historySlots*: uint64 + proc userValidatorsRange*(d: NetworkMetadata): HSlice[int, int] = 0 .. d.lastUserValidator.int diff --git a/beacon_chain/block_pool.nim b/beacon_chain/block_pool.nim index bea11e3f95..968c4b655c 100644 --- a/beacon_chain/block_pool.nim +++ b/beacon_chain/block_pool.nim @@ -249,11 +249,25 @@ proc add*( # them out without penalty - but signing invalid attestations carries # a risk of being slashed, making attestations a more valuable spam # filter. + # TODO when we receive the block, we don't know how many others we're missing + # from that branch, so right now, we'll just do a blind guess debug "Unresolved block", blck = shortLog(blck), blockRoot = shortLog(blockRoot) - pool.unresolved[blck.previous_block_root] = UnresolvedBlock() + let parentSlot = blck.slot - 1 + + pool.unresolved[blck.previous_block_root] = UnresolvedBlock( + slots: + # The block is at least two slots ahead - try to grab whole history + if parentSlot > pool.head.slot: + parentSlot - pool.head.slot + else: + # It's a sibling block from a branch that we're missing - fetch one + # epoch at a time + max(1.uint64, SLOTS_PER_EPOCH.uint64 - + (parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64)) + ) pool.pending[blockRoot] = blck proc get*(pool: BlockPool, blck: BlockRef): BlockData = @@ -280,13 +294,13 @@ proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef = result = pool.blocks.getOrDefault(root) if result.isNil: - pool.unresolved[root] = UnresolvedBlock() + pool.unresolved[root] = UnresolvedBlock(slots: 1) iterator blockRootsForSlot*(pool: BlockPool, slot: uint64|Slot): Eth2Digest = for br in pool.blocksBySlot.getOrDefault(slot.uint64, @[]): yield br.root -proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] = +proc checkUnresolved*(pool: var BlockPool): seq[FetchRecord] = ## Return a list of blocks that we should try to resolve from other client - ## to be called periodically but not too often (once per slot?) var done: seq[Eth2Digest] @@ -305,7 +319,7 @@ proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] = # simple (simplistic?) exponential backoff for retries.. for k, v in pool.unresolved.pairs(): if v.tries.popcount() == 1: - result.add(k) + result.add(FetchRecord(root: k, historySlots: v.slots)) proc skipAndUpdateState( state: var BeaconState, blck: BeaconBlock, flags: UpdateFlags, diff --git a/tests/simulation/start.sh b/tests/simulation/start.sh index b327728591..a8788a14ba 100755 --- a/tests/simulation/start.sh +++ b/tests/simulation/start.sh @@ -23,7 +23,7 @@ mkdir -p $BUILD_OUTPUTS_DIR # Run with "SHARD_COUNT=4 ./start.sh" to change these DEFS="-d:SHARD_COUNT=${SHARD_COUNT:-4} " # Spec default: 1024 DEFS+="-d:SLOTS_PER_EPOCH=${SLOTS_PER_EPOCH:-8} " # Spec default: 64 -DEFS+="-d:SECONDS_PER_SLOT=${SECONDS_PER_SLOT:-6} " # Spec default: 6 +DEFS+="-d:SECONDS_PER_SLOT=${SECONDS_PER_SLOT:-12} " # Spec default: 6 LAST_VALIDATOR_NUM=$(( $NUM_VALIDATORS - 1 )) LAST_VALIDATOR="$VALIDATORS_DIR/v$(printf '%07d' $LAST_VALIDATOR_NUM).deposit.json" @@ -44,8 +44,9 @@ if [[ -z "$SKIP_BUILDS" ]]; then fi if [ ! -f $SNAPSHOT_FILE ]; then - $BEACON_NODE_BIN createTestnet \ + $BEACON_NODE_BIN \ --dataDir=$SIMULATION_DIR/node-0 \ + createTestnet \ --networkId=1000 \ --validatorsDir=$VALIDATORS_DIR \ --totalValidators=$NUM_VALIDATORS \ diff --git a/tests/test_block_pool.nim b/tests/test_block_pool.nim index d0916600ab..0419d59e49 100644 --- a/tests/test_block_pool.nim +++ b/tests/test_block_pool.nim @@ -63,7 +63,7 @@ suite "Block pool processing": check: pool.get(b2Root).isNone() # Unresolved, shouldn't show up - b1Root in pool.checkUnresolved() + FetchRecord(root: b1Root, historySlots: 1) in pool.checkUnresolved() discard pool.add(state, b1Root, b1)