Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor DNS-based channel refresh within DialogueChannel #2269

Merged
merged 19 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-2269.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: DNS-based channel refresh occurs within DialogueChannel, allowing new
addresses to be routed to on retries.
links:
- https://github.com/palantir/dialogue/pull/2269
1 change: 1 addition & 0 deletions dialogue-clients/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Dialogue-specific metrics that are not necessarily applicable to other client im
- `service-name`
- `endpoint`
- `dialogue.client.create` tagged `client-name`, `client-type` (meter): Marked every time a new client is created.
- `dialogue.client.reload` tagged `client-name`, `client-type` (meter): Marked every time a clients targets are reloaded, including initial creation.

### dialogue.concurrencylimiter
Instrumentation for the ConcurrencyLimitedChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.Unsafe;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -106,9 +108,15 @@ static ChannelCache createEmptyCache() {
DialogueChannel getNonReloadingChannel(
ReloadingClientFactory.ReloadingParams reloadingParams,
ServiceConfiguration serviceConf,
List<TargetUri> uris,
@Safe String channelName) {
return getNonReloadingChannel(reloadingParams, serviceConf, channelName, Optional.empty());
}

DialogueChannel getNonReloadingChannel(
ReloadingClientFactory.ReloadingParams reloadingParams,
ServiceConfiguration serviceConf,
@Safe String channelName,
OptionalInt overrideHostIndex) {
Optional<OverrideHostIndex> overrideHostIndex) {
if (log.isWarnEnabled()) {
long estimatedSize = channelCache.estimatedSize();
if (estimatedSize >= MAX_CACHED_CHANNELS * 0.75) {
Expand All @@ -124,10 +132,11 @@ DialogueChannel getNonReloadingChannel(
.from(reloadingParams)
.blockingExecutor(reloadingParams.blockingExecutor())
.serviceConf(serviceConf)
.uris(uris)
.channelName(channelName)
.overrideHostIndex(overrideHostIndex)
.dnsResolver(reloadingParams.dnsResolver())
.dnsRefreshInterval(reloadingParams.dnsRefreshInterval())
.dnsNodeDiscovery(overrideHostIndex.isEmpty() && reloadingParams.dnsNodeDiscovery())
.build());
}

Expand All @@ -142,15 +151,38 @@ private DialogueChannel createNonLiveReloadingChannel(ChannelCacheKey channelCac

ApacheCacheEntry apacheClient = getApacheClient(request);

Refreshable<List<TargetUri>> targets;
if (channelCacheRequest.overrideHostIndex().isPresent()) {
targets = Refreshable.only(
List.of(channelCacheRequest.overrideHostIndex().get().target()));
} else {
DnsPollingSpec<ServiceConfiguration> spec = DnsPollingSpec.serviceConfig(channelCacheRequest.channelName());
targets = DnsSupport.pollForChanges(
channelCacheRequest.dnsNodeDiscovery(),
spec,
channelCacheRequest.dnsResolver(),
channelCacheRequest.dnsRefreshInterval(),
channelCacheRequest.taggedMetrics(),
Refreshable.only(channelCacheRequest.serviceConf()))
.map(dnsResolutionResults -> DnsSupport.getTargetUris(
channelCacheRequest.channelName(),
channelCacheRequest.serviceConf().uris(),
DnsSupport.proxySelector(
channelCacheRequest.serviceConf().proxy()),
dnsResolutionResults.resolvedHosts(),
channelCacheRequest.taggedMetrics()));
}
return DialogueChannel.builder()
.channelName(channelCacheRequest.channelName())
.clientConfiguration(ClientConfiguration.builder()
.from(apacheClient.conf())
.uris(channelCacheRequest.serviceConf().uris()) // restore uris
.build())
.uris(channelCacheRequest.uris())
.uris(targets)
.factory(args -> ApacheHttpClientChannels.createSingleUri(args, apacheClient.client()))
.overrideHostIndex(channelCacheRequest.overrideHostIndex())
.overrideHostIndex(channelCacheRequest.overrideHostIndex().stream()
.mapToInt(OverrideHostIndex::index)
.findAny())
.build();
}

Expand Down Expand Up @@ -231,15 +263,31 @@ public String toString() {
interface ChannelCacheKey extends AugmentClientConfig {
ServiceConfiguration serviceConf();

List<TargetUri> uris();

Optional<ExecutorService> blockingExecutor();

String channelName();

OptionalInt overrideHostIndex();
Optional<OverrideHostIndex> overrideHostIndex();

DialogueDnsResolver dnsResolver();

Duration dnsRefreshInterval();

boolean dnsNodeDiscovery();
}

@Unsafe
@Value.Immutable
interface OverrideHostIndex {
@Value.Parameter(order = 0)
int index();

@Value.Parameter(order = 1)
TargetUri target();

static OverrideHostIndex of(int index, TargetUri target) {
return ImmutableOverrideHostIndex.of(index, target);
}
}

@DoNotLog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

import com.google.common.collect.ImmutableList;
import com.palantir.conjure.java.api.config.service.ServiceConfiguration;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
import com.palantir.conjure.java.client.config.ClientConfiguration;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.Unsafe;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

/** Internal interface to handle DNS refresh from various sources. */
Expand All @@ -39,34 +36,8 @@ interface DnsPollingSpec<INPUT> {
/** Returns a channel-name associated with the host. */
List<@Safe String> describeHostname(INPUT input, String hostname);

DnsPollingSpec<ServicesConfigBlock> RELOADING_FACTORY = new DnsPollingSpec<>() {

@Override
public String kind() {
return "reloading-client-factory";
}

@Override
public Stream<String> extractUris(ServicesConfigBlock servicesConfigBlock) {
return servicesConfigBlock.services().values().stream().flatMap(psc -> psc.uris().stream());
}

@Override
public List<@Safe String> describeHostname(ServicesConfigBlock servicesConfigBlock, @Unsafe String host) {
return servicesConfigBlock.services().entrySet().stream()
.filter(entry -> entry.getValue().uris().stream()
.map(DnsSupport::tryGetHost)
.filter(Objects::nonNull)
.anyMatch(host::equals))
.map(Map.Entry::getKey)
.map(ChannelNames::reloading)
.collect(ImmutableList.toImmutableList());
}
};

static DnsPollingSpec<ClientConfiguration> clientConfig(@Safe String channelName) {
return new DnsPollingSpec<>() {

@Override
public String kind() {
return channelName;
Expand All @@ -86,7 +57,6 @@ public Stream<String> extractUris(ClientConfiguration input) {

static DnsPollingSpec<ServiceConfiguration> serviceConfig(@Safe String channelName) {
return new DnsPollingSpec<>() {

@Override
public String kind() {
return channelName;
Expand All @@ -103,4 +73,23 @@ public Stream<String> extractUris(ServiceConfiguration input) {
}
};
}

static DnsPollingSpec<Optional<ServiceConfiguration>> optionalServiceConfig(@Safe String channelName) {
return new DnsPollingSpec<>() {
@Override
public String kind() {
return channelName;
}

@Override
public Stream<String> extractUris(Optional<ServiceConfiguration> input) {
return input.stream().flatMap(item -> item.uris().stream());
}

@Override
public List<@Safe String> describeHostname(Optional<ServiceConfiguration> _configuration, String _uri) {
return ImmutableList.of(kind());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.conjure.java.api.config.service.ProxyConfiguration;
import com.palantir.conjure.java.client.config.ClientConfigurations;
import com.palantir.dialogue.core.DialogueDnsResolver;
import com.palantir.dialogue.core.DialogueExecutors;
import com.palantir.dialogue.core.TargetUri;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.Unsafe;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Disposable;
Expand All @@ -32,10 +41,17 @@
import com.palantir.tritium.metrics.registry.SharedTaggedMetricRegistries;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.lang.ref.Cleaner;
import java.net.InetAddress;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -133,6 +149,94 @@ static boolean isMeshMode(String uri) {
return uri.startsWith(MESH_PREFIX);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
static ImmutableList<TargetUri> getTargetUris(
@Safe String serviceNameForLogging,
Collection<String> uris,
ProxySelector proxySelector,
Optional<ImmutableSetMultimap<String, InetAddress>> resolvedHosts,
TaggedMetricRegistry metrics) {
List<TargetUri> targetUris = new ArrayList<>();
boolean failedToParse = false;
for (String uri : uris) {
URI parsed = tryParseUri(metrics, serviceNameForLogging, uri);
if (parsed == null || parsed.getHost() == null) {
failedToParse = true;
continue;
}
// When resolvedHosts is an empty optional, dns-based discovery is not supported.
// Mesh mode does not require any form of dns updating because all dns results
// are considered equivalent.
// When a proxy is used, pre-resolved IP addresses have no impact. In many cases the
// proxy handles DNS resolution.
if (resolvedHosts.isEmpty() || DnsSupport.isMeshMode(uri) || usesProxy(proxySelector, parsed)) {
targetUris.add(TargetUri.of(uri));
} else {
String host = parsed.getHost();
Set<InetAddress> resolvedAddresses = resolvedHosts.get().get(host);
if (resolvedAddresses.isEmpty()) {
log.info(
"Resolved no addresses for host '{}' of service '{}'",
UnsafeArg.of("host", host),
SafeArg.of("service", serviceNameForLogging));
}
for (InetAddress addr : resolvedAddresses) {
targetUris.add(
TargetUri.builder().uri(uri).resolvedAddress(addr).build());
}
}
}
if (targetUris.isEmpty() && failedToParse) {
// Handle cases like "host:-1", but only when _all_ uris are invalid
log.warn(
"Failed to parse all URIs, falling back to legacy DNS approach for service '{}'",
SafeArg.of("service", serviceNameForLogging));
for (String uri : uris) {
targetUris.add(TargetUri.of(uri));
}
}
return ImmutableSet.copyOf(targetUris).asList();
}

static ProxySelector proxySelector(Optional<ProxyConfiguration> proxyConfiguration) {
return proxyConfiguration.map(ClientConfigurations::createProxySelector).orElseGet(ProxySelector::getDefault);
}

private static boolean usesProxy(ProxySelector proxySelector, URI uri) {
try {
List<Proxy> proxies = proxySelector.select(uri);
return !proxies.stream().allMatch(proxy -> Proxy.Type.DIRECT.equals(proxy.type()));
} catch (RuntimeException e) {
// Fall back to the simple path without scheduling recurring DNS resolution.
return true;
}
}

@Nullable
private static URI tryParseUri(TaggedMetricRegistry metrics, @Safe String serviceName, @Unsafe String uri) {
try {
URI result = new URI(uri);
if (result.getHost() == null) {
log.error(
"Failed to correctly parse URI {} for service {} due to null host component. "
+ "This usually occurs due to invalid characters causing information to be "
+ "parsed in the wrong uri component, often the host info lands in the authority.",
UnsafeArg.of("uri", uri),
SafeArg.of("service", serviceName));
ClientUriMetrics.of(metrics).invalid(serviceName).mark();
}
return result;
} catch (URISyntaxException | RuntimeException e) {
log.error(
"Failed to parse URI {} for service {}",
UnsafeArg.of("uri", uri),
SafeArg.of("service", serviceName),
e);
ClientUriMetrics.of(metrics).invalid(serviceName).mark();
return null;
}
}

private DnsSupport() {}

// We define a concrete class here to avoid accidental lambda references to the
Expand Down
Loading
Loading