Skip to content

Commit

Permalink
ETCM-393: Add comments about the Iterant consumer setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Nov 24, 2020
1 parent f671cc4 commit d5a8301
Showing 1 changed file with 15 additions and 2 deletions.
Expand Up @@ -26,10 +26,15 @@ class PeerDiscoveryManager(
extends Actor
with ActorLogging {

// Create a random nodes iterator on top of the service so the node can quickly ramp up its peers.
// Derive a random nodes iterator on top of the service so the node can quickly ramp up its peers
// while it has demand to connect to more, rather than wait on the periodic lookups performed in
// the background by the DiscoveryService.
val discoveryResources = for {
service <- discoveryServiceResource

// Create an Iterant (like a pull-based Observable) that repeatedly performs a random lookup
// (grabbing kademlia-bucket-size items at a time) and flattens the results. It will automatically
// perform further lookups as the items are pulled from it.
randomNodes = Iterant
.repeatEvalF {
Task(log.debug("Pulling random nodes on demand...")) >>
Expand All @@ -39,8 +44,16 @@ class PeerDiscoveryManager(
.map(toNode)
.filter(!isLocalNode(_))

// Create a consumer on top of the iterant with a limited buffer capacity, so that the Iterant
// blocks trying to push items into it when it gets full, and thus stops making more random lookups.
// For example with buffer-size=45 and kademlia-bucket-size=16 the iterant would make 3 requests
// to fill the queue underlying the consumer, then be blocked trying to push the last 3 items.
// The first 2 items pulled from the consumer would not result in further lookups. After the 3rd
// pull the iterant would look up the next 16 items and try to add them to the queue, etc.
// Note that every `pull` from the consumer takes items from the same queue. To multicast one
// would have to instantiate a `ConcurrentChannel`, create multiple consumers, and use
// `Iterant.pushToChannel`. But here this is the only consumer of the underlying channel.
randomNodeConsumer <- randomNodes.consumeWithConfig(
// Using bounded capacity for back pressure, so we don't make more lookups unless there is a need.
ConsumerF.Config(capacity = Some(BufferCapacity.Bounded(randomNodeBufferSize)))
)
} yield (service, randomNodeConsumer)
Expand Down

0 comments on commit d5a8301

Please sign in to comment.