Skip to content

Commit

Permalink
Remove task.max-memory config
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Aug 26, 2015
1 parent fb53d17 commit b64a18e
Show file tree
Hide file tree
Showing 20 changed files with 39 additions and 69 deletions.
Expand Up @@ -110,7 +110,6 @@ protected Map<String, Long> runOnce()
TaskContext taskContext = new QueryContext(new DataSize(256, MEGABYTE), memoryPool, systemMemoryPool, executor) TaskContext taskContext = new QueryContext(new DataSize(256, MEGABYTE), memoryPool, systemMemoryPool, executor)
.addTaskContext(new TaskStateMachine(new TaskId("query", "stage", "task"), executor), .addTaskContext(new TaskStateMachine(new TaskId("query", "stage", "task"), executor),
session, session,
new DataSize(256, MEGABYTE),
new DataSize(1, MEGABYTE), new DataSize(1, MEGABYTE),
false, false,
false); false);
Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/release/release-0.116.rst
Expand Up @@ -25,3 +25,4 @@ General Changes
* Add :func:`multimap_agg` function. * Add :func:`multimap_agg` function.
* Removed ``experimental.cluster-memory-manager-enabled`` config. The cluster * Removed ``experimental.cluster-memory-manager-enabled`` config. The cluster
memory manager is now always enabled. memory manager is now always enabled.
* Removed ``task.max-memory`` config.
Expand Up @@ -17,20 +17,26 @@
import io.airlift.units.DataSize; import io.airlift.units.DataSize;


import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_MEMORY_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_MEMORY_LIMIT;
import static java.lang.String.format;


