Skip to content

Commit

Permalink
Multiple improvements to CachingOffHeapBlockAllocator
Browse files Browse the repository at this point in the history
* check if allocator is released in allocate()
* use fixed size array of queues to store cache stripes
* minor tweaks & cosmetics
  • Loading branch information
andreikoval committed Oct 2, 2018
1 parent e3c6248 commit c5fd4d8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
12 changes: 11 additions & 1 deletion community/common/src/main/java/org/neo4j/helpers/Numbers.java
Expand Up @@ -31,13 +31,23 @@ public class Numbers
/**
* Checks if {@code value} is a power of 2.
* @param value the value to check
* @return @{code true} if {@code value} is a power of 2.
* @return {@code true} if {@code value} is a power of 2.
*/
public static boolean isPowerOfTwo( long value )
{
return value > 0 && (value & (value - 1)) == 0;
}

/**
* Returns base 2 logarithm of the closest power of 2 that is less or equal to the {@code value}.
*
* @param value a positive long value
*/
public static int log2floor( long value )
{
return (Long.SIZE - 1) - Long.numberOfLeadingZeros( requirePositive( value ) );
}

public static short safeCastIntToUnsignedShort( int value )
{
if ( (value & ~0xFFFF) != 0 )
Expand Down
Expand Up @@ -20,9 +20,8 @@

package org.neo4j.kernel.impl.util.collection;

import org.eclipse.collections.impl.map.mutable.primitive.LongObjectHashMap;
import org.eclipse.collections.impl.map.mutable.primitive.SynchronizedLongObjectMap;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

Expand All @@ -32,57 +31,65 @@
import org.neo4j.util.VisibleForTesting;

import static org.neo4j.helpers.Numbers.isPowerOfTwo;
import static org.neo4j.helpers.Numbers.log2floor;
import static org.neo4j.util.Preconditions.checkState;
import static org.neo4j.util.Preconditions.requirePositive;
import static org.neo4j.util.Preconditions.requirePowerOfTwo;

/**
* Block allocator that caches freed blocks matching following criteria:
* <ul>
* <li>Aligned size (without alignment padding) must be power of 2
* <li>Unaligned size (with alignment padding) must be less or equal to {@link #maxCacheableBlockSize}
* <li>Size must be power of 2
* <li>Size must be less or equal to {@link #maxCacheableBlockSize}
* </ul>
*
* This class can store at most {@link #maxCachedBlocks} blocks of each cacheable size.
*
* This class is thread safe.
*/
public class CachingOffHeapBlockAllocator implements OffHeapBlockAllocator
{
private final SynchronizedLongObjectMap<BlockingQueue<MemoryBlock>> pool = new SynchronizedLongObjectMap<>( new LongObjectHashMap<>() );
/**
* Max size of cached blocks including alignment padding.
* Max size of cached blocks, a power of 2
*/
private final long maxCacheableBlockSize;
/**
* Max number of blocks of each size to store.
*/
private final int maxCachedBlocks;
private volatile boolean released;
private final BlockingQueue<MemoryBlock>[] caches;

@VisibleForTesting
public CachingOffHeapBlockAllocator()
{
this( ByteUnit.kibiBytes( 512 ), 128 );
}

/**
* @param maxCacheableBlockSize Max size of cached blocks including alignment padding, must be a power of 2
* @param maxCachedBlocks Max number of blocks of each size to store
*/
public CachingOffHeapBlockAllocator( long maxCacheableBlockSize, int maxCachedBlocks )
{
requirePositive( maxCachedBlocks );
this.maxCacheableBlockSize = requirePowerOfTwo( maxCacheableBlockSize );
this.maxCachedBlocks = requireNonNegative( maxCachedBlocks );

final int numOfCaches = log2floor( maxCacheableBlockSize ) + 1;
//noinspection unchecked
this.caches = new BlockingQueue[numOfCaches];
for ( int i = 0; i < caches.length; i++ )
{
caches[i] = new ArrayBlockingQueue<>( maxCachedBlocks );
}
}

@Override
public MemoryBlock allocate( long size, MemoryAllocationTracker tracker )
{
requirePositive( size );
if ( size > maxCacheableBlockSize || Long.bitCount( size ) > 1 )
checkState( !released, "Allocator is already released" );
if ( !isCacheable( size ) )
{
return allocateNew( size, tracker );
}

final BlockingQueue<MemoryBlock> cached = pool.getIfAbsentPut( size, () -> new ArrayBlockingQueue<>( maxCachedBlocks ) );

MemoryBlock block = cached.poll();
final BlockingQueue<MemoryBlock> cache = caches[log2floor( size )];
MemoryBlock block = cache.poll();
if ( block == null )
{
block = allocateNew( size, tracker );
Expand All @@ -98,14 +105,14 @@ public MemoryBlock allocate( long size, MemoryAllocationTracker tracker )
@Override
public void free( MemoryBlock block, MemoryAllocationTracker tracker )
{
if ( released || block.size > maxCacheableBlockSize || Long.bitCount( block.size ) > 1 )
if ( released || !isCacheable( block.size ) )
{
doFree( block, tracker );
return;
}

final BlockingQueue<MemoryBlock> cached = pool.getIfAbsentPut( block.size, () -> new ArrayBlockingQueue<>( maxCachedBlocks ) );
if ( cached.offer( block ) )
final BlockingQueue<MemoryBlock> cache = caches[log2floor( block.size )];
if ( cache.offer( block ) )
{
tracker.deallocated( block.unalignedSize );
}
Expand All @@ -119,11 +126,13 @@ public void free( MemoryBlock block, MemoryAllocationTracker tracker )
public void release()
{
released = true;
pool.forEach( cached ->
final List<MemoryBlock> blocks = new ArrayList<>();
for ( final BlockingQueue<MemoryBlock> cache : caches )
{
cached.forEach( block -> UnsafeUtil.free( block.unalignedAddr, block.unalignedSize ) );
} );
pool.clear();
cache.drainTo( blocks );
blocks.forEach( block -> UnsafeUtil.free( block.unalignedAddr, block.unalignedSize ) );
blocks.clear();
}
}

@VisibleForTesting
Expand All @@ -141,4 +150,9 @@ MemoryBlock allocateNew( long size, MemoryAllocationTracker tracker )
UnsafeUtil.setMemory( unalignedAddr, unalignedSize, (byte) 0 );
return new MemoryBlock( addr, size, unalignedAddr, unalignedSize );
}

private boolean isCacheable( long size )
{
return isPowerOfTwo( size ) && size <= maxCacheableBlockSize;
}
}

0 comments on commit c5fd4d8

Please sign in to comment.