Skip to content

Commit

Permalink
Flush join indexes when memory is constrained
Browse files Browse the repository at this point in the history
Add a separate limit for index memory usage that is independent of the task
memory limit. This removes the risk of the flushable memory usage causing
another operator to run out of memory.

The Loader will make the following attempts when loading:
1) Loading a batch of multiple requests, flushes cache if fails
2) Loading just the single required request, flushes if fails
3) Tries to load the required request with reduced number of positions by factors of 10
  • Loading branch information
dain authored and erichwang committed Aug 22, 2014
1 parent e6b2f44 commit 5fd8ac9
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 118 deletions.
Expand Up @@ -22,6 +22,11 @@ public class ExceededMemoryLimitException
{
public ExceededMemoryLimitException(DataSize maxMemory)
{
super(StandardErrorCode.EXCEEDED_MEMORY_LIMIT.toErrorCode(), String.format("Task exceeded max memory size of %s", maxMemory));
this(maxMemory, "Task");
}

public ExceededMemoryLimitException(DataSize maxMemory, String limitName)
{
super(StandardErrorCode.EXCEEDED_MEMORY_LIMIT.toErrorCode(), String.format("%s exceeded max memory size of %s", limitName, maxMemory));
}
}
Expand Up @@ -29,6 +29,7 @@ public class TaskManagerConfig
private boolean taskCpuTimerEnabled = true;
private DataSize maxTaskMemoryUsage = new DataSize(256, Unit.MEGABYTE);
private DataSize operatorPreAllocatedMemory = new DataSize(16, Unit.MEGABYTE);
private DataSize maxTaskIndexMemoryUsage = new DataSize(64, Unit.MEGABYTE);
private int maxShardProcessorThreads = Runtime.getRuntime().availableProcessors() * 4;

private DataSize sinkMaxBufferSize = new DataSize(32, Unit.MEGABYTE);
Expand Down Expand Up @@ -74,6 +75,19 @@ public TaskManagerConfig setOperatorPreAllocatedMemory(DataSize operatorPreAlloc
return this;
}

@NotNull
public DataSize getMaxTaskIndexMemoryUsage()
{
return maxTaskIndexMemoryUsage;
}

@Config("task.max-index-memory")
public TaskManagerConfig setMaxTaskIndexMemoryUsage(DataSize maxTaskIndexMemoryUsage)
{
this.maxTaskIndexMemoryUsage = maxTaskIndexMemoryUsage;
return this;
}

@Min(1)
public int getMaxShardProcessorThreads()
{
Expand Down
Expand Up @@ -318,4 +318,11 @@ public boolean isPartitioned()
{
return partitioned;
}

// hack for index joins
@Deprecated
public Executor getExecutor()
{
return executor;
}
}
@@ -0,0 +1,90 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.index;

import io.airlift.stats.CounterStat;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class IndexJoinLookupStats
{
private final CounterStat totalIndexJoinLookups = new CounterStat();
private final CounterStat successfulIndexJoinLookupsByCacheReset = new CounterStat();
private final CounterStat successfulIndexJoinLookupsBySingleRequest = new CounterStat();
private final CounterStat successfulIndexJoinLookupsByLimitedRequest = new CounterStat();
private final CounterStat failedIndexJoinLookups = new CounterStat();

@Managed
@Nested
public CounterStat getTotalIndexJoinLookups()
{
return totalIndexJoinLookups;
}

@Managed
@Nested
public CounterStat getSuccessfulIndexJoinLookupsByCacheReset()
{
return successfulIndexJoinLookupsByCacheReset;
}

@Managed
@Nested
public CounterStat getSuccessfulIndexJoinLookupsBySingleRequest()
{
return successfulIndexJoinLookupsBySingleRequest;
}

@Managed
@Nested
public CounterStat getSuccessfulIndexJoinLookupsByLimitedRequest()
{
return successfulIndexJoinLookupsByLimitedRequest;
}

@Managed
@Nested
public CounterStat getFailedIndexJoinLookups()
{
return failedIndexJoinLookups;
}

public void recordIndexJoinLookup()
{
totalIndexJoinLookups.update(1);
}

public void recordSuccessfulIndexJoinLookupByCacheReset()
{
successfulIndexJoinLookupsByCacheReset.update(1);
}

public void recordSuccessfulIndexJoinLookupBySingleRequest()
{
successfulIndexJoinLookupsBySingleRequest.update(1);
}

public void recordSuccessfulIndexJoinLookupByLimitedRequest()
{
successfulIndexJoinLookupsByLimitedRequest.update(1);
}

public void recordFailedIndexJoinLookup()
{
failedIndexJoinLookups.update(1);
}
}

0 comments on commit 5fd8ac9

Please sign in to comment.