Skip to content

Commit

Permalink
Add support for scanned bytes limit on queries
Browse files Browse the repository at this point in the history
  • Loading branch information
shenh062326 authored and Praveen2112 committed Jul 17, 2020
1 parent d7ef958 commit 60aaa39
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 0 deletions.
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql;

import io.airlift.units.DataSize;
import io.prestosql.spi.PrestoException;

import static io.prestosql.spi.StandardErrorCode.EXCEEDED_SCAN_LIMIT;

public class ExceededScanLimitException
extends PrestoException
{
public ExceededScanLimitException(DataSize limit)
{
super(EXCEEDED_SCAN_LIMIT, "Exceeded scan limit of " + limit.toString());
}
}
Expand Up @@ -67,6 +67,7 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
public static final String RESOURCE_OVERCOMMIT = "resource_overcommit";
public static final String QUERY_MAX_CPU_TIME = "query_max_cpu_time";
public static final String QUERY_MAX_SCAN_PHYSICAL_BYTES = "query_max_scan_physical_bytes";
public static final String QUERY_MAX_STAGE_COUNT = "query_max_stage_count";
public static final String REDISTRIBUTE_WRITES = "redistribute_writes";
public static final String USE_PREFERRED_WRITE_PARTITIONING = "use_preferred_write_partitioning";
Expand Down Expand Up @@ -266,6 +267,11 @@ public SystemSessionProperties(
"Maximum amount of distributed total memory a query can use",
memoryManagerConfig.getMaxQueryTotalMemory(),
true),
dataSizeProperty(
QUERY_MAX_SCAN_PHYSICAL_BYTES,
"Maximum scan physical bytes of a query",
queryManagerConfig.getQueryMaxScanPhysicalBytes().orElse(null),
false),
booleanProperty(
RESOURCE_OVERCOMMIT,
"Use resources which are not guaranteed to be available to the query",
Expand Down Expand Up @@ -777,6 +783,11 @@ public static Duration getQueryMaxCpuTime(Session session)
return session.getSystemProperty(QUERY_MAX_CPU_TIME, Duration.class);
}

public static Optional<DataSize> getQueryMaxScanPhysicalBytes(Session session)
{
return Optional.ofNullable(session.getSystemProperty(QUERY_MAX_SCAN_PHYSICAL_BYTES, DataSize.class));
}

public static boolean isSpillEnabled(Session session)
{
return session.getSystemProperty(SPILL_ENABLED, Boolean.class);
Expand Down
Expand Up @@ -17,13 +17,15 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

@DefunctConfig({
Expand Down Expand Up @@ -60,6 +62,7 @@ public class QueryManagerConfig
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS);
private Optional<DataSize> queryMaxScanPhysicalBytes = Optional.empty();

private int requiredWorkers = 1;
private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES);
Expand Down Expand Up @@ -295,6 +298,19 @@ public QueryManagerConfig setQueryMaxCpuTime(Duration queryMaxCpuTime)
return this;
}

@NotNull
public Optional<DataSize> getQueryMaxScanPhysicalBytes()
{
return queryMaxScanPhysicalBytes;
}

@Config("query.max-scan-physical-bytes")
public QueryManagerConfig setQueryMaxScanPhysicalBytes(DataSize queryMaxScanPhysicalBytes)
{
this.queryMaxScanPhysicalBytes = Optional.ofNullable(queryMaxScanPhysicalBytes);
return this;
}

@Min(1)
public int getRemoteTaskMaxCallbackThreads()
{
Expand Down
Expand Up @@ -17,8 +17,10 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.ExceededCpuLimitException;
import io.prestosql.ExceededScanLimitException;
import io.prestosql.Session;
import io.prestosql.event.QueryMonitor;
import io.prestosql.execution.QueryExecution.QueryOutputInfo;
Expand All @@ -40,6 +42,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -51,6 +54,7 @@
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.prestosql.SystemSessionProperties.getQueryMaxCpuTime;
import static io.prestosql.SystemSessionProperties.getQueryMaxScanPhysicalBytes;
import static io.prestosql.execution.QueryState.RUNNING;
import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
Expand All @@ -68,6 +72,7 @@ public class SqlQueryManager
private final QueryTracker<QueryExecution> queryTracker;

private final Duration maxQueryCpuTime;
private final Optional<DataSize> maxQueryScanPhysicalBytes;

private final ExecutorService queryExecutor;
private final ThreadPoolExecutorMBean queryExecutorMBean;
Expand All @@ -82,6 +87,7 @@ public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMon
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");

this.maxQueryCpuTime = queryManagerConfig.getQueryMaxCpuTime();
this.maxQueryScanPhysicalBytes = queryManagerConfig.getQueryMaxScanPhysicalBytes();

this.queryExecutor = newCachedThreadPool(threadsNamed("query-scheduler-%s"));
this.queryExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) queryExecutor);
Expand Down Expand Up @@ -110,6 +116,13 @@ public void start()
catch (Throwable e) {
log.error(e, "Error enforcing query CPU time limits");
}

