From 9ef0981fbe9d362a10ebd48f8564fb64c12de7a0 Mon Sep 17 00:00:00 2001 From: WarmSnowy <17583220+WarmSnowy@users.noreply.github.com> Date: Wed, 1 Nov 2023 00:05:12 +0800 Subject: [PATCH] fix: do not auto redirect to tls scheme (#299) --- src/service_discovery.rs | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/service_discovery.rs b/src/service_discovery.rs index 3bd4ad3..1393f6c 100644 --- a/src/service_discovery.rs +++ b/src/service_discovery.rs @@ -116,11 +116,25 @@ impl ServiceDiscovery { } = convert_lookup_response(&response)?; is_authoritative = authoritative; - // use the TLS connection if available - let connection_url = if let Some(u) = &broker_url_tls { - u.clone() - } else if let Some(u) = &broker_url { - u.clone() + // Use broker url with the same schema of url in setting + let (broker_url_maybe_none, broker_port) = match base_url.scheme() { + "pulsar+ssl" => (&broker_url_tls, 6651), + "pulsar" => (&broker_url, 6650), + other => { + error!("invalid scheme: {}", other); + return Err(ServiceDiscoveryError::NotFound); + } + }; + + let (connection_url, broker_url) = if let Some(u) = broker_url_maybe_none { + ( + u.clone(), + format!( + "{}:{}", + u.host_str().unwrap(), + u.port().unwrap_or(broker_port) + ), + ) } else { return Err(ServiceDiscoveryError::NotFound); }; @@ -132,19 +146,6 @@ impl ServiceDiscovery { connection_url.clone() }; - let broker_url = if let Some(u) = broker_url_tls { - format!("{}:{}", u.host_str().unwrap(), u.port().unwrap_or(6651)) - } else if let Some(u) = broker_url { - format!("{}:{}", u.host_str().unwrap(), u.port().unwrap_or(6650)) - } else { - error!( - "tried to lookup a topic but error occured[{:?}]: {:?}", - line!(), - ServiceDiscoveryError::NotFound - ); - return Err(ServiceDiscoveryError::NotFound); - }; - broker_address = BrokerAddress { url, broker_url,