Skip to content

Commit

Permalink
Enforce ResourceGroup level query limits
Browse files Browse the repository at this point in the history
Enables query manager to kill a query which
exceeds its resource group allocation.
  • Loading branch information
kyleCampbel1 committed Jul 19, 2021
1 parent 242b6bf commit b237980
Show file tree
Hide file tree
Showing 21 changed files with 646 additions and 21 deletions.
Expand Up @@ -17,12 +17,13 @@
import io.airlift.units.Duration;

import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT;
import static java.lang.String.format;

public class ExceededCpuLimitException
extends PrestoException
{
public ExceededCpuLimitException(Duration duration)
public ExceededCpuLimitException(Duration limit, String limitSource)
{
super(EXCEEDED_CPU_LIMIT, "Exceeded CPU limit of " + duration.toString());
super(EXCEEDED_CPU_LIMIT, format("Exceeded CPU limit of %s defined at the %s level", limit, limitSource));
}
}
Expand Up @@ -30,9 +30,9 @@ public static ExceededMemoryLimitException exceededGlobalUserLimit(DataSize maxM
return new ExceededMemoryLimitException(EXCEEDED_GLOBAL_MEMORY_LIMIT, format("Query exceeded distributed user memory limit of %s", maxMemory));
}

public static ExceededMemoryLimitException exceededGlobalTotalLimit(DataSize maxMemory)
public static ExceededMemoryLimitException exceededGlobalTotalLimit(DataSize maxMemory, String limitSource)
{
return new ExceededMemoryLimitException(EXCEEDED_GLOBAL_MEMORY_LIMIT, format("Query exceeded distributed total memory limit of %s", maxMemory));
return new ExceededMemoryLimitException(EXCEEDED_GLOBAL_MEMORY_LIMIT, format("Query exceeded distributed total memory limit of %s defined at the %s", maxMemory, limitSource));
}

public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -184,4 +185,14 @@ public DataSize getUserMemoryReservation()
{
return new DataSize(0, BYTE);
}

@Override
public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
{
return Optional.empty();
}

@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{ }
}
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.prerequisites.QueryPrerequisites;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
Expand All @@ -38,6 +39,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.facebook.airlift.concurrent.MoreFutures.addExceptionCallback;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class LocalDispatchQuery
private final Consumer<DispatchQuery> queryQueuer;
private final Consumer<QueryExecution> querySubmitter;
private final SettableFuture<?> submitted = SettableFuture.create();
private final AtomicReference<Optional<ResourceGroupQueryLimits>> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty());

private final boolean retry;

Expand Down Expand Up @@ -185,6 +188,7 @@ private void startExecution(QueryExecution queryExecution)
queryExecutor.execute(() -> {
if (stateMachine.transitionToDispatching()) {
try {
resourceGroupQueryLimits.get().ifPresent(queryExecution::setResourceGroupQueryLimits);
querySubmitter.accept(queryExecution);
}
catch (Throwable t) {
Expand Down Expand Up @@ -350,6 +354,20 @@ public void addStateChangeListener(StateChangeListener<QueryState> stateChangeLi
stateMachine.addStateChangeListener(stateChangeListener);
}

@Override
public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
{
return resourceGroupQueryLimits.get();
}

@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{
if (!this.resourceGroupQueryLimits.compareAndSet(Optional.empty(), Optional.of(requireNonNull(resourceGroupQueryLimits, "resourceGroupQueryLimits is null")))) {
throw new IllegalStateException("Cannot set resourceGroupQueryLimits more than once");
}
}

private Optional<QueryExecution> tryGetQueryExecution()
{
try {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.resourceGroups.QueryType;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.Statement;
Expand All @@ -41,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -62,6 +64,7 @@ public class DataDefinitionExecution<T extends Statement>
private final AccessControl accessControl;
private final QueryStateMachine stateMachine;
private final List<Expression> parameters;
private final AtomicReference<Optional<ResourceGroupQueryLimits>> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty());

private DataDefinitionExecution(
DataDefinitionTask<T> task,
Expand Down Expand Up @@ -169,6 +172,20 @@ public DataSize getOutputDataSize()
return DataSize.succinctBytes(0);
}

@Override
public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
{
return resourceGroupQueryLimits.get();
}

@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{
if (!this.resourceGroupQueryLimits.compareAndSet(Optional.empty(), Optional.of(requireNonNull(resourceGroupQueryLimits, "resourceGroupQueryLimits is null")))) {
throw new IllegalStateException("Cannot set resourceGroupQueryLimits more than once");
}
}

@Override
public BasicQueryInfo getBasicQueryInfo()
{
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

Expand Down Expand Up @@ -47,6 +48,8 @@ public interface ManagedQueryExecution

BasicQueryInfo getBasicQueryInfo();

void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits);

boolean isDone();

/**
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.resourceGroups.QueryType;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.sql.planner.Plan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -71,6 +72,10 @@ public interface QueryExecution

VersionedMemoryPoolId getMemoryPool();

Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits();

void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits);

void setMemoryPool(VersionedMemoryPoolId poolId);

void start();
Expand Down
@@ -0,0 +1,78 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;

public class QueryLimit<T extends Comparable<T>>
{
private final T limit;
private final Source source;

private QueryLimit(
T limit,
Source source)
{
this.limit = requireNonNull(limit, "limit is null");
this.source = requireNonNull(source, "source is null");
}

public static QueryLimit<Duration> createDurationLimit(Duration limit, Source source)
{
return new QueryLimit<>(limit, source);
}

public static QueryLimit<DataSize> createDataSizeLimit(DataSize limit, Source source)
{
return new QueryLimit<>(limit, source);
}

public T getLimit()
{
return limit;
}

public Source getLimitSource()
{
return source;
}

public enum Source
{
QUERY,
SYSTEM,
RESOURCE_GROUP,
/**/;
}

@SafeVarargs
public static <S extends Comparable<S>> QueryLimit<S> getMinimum(QueryLimit<S> limit, QueryLimit<S>... limits)
{
Optional<QueryLimit<S>> queryLimit = Stream.concat(
limits != null ? Arrays.stream(limits) : Stream.empty(),
limit != null ? Stream.of(limit) : Stream.empty())
.filter(Objects::nonNull)
.min(Comparator.comparing(QueryLimit::getLimit));
return queryLimit.orElseThrow(() -> new IllegalArgumentException("At least one nonnull argument is required."));
}
}
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import org.joda.time.DateTime;
Expand All @@ -40,6 +41,10 @@

import static com.facebook.presto.SystemSessionProperties.getQueryMaxExecutionTime;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxRunTime;
import static com.facebook.presto.execution.QueryLimit.Source.QUERY;
import static com.facebook.presto.execution.QueryLimit.Source.RESOURCE_GROUP;
import static com.facebook.presto.execution.QueryLimit.createDurationLimit;
import static com.facebook.presto.execution.QueryLimit.getMinimum;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
Expand Down Expand Up @@ -208,11 +213,20 @@ private void enforceTimeLimits()
continue;
}
Duration queryMaxRunTime = getQueryMaxRunTime(query.getSession());
Duration queryMaxExecutionTime = getQueryMaxExecutionTime(query.getSession());
QueryLimit<Duration> queryMaxExecutionTime = getMinimum(
createDurationLimit(getQueryMaxExecutionTime(query.getSession()), QUERY),
query.getResourceGroupQueryLimits()
.flatMap(ResourceGroupQueryLimits::getExecutionTimeLimit)
.map(rgLimit -> createDurationLimit(rgLimit, RESOURCE_GROUP)).orElse(null));
Optional<DateTime> executionStartTime = query.getExecutionStartTime();
DateTime createTime = query.getCreateTime();
if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.toMillis()).isBeforeNow()) {
query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum execution time limit of " + queryMaxExecutionTime));
if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.getLimit().toMillis()).isBeforeNow()) {
query.fail(
new PrestoException(EXCEEDED_TIME_LIMIT,
format(
"Query exceeded the maximum execution time limit of %s defined at the %s level",
queryMaxExecutionTime.getLimit(),
queryMaxExecutionTime.getLimitSource().name())));
}
if (createTime.plus(queryMaxRunTime.toMillis()).isBeforeNow()) {
query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + queryMaxRunTime));
Expand Down Expand Up @@ -356,6 +370,8 @@ public interface TrackedQuery

