Skip to content

Commit

Permalink
feat(sandbox): fast forward timestamp and epoch height (#6211)
Browse files Browse the repository at this point in the history
Realized was missing some extra stuff like updating timestamps/epoch-height after fast forwarding special thanks to @TrevorJTClarke. This is a corrollary PR to #6158 to address and add timestamps and epoch-height updates.

### How it works:
The timestamps are updated via the delta block height converted to a fast-forward timestamp delta; which then gets added to a `Clock::utc()` to get the fast-forward timestamp. Then the next block with use the previous timestamp to go from there. Epoch height update uses current block height / epoch_length to get the height instead of the usual increment by one. This is handled this way since not much is passed into it besides the `BlockInfo` and `EpochManager` is separate from everything else such as chain/client/client-actor.

Also, unfortunately, there’s no easy way that I can see of for testing the epoch height update since that is apart of `near-epoch-manager` crate and is created separately from everything else; so can't really test it with block production. Let me know if there is something I'm missing out on
  • Loading branch information
ChaoticTempest committed Apr 1, 2022
1 parent 65f549d commit 76a01fe
Show file tree
Hide file tree
Showing 19 changed files with 312 additions and 81 deletions.
7 changes: 6 additions & 1 deletion chain/chain/src/chain.rs
Expand Up @@ -91,7 +91,12 @@ pub const NUM_ORPHAN_ANCESTORS_CHECK: u64 = 3;
// It should almost never be hit
const MAX_ORPHAN_MISSING_CHUNKS: usize = 5;

/// 10000 years in seconds. Big constant for sandbox to allow time traveling.
#[cfg(feature = "sandbox")]
const ACCEPTABLE_TIME_DIFFERENCE: i64 = 60 * 60 * 24 * 365 * 10000;

/// Refuse blocks more than this many block intervals in the future (as in bitcoin).
#[cfg(not(feature = "sandbox"))]
const ACCEPTABLE_TIME_DIFFERENCE: i64 = 12 * 10;

/// Over this block height delta in advance if we are not chunk producer - route tx to upcoming validators.
Expand Down Expand Up @@ -2233,7 +2238,7 @@ impl Chain {
blocks_catch_up_state: &mut BlocksCatchUpState,
block_catch_up_scheduler: &dyn Fn(BlockCatchUpRequest),
) -> Result<(), Error> {
debug!(target:"catchup", "catch up blocks: pending blocks: {:?}, processed {:?}, scheduled: {:?}, done: {:?}",
debug!(target:"catchup", "catch up blocks: pending blocks: {:?}, processed {:?}, scheduled: {:?}, done: {:?}",
blocks_catch_up_state.pending_blocks, blocks_catch_up_state.processed_blocks.keys().collect::<Vec<_>>(),
blocks_catch_up_state.scheduled_blocks.keys().collect::<Vec<_>>(), blocks_catch_up_state.done_blocks.len());
for (queued_block, (saved_store_update, results)) in
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/tests/simple_chain.rs
Expand Up @@ -116,6 +116,7 @@ fn build_chain_with_orhpans() {
&*signer,
*last_block.header().next_bp_hash(),
CryptoHash::default(),
None,
);
assert_eq!(chain.process_block_test(&None, block).unwrap_err().kind(), ErrorKind::Orphan);
assert_eq!(
Expand Down
32 changes: 30 additions & 2 deletions chain/client/src/client.rs
Expand Up @@ -36,7 +36,7 @@ use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{AccountId, ApprovalStake, BlockHeight, EpochId, NumBlocks, ShardId};
use near_primitives::unwrap_or_return;
use near_primitives::utils::{to_timestamp, MaybeValidated};
use near_primitives::utils::MaybeValidated;
use near_primitives::validator_signer::ValidatorSigner;

use crate::chunks_delay_tracker::ChunksDelayTracker;
Expand Down Expand Up @@ -69,6 +69,10 @@ pub struct Client {
#[cfg(feature = "test_features")]
pub adv_produce_blocks_only_valid: bool,

/// Fast Forward accrued delta height used to calculate fast forwarded timestamps for each block.
#[cfg(feature = "sandbox")]
pub(crate) accrued_fastforward_delta: near_primitives::types::BlockHeightDelta,

pub config: ClientConfig,
pub sync_status: SyncStatus,
pub chain: Chain,
Expand Down Expand Up @@ -200,6 +204,8 @@ impl Client {
adv_produce_blocks: false,
#[cfg(feature = "test_features")]
adv_produce_blocks_only_valid: false,
#[cfg(feature = "sandbox")]
accrued_fastforward_delta: 0,
config,
sync_status,
chain,
Expand Down Expand Up @@ -473,6 +479,11 @@ impl Client {
prev_next_bp_hash
};

#[cfg(feature = "sandbox")]
let timestamp_override = Some(Clock::utc() + self.sandbox_delta_time());
#[cfg(not(feature = "sandbox"))]
let timestamp_override = None;

// Get block extra from previous block.
let mut block_merkle_tree =
self.chain.mut_store().get_block_merkle_tree(&prev_hash)?.clone();
Expand Down Expand Up @@ -542,12 +553,13 @@ impl Client {
&*validator_signer,
next_bp_hash,
block_merkle_root,
timestamp_override,
);

// Update latest known even before returning block out, to prevent race conditions.
self.chain.mut_store().save_latest_known(LatestKnown {
height: next_height,
seen: to_timestamp(Clock::utc()),
seen: block.header().raw_timestamp(),
})?;

metrics::BLOCK_PRODUCED_TOTAL.inc();
Expand Down Expand Up @@ -988,6 +1000,22 @@ impl Client {
Ok(())
}

/// Gets the advanced timestamp delta in nanoseconds for sandbox once it has been fast-forwarded
#[cfg(feature = "sandbox")]
pub fn sandbox_delta_time(&self) -> chrono::Duration {
let avg_block_prod_time = (self.config.min_block_production_delay.as_nanos()
+ self.config.max_block_production_delay.as_nanos())
/ 2;
let ns = (self.accrued_fastforward_delta as u128 * avg_block_prod_time).try_into().expect(
&format!(
"Too high of a delta_height {} to convert into u64",
self.accrued_fastforward_delta
),
);

chrono::Duration::nanoseconds(ns)
}

pub fn send_approval(
&mut self,
parent_hash: &CryptoHash,
Expand Down
114 changes: 97 additions & 17 deletions chain/client/src/client_actor.rs
Expand Up @@ -101,7 +101,7 @@ pub struct ClientActor {
state_parts_client_arbiter: Arbiter,

#[cfg(feature = "sandbox")]
fastforward_delta: Option<near_primitives::types::BlockHeightDelta>,
fastforward_delta: near_primitives::types::BlockHeightDelta,

/// Synchronization measure to allow graceful shutdown.
/// Informs the system when a ClientActor gets dropped.
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ClientActor {
state_parts_client_arbiter: state_parts_arbiter,

#[cfg(feature = "sandbox")]
fastforward_delta: None,
fastforward_delta: 0,
_shutdown_signal: shutdown_signal,
})
}
Expand Down Expand Up @@ -384,9 +384,22 @@ impl ClientActor {
)
}
near_network_primitives::types::NetworkSandboxMessage::SandboxFastForward(delta_height) => {
self.fastforward_delta = Some(delta_height);
if self.fastforward_delta > 0 {
return NetworkClientResponses::SandboxResult(
near_network_primitives::types::SandboxResponse::SandboxFastForwardFailed(
"Consecutive fast_forward requests cannot be made while a current one is going on.".to_string()));
}

self.fastforward_delta = delta_height;
NetworkClientResponses::NoResponse
}
near_network_primitives::types::NetworkSandboxMessage::SandboxFastForwardStatus => {
NetworkClientResponses::SandboxResult(
near_network_primitives::types::SandboxResponse::SandboxFastForwardFinished(
self.fastforward_delta == 0,
),
)
}
};
}
NetworkClientMessages::Transaction { transaction, is_forwarded, check_only } => {
Expand Down Expand Up @@ -890,6 +903,84 @@ impl ClientActor {
}
}

/// Process the sandbox fast forward request. If the change in block height is past an epoch,
/// we fast forward to just right before the epoch, produce some blocks to get past and into
/// a new epoch, then we continue on with the residual amount to fast forward.
#[cfg(feature = "sandbox")]
fn sandbox_process_fast_forward(
&mut self,
block_height: BlockHeight,
) -> Result<Option<near_chain::types::LatestKnown>, Error> {
let mut delta_height = std::mem::replace(&mut self.fastforward_delta, 0);
if delta_height == 0 {
return Ok(None);
}

let epoch_length = self.client.config.epoch_length;
if epoch_length <= 3 {
return Err(Error::Other(
"Unsupported: fast_forward with an epoch length of 3 or less".to_string(),
));
}

// Check if we are at epoch boundary. If we are, do not fast forward until new
// epoch is here. Decrement the fast_forward count by 1 when a block is produced
// during this period of waiting
let block_height_wrt_epoch = block_height % epoch_length;
if epoch_length - block_height_wrt_epoch <= 3 || block_height_wrt_epoch == 0 {
// wait for doomslug to call into produce block
self.fastforward_delta = delta_height;
return Ok(None);
}

let delta_height = if block_height_wrt_epoch + delta_height >= epoch_length {
// fast forward to just right before epoch boundary to have epoch_manager
// handle the epoch_height updates as normal. `- 3` since this is being
// done 3 blocks before the epoch ends.
let right_before_epoch_update = epoch_length - block_height_wrt_epoch - 3;

delta_height -= right_before_epoch_update;
self.fastforward_delta = delta_height;
right_before_epoch_update
} else {
delta_height
};

self.client.accrued_fastforward_delta += delta_height;
let delta_time = self.client.sandbox_delta_time();
let new_latest_known = near_chain::types::LatestKnown {
height: block_height + delta_height,
seen: near_primitives::utils::to_timestamp(Clock::utc() + delta_time),
};

Ok(Some(new_latest_known))
}

fn pre_block_production(&mut self) -> Result<(), Error> {
#[cfg(feature = "sandbox")]
{
let latest_known = self.client.chain.mut_store().get_latest_known()?;
if let Some(new_latest_known) =
self.sandbox_process_fast_forward(latest_known.height)?
{
self.client.chain.mut_store().save_latest_known(new_latest_known.clone())?;
self.client.sandbox_update_tip(new_latest_known.height)?;
}
}
Ok(())
}

fn post_block_production(&mut self) {
#[cfg(feature = "sandbox")]
if self.fastforward_delta > 0 {
// Decrease the delta_height by 1 since we've produced a single block. This
// ensures that we advanced the right amount of blocks when fast forwarding
// and fast forwarding triggers regular block production in the case of
// stepping between epoch boundaries.
self.fastforward_delta -= 1;
}
}

/// Retrieves latest height, and checks if must produce next block.
/// Otherwise wait for block arrival or suggest to skip after timeout.
fn handle_block_production(&mut self) -> Result<(), Error> {
Expand All @@ -900,23 +991,10 @@ impl ClientActor {

let _ = self.client.check_and_update_doomslug_tip();

self.pre_block_production()?;
let head = self.client.chain.head()?;
let latest_known = self.client.chain.mut_store().get_latest_known()?;

#[cfg(feature = "sandbox")]
let latest_known = if let Some(delta_height) = self.fastforward_delta.take() {
let new_latest_known = near_chain::types::LatestKnown {
height: latest_known.height + delta_height,
seen: near_primitives::utils::to_timestamp(Clock::utc()),
};

self.client.chain.mut_store().save_latest_known(new_latest_known.clone())?;
self.client.sandbox_update_tip(new_latest_known.height)?;
new_latest_known
} else {
latest_known
};

assert!(
head.height <= latest_known.height,
"Latest known height is invalid {} vs {}",
Expand Down Expand Up @@ -948,6 +1026,8 @@ impl ClientActor {
if let Err(err) = self.produce_block(height) {
// If there is an error, report it and let it retry on the next loop step.
error!(target: "client", "Block production failed: {}", err);
} else {
self.post_block_production();
}
}
}
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/sync.rs
Expand Up @@ -1502,6 +1502,7 @@ mod test {
&*signers[3],
*last_block.header().next_bp_hash(),
block_merkle_tree.root(),
None,
);
block_merkle_tree.insert(*block.hash());

Expand Down
10 changes: 8 additions & 2 deletions chain/client/src/test_utils.rs
Expand Up @@ -66,6 +66,11 @@ use near_primitives::utils::MaybeValidated;

pub type PeerManagerMock = Mocker<PeerManagerActor>;

/// min block production time in milliseconds
pub const MIN_BLOCK_PROD_TIME: Duration = Duration::from_millis(100);
/// max block production time in milliseconds
pub const MAX_BLOCK_PROD_TIME: Duration = Duration::from_millis(200);

const TEST_SEED: RngSeed = [3; 32];
/// Sets up ClientActor and ViewClientActor viewing the same store/runtime.
pub fn setup(
Expand Down Expand Up @@ -285,8 +290,8 @@ pub fn setup_mock_with_validity_period_and_no_epoch_sync(
5,
account_id,
skip_sync_wait,
100,
200,
MIN_BLOCK_PROD_TIME.as_millis() as u64,
MAX_BLOCK_PROD_TIME.as_millis() as u64,
enable_doomslug,
false,
false,
Expand Down Expand Up @@ -1669,6 +1674,7 @@ pub fn create_chunk(
&*client.validator_signer.as_ref().unwrap().clone(),
*last_block.header().next_bp_hash(),
block_merkle_tree.root(),
None,
);
(chunk, merkle_paths, receipts, block)
}
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/tests/query_client.rs
Expand Up @@ -89,6 +89,7 @@ fn query_status_not_crash() {
&signer,
block.header.next_bp_hash,
block_merkle_tree.root(),
None,
);
next_block.mut_header().get_mut().inner_lite.timestamp =
to_timestamp(next_block.header().timestamp() + chrono::Duration::seconds(60));
Expand Down
32 changes: 32 additions & 0 deletions chain/jsonrpc/src/lib.rs
Expand Up @@ -1180,13 +1180,45 @@ impl JsonRpcHandler {
near_jsonrpc_primitives::types::sandbox::RpcSandboxFastForwardResponse,
near_jsonrpc_primitives::types::sandbox::RpcSandboxFastForwardError,
> {
use near_network_primitives::types::SandboxResponse;

self.client_addr
.send(NetworkClientMessages::Sandbox(
near_network_primitives::types::NetworkSandboxMessage::SandboxFastForward(
fast_forward_request.delta_height,
),
))
.await?;

// Hard limit the request to timeout at an hour, since fast forwarding can take a while,
// where we can leave it to the rpc clients to set their own timeouts if necessary.
timeout(Duration::from_secs(60 * 60), async {
loop {
let fast_forward_finished = self
.client_addr
.send(NetworkClientMessages::Sandbox(
near_network_primitives::types::NetworkSandboxMessage::SandboxFastForwardStatus {},
))
.await;

match fast_forward_finished {
Ok(NetworkClientResponses::SandboxResult(SandboxResponse::SandboxFastForwardFinished(true))) => break,
Ok(NetworkClientResponses::SandboxResult(SandboxResponse::SandboxFastForwardFailed(err))) => return Err(err),
_ => (),
}

let _ = sleep(self.polling_config.polling_interval).await;
}
Ok(())
})
.await
.map_err(|_| near_jsonrpc_primitives::types::sandbox::RpcSandboxFastForwardError::InternalError {
error_message: "sandbox failed to fast forward within reasonable time of an hour".to_string()
})?
.map_err(|err| near_jsonrpc_primitives::types::sandbox::RpcSandboxFastForwardError::InternalError {
error_message: format!("sandbox failed to fast forward due to: {:?}", err),
})?;

Ok(near_jsonrpc_primitives::types::sandbox::RpcSandboxFastForwardResponse {})
}
}
Expand Down
3 changes: 3 additions & 0 deletions chain/network-primitives/src/types.rs
Expand Up @@ -278,12 +278,15 @@ pub enum NetworkSandboxMessage {
SandboxPatchState(Vec<near_primitives::state_record::StateRecord>),
SandboxPatchStateStatus,
SandboxFastForward(near_primitives::types::BlockHeightDelta),
SandboxFastForwardStatus,
}

#[cfg(feature = "sandbox")]
#[derive(Eq, PartialEq, Debug)]
pub enum SandboxResponse {
SandboxPatchStateFinished(bool),
SandboxFastForwardFinished(bool),
SandboxFastForwardFailed(String),
}

#[derive(actix::Message, AsStaticStr)]
Expand Down
1 change: 1 addition & 0 deletions core/primitives/benches/serialization.rs
Expand Up @@ -68,6 +68,7 @@ fn create_block() -> Block {
&signer,
CryptoHash::default(),
CryptoHash::default(),
None,
)
}

Expand Down

0 comments on commit 76a01fe

Please sign in to comment.