Skip to content

Commit

Permalink
feat(mirror): add a config file with a tx_send_interval option (#11098)
Browse files Browse the repository at this point in the history
This option will allow setting the delay between sending each mainnet
block's worth of transactions. If it is not present, we will keep the
old behavior of waiting about as long as blocks in the target chain are
taking to be produced, and if it is present, we'll wait for however long
is indicated between sending each batch. The load that this results in
depends on the number of transactions in each mainnet block we're
sending, but in the future it would be good to implement another option
that allows targeting a specific number of transactions per second, but
this is easier to implement for now.
  • Loading branch information
marcelo-gonzalez committed Apr 24, 2024
1 parent f792d3b commit 02a315a
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 15 deletions.
33 changes: 30 additions & 3 deletions pytest/tests/mocknet/helpers/neard_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,22 @@ def do_update_config(self, key_value):

return True

def do_start(self):
def do_start(self, batch_interval_millis=None):
if batch_interval_millis is not None and not isinstance(
batch_interval_millis, int):
raise ValueError(
f'batch_interval_millis: {batch_interval_millis} not an int')
with self.lock:
state = self.get_state()
if state == TestState.STOPPED:
self.start_neard()
if batch_interval_millis is not None and not self.is_traffic_generator(
):
logging.warn(
f'got batch_interval_millis = {batch_interval_millis} on non traffic generator node. Ignoring it.'
)
batch_interval_millis = None
# TODO: restart it if we get a different batch_interval_millis than last time
self.start_neard(batch_interval_millis)
self.set_state(TestState.RUNNING)
self.save_data()
elif state != TestState.RUNNING:
Expand Down Expand Up @@ -704,7 +715,7 @@ def kill_neard(self):
self.save_data()

# If this is a regular node, starts neard run. If it's a traffic generator, starts neard mirror run
def start_neard(self):
def start_neard(self, batch_interval_millis=None):
for i in range(20, -1, -1):
old_log = os.path.join(self.neard_logs_dir, f'log-{i}.txt')
new_log = os.path.join(self.neard_logs_dir, f'log-{i+1}.txt')
Expand All @@ -725,6 +736,22 @@ def start_neard(self):
self.target_near_home_path(),
'--no-secret',
]
if batch_interval_millis is not None:
with open(self.target_near_home_path('mirror-config.json'),
'w') as f:
secs = batch_interval_millis // 1000
nanos = (batch_interval_millis % 1000) * 1000000
json.dump(
{
'tx_batch_interval': {
'secs': secs,
'nanos': nanos
}
},
f,
indent=2)
cmd.append('--config-path')
cmd.append(self.target_near_home_path('mirror-config.json'))
else:
cmd = [
self.data['current_neard_path'], '--log-span-events',
Expand Down
12 changes: 11 additions & 1 deletion pytest/tests/mocknet/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ def start_traffic_cmd(args, traffic_generator, nodes):
logger.info(
"waiting a bit after validators started before starting traffic")
time.sleep(10)
traffic_generator.neard_runner_start()
traffic_generator.neard_runner_start(
batch_interval_millis=args.batch_interval_millis)
logger.info(
f'test running. to check the traffic sent, try running "curl --silent http://{traffic_generator.ip_addr()}:{traffic_generator.neard_port()}/metrics | grep near_mirror"'
)
Expand Down Expand Up @@ -423,6 +424,15 @@ def update_binaries_cmd(args, traffic_generator, nodes):
help=
'Starts all nodes and starts neard mirror run on the traffic generator.'
)
start_traffic_parser.add_argument(
'--batch-interval-millis',
type=int,
help=
'''Interval in millis between sending each mainnet block\'s worth of transactions.
Without this flag, the traffic generator will try to match the per-block load on mainnet.
So, transactions from consecutive mainnet blocks will be be sent with delays
between them such that they will probably appear in consecutive mocknet blocks.
''')
start_traffic_parser.set_defaults(func=start_traffic_cmd)

