Skip to content

Commit

Permalink
Remove no-fail memory reservation parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Apr 15, 2015
1 parent dd5984c commit 51320bb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 52 deletions.
Expand Up @@ -190,13 +190,10 @@ public DataSize getOperatorPreAllocatedMemory()
return pipelineContext.getOperatorPreAllocatedMemory();
}

public boolean reserveMemory(long bytes)
public void reserveMemory(long bytes)
{
boolean result = pipelineContext.reserveMemory(bytes);
if (result) {
memoryReservation.getAndAdd(bytes);
}
return result;
pipelineContext.reserveMemory(bytes);
memoryReservation.getAndAdd(bytes);
}

public void freeMemory(long bytes)
Expand Down
Expand Up @@ -219,11 +219,6 @@ public Page getOutput()
return null;
}

// Only partial aggregation can flush early. Also, check that we are not flushing tiny bits at a time
if (!finishing && step != Step.PARTIAL) {
throw new ExceededMemoryLimitException(operatorContext.getMaxMemorySize());
}

outputIterator = aggregationBuilder.build();
aggregationBuilder = null;

Expand Down Expand Up @@ -255,6 +250,7 @@ private static class GroupByHashAggregationBuilder
private final GroupByHash groupByHash;
private final List<Aggregator> aggregators;
private final OperatorContext operatorContext;
private final boolean partial;

private GroupByHashAggregationBuilder(
List<AccumulatorFactory> accumulatorFactories,
Expand All @@ -267,6 +263,7 @@ private GroupByHashAggregationBuilder(
{
this.groupByHash = createGroupByHash(groupByTypes, Ints.toArray(groupByChannels), hashChannel, expectedGroups);
this.operatorContext = operatorContext;
this.partial = (step == Step.PARTIAL);

// wrapper each function with an aggregator
ImmutableList.Builder<Aggregator> builder = ImmutableList.builder();
Expand Down Expand Up @@ -302,7 +299,16 @@ public boolean isFull()
if (memorySize < 0) {
memorySize = 0;
}
return operatorContext.setMemoryReservation(memorySize, true) != memorySize;
try {
operatorContext.setMemoryReservation(memorySize);
return false;
}
catch (ExceededMemoryLimitException e) {
if (partial) {
return true;
}
throw e;
}
}

public Iterator<Page> build()
Expand Down
Expand Up @@ -203,29 +203,14 @@ public DataSize getOperatorPreAllocatedMemory()
return driverContext.getOperatorPreAllocatedMemory();
}

public boolean reserveMemory(long bytes)
{
return reserveMemory(bytes, false);
}

public boolean reserveMemory(long bytes, boolean noFail)
public void reserveMemory(long bytes)
{
driverContext.reserveMemory(bytes);
long newReservation = memoryReservation.getAndAdd(bytes);
if (newReservation > maxMemoryReservation) {
memoryReservation.getAndAdd(-bytes);
if (!noFail) {
throw new ExceededMemoryLimitException(getMaxMemorySize());
}
return false;
throw new ExceededMemoryLimitException(getMaxMemorySize());
}
boolean result = driverContext.reserveMemory(bytes);
if (!result) {
memoryReservation.getAndAdd(-bytes);
if (!noFail) {
throw new ExceededMemoryLimitException(getMaxMemorySize());
}
}
return result;
}

public void freeMemory(long bytes)
Expand All @@ -234,25 +219,18 @@ public void freeMemory(long bytes)
memoryReservation.getAndAdd(-bytes);
}

public synchronized long setMemoryReservation(long newMemoryReservation)
{
return setMemoryReservation(newMemoryReservation, false);
}

public synchronized long setMemoryReservation(long newMemoryReservation, boolean noFail)
public synchronized void setMemoryReservation(long newMemoryReservation)
{
checkArgument(newMemoryReservation >= 0, "newMemoryReservation is negative");

long delta = newMemoryReservation - memoryReservation.get();

if (delta > 0) {
reserveMemory(delta, noFail);
reserveMemory(delta);
}
else {
freeMemory(-delta);
}

return memoryReservation.get();
}

public void setInfoSupplier(Supplier<Object> infoSupplier)
Expand Down
Expand Up @@ -198,13 +198,10 @@ public DataSize getOperatorPreAllocatedMemory()
return taskContext.getOperatorPreAllocatedMemory();
}

public synchronized boolean reserveMemory(long bytes)
public synchronized void reserveMemory(long bytes)
{
boolean result = taskContext.reserveMemory(bytes);
if (result) {
memoryReservation.getAndAdd(bytes);
}
return result;
taskContext.reserveMemory(bytes);
memoryReservation.getAndAdd(bytes);
}

public synchronized void freeMemory(long bytes)
Expand Down
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.operator;

import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.TaskId;
Expand Down Expand Up @@ -180,15 +181,14 @@ public DataSize getOperatorPreAllocatedMemory()
return operatorPreAllocatedMemory;
}

public synchronized boolean reserveMemory(long bytes)
public synchronized void reserveMemory(long bytes)
{
checkArgument(bytes >= 0, "bytes is negative");

if (memoryReservation.get() + bytes > maxMemory) {
return false;
throw new ExceededMemoryLimitException(getMaxMemorySize());
}
memoryReservation.getAndAdd(bytes);
return true;
}

public synchronized void freeMemory(long bytes)
Expand Down
Expand Up @@ -182,6 +182,7 @@ public void addInput(Page page)
if (topNBuilder == null) {
topNBuilder = new TopNBuilder(
n,
partial,
sortTypes,
sortChannels,
sortOrders,
Expand Down Expand Up @@ -211,9 +212,6 @@ public Page getOutput()
outputIterator = topNBuilder.build();
topNBuilder = null;
}
else {
throw new ExceededMemoryLimitException(operatorContext.getMaxMemorySize());
}
}

pageBuilder.reset();
Expand All @@ -232,6 +230,7 @@ public Page getOutput()
private static class TopNBuilder
{
private final int n;
private final boolean partial;
private final List<Type> sortTypes;
private final List<Integer> sortChannels;
private final List<SortOrder> sortOrders;
Expand All @@ -241,12 +240,14 @@ private static class TopNBuilder
private long memorySize;

private TopNBuilder(int n,
boolean partial,
List<Type> sortTypes,
List<Integer> sortChannels,
List<SortOrder> sortOrders,
OperatorContext operatorContext)
{
this.n = n;
this.partial = partial;

this.sortTypes = sortTypes;
this.sortChannels = sortChannels;
Expand Down Expand Up @@ -336,7 +337,16 @@ private boolean isFull()
if (memorySize < 0) {
memorySize = 0;
}
return operatorContext.setMemoryReservation(memorySize, true) != memorySize;
try {
operatorContext.setMemoryReservation(memorySize);
return false;
}
catch (ExceededMemoryLimitException e) {
if (partial) {
return true;
}
throw e;
}
}

public Iterator<Block[]> build()
Expand Down

0 comments on commit 51320bb

Please sign in to comment.