Skip to content

Commit

Permalink
fetch multiple blocks at a time
Browse files Browse the repository at this point in the history
* avoid crash on invalid block production (fixes #209)
* fetch blocks every second, roughly
* fix start.sh param order
* run beacon node sim at slower pace
  • Loading branch information
arnetheduck authored and zah committed Mar 28, 2019
1 parent 58b6174 commit f9e0418
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 28 deletions.
76 changes: 55 additions & 21 deletions beacon_chain/beacon_node.nim
Expand Up @@ -11,8 +11,10 @@ import

const
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
topicBeaconBlocks2 = "ethereum/2.1/beacon_chain/blocks2"
topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch"
topicFetchBlocks = "ethereum/2.1/beacon_chain/fetch"
topicFetchBlocks2 = "ethereum/2.1/beacon_chain/fetch2"

dataDirValidators = "validators"
networkMetadataFile = "network.json"
Expand Down Expand Up @@ -364,8 +366,12 @@ proc proposeBlock(node: BeaconNode,

let blockRoot = signed_root(newBlock)

# TODO return new BlockRef from add?
let newBlockRef = node.blockPool.add(node.state, blockRoot, newBlock)
if newBlockRef == nil:

This comment has been minimized.

Copy link
@zah

zah Mar 28, 2019

Member

If you follow this nil pointer, it propagates quite far. It will be returned from node.handleProposal and then used in calls such as handleAttestations(node, head, slot).

This check doesn't seem like a proper fix.

This comment has been minimized.

Copy link
@zah

zah Mar 28, 2019

Member

Ah, nevermind, I missed a return statement in the code.

warn "Unable to add proposed block to block pool",
newBlock = shortLog(newBlock),
blockRoot = shortLog(blockRoot)
return head

info "Block proposed",
blck = shortLog(newBlock),
Expand All @@ -379,21 +385,36 @@ proc proposeBlock(node: BeaconNode,

return newBlockRef

proc fetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) =
proc fetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) =
if roots.len == 0: return

debug "Fetching blocks", roots

# TODO shouldn't send to all!
# TODO should never fail - asyncCheck is wrong here..
asyncCheck node.network.broadcast(topicfetchBlocks, roots)
asyncCheck node.network.broadcast(topicfetchBlocks2, roots)

proc onFetchBlocks(node: BeaconNode, roots: seq[Eth2Digest]) =
proc onFetchBlocks(node: BeaconNode, roots: seq[FetchRecord]) =
# TODO placeholder logic for block recovery
debug "fetchBlocks received", roots = roots.len
for root in roots:
if (let blck = node.db.getBlock(root); blck.isSome()):
# TODO should never fail - asyncCheck is wrong here..
# TODO should obviously not spam, but rather send it back to the requester
asyncCheck node.network.broadcast(topicBeaconBlocks, blck.get())
var resp: seq[BeaconBlock]
for rec in roots:
if (var blck = node.db.getBlock(rec.root); blck.isSome()):
# TODO validate historySlots
let firstSlot = blck.get().slot - rec.historySlots

for i in 0..<rec.historySlots.int:
resp.add(blck.get())

# TODO should obviously not spam, but rather send it back to the requester
if (blck = node.db.getBlock(blck.get().previous_block_root);
blck.isNone() or blck.get().slot < firstSlot):
break

debug "fetchBlocks received", roots = roots.len, resp = resp.len

# TODO should never fail - asyncCheck is wrong here..
if resp.len > 0:
asyncCheck node.network.broadcast(topicBeaconBlocks2, resp)

proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it
Expand All @@ -414,10 +435,6 @@ proc onBeaconBlock(node: BeaconNode, blck: BeaconBlock) =
blockRoot = shortLog(blockRoot)

if node.blockPool.add(node.state, blockRoot, blck).isNil:
# TODO this will cause us to fetch parent, even for invalid blocks.. fix
#debug "Missing block detected. Fetching from network",
# `block` = blck.previous_block_root
node.fetchBlocks(@[blck.previous_block_root])
return

# The block we received contains attestations, and we might not yet know about
Expand Down Expand Up @@ -517,12 +534,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
scheduledSlot = humaneSlotNum(scheduledSlot),
slot = humaneSlotNum(slot)

# TODO in this setup, we retry fetching blocks at the beginning of every slot,
# hoping that we'll get some before it's time to attest or propose - is
# there a better time to do this?
let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks)

if slot < lastSlot:
# This can happen if the system clock changes time for example, and it's
# pretty bad
Expand Down Expand Up @@ -646,6 +657,14 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
addTimer(nextSlotStart) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot)

proc onSecond(node: BeaconNode, moment: Moment) {.async.} =
let missingBlocks = node.blockPool.checkUnresolved()
node.fetchBlocks(missingBlocks)

let nextSecond = max(Moment.now(), moment + chronos.seconds(1))
addTimer(nextSecond) do (p: pointer):
asyncCheck node.onSecond(nextSecond)

proc run*(node: BeaconNode) =
waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock):
node.onBeaconBlock(blck)
Expand All @@ -654,8 +673,19 @@ proc run*(node: BeaconNode) =
node.onAttestation(attestation)

waitFor node.network.subscribe(topicfetchBlocks) do (roots: seq[Eth2Digest]):
# Backwards compat, remove eventually
# TODO proof of concept block fetcher - need serious anti-spam rework
node.onFetchBlocks(roots.mapIt(FetchRecord(root: it, historySlots: 1)))

waitFor node.network.subscribe(topicfetchBlocks2) do (roots: seq[FetchRecord]):
# TODO proof of concept block fetcher - need serious anti-spam rework
node.onFetchBlocks(roots)