try {
enforceScanLimits();
}
catch (Throwable e) {
log.error(e, "Error enforcing query scan bytes limits");
}
}, 1, 1, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -300,4 +313,23 @@ private void enforceCpuLimits()
}
}
}

/**
* Enforce query scan physical bytes limits
*/
private void enforceScanLimits()
{
for (QueryExecution query : queryTracker.getAllQueries()) {
Optional<DataSize> limitOpt = getQueryMaxScanPhysicalBytes(query.getSession())
.flatMap(sessionLimit -> maxQueryScanPhysicalBytes.map(serverLimit -> Ordering.natural().min(serverLimit, sessionLimit)))
.or(() -> maxQueryScanPhysicalBytes);

limitOpt.ifPresent(limit -> {
DataSize scan = query.getBasicQueryInfo().getQueryStats().getPhysicalInputDataSize();
if (scan.compareTo(limit) > 0) {
query.fail(new ExceededScanLimitException(limit));
}
});
}
}
}
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.execution;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

Expand All @@ -23,6 +24,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.KILOBYTE;

public class TestQueryManagerConfig
{
Expand All @@ -49,6 +51,7 @@ public void testDefaults()
.setQueryMaxRunTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxExecutionTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS))
.setQueryMaxScanPhysicalBytes(null)
.setRequiredWorkers(1)
.setRequiredWorkersMaxWait(new Duration(5, TimeUnit.MINUTES)));
}
Expand Down Expand Up @@ -76,6 +79,7 @@ public void testExplicitPropertyMappings()
.put("query.max-run-time", "2h")
.put("query.max-execution-time", "3h")
.put("query.max-cpu-time", "2d")
.put("query.max-scan-physical-bytes", "1kB")
.put("query-manager.required-workers", "333")
.put("query-manager.required-workers-max-wait", "33m")
.build();
Expand All @@ -100,6 +104,7 @@ public void testExplicitPropertyMappings()
.setQueryMaxRunTime(new Duration(2, TimeUnit.HOURS))
.setQueryMaxExecutionTime(new Duration(3, TimeUnit.HOURS))
.setQueryMaxCpuTime(new Duration(2, TimeUnit.DAYS))
.setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE))
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES));

Expand Down
Expand Up @@ -145,6 +145,8 @@ public enum StandardErrorCode
EXCEEDED_SPILL_LIMIT(131078, INSUFFICIENT_RESOURCES),
EXCEEDED_LOCAL_MEMORY_LIMIT(131079, INSUFFICIENT_RESOURCES),
ADMINISTRATIVELY_PREEMPTED(131080, INSUFFICIENT_RESOURCES),
EXCEEDED_SCAN_LIMIT(131081, INSUFFICIENT_RESOURCES),

/**/;

// Connectors can use error codes starting at the range 0x0100_0000
Expand Down
Expand Up @@ -34,6 +34,7 @@
import static io.prestosql.execution.TestQueryRunnerUtil.createQuery;
import static io.prestosql.execution.TestQueryRunnerUtil.waitForQueryState;
import static io.prestosql.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT;
import static io.prestosql.spi.StandardErrorCode.EXCEEDED_SCAN_LIMIT;
import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -106,4 +107,17 @@ public void testQueryCpuLimit()
assertEquals(queryInfo.getErrorCode(), EXCEEDED_CPU_LIMIT.toErrorCode());
}
}

@Test(timeOut = 60_000L)
public void testQueryScanExceeded() throws Exception
{
try (DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder().setSingleExtraProperty("query.max-scan-physical-bytes", "0B").build()) {
QueryId queryId = createQuery(queryRunner, TEST_SESSION, "SELECT * FROM system.runtime.nodes");
waitForQueryState(queryRunner, queryId, FAILED);
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
BasicQueryInfo queryInfo = queryManager.getQueryInfo(queryId);
assertEquals(queryInfo.getState(), FAILED);
assertEquals(queryInfo.getErrorCode(), EXCEEDED_SCAN_LIMIT.toErrorCode());
}
}
}

0 comments on commit 60aaa39

Please sign in to comment.