Skip to content

Commit

Permalink
Fix connection callback tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed May 5, 2022
1 parent ca10263 commit 540b597
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
24 changes: 9 additions & 15 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,8 @@ mod client {

#[tokio::test]
async fn connection_callbacks() {
use async_nats::ServerAddr;

let mut servers = vec![
nats_server::run_basic_server(),
nats_server::run_basic_server(),
];
let server = nats_server::run_basic_server();
let port = server.client_port().to_string();

let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let (dc_tx, mut dc_rx) = tokio::sync::mpsc::channel(128);
Expand All @@ -354,21 +350,19 @@ mod client {
dc_tx.send(()).await.unwrap();
}
})
.connect(
servers
.iter()
.map(|server| server.client_url().parse::<ServerAddr>().unwrap())
.collect::<Vec<ServerAddr>>()
.as_slice(),
)
.connect(server.client_url())
.await
.unwrap();
println!("conncted");
nc.subscribe("test".to_string()).await.unwrap();
nc.flush().await.unwrap();
drop(servers.remove(0));

println!("dropped server {:?}", server.client_url());
drop(server);
tokio::time::sleep(Duration::from_secs(3)).await;
println!("dropped server");

let _server = nats_server::run_server_with_port("", Some(port.as_str()));

tokio::time::timeout(Duration::from_secs(15), dc_rx.recv())
.await
.unwrap()
Expand Down
24 changes: 20 additions & 4 deletions async-nats/tests/nats_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ impl Server {
format!("{}127.0.0.1:{}", scheme, port)
}

pub fn client_port(&self) -> u16 {
let addr = self.client_addr();
let mut r = BufReader::with_capacity(1024, TcpStream::connect(addr).unwrap());
let mut line = String::new();
r.read_line(&mut line).expect("did not receive INFO");
let si = json::parse(&line["INFO".len()..]).unwrap();
si["port"].as_u16().expect("could not parse port")
}

// Allow user/pass override.
pub fn client_url_with(&self, user: &str, pass: &str) -> String {
use url::Url;
Expand Down Expand Up @@ -110,8 +119,12 @@ pub fn set_lame_duck_mode(s: &Server) {
.unwrap();
}

/// Starts a local NATS server with the given config that gets stopped and cleaned up on drop.
pub fn run_server(cfg: &str) -> Server {
run_server_with_port(cfg, None)
}

/// Starts a local NATS server with the given config that gets stopped and cleaned up on drop.
pub fn run_server_with_port(cfg: &str, port: Option<&str>) -> Server {
let id = nuid::next();
let logfile = env::temp_dir().join(format!("nats-server-{}.log", id));
let store_dir = env::temp_dir().join(format!("store-dir-{}", id));
Expand All @@ -122,9 +135,12 @@ pub fn run_server(cfg: &str) -> Server {
let mut cmd = Command::new("nats-server");
cmd.arg("--store_dir")
.arg(store_dir.as_path().to_str().unwrap())
.arg("-p")
.arg("-1")
.arg("-l")
.arg("-p");
match port {
Some(port) => cmd.arg(port),
None => cmd.arg("-1"),
};
cmd.arg("-l")
.arg(logfile.as_os_str())
.arg("-P")
.arg(pidfile.as_os_str());
Expand Down

0 comments on commit 540b597

Please sign in to comment.