Skip to content

Commit

Permalink
use take_until and do not fail fast
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Sep 28, 2021
1 parent 9715919 commit ee0888a
Showing 1 changed file with 15 additions and 23 deletions.
38 changes: 15 additions & 23 deletions jormungandr/src/blockchain/bootstrap.rs
Expand Up @@ -8,10 +8,9 @@ use crate::metrics::Metrics;
use chain_core::property::Deserialize;
use chain_network::data as net_data;
use chain_network::error::Error as NetworkError;
use futures::{prelude::*, task::Poll};
use futures::prelude::*;
use tokio_util::sync::CancellationToken;

use std::pin::Pin;
use std::sync::Arc;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -53,39 +52,28 @@ where
// the cancellation signal arrives. Building such stream allows us to
// correctly write all blocks and update the block tip upon the arrival of
// the cancellation signal.

let mut cancel = cancellation_token.cancelled().boxed();
let mut stream = stream.map_err(Error::PullStreamFailed);

let mut stream = stream::poll_fn(move |cx| {
let cancel = Pin::new(&mut cancel);
match cancel.poll(cx) {
Poll::Pending => {
let stream = Pin::new(&mut stream);
stream.poll_next(cx)
}
Poll::Ready(()) => Poll::Ready(Some(Err(Error::Interrupted))),
}
});
let cancel = cancellation_token.cancelled().boxed();
let mut stream = stream
.map_err(Error::PullStreamFailed)
.map(|maybe_block| maybe_block.and_then(|b| Ok(Block::deserialize(b.as_bytes())?)))
.take_until(cancel);

while let Some(block_result) = stream.next().await {
let result = match block_result {
let maybe_tip = match block_result {
Ok(block) => {
let block = Block::deserialize(block.as_bytes())?;

if block.header.hash() == block0 {
continue;
}

bootstrap_info.append_block(&block);
Ok(blockchain
blockchain
.handle_bootstrap_block(block, CheckHeaderProof::Enabled)
.await?)
.await
.map_err(Error::from)
}
Err(err) => Err(err),
};

match result {
match maybe_tip {
Ok(parent_tip) => {
maybe_parent_tip = Some(parent_tip);
}
Expand All @@ -104,6 +92,10 @@ where
tracing::info!("no new blocks received from the network");
}

if stream.take_result().is_some() {
return Err(Error::Interrupted);
}

Ok(maybe_parent_tip)
}

Expand Down

0 comments on commit ee0888a

Please sign in to comment.