Skip to content

Commit

Permalink
Add coordinator selection timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Mar 18, 2019
1 parent 4850bb8 commit 8cf2e90
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 2 deletions.
Expand Up @@ -57,9 +57,12 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.airlift.concurrent.MoreFutures.addTimeout;
import static io.prestosql.dispatcher.QueryAnalysisResponse.analysisUnknown;
import static io.prestosql.spi.NodeState.ACTIVE;
import static io.prestosql.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.prestosql.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -79,6 +82,7 @@ public class QueryDispatcher
private final QueryTracker<DispatchQuery> dispatchQueryTracker;
private final RemoteCoordinatorMonitor remoteCoordinatorMonitor;
private final SessionPropertyManager sessionPropertyManager;
private final Duration coordinatorSelectionWaitTime;

@GuardedBy("this")
private boolean running;
Expand All @@ -100,7 +104,8 @@ public QueryDispatcher(
JsonCodec<QuerySubmissionResponse> querySubmissionResponseCodec,
JsonCodec<QueryAnalysis> queryAnalysisCodec,
JsonCodec<QueryAnalysisResponse> queryAnalysisResponseCodec,
JsonCodec<CoordinatorStatus> coordinatorStatusCodec)
JsonCodec<CoordinatorStatus> coordinatorStatusCodec,
RemoteDispatcherConfig config)
{
this.dispatchQueryTracker = requireNonNull(dispatchQueryTracker, "dispatchQueryTracker is null");
this.remoteCoordinatorMonitor = requireNonNull(remoteCoordinatorMonitor, "remoteCoordinatorMonitor is null");
Expand All @@ -116,6 +121,8 @@ public QueryDispatcher(
this.executor = dispatchExecutor.getExecutor();
this.scheduledExecutor = dispatchExecutor.getScheduledExecutor();
this.remoteCoordinators = new StateMachine<>("coordinators", executor, ImmutableMap.of());

coordinatorSelectionWaitTime = requireNonNull(config, "config is null").getCoordinatorSelectionWaitTime();
}

@PostConstruct
Expand Down Expand Up @@ -248,13 +255,22 @@ private ListenableFuture<RemoteCoordinator> getCoordinator(QueuedQueryStateMachi
{
Optional<TransactionId> transactionId = stateMachine.getSession().getTransactionId();
ListenableFuture<RemoteCoordinator> coordinatorFuture;
String timeoutMessage;
if (transactionId.isPresent()) {
coordinatorFuture = getCoordinatorForTransaction(transactionId.get());
timeoutMessage = format("Coordinator managing active transaction %s has been offline for %s.", transactionId.get(), coordinatorSelectionWaitTime);
}
else {
coordinatorFuture = selectRandomCoordinator();
timeoutMessage = format("No coordinators available withing %s.", coordinatorSelectionWaitTime);
}
return coordinatorFuture;
return addTimeout(
coordinatorFuture,
() -> {
throw new PrestoException(GENERIC_INSUFFICIENT_RESOURCES, timeoutMessage);
},
coordinatorSelectionWaitTime,
scheduledExecutor);
}

private ListenableFuture<RemoteCoordinator> getCoordinatorForTransaction(TransactionId transactionId)
Expand Down
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.dispatcher;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;

import javax.validation.constraints.NotNull;

import static java.util.concurrent.TimeUnit.MINUTES;

public class RemoteDispatcherConfig
{
private Duration coordinatorSelectionWaitTime = new Duration(5, MINUTES);

@NotNull
public Duration getCoordinatorSelectionWaitTime()
{
return coordinatorSelectionWaitTime;
}

@Config("dispatcher.coordinator-selection-max-wait")
@ConfigDescription("Maximum time to wait for valid coordinator before the query is failed")
public RemoteDispatcherConfig setCoordinatorSelectionWaitTime(Duration coordinatorSelectionWaitTime)
{
this.coordinatorSelectionWaitTime = coordinatorSelectionWaitTime;
return this;
}
}
Expand Up @@ -50,6 +50,7 @@
import io.prestosql.dispatcher.QuerySubmissionResponse;
import io.prestosql.dispatcher.RemoteCoordinatorMonitor;
import io.prestosql.dispatcher.RemoteDispatchQueryFactory;
import io.prestosql.dispatcher.RemoteDispatcherConfig;
import io.prestosql.dispatcher.SubmissionResource;
import io.prestosql.event.QueryMonitor;
import io.prestosql.event.QueryMonitorConfig;
Expand Down Expand Up @@ -256,6 +257,7 @@ protected void setup(Binder binder)
// remote dispatcher
binder.bind(DispatchQueryFactory.class).to(RemoteDispatchQueryFactory.class);
binder.bind(QueryDispatcher.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(RemoteDispatcherConfig.class);
binder.bind(RemoteCoordinatorMonitor.class).to(DiscoveryRemoteCoordinatorMonitor.class).in(Scopes.SINGLETON);
jaxrsBinder(binder).bind(SubmissionResource.class);
jsonCodecBinder(binder).bindJsonCodec(QuerySubmission.class);
Expand Down
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.dispatcher;

import com.google.common.collect.ImmutableMap;
import io.airlift.configuration.testing.ConfigAssertions;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.Map;

import static java.util.concurrent.TimeUnit.MINUTES;

public class TestRemoteDispatcherConfig
{
@Test
public void testDefaults()
{
ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(RemoteDispatcherConfig.class)
.setCoordinatorSelectionWaitTime(new Duration(5, MINUTES)));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("dispatcher.coordinator-selection-max-wait", "33m")
.build();

RemoteDispatcherConfig expected = new RemoteDispatcherConfig()
.setCoordinatorSelectionWaitTime(new Duration(33, MINUTES));

ConfigAssertions.assertFullMapping(properties, expected);
}
}

0 comments on commit 8cf2e90

Please sign in to comment.