Optional<DateTime> getEndTime();

Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits();

void fail(Throwable cause);

// XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history
Expand Down
Expand Up @@ -40,6 +40,7 @@
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.resourceGroups.QueryType;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.split.CloseableSplitSourceProvider;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.sql.analyzer.Analysis;
Expand Down Expand Up @@ -122,6 +123,7 @@ public class SqlQueryExecution
private final PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
private final AtomicReference<PlanVariableAllocator> variableAllocator = new AtomicReference<>();
private final PartialResultQueryManager partialResultQueryManager;
private final AtomicReference<Optional<ResourceGroupQueryLimits>> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty());

private SqlQueryExecution(
PreparedQuery preparedQuery,
Expand Down Expand Up @@ -393,6 +395,20 @@ public void addStateChangeListener(StateChangeListener<QueryState> stateChangeLi
}
}

@Override
public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
{
return resourceGroupQueryLimits.get();
}

@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{
if (!this.resourceGroupQueryLimits.compareAndSet(Optional.empty(), Optional.of(requireNonNull(resourceGroupQueryLimits, "resourceGroupQueryLimits is null")))) {
throw new IllegalStateException("Cannot set resourceGroupQueryLimits more than once");
}
}

@Override
public Session getSession()
{
Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.version.EmbedVersion;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -55,6 +56,11 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxCpuTime;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxOutputSize;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxScanRawInputBytes;
import static com.facebook.presto.execution.QueryLimit.Source.QUERY;
import static com.facebook.presto.execution.QueryLimit.Source.RESOURCE_GROUP;
import static com.facebook.presto.execution.QueryLimit.Source.SYSTEM;
import static com.facebook.presto.execution.QueryLimit.createDurationLimit;
import static com.facebook.presto.execution.QueryLimit.getMinimum;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -334,10 +340,15 @@ private void enforceCpuLimits()
{
for (QueryExecution query : queryTracker.getAllQueries()) {
Duration cpuTime = query.getTotalCpuTime();
Duration sessionLimit = getQueryMaxCpuTime(query.getSession());
Duration limit = Ordering.natural().min(maxQueryCpuTime, sessionLimit);
if (cpuTime.compareTo(limit) > 0) {
query.fail(new ExceededCpuLimitException(limit));
QueryLimit<Duration> queryMaxCpuTimeLimit = getMinimum(
createDurationLimit(maxQueryCpuTime, SYSTEM),
query.getResourceGroupQueryLimits()
.flatMap(ResourceGroupQueryLimits::getCpuTimeLimit)
.map(rgLimit -> createDurationLimit(rgLimit, RESOURCE_GROUP))
.orElse(null),
createDurationLimit(getQueryMaxCpuTime(query.getSession()), QUERY));
if (cpuTime.compareTo(queryMaxCpuTimeLimit.getLimit()) > 0) {
query.fail(new ExceededCpuLimitException(queryMaxCpuTimeLimit.getLimit(), queryMaxCpuTimeLimit.getLimitSource().name()));
}
}
}
Expand Down

0 comments on commit b237980

Please sign in to comment.