public class ExceededMemoryLimitException public class ExceededMemoryLimitException
extends PrestoException extends PrestoException
{ {
private final DataSize maxMemory; private final DataSize maxMemory;


public ExceededMemoryLimitException(DataSize maxMemory) public static ExceededMemoryLimitException exceededGlobalLimit(DataSize maxMemory)
{ {
this("Task", maxMemory); return new ExceededMemoryLimitException(maxMemory, format("Query exceeded max memory size of %s", maxMemory));
} }


public ExceededMemoryLimitException(String entity, DataSize maxMemory) public static ExceededMemoryLimitException exceededLocalLimit(DataSize maxMemory)
{ {
super(EXCEEDED_MEMORY_LIMIT, String.format("%s exceeded max memory size of %s", entity, maxMemory)); return new ExceededMemoryLimitException(maxMemory, format("Query exceeded local memory limit of %s", maxMemory));
}

private ExceededMemoryLimitException(DataSize maxMemory, String message)
{
super(EXCEEDED_MEMORY_LIMIT, message);
this.maxMemory = maxMemory; this.maxMemory = maxMemory;
} }


Expand Down
Expand Up @@ -40,7 +40,6 @@ public class SqlTaskExecutionFactory


private final LocalExecutionPlanner planner; private final LocalExecutionPlanner planner;
private final QueryMonitor queryMonitor; private final QueryMonitor queryMonitor;
private final DataSize maxTaskMemoryUsage;
private final DataSize operatorPreAllocatedMemory; private final DataSize operatorPreAllocatedMemory;
private final boolean verboseStats; private final boolean verboseStats;
private final boolean cpuTimerEnabled; private final boolean cpuTimerEnabled;
Expand All @@ -57,7 +56,6 @@ public SqlTaskExecutionFactory(
this.planner = checkNotNull(planner, "planner is null"); this.planner = checkNotNull(planner, "planner is null");
this.queryMonitor = checkNotNull(queryMonitor, "queryMonitor is null"); this.queryMonitor = checkNotNull(queryMonitor, "queryMonitor is null");
requireNonNull(config, "config is null"); requireNonNull(config, "config is null");
this.maxTaskMemoryUsage = config.getMaxTaskMemoryUsage();
this.operatorPreAllocatedMemory = config.getOperatorPreAllocatedMemory(); this.operatorPreAllocatedMemory = config.getOperatorPreAllocatedMemory();
this.verboseStats = config.isVerboseStats(); this.verboseStats = config.isVerboseStats();
this.cpuTimerEnabled = config.isTaskCpuTimerEnabled(); this.cpuTimerEnabled = config.isTaskCpuTimerEnabled();
Expand All @@ -69,7 +67,6 @@ public SqlTaskExecution create(Session session, QueryContext queryContext, TaskS
TaskContext taskContext = queryContext.addTaskContext( TaskContext taskContext = queryContext.addTaskContext(
taskStateMachine, taskStateMachine,
session, session,
maxTaskMemoryUsage,
checkNotNull(operatorPreAllocatedMemory, "operatorPreAllocatedMemory is null"), checkNotNull(operatorPreAllocatedMemory, "operatorPreAllocatedMemory is null"),
verboseStats, verboseStats,
cpuTimerEnabled); cpuTimerEnabled);
Expand Down
Expand Up @@ -28,12 +28,11 @@


import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


@DefunctConfig("experimental.big-query-max-task-memory") @DefunctConfig({"experimental.big-query-max-task-memory", "task.max-memory"})
public class TaskManagerConfig public class TaskManagerConfig
{ {
private boolean verboseStats; private boolean verboseStats;
private boolean taskCpuTimerEnabled = true; private boolean taskCpuTimerEnabled = true;
private DataSize maxTaskMemoryUsage = new DataSize(256, Unit.MEGABYTE);
private DataSize maxPartialAggregationMemoryUsage = new DataSize(16, Unit.MEGABYTE); private DataSize maxPartialAggregationMemoryUsage = new DataSize(16, Unit.MEGABYTE);
private DataSize operatorPreAllocatedMemory = new DataSize(16, Unit.MEGABYTE); private DataSize operatorPreAllocatedMemory = new DataSize(16, Unit.MEGABYTE);
private DataSize maxTaskIndexMemoryUsage = new DataSize(64, Unit.MEGABYTE); private DataSize maxTaskIndexMemoryUsage = new DataSize(64, Unit.MEGABYTE);
Expand Down Expand Up @@ -101,19 +100,6 @@ public TaskManagerConfig setMaxPartialAggregationMemoryUsage(DataSize maxPartial
return this; return this;
} }


@NotNull
public DataSize getMaxTaskMemoryUsage()
{
return maxTaskMemoryUsage;
}

@Config("task.max-memory")
public TaskManagerConfig setMaxTaskMemoryUsage(DataSize maxTaskMemoryUsage)
{
this.maxTaskMemoryUsage = maxTaskMemoryUsage;
return this;
}

@NotNull @NotNull
public DataSize getOperatorPreAllocatedMemory() public DataSize getOperatorPreAllocatedMemory()
{ {
Expand Down
Expand Up @@ -13,7 +13,6 @@
*/ */
package com.facebook.presto.memory; package com.facebook.presto.memory;


import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.execution.LocationFactory; import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.QueryExecution; import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryIdGenerator; import com.facebook.presto.execution.QueryIdGenerator;
Expand All @@ -28,8 +27,6 @@
import io.airlift.json.JsonCodec; import io.airlift.json.JsonCodec;
import io.airlift.log.Logger; import io.airlift.log.Logger;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;

import org.weakref.jmx.JmxException; import org.weakref.jmx.JmxException;
import org.weakref.jmx.MBeanExporter; import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed; import org.weakref.jmx.Managed;
Expand All @@ -46,12 +43,15 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


import static com.facebook.presto.ExceededMemoryLimitException.exceededGlobalLimit;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemory;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL; import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL; import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemory;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList; import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet; import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
import static com.google.common.collect.Sets.difference; import static com.google.common.collect.Sets.difference;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.succinctDataSize;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public class ClusterMemoryManager public class ClusterMemoryManager
Expand Down Expand Up @@ -110,7 +110,8 @@ public void process(Iterable<QueryExecution> queries)
long queryMemoryLimit = Math.min(maxQueryMemory.toBytes(), sessionMaxQueryMemory.toBytes()); long queryMemoryLimit = Math.min(maxQueryMemory.toBytes(), sessionMaxQueryMemory.toBytes());
totalBytes += bytes; totalBytes += bytes;
if (bytes > queryMemoryLimit) { if (bytes > queryMemoryLimit) {
query.fail(new ExceededMemoryLimitException("Query", DataSize.succinctDataSize(queryMemoryLimit, Unit.BYTE))); DataSize maxMemory = succinctDataSize(queryMemoryLimit, BYTE);
query.fail(exceededGlobalLimit(maxMemory));
} }
} }
clusterMemoryUsageBytes.set(totalBytes); clusterMemoryUsageBytes.set(totalBytes);
Expand Down
Expand Up @@ -16,7 +16,6 @@
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.execution.TaskStateMachine; import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.operator.TaskContext; import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.spi.PrestoException;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -29,7 +28,7 @@
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_MEMORY_LIMIT; import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalLimit;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


Expand Down Expand Up @@ -63,7 +62,7 @@ public synchronized ListenableFuture<?> reserveMemory(long bytes)
checkArgument(bytes >= 0, "bytes is negative"); checkArgument(bytes >= 0, "bytes is negative");


if (reserved + bytes > maxMemory) { if (reserved + bytes > maxMemory) {
throw new PrestoException(EXCEEDED_MEMORY_LIMIT, "Query exceeded local memory limit of " + new DataSize(maxMemory, DataSize.Unit.BYTE).convertToMostSuccinctDataSize()); throw exceededLocalLimit(new DataSize(maxMemory, DataSize.Unit.BYTE).convertToMostSuccinctDataSize());
} }
ListenableFuture<?> future = memoryPool.reserve(bytes); ListenableFuture<?> future = memoryPool.reserve(bytes);
reserved += bytes; reserved += bytes;
Expand Down Expand Up @@ -138,9 +137,9 @@ public void onFailure(Throwable t)
}); });
} }


