Skip to content

Commit

Permalink
HIVE-19866 : improve LLAP cache purge (Sergey Shelukhin, reviewed by …
Browse files Browse the repository at this point in the history
…Gopal Vijayaraghavan)
  • Loading branch information
sershe-apache committed Jun 14, 2018
1 parent f41acae commit eade25a
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 19 deletions.
Expand Up @@ -145,4 +145,16 @@ public void debugDumpShort(StringBuilder sb) {
public void updateMaxSize(long maxSize) {
this.maxSize = maxSize;
}

public long purge() {
if (evictor == null) return 0;
long evicted = evictor.purge();
if (evicted == 0) return 0;
long usedMem = -1;
do {
usedMem = usedMemory.get();
} while (!usedMemory.compareAndSet(usedMem, usedMem - evicted));
metrics.incrCacheCapacityUsed(-evicted);
return evicted;
}
}
Expand Up @@ -57,18 +57,19 @@ private final double expirePriority(long time, long lastAccess, double previous)
* Perhaps we should use ConcurrentDoubleLinkedList (in public domain).
* ONLY LIST REMOVAL is allowed under list lock.
*/
private final LlapCacheableBuffer[] heap;
private LlapCacheableBuffer[] heap;
private final Object heapLock = new Object();
private final ReentrantLock listLock = new ReentrantLock();
private LlapCacheableBuffer listHead, listTail;
/** Number of elements. */
private int heapSize = 0;
private final int maxHeapSize;
private EvictionListener evictionListener;
private LlapOomDebugDump parentDebugDump;

