@@ -15,6 +15,9 @@ use tokio::sync::mpsc;
1515use types:: EthSpec ;
1616use types:: { Epoch , Hash256 , Slot } ;
1717
18+ /// The number of head syncing chains to sync at a time.
19+ const PARALLEL_HEAD_CHAINS : usize = 2 ;
20+
1821/// The state of the long range/batch sync.
1922#[ derive( Clone ) ]
2023pub enum RangeSyncState {
@@ -205,8 +208,9 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
205208 /// Updates the state of the chain collection.
206209 ///
207210 /// This removes any out-dated chains, swaps to any higher priority finalized chains and
208- /// updates the state of the collection.
209- pub fn update_finalized ( & mut self , network : & mut SyncNetworkContext < T :: EthSpec > ) {
211+ /// updates the state of the collection. This starts head chains syncing if any are required to
212+ /// do so.
213+ pub fn update ( & mut self , network : & mut SyncNetworkContext < T :: EthSpec > ) {
210214 let local_epoch = {
211215 let local = match PeerSyncInfo :: from_chain ( & self . beacon_chain ) {
212216 Some ( local) => local,
@@ -222,9 +226,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
222226 local. finalized_epoch
223227 } ;
224228
225- // Remove any outdated finalized chains
229+ // Remove any outdated finalized/head chains
226230 self . purge_outdated_chains ( network) ;
227231
232+ // Choose the best finalized chain if one needs to be selected.
233+ self . update_finalized_chains ( network, local_epoch) ;
234+
235+ if self . finalized_syncing_index ( ) . is_none ( ) {
236+ // Handle head syncing chains if there are no finalized chains left.
237+ self . update_head_chains ( network, local_epoch) ;
238+ }
239+ }
240+
241+ /// This looks at all current finalized chains and decides if a new chain should be prioritised
242+ /// or not.
243+ fn update_finalized_chains (
244+ & mut self ,
245+ network : & mut SyncNetworkContext < T :: EthSpec > ,
246+ local_epoch : Epoch ,
247+ ) {
228248 // Check if any chains become the new syncing chain
229249 if let Some ( index) = self . finalized_syncing_index ( ) {
230250 // There is a current finalized chain syncing
@@ -269,30 +289,74 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
269289 head_root : chain. target_head_root ,
270290 } ;
271291 self . state = state;
272- } else {
273- // There are no finalized chains, update the state.
274- if self . head_chains . is_empty ( ) {
275- self . state = RangeSyncState :: Idle ;
276- } else {
277- // for the syncing API, we find the minimal start_slot and the maximum
278- // target_slot of all head chains to report back.
279-
280- let ( min_epoch, max_slot) = self . head_chains . iter ( ) . fold (
281- ( Epoch :: from ( 0u64 ) , Slot :: from ( 0u64 ) ) ,
282- |( min, max) , chain| {
283- (
284- std:: cmp:: min ( min, chain. start_epoch ) ,
285- std:: cmp:: max ( max, chain. target_head_slot ) ,
286- )
287- } ,
288- ) ;
289- let head_state = RangeSyncState :: Head {
290- start_slot : min_epoch. start_slot ( T :: EthSpec :: slots_per_epoch ( ) ) ,
291- head_slot : max_slot,
292- } ;
293- self . state = head_state;
292+ }
293+ }
294+
295+ /// Start syncing any head chains if required.
296+ fn update_head_chains (
297+ & mut self ,
298+ network : & mut SyncNetworkContext < T :: EthSpec > ,
299+ local_epoch : Epoch ,
300+ ) {
301+ // There are no finalized chains, update the state.
302+ if self . head_chains . is_empty ( ) {
303+ self . state = RangeSyncState :: Idle ;
304+ return ;
305+ }
306+
307+ let mut currently_syncing = self
308+ . head_chains
309+ . iter ( )
310+ . filter ( |chain| chain. is_syncing ( ) )
311+ . count ( ) ;
312+ let mut not_syncing = self . head_chains . len ( ) - currently_syncing;
313+
314+ // Find all head chains that are not currently syncing ordered by peer count.
315+ while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 {
316+ // Find the chain with the most peers and start syncing
317+ if let Some ( ( _index, chain) ) = self
318+ . head_chains
319+ . iter_mut ( )
320+ . filter ( |chain| !chain. is_syncing ( ) )
321+ . enumerate ( )
322+ . max_by_key ( |( _index, chain) | chain. peer_pool . len ( ) )
323+ {
324+ // start syncing this chain
325+ debug ! ( self . log, "New head chain started syncing" ; "new_target_root" => format!( "{}" , chain. target_head_root) , "new_end_slot" => chain. target_head_slot, "new_start_epoch" => chain. start_epoch) ;
326+ chain. start_syncing ( network, local_epoch) ;
294327 }
328+
329+ // update variables
330+ currently_syncing = self
331+ . head_chains
332+ . iter ( )
333+ . filter ( |chain| chain. is_syncing ( ) )
334+ . count ( ) ;
335+ not_syncing = self . head_chains . len ( ) - currently_syncing;
295336 }
337+
338+ // Start
339+ // for the syncing API, we find the minimal start_slot and the maximum
340+ // target_slot of all head chains to report back.
341+
342+ let ( min_epoch, max_slot) = self
343+ . head_chains
344+ . iter ( )
345+ . filter ( |chain| chain. is_syncing ( ) )
346+ . fold (
347+ ( Epoch :: from ( 0u64 ) , Slot :: from ( 0u64 ) ) ,
348+ |( min, max) , chain| {
349+ (
350+ std:: cmp:: min ( min, chain. start_epoch ) ,
351+ std:: cmp:: max ( max, chain. target_head_slot ) ,
352+ )
353+ } ,
354+ ) ;
355+ let head_state = RangeSyncState :: Head {
356+ start_slot : min_epoch. start_slot ( T :: EthSpec :: slots_per_epoch ( ) ) ,
357+ head_slot : max_slot,
358+ } ;
359+ self . state = head_state;
296360 }
297361
298362 /// Add a new finalized chain to the collection.
@@ -321,7 +385,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
321385 #[ allow( clippy:: too_many_arguments) ]
322386 pub fn new_head_chain (
323387 & mut self ,
324- network : & mut SyncNetworkContext < T :: EthSpec > ,
325388 remote_finalized_epoch : Epoch ,
326389 target_head : Hash256 ,
327390 target_slot : Slot ,
@@ -336,7 +399,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
336399 self . head_chains . retain ( |chain| !chain. peer_pool . is_empty ( ) ) ;
337400
338401 let chain_id = rand:: random ( ) ;
339- let mut new_head_chain = SyncingChain :: new (
402+ let new_head_chain = SyncingChain :: new (
340403 chain_id,
341404 remote_finalized_epoch,
342405 target_slot,
@@ -346,8 +409,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
346409 self . beacon_chain . clone ( ) ,
347410 self . log . clone ( ) ,
348411 ) ;
349- // All head chains can sync simultaneously
350- new_head_chain. start_syncing ( network, remote_finalized_epoch) ;
351412 self . head_chains . push ( new_head_chain) ;
352413 }
353414
@@ -511,7 +572,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
511572 debug ! ( self . log, "Chain was removed" ; "start_epoch" => chain. start_epoch, "end_slot" => chain. target_head_slot) ;
512573
513574 // update the state
514- self . update_finalized ( network) ;
575+ self . update ( network) ;
515576 }
516577
517578 /// Returns the index of finalized chain that is currently syncing. Returns `None` if no
0 commit comments