waitFor node.network.subscribe(topicBeaconBlocks2) do (blcks: seq[BeaconBlock]):
# TODO proof of concept block transfer - need serious anti-spam rework
for blck in blcks:
node.onBeaconBlock(blck)

let
slot = node.beaconClock.now().toSlot()
startSlot =
Expand All @@ -670,6 +700,10 @@ proc run*(node: BeaconNode) =
addTimer(fromNow) do (p: pointer):
asyncCheck node.onSlotStart(startSlot - 1, startSlot)

let second = Moment.now() + chronos.seconds(1)
addTimer(second) do (p: pointer):
asyncCheck node.onSecond(second)

runForever()

var gPidFile: string
Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/beacon_node_types.nim
Expand Up @@ -180,6 +180,7 @@ type
db*: BeaconChainDB

UnresolvedBlock* = object
slots*: uint64 # number of slots that are suspected missing
tries*: int

BlockRef* = ref object {.acyclic.}
Expand Down Expand Up @@ -259,5 +260,9 @@ type
totalValidators*: uint64
lastUserValidator*: uint64

FetchRecord* = object
root*: Eth2Digest
historySlots*: uint64

proc userValidatorsRange*(d: NetworkMetadata): HSlice[int, int] =
0 .. d.lastUserValidator.int
22 changes: 18 additions & 4 deletions beacon_chain/block_pool.nim
Expand Up @@ -249,11 +249,25 @@ proc add*(
# them out without penalty - but signing invalid attestations carries
# a risk of being slashed, making attestations a more valuable spam
# filter.
# TODO when we receive the block, we don't know how many others we're missing
# from that branch, so right now, we'll just do a blind guess
debug "Unresolved block",
blck = shortLog(blck),
blockRoot = shortLog(blockRoot)

pool.unresolved[blck.previous_block_root] = UnresolvedBlock()
let parentSlot = blck.slot - 1

pool.unresolved[blck.previous_block_root] = UnresolvedBlock(
slots:
# The block is at least two slots ahead - try to grab whole history
if parentSlot > pool.head.slot:
parentSlot - pool.head.slot
else:
# It's a sibling block from a branch that we're missing - fetch one
# epoch at a time
max(1.uint64, SLOTS_PER_EPOCH.uint64 -
(parentSlot.uint64 mod SLOTS_PER_EPOCH.uint64))
)
pool.pending[blockRoot] = blck

proc get*(pool: BlockPool, blck: BlockRef): BlockData =
Expand All @@ -280,13 +294,13 @@ proc getOrResolve*(pool: var BlockPool, root: Eth2Digest): BlockRef =
result = pool.blocks.getOrDefault(root)

if result.isNil:
pool.unresolved[root] = UnresolvedBlock()
pool.unresolved[root] = UnresolvedBlock(slots: 1)

iterator blockRootsForSlot*(pool: BlockPool, slot: uint64|Slot): Eth2Digest =
for br in pool.blocksBySlot.getOrDefault(slot.uint64, @[]):
yield br.root

proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] =
proc checkUnresolved*(pool: var BlockPool): seq[FetchRecord] =
## Return a list of blocks that we should try to resolve from other client -
## to be called periodically but not too often (once per slot?)
var done: seq[Eth2Digest]
Expand All @@ -305,7 +319,7 @@ proc checkUnresolved*(pool: var BlockPool): seq[Eth2Digest] =
# simple (simplistic?) exponential backoff for retries..
for k, v in pool.unresolved.pairs():
if v.tries.popcount() == 1:
result.add(k)
result.add(FetchRecord(root: k, historySlots: v.slots))

proc skipAndUpdateState(
state: var BeaconState, blck: BeaconBlock, flags: UpdateFlags,
Expand Down
5 changes: 3 additions & 2 deletions tests/simulation/start.sh
Expand Up @@ -23,7 +23,7 @@ mkdir -p $BUILD_OUTPUTS_DIR
# Run with "SHARD_COUNT=4 ./start.sh" to change these
DEFS="-d:SHARD_COUNT=${SHARD_COUNT:-4} " # Spec default: 1024
DEFS+="-d:SLOTS_PER_EPOCH=${SLOTS_PER_EPOCH:-8} " # Spec default: 64
DEFS+="-d:SECONDS_PER_SLOT=${SECONDS_PER_SLOT:-6} " # Spec default: 6
DEFS+="-d:SECONDS_PER_SLOT=${SECONDS_PER_SLOT:-12} " # Spec default: 6

LAST_VALIDATOR_NUM=$(( $NUM_VALIDATORS - 1 ))
LAST_VALIDATOR="$VALIDATORS_DIR/v$(printf '%07d' $LAST_VALIDATOR_NUM).deposit.json"
Expand All @@ -44,8 +44,9 @@ if [[ -z "$SKIP_BUILDS" ]]; then
fi

if [ ! -f $SNAPSHOT_FILE ]; then
$BEACON_NODE_BIN createTestnet \
$BEACON_NODE_BIN \
--dataDir=$SIMULATION_DIR/node-0 \
createTestnet \
--networkId=1000 \
--validatorsDir=$VALIDATORS_DIR \
--totalValidators=$NUM_VALIDATORS \
Expand Down
2 changes: 1 addition & 1 deletion tests/test_block_pool.nim
Expand Up @@ -63,7 +63,7 @@ suite "Block pool processing":

check:
pool.get(b2Root).isNone() # Unresolved, shouldn't show up
b1Root in pool.checkUnresolved()
FetchRecord(root: b1Root, historySlots: 1) in pool.checkUnresolved()

discard pool.add(state, b1Root, b1)

Expand Down

0 comments on commit f9e0418

Please sign in to comment.