start_nodes_parser = subparsers.add_parser(
Expand Down
8 changes: 6 additions & 2 deletions pytest/tests/mocknet/node_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ def neard_runner_jsonrpc(self, method, params=[]):
)
return response['result']

def neard_runner_start(self):
return self.neard_runner_jsonrpc('start')
def neard_runner_start(self, batch_interval_millis=None):
if batch_interval_millis is None:
params = []
else:
params = {'batch_interval_millis': batch_interval_millis}
return self.neard_runner_jsonrpc('start', params=params)

def neard_runner_stop(self):
return self.neard_runner_jsonrpc('stop')
Expand Down
21 changes: 17 additions & 4 deletions pytest/tools/mirror/mirror_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,26 @@ def start(self):
env = os.environ.copy()
env["RUST_LOG"] = "actix_web=warn,mio=warn,tokio_util=warn,actix_server=warn,actix_http=warn,indexer=info," + env.get(
"RUST_LOG", "debug")
config_path = dot_near() / f'{MIRROR_DIR}/config.json'
with open(dot_near() / f'{MIRROR_DIR}/stdout', 'ab') as stdout, \
open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr:
open(dot_near() / f'{MIRROR_DIR}/stderr', 'ab') as stderr, \
open(config_path, 'w') as mirror_config:
json.dump({'tx_batch_interval': {
'secs': 0,
'nanos': 600000000
}}, mirror_config)
args = [
self.neard, 'mirror', 'run', "--source-home", self.source_home,
self.neard,
'mirror',
'run',
"--source-home",
self.source_home,
"--target-home",
dot_near() / f'{MIRROR_DIR}/target/', '--secret-file',
dot_near() / f'{MIRROR_DIR}/target/mirror-secret.json'
dot_near() / f'{MIRROR_DIR}/target/',
'--secret-file',
dot_near() / f'{MIRROR_DIR}/target/mirror-secret.json',
'--config-path',
config_path,
]
if self.online_source:
args.append('--online-source')
Expand Down
11 changes: 8 additions & 3 deletions tools/mirror/src/chain_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub(crate) struct TxTracker {
send_time: Pin<Box<tokio::time::Sleep>>,
// Config value in the target chain, used to judge how long to wait before sending a new batch of txs
min_block_production_delay: Duration,
// optional specific tx send delay
tx_batch_interval: Option<Duration>,
// timestamps in the target chain, used to judge how long to wait before sending a new batch of txs
recent_block_timestamps: VecDeque<u64>,
// last source block we'll be sending transactions for
Expand All @@ -161,6 +163,7 @@ impl TxTracker {
// we unwrap() self.height_queued() in Self::next_heights()
pub(crate) fn new<'a, I>(
min_block_production_delay: Duration,
tx_batch_interval: Option<Duration>,
next_heights: I,
stop_height: Option<BlockHeight>,
) -> Self
Expand All @@ -172,6 +175,7 @@ impl TxTracker {
min_block_production_delay,
next_heights,
stop_height,
tx_batch_interval,
// Wait at least 15 seconds before sending any transactions because for
// a few seconds after the node starts, transaction routing requests
// will be silently dropped by the peer manager.
Expand Down Expand Up @@ -1143,9 +1147,10 @@ impl TxTracker {

let (txs_sent, provenance) = match sent_batch {
SentBatch::MappedBlock(b) => {
let block_delay = self
.second_longest_recent_block_delay()
.unwrap_or(self.min_block_production_delay + Duration::from_millis(100));
let block_delay = self.tx_batch_interval.unwrap_or_else(|| {
self.second_longest_recent_block_delay()
.unwrap_or(self.min_block_production_delay + Duration::from_millis(100))
});
self.send_time.as_mut().reset(tokio::time::Instant::now() + block_delay);
crate::set_last_source_height(db, b.source_height)?;
let txs = b
Expand Down
3 changes: 3 additions & 0 deletions tools/mirror/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ struct RunCmd {
/// this height in the source chain
#[clap(long)]
stop_height: Option<BlockHeight>,
#[clap(long)]
config_path: Option<PathBuf>,
}

impl RunCmd {
Expand Down Expand Up @@ -80,6 +82,7 @@ impl RunCmd {
secret,
self.stop_height,
self.online_source,
self.config_path,
))
.await
})
Expand Down
30 changes: 28 additions & 2 deletions tools/mirror/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,16 @@ fn execution_status_good(status: &ExecutionStatusView) -> bool {
)
}

#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
struct MirrorConfig {
/// Delay between sending batches of mainnet transactions. If this is
/// given, then instead of trying to get the transactions in consecutive
/// mainnet blocks to appear in consecutive target chain blocks, we will
/// wait this long before sending each mainnet block's worth of transactions.
/// TODO: add an option to target a specific number of transactions per second
tx_batch_interval: Option<Duration>,
}

const CREATE_ACCOUNT_DELTA: usize = 5;

struct TxMirror<T: ChainAccess> {
Expand All @@ -475,6 +485,7 @@ struct TxMirror<T: ChainAccess> {
target_min_block_production_delay: Duration,
secret: Option<[u8; crate::secret::SECRET_LEN]>,
default_extra_key: SecretKey,
config: MirrorConfig,
}

fn open_db<P: AsRef<Path>>(home: P, config: &NearConfig) -> anyhow::Result<DB> {
Expand Down Expand Up @@ -854,6 +865,7 @@ impl<T: ChainAccess> TxMirror<T> {
source_chain_access: T,
target_home: P,
secret: Option<[u8; crate::secret::SECRET_LEN]>,
config: MirrorConfig,
) -> anyhow::Result<Self> {
let target_config =
nearcore::config::load_config(target_home.as_ref(), GenesisValidationMode::UnsafeFast)
Expand Down Expand Up @@ -893,6 +905,7 @@ impl<T: ChainAccess> TxMirror<T> {
.unsigned_abs(),
secret,
default_extra_key,
config,
})
}

Expand Down Expand Up @@ -1777,6 +1790,7 @@ impl<T: ChainAccess> TxMirror<T> {

let mut tracker = crate::chain_tracker::TxTracker::new(
self.target_min_block_production_delay,
self.config.tx_batch_interval,
next_heights.iter(),
stop_height,
);
Expand Down Expand Up @@ -1821,16 +1835,28 @@ async fn run<P: AsRef<Path>>(
secret: Option<[u8; crate::secret::SECRET_LEN]>,
stop_height: Option<BlockHeight>,
online_source: bool,
config_path: Option<P>,
) -> anyhow::Result<()> {
let config: MirrorConfig = match config_path {
Some(p) => {
let c = std::fs::read_to_string(p.as_ref())
.with_context(|| format!("Could not read config from {}", p.as_ref().display()))?;
serde_json::from_str(&c)
.with_context(|| format!("Could not parse config from {}", p.as_ref().display()))?
}
None => Default::default(),
};
if !online_source {
let source_chain_access = crate::offline::ChainAccess::new(source_home)?;
let stop_height = stop_height.unwrap_or(
source_chain_access.head_height().await.context("could not fetch source chain head")?,
);
TxMirror::new(source_chain_access, target_home, secret)?.run(Some(stop_height)).await
TxMirror::new(source_chain_access, target_home, secret, config)?
.run(Some(stop_height))
.await
} else {
tracing::warn!(target: "mirror", "FIXME: currently --online-source will skip DeployContract actions");
TxMirror::new(crate::online::ChainAccess::new(source_home)?, target_home, secret)?
TxMirror::new(crate::online::ChainAccess::new(source_home)?, target_home, secret, config)?
.run(stop_height)
.await
}
Expand Down

0 comments on commit 02a315a

Please sign in to comment.