diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index 22b523804..52e77fc2c 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -330,12 +330,8 @@ mod client { #[tokio::test] async fn connection_callbacks() { - use async_nats::ServerAddr; - - let mut servers = vec![ - nats_server::run_basic_server(), - nats_server::run_basic_server(), - ]; + let server = nats_server::run_basic_server(); + let port = server.client_port().to_string(); let (tx, mut rx) = tokio::sync::mpsc::channel(128); let (dc_tx, mut dc_rx) = tokio::sync::mpsc::channel(128); @@ -354,21 +350,19 @@ mod client { dc_tx.send(()).await.unwrap(); } }) - .connect( - servers - .iter() - .map(|server| server.client_url().parse::().unwrap()) - .collect::>() - .as_slice(), - ) + .connect(server.client_url()) .await .unwrap(); println!("conncted"); nc.subscribe("test".to_string()).await.unwrap(); nc.flush().await.unwrap(); - drop(servers.remove(0)); + + println!("dropped server {:?}", server.client_url()); + drop(server); tokio::time::sleep(Duration::from_secs(3)).await; - println!("dropped server"); + + let _server = nats_server::run_server_with_port("", Some(port.as_str())); + tokio::time::timeout(Duration::from_secs(15), dc_rx.recv()) .await .unwrap() diff --git a/async-nats/tests/nats_server/mod.rs b/async-nats/tests/nats_server/mod.rs index e25b19c8b..deacee502 100644 --- a/async-nats/tests/nats_server/mod.rs +++ b/async-nats/tests/nats_server/mod.rs @@ -66,6 +66,15 @@ impl Server { format!("{}127.0.0.1:{}", scheme, port) } + pub fn client_port(&self) -> u16 { + let addr = self.client_addr(); + let mut r = BufReader::with_capacity(1024, TcpStream::connect(addr).unwrap()); + let mut line = String::new(); + r.read_line(&mut line).expect("did not receive INFO"); + let si = json::parse(&line["INFO".len()..]).unwrap(); + si["port"].as_u16().expect("could not parse port") + } + // Allow user/pass override. pub fn client_url_with(&self, user: &str, pass: &str) -> String { use url::Url; @@ -110,8 +119,12 @@ pub fn set_lame_duck_mode(s: &Server) { .unwrap(); } -/// Starts a local NATS server with the given config that gets stopped and cleaned up on drop. pub fn run_server(cfg: &str) -> Server { + run_server_with_port(cfg, None) +} + +/// Starts a local NATS server with the given config that gets stopped and cleaned up on drop. +pub fn run_server_with_port(cfg: &str, port: Option<&str>) -> Server { let id = nuid::next(); let logfile = env::temp_dir().join(format!("nats-server-{}.log", id)); let store_dir = env::temp_dir().join(format!("store-dir-{}", id)); @@ -122,9 +135,12 @@ pub fn run_server(cfg: &str) -> Server { let mut cmd = Command::new("nats-server"); cmd.arg("--store_dir") .arg(store_dir.as_path().to_str().unwrap()) - .arg("-p") - .arg("-1") - .arg("-l") + .arg("-p"); + match port { + Some(port) => cmd.arg(port), + None => cmd.arg("-1"), + }; + cmd.arg("-l") .arg(logfile.as_os_str()) .arg("-P") .arg(pidfile.as_os_str());