From 8cf2e90ed78bd376df5fac6ec83bb87f3d6c7c82 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Mon, 11 Mar 2019 13:51:57 -0700 Subject: [PATCH] Add coordinator selection timeout --- .../prestosql/dispatcher/QueryDispatcher.java | 20 +++++++- .../dispatcher/RemoteDispatcherConfig.java | 41 +++++++++++++++++ .../prestosql/server/CoordinatorModule.java | 2 + .../TestRemoteDispatcherConfig.java | 46 +++++++++++++++++++ 4 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 presto-main/src/main/java/io/prestosql/dispatcher/RemoteDispatcherConfig.java create mode 100644 presto-main/src/test/java/io/prestosql/dispatcher/TestRemoteDispatcherConfig.java diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/QueryDispatcher.java b/presto-main/src/main/java/io/prestosql/dispatcher/QueryDispatcher.java index 2336e9df876f1..a9f82da788f3e 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/QueryDispatcher.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/QueryDispatcher.java @@ -57,9 +57,12 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.Futures.immediateFuture; +import static io.airlift.concurrent.MoreFutures.addTimeout; import static io.prestosql.dispatcher.QueryAnalysisResponse.analysisUnknown; import static io.prestosql.spi.NodeState.ACTIVE; +import static io.prestosql.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; import static io.prestosql.spi.StandardErrorCode.SERVER_SHUTTING_DOWN; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -79,6 +82,7 @@ public class QueryDispatcher private final QueryTracker dispatchQueryTracker; private final RemoteCoordinatorMonitor remoteCoordinatorMonitor; private final SessionPropertyManager sessionPropertyManager; + private final Duration coordinatorSelectionWaitTime; @GuardedBy("this") private boolean running; @@ -100,7 +104,8 @@ public QueryDispatcher( JsonCodec querySubmissionResponseCodec, JsonCodec queryAnalysisCodec, JsonCodec queryAnalysisResponseCodec, - JsonCodec coordinatorStatusCodec) + JsonCodec coordinatorStatusCodec, + RemoteDispatcherConfig config) { this.dispatchQueryTracker = requireNonNull(dispatchQueryTracker, "dispatchQueryTracker is null"); this.remoteCoordinatorMonitor = requireNonNull(remoteCoordinatorMonitor, "remoteCoordinatorMonitor is null"); @@ -116,6 +121,8 @@ public QueryDispatcher( this.executor = dispatchExecutor.getExecutor(); this.scheduledExecutor = dispatchExecutor.getScheduledExecutor(); this.remoteCoordinators = new StateMachine<>("coordinators", executor, ImmutableMap.of()); + + coordinatorSelectionWaitTime = requireNonNull(config, "config is null").getCoordinatorSelectionWaitTime(); } @PostConstruct @@ -248,13 +255,22 @@ private ListenableFuture getCoordinator(QueuedQueryStateMachi { Optional transactionId = stateMachine.getSession().getTransactionId(); ListenableFuture coordinatorFuture; + String timeoutMessage; if (transactionId.isPresent()) { coordinatorFuture = getCoordinatorForTransaction(transactionId.get()); + timeoutMessage = format("Coordinator managing active transaction %s has been offline for %s.", transactionId.get(), coordinatorSelectionWaitTime); } else { coordinatorFuture = selectRandomCoordinator(); + timeoutMessage = format("No coordinators available withing %s.", coordinatorSelectionWaitTime); } - return coordinatorFuture; + return addTimeout( + coordinatorFuture, + () -> { + throw new PrestoException(GENERIC_INSUFFICIENT_RESOURCES, timeoutMessage); + }, + coordinatorSelectionWaitTime, + scheduledExecutor); } private ListenableFuture getCoordinatorForTransaction(TransactionId transactionId) diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/RemoteDispatcherConfig.java b/presto-main/src/main/java/io/prestosql/dispatcher/RemoteDispatcherConfig.java new file mode 100644 index 0000000000000..db3ffad8b6ba8 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/dispatcher/RemoteDispatcherConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.dispatcher; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; + +import javax.validation.constraints.NotNull; + +import static java.util.concurrent.TimeUnit.MINUTES; + +public class RemoteDispatcherConfig +{ + private Duration coordinatorSelectionWaitTime = new Duration(5, MINUTES); + + @NotNull + public Duration getCoordinatorSelectionWaitTime() + { + return coordinatorSelectionWaitTime; + } + + @Config("dispatcher.coordinator-selection-max-wait") + @ConfigDescription("Maximum time to wait for valid coordinator before the query is failed") + public RemoteDispatcherConfig setCoordinatorSelectionWaitTime(Duration coordinatorSelectionWaitTime) + { + this.coordinatorSelectionWaitTime = coordinatorSelectionWaitTime; + return this; + } +} diff --git a/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java b/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java index 1d446b6afd670..118419920bf5b 100644 --- a/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java +++ b/presto-main/src/main/java/io/prestosql/server/CoordinatorModule.java @@ -50,6 +50,7 @@ import io.prestosql.dispatcher.QuerySubmissionResponse; import io.prestosql.dispatcher.RemoteCoordinatorMonitor; import io.prestosql.dispatcher.RemoteDispatchQueryFactory; +import io.prestosql.dispatcher.RemoteDispatcherConfig; import io.prestosql.dispatcher.SubmissionResource; import io.prestosql.event.QueryMonitor; import io.prestosql.event.QueryMonitorConfig; @@ -256,6 +257,7 @@ protected void setup(Binder binder) // remote dispatcher binder.bind(DispatchQueryFactory.class).to(RemoteDispatchQueryFactory.class); binder.bind(QueryDispatcher.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(RemoteDispatcherConfig.class); binder.bind(RemoteCoordinatorMonitor.class).to(DiscoveryRemoteCoordinatorMonitor.class).in(Scopes.SINGLETON); jaxrsBinder(binder).bind(SubmissionResource.class); jsonCodecBinder(binder).bindJsonCodec(QuerySubmission.class); diff --git a/presto-main/src/test/java/io/prestosql/dispatcher/TestRemoteDispatcherConfig.java b/presto-main/src/test/java/io/prestosql/dispatcher/TestRemoteDispatcherConfig.java new file mode 100644 index 0000000000000..56ce9c372ab1d --- /dev/null +++ b/presto-main/src/test/java/io/prestosql/dispatcher/TestRemoteDispatcherConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.dispatcher; + +import com.google.common.collect.ImmutableMap; +import io.airlift.configuration.testing.ConfigAssertions; +import io.airlift.units.Duration; +import org.testng.annotations.Test; + +import java.util.Map; + +import static java.util.concurrent.TimeUnit.MINUTES; + +public class TestRemoteDispatcherConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(RemoteDispatcherConfig.class) + .setCoordinatorSelectionWaitTime(new Duration(5, MINUTES))); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("dispatcher.coordinator-selection-max-wait", "33m") + .build(); + + RemoteDispatcherConfig expected = new RemoteDispatcherConfig() + .setCoordinatorSelectionWaitTime(new Duration(33, MINUTES)); + + ConfigAssertions.assertFullMapping(properties, expected); + } +}