public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
int maxHeapSize = -1;
if (lambda == 0) {
maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
} else {
Expand Down Expand Up @@ -121,7 +122,7 @@ public void notifyUnlock(LlapCacheableBuffer buffer) {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
}
synchronized (heap) {
synchronized (heapLock) {
// First, update buffer priority - we have just been using it.
buffer.priority = (buffer.lastUpdate == -1) ? F0
: touchPriority(time, buffer.lastUpdate, buffer.priority);
Expand Down Expand Up @@ -180,11 +181,75 @@ public void setParentDebugDumper(LlapOomDebugDump dumper) {

@Override
public long purge() {
long evicted = evictSomeBlocks(Long.MAX_VALUE);
LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", LlapUtil.humanReadableByteCount(evicted));
long evicted = 0;
LlapCacheableBuffer oldTail = null;
listLock.lock();
try {
LlapCacheableBuffer current = oldTail = listTail;
while (current != null) {
boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK != current.invalidate();
current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
if (canEvict) {
current = current.prev;
} else {
// Remove from the list.
LlapCacheableBuffer newCurrent = current.prev;
oldTail = removeFromLocalList(oldTail, current);
current = newCurrent;
}
}
listHead = listTail = null;
} finally {
listLock.unlock();
}

LlapCacheableBuffer[] oldHeap = null;
int oldHeapSize = -1;
synchronized (heapLock) {
oldHeap = heap;
oldHeapSize = heapSize;
heap = new LlapCacheableBuffer[maxHeapSize];
heapSize = 0;
for (int i = 0; i < oldHeapSize; ++i) {
LlapCacheableBuffer result = oldHeap[i];
result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
int invalidateResult = result.invalidate();
if (invalidateResult != LlapCacheableBuffer.INVALIDATE_OK) {
oldHeap[i] = null; // Removed from heap without evicting.
}
}
}
LlapCacheableBuffer current = oldTail;
while (current != null) {
evicted += current.getMemoryUsage();
evictionListener.notifyEvicted(current);
current = current.prev;
}
for (int i = 0; i < oldHeapSize; ++i) {
current = oldHeap[i];
if (current == null) continue;
evicted += current.getMemoryUsage();
evictionListener.notifyEvicted(current);
}
LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy",
LlapUtil.humanReadableByteCount(evicted));
return evicted;
}

private static LlapCacheableBuffer removeFromLocalList(
LlapCacheableBuffer tail, LlapCacheableBuffer current) {
if (current == tail) {
tail = current.prev;
} else {
current.next.prev = current.prev;
}
if (current.prev != null) {
current.prev.next = current.next;
}
current.prev = current.next = null;
return tail;
}


@Override
public long evictSomeBlocks(long memoryToReserve) {
Expand All @@ -196,7 +261,7 @@ public long evictSomeBlocks(long memoryToReserve) {
long time = timer.get();
while (evicted < memoryToReserve) {
LlapCacheableBuffer buffer = null;
synchronized (heap) {
synchronized (heapLock) {
buffer = evictFromHeapUnderLock(time);
}
if (buffer == null) return evicted;
Expand Down
Expand Up @@ -100,7 +100,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private final LowLevelCache dataCache;
private final BufferUsageManager bufferManager;
private final Configuration daemonConf;
private LowLevelCachePolicy cachePolicy;
private final LowLevelCacheMemoryManager memoryManager;


private LlapIoImpl(Configuration conf) throws IOException {
this.daemonConf = conf;
Expand Down Expand Up @@ -143,37 +144,38 @@ private LlapIoImpl(Configuration conf) throws IOException {
LowLevelCachePolicy cp = useLrfu ? new LowLevelLrfuCachePolicy(
minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
LowLevelCachePolicy cachePolicyWrapper;
if (trackUsage) {
this.cachePolicy = new CacheContentsTracker(cp);
cachePolicyWrapper = new CacheContentsTracker(cp);
} else {
this.cachePolicy = cp;
cachePolicyWrapper = cp;
}
// Allocator uses memory manager to request memory, so create the manager next.
LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
totalMemorySize, cachePolicy, cacheMetrics);
this.memoryManager = new LowLevelCacheMemoryManager(
totalMemorySize, cachePolicyWrapper, cacheMetrics);
cacheMetrics.setCacheCapacityTotal(totalMemorySize);
// Cache uses allocator to allocate and deallocate, create allocator and then caches.
BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
BuddyAllocator allocator = new BuddyAllocator(conf, memoryManager, cacheMetrics);
this.allocator = allocator;
this.memoryDump = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator, true);
cacheMetrics, cachePolicyWrapper, allocator, true);
dataCache = cacheImpl;
if (isEncodeEnabled) {
SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator);
cacheMetrics, cachePolicyWrapper, allocator);
serdeCache = serdeCacheImpl;
}

boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
metadataCache = new MetadataCache(
allocator, memManager, cachePolicy, useGapCache, cacheMetrics);
allocator, memoryManager, cachePolicyWrapper, useGapCache, cacheMetrics);
fileMetadataCache = metadataCache;
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
EvictionDispatcher e = new EvictionDispatcher(
dataCache, serdeCache, metadataCache, allocator);
cachePolicy.setEvictionListener(e);
cachePolicy.setParentDebugDumper(e);
cachePolicyWrapper.setEvictionListener(e);
cachePolicyWrapper.setParentDebugDumper(e);

cacheImpl.startThreads(); // Start the cache threads.
bufferManager = bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
Expand All @@ -185,6 +187,7 @@ private LlapIoImpl(Configuration conf) throws IOException {
SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
dataCache = sbm;
this.memoryManager = null;
}
// IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
Expand Down Expand Up @@ -216,8 +219,8 @@ public String getMemoryInfo() {

@Override
public long purge() {
if (cachePolicy != null) {
return cachePolicy.purge();
if (memoryManager != null) {
return memoryManager.purge();
}
return 0;
}
Expand Down
Expand Up @@ -178,6 +178,46 @@ public void testLruExtreme() {
verifyOrder(mm, lru, et, inserted, null);
}

@Test
public void testPurge() {
final int HEAP_SIZE = 32;
Configuration conf = new Configuration();
conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f);
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf);
MetricsMock m = createMetricsMock();
LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(
HEAP_SIZE, lrfu, m.metricsMock);
lrfu.setEvictionListener(et);
assertEquals(0, lrfu.purge());
for (int testSize = 1; testSize <= HEAP_SIZE; ++testSize) {
LOG.info("Starting with " + testSize);
ArrayList<LlapDataBuffer> purge = new ArrayList<LlapDataBuffer>(testSize),
dontPurge = new ArrayList<LlapDataBuffer>(testSize);
for (int i = 0; i < testSize; ++i) {
LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
assertTrue(cache(mm, lrfu, et, buffer));
// Lock a few blocks without telling the policy.
if ((i + 1) % 3 == 0) {
buffer.incRef();
dontPurge.add(buffer);
} else {
purge.add(buffer);
}
}
lrfu.purge();
for (LlapDataBuffer buffer : purge) {
assertTrue(buffer + " " + testSize, buffer.isInvalid());
mm.releaseMemory(buffer.getMemoryUsage());
}
for (LlapDataBuffer buffer : dontPurge) {
assertFalse(buffer.isInvalid());
buffer.decRef();
mm.releaseMemory(buffer.getMemoryUsage());
}
}
}

@Test
public void testDeadlockResolution() {
int heapSize = 4;
Expand Down

0 comments on commit eade25a

Please sign in to comment.