public TaskContext addTaskContext(TaskStateMachine taskStateMachine, Session session, DataSize maxTaskMemory, DataSize operatorPreAllocatedMemory, boolean verboseStats, boolean cpuTimerEnabled) public TaskContext addTaskContext(TaskStateMachine taskStateMachine, Session session, DataSize operatorPreAllocatedMemory, boolean verboseStats, boolean cpuTimerEnabled)
{ {
TaskContext taskContext = new TaskContext(this, taskStateMachine, executor, session, maxTaskMemory, operatorPreAllocatedMemory, verboseStats, cpuTimerEnabled); TaskContext taskContext = new TaskContext(this, taskStateMachine, executor, session, operatorPreAllocatedMemory, verboseStats, cpuTimerEnabled);
taskContexts.add(taskContext); taskContexts.add(taskContext);
return taskContext; return taskContext;
} }
Expand Down
Expand Up @@ -13,7 +13,6 @@
*/ */
package com.facebook.presto.operator; package com.facebook.presto.operator;


import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.spi.Page; import com.facebook.presto.spi.Page;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
Expand All @@ -34,6 +33,7 @@
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;


import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalLimit;
import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY; import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void onFailure(Throwable t)
long newReservation = memoryReservation.addAndGet(bytes); long newReservation = memoryReservation.addAndGet(bytes);
if (newReservation > maxMemoryReservation) { if (newReservation > maxMemoryReservation) {
memoryReservation.getAndAdd(-bytes); memoryReservation.getAndAdd(-bytes);
throw new ExceededMemoryLimitException(new DataSize(maxMemoryReservation, BYTE)); throw exceededLocalLimit(new DataSize(maxMemoryReservation, BYTE));
} }
} }


Expand Down
Expand Up @@ -13,7 +13,6 @@
*/ */
package com.facebook.presto.operator; package com.facebook.presto.operator;


import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.Session; import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.TaskId;
Expand Down Expand Up @@ -52,7 +51,6 @@ public class TaskContext
private final Executor executor; private final Executor executor;
private final Session session; private final Session session;


private final long maxMemory;
private final DataSize operatorPreAllocatedMemory; private final DataSize operatorPreAllocatedMemory;


