Skip to content

Commit

Permalink
Basic tests for sync loop (paritytech#182)
Browse files Browse the repository at this point in the history
* basic sync loop tests

* cargo ftm --all

* SyncLoopTestParams

* move sync loop tests to sync_loop_tests.rs

* cargo fmt --all
  • Loading branch information
svyatonik authored and bkchr committed Apr 10, 2024
1 parent ae8c82f commit 83a3fca
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 68 deletions.
1 change: 1 addition & 0 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
target,
SUBSTRATE_TICK_INTERVAL,
params.sync_params,
futures::future::pending(),
);

Ok(())
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/ethereum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod substrate_sync_loop;
mod substrate_types;
mod sync;
mod sync_loop;
mod sync_loop_tests;
mod sync_types;
mod utils;

Expand Down
1 change: 1 addition & 0 deletions bridges/relays/ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
target,
ETHEREUM_TICK_INTERVAL,
params.sync_params,
futures::future::pending(),
);

Ok(())
Expand Down
42 changes: 37 additions & 5 deletions bridges/relays/ethereum/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::headers::QueuedHeaders;
use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeader};
use num_traits::{One, Saturating};
use num_traits::{One, Saturating, Zero};

/// Common sync params.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -112,19 +112,41 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
return None;
}

// if queue is empty and best header on target is > than best header on source,
// then we shoud reorg
let best_queued_number = self.headers.best_queued_number();
if best_queued_number.is_zero() && source_best_number < target_best_header.0 {
return Some(source_best_number);
}

// we assume that there were no reorgs if we have already downloaded best header
let best_downloaded_number = std::cmp::max(
std::cmp::max(self.headers.best_queued_number(), self.headers.best_synced_number()),
std::cmp::max(best_queued_number, self.headers.best_synced_number()),
target_best_header.0,
);
if best_downloaded_number == source_best_number {
if best_downloaded_number >= source_best_number {
return None;
}

// download new header
Some(best_downloaded_number + One::one())
}

/// Selech orphan header to downoload.
pub fn select_orphan_header_to_download(&self) -> Option<&QueuedHeader<P>> {
let orphan_header = self.headers.header(HeaderStatus::Orphan)?;

// we consider header orphan until we'll find it ancestor that is known to the target node
// => we may get orphan header while we ask target node whether it knows its parent
// => let's avoid fetching duplicate headers
let parent_id = orphan_header.parent_id();
if self.headers.status(&parent_id) != HeaderStatus::Unknown {
return None;
}

Some(orphan_header)
}

/// Select headers that need to be submitted to the target node.
pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader<P>>> {
// if we operate in backup mode, we only submit headers when sync has stalled
Expand Down Expand Up @@ -208,7 +230,7 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
}

#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256};
use crate::headers::tests::{header, id};
Expand All @@ -218,7 +240,7 @@ mod tests {
H256::from_low_u64_le(1000 + number)
}

