Skip to content

Commit

Permalink
fix: do not auto redirect to tls scheme (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
WarmSnowy committed Oct 31, 2023
1 parent 5c245cd commit 9ef0981
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions src/service_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,25 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
} = convert_lookup_response(&response)?;
is_authoritative = authoritative;

// use the TLS connection if available
let connection_url = if let Some(u) = &broker_url_tls {
u.clone()
} else if let Some(u) = &broker_url {
u.clone()
// Use broker url with the same schema of url in setting
let (broker_url_maybe_none, broker_port) = match base_url.scheme() {
"pulsar+ssl" => (&broker_url_tls, 6651),
"pulsar" => (&broker_url, 6650),
other => {
error!("invalid scheme: {}", other);
return Err(ServiceDiscoveryError::NotFound);
}
};

let (connection_url, broker_url) = if let Some(u) = broker_url_maybe_none {
(
u.clone(),
format!(
"{}:{}",
u.host_str().unwrap(),
u.port().unwrap_or(broker_port)
),
)
} else {
return Err(ServiceDiscoveryError::NotFound);
};
Expand All @@ -132,19 +146,6 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
connection_url.clone()
};

let broker_url = if let Some(u) = broker_url_tls {
format!("{}:{}", u.host_str().unwrap(), u.port().unwrap_or(6651))
} else if let Some(u) = broker_url {
format!("{}:{}", u.host_str().unwrap(), u.port().unwrap_or(6650))
} else {
error!(
"tried to lookup a topic but error occured[{:?}]: {:?}",
line!(),
ServiceDiscoveryError::NotFound
);
return Err(ServiceDiscoveryError::NotFound);
};

broker_address = BrokerAddress {
url,
broker_url,
Expand Down

0 comments on commit 9ef0981

Please sign in to comment.