private final AtomicLong memoryReservation = new AtomicLong(); private final AtomicLong memoryReservation = new AtomicLong();
Expand All @@ -76,7 +74,6 @@ public TaskContext(QueryContext queryContext,
TaskStateMachine taskStateMachine, TaskStateMachine taskStateMachine,
Executor executor, Executor executor,
Session session, Session session,
DataSize maxMemory,
DataSize operatorPreAllocatedMemory, DataSize operatorPreAllocatedMemory,
boolean verboseStats, boolean verboseStats,
boolean cpuTimerEnabled) boolean cpuTimerEnabled)
Expand All @@ -85,7 +82,6 @@ public TaskContext(QueryContext queryContext,
this.queryContext = requireNonNull(queryContext, "queryContext is null"); this.queryContext = requireNonNull(queryContext, "queryContext is null");
this.executor = checkNotNull(executor, "executor is null"); this.executor = checkNotNull(executor, "executor is null");
this.session = session; this.session = session;
this.maxMemory = checkNotNull(maxMemory, "maxMemory is null").toBytes();
this.operatorPreAllocatedMemory = checkNotNull(operatorPreAllocatedMemory, "operatorPreAllocatedMemory is null"); this.operatorPreAllocatedMemory = checkNotNull(operatorPreAllocatedMemory, "operatorPreAllocatedMemory is null");


taskStateMachine.addStateChangeListener(new StateChangeListener<TaskState>() taskStateMachine.addStateChangeListener(new StateChangeListener<TaskState>()
Expand Down Expand Up @@ -155,9 +151,6 @@ public synchronized ListenableFuture<?> reserveMemory(long bytes)
{ {
checkArgument(bytes >= 0, "bytes is negative"); checkArgument(bytes >= 0, "bytes is negative");


if (memoryReservation.get() + bytes > maxMemory) {
throw new ExceededMemoryLimitException(new DataSize(maxMemory, BYTE).convertToMostSuccinctDataSize());
}
ListenableFuture<?> future = queryContext.reserveMemory(bytes); ListenableFuture<?> future = queryContext.reserveMemory(bytes);
memoryReservation.getAndAdd(bytes); memoryReservation.getAndAdd(bytes);
return future; return future;
Expand All @@ -175,9 +168,6 @@ public synchronized boolean tryReserveMemory(long bytes)
{ {
checkArgument(bytes >= 0, "bytes is negative"); checkArgument(bytes >= 0, "bytes is negative");


if (memoryReservation.get() + bytes > maxMemory) {
return false;
}
if (queryContext.tryReserveMemory(bytes)) { if (queryContext.tryReserveMemory(bytes)) {
memoryReservation.getAndAdd(bytes); memoryReservation.getAndAdd(bytes);
return true; return true;
Expand Down
Expand Up @@ -13,7 +13,6 @@
*/ */
package com.facebook.presto.operator.aggregation; package com.facebook.presto.operator.aggregation;


import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus; import com.facebook.presto.spi.block.BlockBuilderStatus;
Expand All @@ -25,6 +24,7 @@
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;


import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalLimit;
import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.type.TypeUtils.expectedValueSize; import static com.facebook.presto.type.TypeUtils.expectedValueSize;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -133,7 +133,6 @@ public void add(int position, Block block, long count)
} }


addNewGroup(hashPosition, position, block, count); addNewGroup(hashPosition, position, block, count);
return;
} }


private void addNewGroup(int hashPosition, int position, Block block, long count) private void addNewGroup(int hashPosition, int position, Block block, long count)
Expand All @@ -148,10 +147,8 @@ private void addNewGroup(int hashPosition, int position, Block block, long count
} }


if (getEstimatedSize() > FOUR_MEGABYTES) { if (getEstimatedSize() > FOUR_MEGABYTES) {
throw new ExceededMemoryLimitException(new DataSize(4, MEGABYTE)); throw exceededLocalLimit(new DataSize(4, MEGABYTE));
} }

return;
} }


private void rehash(int size) private void rehash(int size)
Expand Down
Expand Up @@ -13,14 +13,14 @@
*/ */
package com.facebook.presto.operator.aggregation; package com.facebook.presto.operator.aggregation;


import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus; import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.facebook.presto.util.array.IntBigArray; import com.facebook.presto.util.array.IntBigArray;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;


import static com.facebook.presto.ExceededMemoryLimitException.exceededLocalLimit;
import static com.facebook.presto.type.TypeUtils.hashPosition; import static com.facebook.presto.type.TypeUtils.hashPosition;
import static com.facebook.presto.type.TypeUtils.positionEqualsPosition; import static com.facebook.presto.type.TypeUtils.positionEqualsPosition;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -130,7 +130,7 @@ private void addNewElement(int hashPosition, Block block, int position)
{ {
elementType.appendTo(block, position, elementBlock); elementType.appendTo(block, position, elementBlock);
if (elementBlock.getSizeInBytes() > FOUR_MEGABYTES) { if (elementBlock.getSizeInBytes() > FOUR_MEGABYTES) {
throw new ExceededMemoryLimitException(new DataSize(4, MEGABYTE)); throw exceededLocalLimit(new DataSize(4, MEGABYTE));
} }
blockPositionByHash.set(hashPosition, elementBlock.getPositionCount() - 1); blockPositionByHash.set(hashPosition, elementBlock.getPositionCount() - 1);


Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


import static com.facebook.presto.util.Threads.checkNotSameThreadExecutor; import static com.facebook.presto.util.Threads.checkNotSameThreadExecutor;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE;


Expand All @@ -45,16 +44,15 @@ public static TaskContext createTaskContext(Executor executor, Session session,
{ {
MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(1, GIGABYTE)); MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(1, GIGABYTE));
MemoryPool systemMemoryPool = new MemoryPool(new MemoryPoolId("testSystem"), new DataSize(1, GIGABYTE)); MemoryPool systemMemoryPool = new MemoryPool(new MemoryPoolId("testSystem"), new DataSize(1, GIGABYTE));
QueryContext queryContext = new QueryContext(new DataSize(10, MEGABYTE), memoryPool, systemMemoryPool, executor); QueryContext queryContext = new QueryContext(maxMemory, memoryPool, systemMemoryPool, executor);
return createTaskContext(queryContext, executor, session, maxMemory, new DataSize(1, MEGABYTE)); return createTaskContext(queryContext, executor, session, new DataSize(1, MEGABYTE));
} }


public static TaskContext createTaskContext(QueryContext queryContext, Executor executor, Session session, DataSize maxMemory, DataSize preallocated) public static TaskContext createTaskContext(QueryContext queryContext, Executor executor, Session session, DataSize preallocated)
{ {
return queryContext.addTaskContext( return queryContext.addTaskContext(
new TaskStateMachine(new TaskId("query", "stage", "task"), checkNotSameThreadExecutor(executor, "executor is null")), new TaskStateMachine(new TaskId("query", "stage", "task"), checkNotSameThreadExecutor(executor, "executor is null")),
session, session,
checkNotNull(maxMemory, "maxMemory is null"),
preallocated, preallocated,
true, true,
true); true);
Expand Down
Expand Up @@ -146,7 +146,7 @@ public MockRemoteTask(TaskId taskId,


MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(1, GIGABYTE)); MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(1, GIGABYTE));
MemoryPool memorySystemPool = new MemoryPool(new MemoryPoolId("testSystem"), new DataSize(1, GIGABYTE)); MemoryPool memorySystemPool = new MemoryPool(new MemoryPoolId("testSystem"), new DataSize(1, GIGABYTE));
this.taskContext = new QueryContext(new DataSize(1, MEGABYTE), memoryPool, memorySystemPool, executor).addTaskContext(taskStateMachine, TEST_SESSION, new DataSize(256, MEGABYTE), new DataSize(1, MEGABYTE), true, true); this.taskContext = new QueryContext(new DataSize(1, MEGABYTE), memoryPool, memorySystemPool, executor).addTaskContext(taskStateMachine, TEST_SESSION, new DataSize(1, MEGABYTE), true, true);


this.location = URI.create("fake://task/" + taskId); this.location = URI.create("fake://task/" + taskId);


Expand Down
Expand Up @@ -39,7 +39,6 @@ public void testDefaults()
.setMinDrivers(Runtime.getRuntime().availableProcessors() * 4 * 2) .setMinDrivers(Runtime.getRuntime().availableProcessors() * 4 * 2)
.setInfoMaxAge(new Duration(15, TimeUnit.MINUTES)) .setInfoMaxAge(new Duration(15, TimeUnit.MINUTES))
.setClientTimeout(new Duration(2, TimeUnit.MINUTES)) .setClientTimeout(new Duration(2, TimeUnit.MINUTES))
.setMaxTaskMemoryUsage(new DataSize(256, Unit.MEGABYTE))
.setMaxTaskIndexMemoryUsage(new DataSize(64, Unit.MEGABYTE)) .setMaxTaskIndexMemoryUsage(new DataSize(64, Unit.MEGABYTE))
.setOperatorPreAllocatedMemory(new DataSize(16, Unit.MEGABYTE)) .setOperatorPreAllocatedMemory(new DataSize(16, Unit.MEGABYTE))
.setMaxPartialAggregationMemoryUsage(new DataSize(16, Unit.MEGABYTE)) .setMaxPartialAggregationMemoryUsage(new DataSize(16, Unit.MEGABYTE))
Expand All @@ -56,7 +55,6 @@ public void testExplicitPropertyMappings()
.put("task.info-refresh-max-wait", "1s") .put("task.info-refresh-max-wait", "1s")
.put("task.verbose-stats", "true") .put("task.verbose-stats", "true")
.put("task.cpu-timer-enabled", "false") .put("task.cpu-timer-enabled", "false")
.put("task.max-memory", "2GB")
.put("task.max-index-memory", "512MB") .put("task.max-index-memory", "512MB")
.put("task.operator-pre-allocated-memory", "2MB") .put("task.operator-pre-allocated-memory", "2MB")
.put("task.max-partial-aggregation-memory", "32MB") .put("task.max-partial-aggregation-memory", "32MB")
Expand All @@ -74,7 +72,6 @@ public void testExplicitPropertyMappings()
.setInfoRefreshMaxWait(new Duration(1, TimeUnit.SECONDS)) .setInfoRefreshMaxWait(new Duration(1, TimeUnit.SECONDS))
.setVerboseStats(true) .setVerboseStats(true)
.setTaskCpuTimerEnabled(false) .setTaskCpuTimerEnabled(false)
.setMaxTaskMemoryUsage(new DataSize(2, Unit.GIGABYTE))
.setMaxTaskIndexMemoryUsage(new DataSize(512, Unit.MEGABYTE)) .setMaxTaskIndexMemoryUsage(new DataSize(512, Unit.MEGABYTE))
.setOperatorPreAllocatedMemory(new DataSize(2, Unit.MEGABYTE)) .setOperatorPreAllocatedMemory(new DataSize(2, Unit.MEGABYTE))
.setMaxPartialAggregationMemoryUsage(new DataSize(32, Unit.MEGABYTE)) .setMaxPartialAggregationMemoryUsage(new DataSize(32, Unit.MEGABYTE))
Expand Down
Expand Up @@ -55,7 +55,7 @@ public void testBlocking()


QueryContext queryContext = new QueryContext(new DataSize(10, MEGABYTE), pool, systemPool, localQueryRunner.getExecutor()); QueryContext queryContext = new QueryContext(new DataSize(10, MEGABYTE), pool, systemPool, localQueryRunner.getExecutor());
LocalQueryRunner.MaterializedOutputFactory outputFactory = new LocalQueryRunner.MaterializedOutputFactory(); LocalQueryRunner.MaterializedOutputFactory outputFactory = new LocalQueryRunner.MaterializedOutputFactory();
TaskContext taskContext = createTaskContext(queryContext, localQueryRunner.getExecutor(), session, new DataSize(10, MEGABYTE), new DataSize(0, BYTE)); TaskContext taskContext = createTaskContext(queryContext, localQueryRunner.getExecutor(), session, new DataSize(0, BYTE));
Driver driver = Iterables.getOnlyElement(localQueryRunner.createDrivers("SELECT COUNT(*), clerk FROM orders GROUP BY clerk", outputFactory, taskContext)); Driver driver = Iterables.getOnlyElement(localQueryRunner.createDrivers("SELECT COUNT(*), clerk FROM orders GROUP BY clerk", outputFactory, taskContext));


// run driver, until it blocks // run driver, until it blocks
Expand Down

0 comments on commit b64a18e

Please sign in to comment.