Skip to content

Commit

Permalink
Add first stab at a MemoryManager for Muninn
Browse files Browse the repository at this point in the history
The idea is to be able to allocate the cache pages at any alignment that the PageSwappers might require.
Useful if we want to start doing direct-io.
  • Loading branch information
chrisvest committed Jul 10, 2015
1 parent ba5cfcb commit ce99e12
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 61 deletions.
Expand Up @@ -56,6 +56,14 @@ public interface PageSwapperFactory
*/
boolean isCachePageSizeHintStrict();

/**
* Get the unit of alignment that the swappers require of the memory buffers. For instance, if page alignment is
* required for doing direct IO, then {@link org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil#pageSize()} can be
* returned.
* @return The required buffer alignment byte multiple.
*/
long getRequiredBufferAlignment();

/**
* Create a PageSwapper for the given file.
* @param file The file that the PageSwapper will move file pages in and
Expand Down
Expand Up @@ -87,4 +87,10 @@ public boolean isCachePageSizeHintStrict()
{
return false;
}

@Override
public long getRequiredBufferAlignment()
{
return 1;
}
}
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.pagecache.impl.muninn;

import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

/**
* The memory manager is simple: it only allocates memory, until it itself is finalizable and frees it all in one go.
*
* The memory is allocated in large segments, and the memory returned by the memory manager is page aligned, and plays
* well with transparent huge pages and other operating system optimisations.
*
* The memory manager assumes that the memory claimed from it is evenly divisible in units of pages.
*/
final class MemoryManager
{
private static final long GRAB_SIZE = 32 * 1024 * 1024; // 32 MiB

/**
* The amount of memory that this memory manager can still allocate.
*/
private long memoryReserve;
private final long alignment;

private Slab slabs;

/**
* Create a new MemoryManager that will allocate the given amount of memory, to pointers that are aligned to the
* given alignment size.
* @param expectedMaxMemory The maximum amount of memory that this memory manager is expected to allocate. The
* actual amount of memory used can end up greater than this value, if some of it gets wasted on alignment padding.
* @param alignment The byte multiple that the allocated pointers have to be aligned at.
*/
public MemoryManager( long expectedMaxMemory, long alignment )
{
this.memoryReserve = expectedMaxMemory;
this.alignment = alignment;
}

/**
* Allocate a contiguous, aligned region of memory of the given size in bytes.
* @param bytes the number of bytes to allocate.
* @return A pointer to the allocated memory.
*/
public synchronized long allocateAligned( long bytes )
{
if ( slabs == null || !slabs.canAllocate( bytes ) )
{
long slabGrab = Math.min( GRAB_SIZE, memoryReserve );
if ( slabGrab < bytes )
{
slabGrab = bytes + alignment;
}
memoryReserve -= slabGrab;
slabs = new Slab( slabs, slabGrab, alignment );
}
return slabs.allocate( bytes );
}

@Override
protected synchronized void finalize() throws Throwable
{
super.finalize();
Slab current = slabs;

while ( current != null )
{
current.free();
current = current.next;
}
}

private static class Slab
{
public final Slab next;
private final long address;
private final long limit;
private final long alignMask;
private long nextAlignedPointer;

public Slab( Slab next, long size, long alignment )
{
this.next = next;
this.address = UnsafeUtil.malloc( size );
this.limit = address + size;
this.alignMask = alignment - 1;

nextAlignedPointer = nextAligned( address );
}

private long nextAligned( long pointer )
{
if ( (pointer & ~alignMask) == pointer )
{
return pointer;
}
return (pointer + alignMask) & ~alignMask;
}

public long allocate( long bytes )
{
long allocation = nextAlignedPointer;
nextAlignedPointer = nextAligned( nextAlignedPointer + bytes );
return allocation;
}

public void free()
{
UnsafeUtil.free( address );
}

public boolean canAllocate( long bytes )
{
return nextAlignedPointer + bytes <= limit;
}
}
}

This file was deleted.

Expand Up @@ -41,9 +41,9 @@ final class MuninnPage extends StampedLock implements Page
// The other 7 bits are used as an exponent for computing the cache page size (as a power of two).
private byte cachePageHeader;

// We keep this reference to prevent the MemoryReleaser from becoming
// We keep this reference to prevent the MemoryManager from becoming
// finalizable until all our pages are finalizable or collected.
private final MemoryReleaser memoryReleaser;
private final MemoryManager memoryManager;

private long pointer;

Expand All @@ -60,10 +60,10 @@ final class MuninnPage extends StampedLock implements Page
private PageSwapper swapper;
private long filePageId = PageCursor.UNBOUND_PAGE_ID;

public MuninnPage( int cachePageSize, MemoryReleaser memoryReleaser )
public MuninnPage( int cachePageSize, MemoryManager memoryManager )
{
this.cachePageHeader = (byte) (31 - Integer.numberOfLeadingZeros( cachePageSize ));
this.memoryReleaser = memoryReleaser;
this.memoryManager = memoryManager;
getCachePageId(); // initialize our identity hashCode
}

Expand Down Expand Up @@ -430,8 +430,7 @@ public void initBuffer()
assert isWriteLocked(): "Cannot initBuffer without write-lock";
if ( pointer == 0 )
{
pointer = UnsafeUtil.malloc( size() );
memoryReleaser.registerPointer( pointer );
pointer = memoryManager.allocateAligned( size() );
UnsafeUtil.setMemory( pointer, size(), MuninnPageCache.ZERO_BYTE );
}
}
Expand Down
Expand Up @@ -235,12 +235,13 @@ public MuninnPageCache(
this.backgroundFlushPauseRequests = new AtomicInteger();
this.printExceptionsOnClose = true;

MemoryReleaser memoryReleaser = new MemoryReleaser( maxPages );
long alignment = swapperFactory.getRequiredBufferAlignment();
MemoryManager memoryManager = new MemoryManager( maxPages * cachePageSize, alignment );
Object pageList = null;
int pageIndex = maxPages;
while ( pageIndex --> 0 )
{
MuninnPage page = new MuninnPage( cachePageSize, memoryReleaser );
MuninnPage page = new MuninnPage( cachePageSize, memoryManager );
pages[pageIndex] = page;

if ( pageList == null )
Expand Down
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.pagecache.impl.muninn;

import org.junit.Test;

import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.*;

public class MemoryManagerTest
{
@Test
public void allocatedPointerMustNotBeNull() throws Exception
{
MemoryManager mman = new MemoryManager( 16 * 4096, 8 );
long address = mman.allocateAligned( 8192 );
assertThat( address, is( not( 0L ) ) );
}

@Test
public void allocatedPointerMustBePageAligned() throws Exception
{
MemoryManager mman = new MemoryManager( 16 * 4096, UnsafeUtil.pageSize() );
long address = mman.allocateAligned( 8192 );
assertThat( address % UnsafeUtil.pageSize(), is( 0L ) );
}

@Test
public void mustBeAbleToAllocatePastMemoryLimit() throws Exception
{
MemoryManager mman = new MemoryManager( 8192, 2 );
for ( int i = 0; i < 4100; i++ )
{
assertThat( mman.allocateAligned( 1 ) % 2, is( 0L ) );
}
// Also asserts that no OutOfMemoryError is thrown.
}
}

0 comments on commit ce99e12

Please sign in to comment.