diff --git a/pytest/tests/mocknet/helpers/neard_runner.py b/pytest/tests/mocknet/helpers/neard_runner.py index 9a427016bb9..04424118d0a 100644 --- a/pytest/tests/mocknet/helpers/neard_runner.py +++ b/pytest/tests/mocknet/helpers/neard_runner.py @@ -489,11 +489,22 @@ def do_update_config(self, key_value): return True - def do_start(self): + def do_start(self, batch_interval_millis=None): + if batch_interval_millis is not None and not isinstance( + batch_interval_millis, int): + raise ValueError( + f'batch_interval_millis: {batch_interval_millis} not an int') with self.lock: state = self.get_state() if state == TestState.STOPPED: - self.start_neard() + if batch_interval_millis is not None and not self.is_traffic_generator( + ): + logging.warn( + f'got batch_interval_millis = {batch_interval_millis} on non traffic generator node. Ignoring it.' + ) + batch_interval_millis = None + # TODO: restart it if we get a different batch_interval_millis than last time + self.start_neard(batch_interval_millis) self.set_state(TestState.RUNNING) self.save_data() elif state != TestState.RUNNING: @@ -704,7 +715,7 @@ def kill_neard(self): self.save_data() # If this is a regular node, starts neard run. If it's a traffic generator, starts neard mirror run - def start_neard(self): + def start_neard(self, batch_interval_millis=None): for i in range(20, -1, -1): old_log = os.path.join(self.neard_logs_dir, f'log-{i}.txt') new_log = os.path.join(self.neard_logs_dir, f'log-{i+1}.txt') @@ -725,6 +736,22 @@ def start_neard(self): self.target_near_home_path(), '--no-secret', ] + if batch_interval_millis is not None: + with open(self.target_near_home_path('mirror-config.json'), + 'w') as f: + secs = batch_interval_millis // 1000 + nanos = (batch_interval_millis % 1000) * 1000000 + json.dump( + { + 'tx_batch_interval': { + 'secs': secs, + 'nanos': nanos + } + }, + f, + indent=2) + cmd.append('--config-path') + cmd.append(self.target_near_home_path('mirror-config.json')) else: cmd = [ self.data['current_neard_path'], '--log-span-events', diff --git a/pytest/tests/mocknet/mirror.py b/pytest/tests/mocknet/mirror.py index 8ee544eca28..8714c4ae2b8 100755 --- a/pytest/tests/mocknet/mirror.py +++ b/pytest/tests/mocknet/mirror.py @@ -335,7 +335,8 @@ def start_traffic_cmd(args, traffic_generator, nodes): logger.info( "waiting a bit after validators started before starting traffic") time.sleep(10) - traffic_generator.neard_runner_start() + traffic_generator.neard_runner_start( + batch_interval_millis=args.batch_interval_millis) logger.info( f'test running. to check the traffic sent, try running "curl --silent http://{traffic_generator.ip_addr()}:{traffic_generator.neard_port()}/metrics | grep near_mirror"' ) @@ -423,6 +424,15 @@ def update_binaries_cmd(args, traffic_generator, nodes): help= 'Starts all nodes and starts neard mirror run on the traffic generator.' ) + start_traffic_parser.add_argument( + '--batch-interval-millis', + type=int, + help= + '''Interval in millis between sending each mainnet block\'s worth of transactions. + Without this flag, the traffic generator will try to match the per-block load on mainnet. + So, transactions from consecutive mainnet blocks will be be sent with delays + between them such that they will probably appear in consecutive mocknet blocks. + ''') start_traffic_parser.set_defaults(func=start_traffic_cmd) start_nodes_parser = subparsers.add_parser( diff --git a/pytest/tests/mocknet/node_handle.py b/pytest/tests/mocknet/node_handle.py index 92e10b699bb..5707ec2c74a 100644 --- a/pytest/tests/mocknet/node_handle.py +++ b/pytest/tests/mocknet/node_handle.py @@ -81,8 +81,12 @@ def neard_runner_jsonrpc(self, method, params=[]): ) return response['result'] - def neard_runner_start(self): - return self.neard_runner_jsonrpc('start') + def neard_runner_start(self, batch_interval_millis=None): + if batch_interval_millis is None: + params = [] + else: + params = {'batch_interval_millis': batch_interval_millis} + return self.neard_runner_jsonrpc('start', params=params) def neard_runner_stop(self): return self.neard_runner_jsonrpc('stop') diff --git a/pytest/tools/mirror/mirror_utils.py b/pytest/tools/mirror/mirror_utils.py index b363dcf0366..46f2bab89db 100644 --- a/pytest/tools/mirror/mirror_utils.py +++ b/pytest/tools/mirror/mirror_utils.py @@ -229,13 +229,26 @@ def start(self): env = os.environ.copy() env["RUST_LOG"] = "actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn,indexer=info," + env.get( "RUST_LOG", "debug") + config_path = dot_near() / f'{MIRROR_DIR}/config.json' with open(dot_near() / f'{MIRROR_DIR}/stdout', 'ab') as stdout, \ - open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr: + open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr, \ + open(config_path, 'w') as mirror_config: + json.dump({'tx_batch_interval': { + 'secs': 0, + 'nanos': 600000000 + }}, mirror_config) args = [ - self.neard, 'mirror', 'run', "--source-home", self.source_home, + self.neard, + 'mirror', + 'run', + "--source-home", + self.source_home, "--target-home", - dot_near() / f'{MIRROR_DIR}/target/', '--secret-file', - dot_near() / f'{MIRROR_DIR}/target/mirror-secret.json' + dot_near() / f'{MIRROR_DIR}/target/', + '--secret-file', + dot_near() / f'{MIRROR_DIR}/target/mirror-secret.json', + '--config-path', + config_path, ] if self.online_source: args.append('--online-source') diff --git a/tools/mirror/src/chain_tracker.rs b/tools/mirror/src/chain_tracker.rs index 29641131bdc..a232ce0802d 100644 --- a/tools/mirror/src/chain_tracker.rs +++ b/tools/mirror/src/chain_tracker.rs @@ -149,6 +149,8 @@ pub(crate) struct TxTracker { send_time: Pin>, // Config value in the target chain, used to judge how long to wait before sending a new batch of txs min_block_production_delay: Duration, + // optional specific tx send delay + tx_batch_interval: Option, // timestamps in the target chain, used to judge how long to wait before sending a new batch of txs recent_block_timestamps: VecDeque, // last source block we'll be sending transactions for @@ -161,6 +163,7 @@ impl TxTracker { // we unwrap() self.height_queued() in Self::next_heights() pub(crate) fn new<'a, I>( min_block_production_delay: Duration, + tx_batch_interval: Option, next_heights: I, stop_height: Option, ) -> Self @@ -172,6 +175,7 @@ impl TxTracker { min_block_production_delay, next_heights, stop_height, + tx_batch_interval, // Wait at least 15 seconds before sending any transactions because for // a few seconds after the node starts, transaction routing requests // will be silently dropped by the peer manager. @@ -1143,9 +1147,10 @@ impl TxTracker { let (txs_sent, provenance) = match sent_batch { SentBatch::MappedBlock(b) => { - let block_delay = self - .second_longest_recent_block_delay() - .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)); + let block_delay = self.tx_batch_interval.unwrap_or_else(|| { + self.second_longest_recent_block_delay() + .unwrap_or(self.min_block_production_delay + Duration::from_millis(100)) + }); self.send_time.as_mut().reset(tokio::time::Instant::now() + block_delay); crate::set_last_source_height(db, b.source_height)?; let txs = b diff --git a/tools/mirror/src/cli.rs b/tools/mirror/src/cli.rs index a9931498efc..c0d69193486 100644 --- a/tools/mirror/src/cli.rs +++ b/tools/mirror/src/cli.rs @@ -43,6 +43,8 @@ struct RunCmd { /// this height in the source chain #[clap(long)] stop_height: Option, + #[clap(long)] + config_path: Option, } impl RunCmd { @@ -80,6 +82,7 @@ impl RunCmd { secret, self.stop_height, self.online_source, + self.config_path, )) .await }) diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 809582269c3..49d32eae816 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -458,6 +458,16 @@ fn execution_status_good(status: &ExecutionStatusView) -> bool { ) } +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +struct MirrorConfig { + /// Delay between sending batches of mainnet transactions. If this is + /// given, then instead of trying to get the transactions in consecutive + /// mainnet blocks to appear in consecutive target chain blocks, we will + /// wait this long before sending each mainnet block's worth of transactions. + /// TODO: add an option to target a specific number of transactions per second + tx_batch_interval: Option, +} + const CREATE_ACCOUNT_DELTA: usize = 5; struct TxMirror { @@ -475,6 +485,7 @@ struct TxMirror { target_min_block_production_delay: Duration, secret: Option<[u8; crate::secret::SECRET_LEN]>, default_extra_key: SecretKey, + config: MirrorConfig, } fn open_db>(home: P, config: &NearConfig) -> anyhow::Result { @@ -854,6 +865,7 @@ impl TxMirror { source_chain_access: T, target_home: P, secret: Option<[u8; crate::secret::SECRET_LEN]>, + config: MirrorConfig, ) -> anyhow::Result { let target_config = nearcore::config::load_config(target_home.as_ref(), GenesisValidationMode::UnsafeFast) @@ -893,6 +905,7 @@ impl TxMirror { .unsigned_abs(), secret, default_extra_key, + config, }) } @@ -1777,6 +1790,7 @@ impl TxMirror { let mut tracker = crate::chain_tracker::TxTracker::new( self.target_min_block_production_delay, + self.config.tx_batch_interval, next_heights.iter(), stop_height, ); @@ -1821,16 +1835,28 @@ async fn run>( secret: Option<[u8; crate::secret::SECRET_LEN]>, stop_height: Option, online_source: bool, + config_path: Option

, ) -> anyhow::Result<()> { + let config: MirrorConfig = match config_path { + Some(p) => { + let c = std::fs::read_to_string(p.as_ref()) + .with_context(|| format!("Could not read config from {}", p.as_ref().display()))?; + serde_json::from_str(&c) + .with_context(|| format!("Could not parse config from {}", p.as_ref().display()))? + } + None => Default::default(), + }; if !online_source { let source_chain_access = crate::offline::ChainAccess::new(source_home)?; let stop_height = stop_height.unwrap_or( source_chain_access.head_height().await.context("could not fetch source chain head")?, ); - TxMirror::new(source_chain_access, target_home, secret)?.run(Some(stop_height)).await + TxMirror::new(source_chain_access, target_home, secret, config)? + .run(Some(stop_height)) + .await } else { tracing::warn!(target: "mirror", "FIXME: currently --online-source will skip DeployContract actions"); - TxMirror::new(crate::online::ChainAccess::new(source_home)?, target_home, secret)? + TxMirror::new(crate::online::ChainAccess::new(source_home)?, target_home, secret, config)? .run(stop_height) .await }