fn default_sync_params() -> HeadersSyncParams {
pub fn default_sync_params() -> HeadersSyncParams {
HeadersSyncParams {
max_future_headers_to_download: 128,
max_headers_in_submitted_status: 128,
Expand Down Expand Up @@ -253,6 +275,11 @@ mod tests {
eth_sync.source_best_number = Some(101);
assert_eq!(eth_sync.select_new_header_to_download(), Some(101));

// when we have to reorganize to longer fork
eth_sync.source_best_number = Some(100);
eth_sync.target_best_header = Some(HeaderId(200, Default::default()));
assert_eq!(eth_sync.select_new_header_to_download(), Some(100));

// when there are too many headers scheduled for submitting
for i in 1..1000 {
eth_sync.headers.header_response(header(i).header().clone());
Expand Down Expand Up @@ -356,6 +383,11 @@ mod tests {
assert_eq!(eth_sync.headers.header(HeaderStatus::Orphan), Some(&header(101)));
eth_sync.headers.header_response(header(100).header().clone());

// #101 is now Orphan and #100 is MaybeOrphan => we do not want to retrieve
// header #100 again
assert_eq!(eth_sync.headers.header(HeaderStatus::Orphan), Some(&header(101)));
assert_eq!(eth_sync.select_orphan_header_to_download(), None);

// we can't submit header #100, because its parent status is unknown
assert_eq!(eth_sync.select_headers_to_submit(false), None);

Expand Down
76 changes: 13 additions & 63 deletions bridges/relays/ethereum/src/sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use futures::{future::FutureExt, stream::StreamExt};
use num_traits::{Saturating, Zero};
use std::{
collections::HashSet,
future::Future,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -123,6 +124,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_client: TC,
target_tick: Duration,
sync_params: HeadersSyncParams,
exit_signal: impl Future<Output = ()>,
) {
let mut local_pool = futures::executor::LocalPool::new();
let mut progress_context = (Instant::now(), None, None);
Expand Down Expand Up @@ -156,6 +158,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(target_tick).fuse();

let exit_signal = exit_signal.fuse();

futures::pin_mut!(
source_best_block_number_future,
source_new_header_future,
Expand All @@ -171,7 +175,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_submit_header_future,
target_complete_header_future,
target_go_offline_future,
target_tick_stream
target_tick_stream,
exit_signal
);

loop {
Expand Down Expand Up @@ -350,6 +355,10 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_best_block_required = true;
target_incomplete_headers_required = true;
},

_ = exit_signal => {
return;
}
}

// print progress
Expand Down Expand Up @@ -491,7 +500,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
id,
);
source_extra_future.set(source_client.header_extra(id, header.clone()).fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::Orphan) {
} else if let Some(header) = sync.select_orphan_header_to_download() {
// for Orphan we actually ask for parent' header
let parent_id = header.parent_id();

Expand Down Expand Up @@ -540,7 +549,7 @@ fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
}

/// Exponential backoff for connection-unrelated errors retries.
fn retry_backoff() -> ExponentialBackoff {
pub(crate) fn retry_backoff() -> ExponentialBackoff {
let mut backoff = ExponentialBackoff::default();
// we do not want relayer to stop
backoff.max_elapsed_time = None;
Expand All @@ -553,7 +562,7 @@ fn retry_backoff() -> ExponentialBackoff {
/// Returns whether or not the client we're interacting with is online. In this context
/// what online means is that the client is currently not handling any other requests
/// that we've previously sent.
fn process_future_result<TResult, TError, TGoOfflineFuture>(
pub(crate) fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
retry_backoff: &mut ExponentialBackoff,
on_success: impl FnOnce(TResult),
Expand Down Expand Up @@ -624,62 +633,3 @@ fn print_sync_progress<P: HeadersSyncPipeline>(
);
(now_time, now_best_header.clone().map(|id| id.0), *now_target_header)
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Debug)]
struct TestError(bool);

impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
self.0
}
}

fn run_backoff_test(result: Result<(), TestError>) -> (Duration, Duration) {
let mut backoff = retry_backoff();

// no randomness in tests (otherwise intervals may overlap => asserts are failing)
backoff.randomization_factor = 0f64;

// increase backoff's current interval
let interval1 = backoff.next_backoff().unwrap();
let interval2 = backoff.next_backoff().unwrap();
assert!(interval2 > interval1);

// successful future result leads to backoff's reset
let go_offline_future = futures::future::Fuse::terminated();
futures::pin_mut!(go_offline_future);

process_future_result(
result,
&mut backoff,
|_| {},
&mut go_offline_future,
|delay| async_std::task::sleep(delay),
|| "Test error".into(),
);

(interval2, backoff.next_backoff().unwrap())
}

#[test]
fn process_future_result_resets_backoff_on_success() {
let (interval2, interval_after_reset) = run_backoff_test(Ok(()));
assert!(interval2 > interval_after_reset);
}

#[test]
fn process_future_result_resets_backoff_on_connection_error() {
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(true)));
assert!(interval2 > interval_after_reset);
}

#[test]
fn process_future_result_does_not_reset_backoff_on_non_connection_error() {
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(false)));
assert!(interval2 < interval_after_reset);
}
}
Loading

0 comments on commit 83a3fca

Please sign in to comment.