Skip to content

Commit

Permalink
Finish SwapperSet and reuse of swapper ids
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed May 26, 2017
1 parent 41ba80b commit 86d527b
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 85 deletions.
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
Expand Down Expand Up @@ -854,15 +855,7 @@ int evictPages( int pageCountToEvict, int clockArm, EvictionRunEvent evictionRun
{
clearEvictorException();
pageCountToEvict--;

Object current;
FreePage freePage = new FreePage( pageRef );
do
{
current = getFreelistHead();
freePage.setNext( current instanceof FreePage ? (FreePage) current : null );
}
while ( !compareAndSetFreelistHead( current, freePage ) );
addFreePageToFreelist( pageRef );
}
}
catch ( IOException e )
Expand All @@ -886,6 +879,18 @@ int evictPages( int pageCountToEvict, int clockArm, EvictionRunEvent evictionRun
return clockArm;
}

private void addFreePageToFreelist( long pageRef )
{
Object current;
FreePage freePage = new FreePage( pageRef );
do
{
current = getFreelistHead();
freePage.setNext( current instanceof FreePage ? (FreePage) current : null );
}
while ( !compareAndSetFreelistHead( current, freePage ) );
}

void clearEvictorException()
{
if ( evictorException != null )
Expand All @@ -908,4 +913,31 @@ public String toString()
sb.append( ']' ).append( '\n' );
return sb.toString();
}

void vacuum( SwapperSet swappers )
{
swappers.vacuum( swapperIds ->
{
int pageCount = pages.getPageCount();
try ( EvictionRunEvent evictions = pageCacheTracer.beginPageEvictions( 0 ) )
{
for ( int i = 0; i < pageCount; i++ )
{
long pageRef = pages.deref( i );
while ( swapperIds.test( pages.getSwapperId( pageRef ) ) )
{
if ( pages.tryEvict( pageRef, evictions ) )
{
addFreePageToFreelist( pageRef );
break;
}
}
}
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
});
}
}
Expand Up @@ -136,7 +136,7 @@ public final void close() throws IOException
}
}

private void closeLinks( MuninnPageCursor cursor )
private void closeLinks( MuninnPageCursor cursor ) throws IOException
{
while ( cursor != null && cursor.pagedFile != null )
{
Expand Down
Expand Up @@ -62,14 +62,14 @@ final class MuninnPagedFile extends PageList implements PagedFile, Flushable
final MuninnPageCache pageCache;
final int filePageSize;
final PageCacheTracer pageCacheTracer;
final int swapperId;
final LatchMap pageFaultLatches;

// This is the table where we translate file-page-ids to cache-page-ids. Only one thread can perform a resize at
// a time, and we ensure this mutual exclusion using the monitor lock on this MuninnPagedFile object.
volatile int[][] translationTable;

final PageSwapper swapper;
final int swapperId;
private final CursorPool cursorPool;

// Guarded by the monitor lock on MuninnPageCache (map and unmap)
Expand Down Expand Up @@ -248,6 +248,12 @@ void closeSwapper() throws IOException
{
swapper.closeAndDelete();
}
if ( getSwappers().free( swapperId ) )
{
// We need to do a vacuum of the cache, fully evicting all pages that have freed swapper ids.
// We cannot reuse those swapper ids until there are no more pages using them.
pageCache.vacuum( getSwappers() );
}
}

@Override
Expand Down
Expand Up @@ -416,35 +416,47 @@ private void evict( long pageRef, EvictionEvent evictionEvent ) throws IOExcepti
if ( swapperId != 0 )
{
// If the swapper id is non-zero, then the page was not only loaded, but also bound, and possibly modified.
PageSwapper swapper = swappers.getAllocation( swapperId ).swapper;
evictionEvent.setSwapper( swapper );

if ( isModified( pageRef ) )
SwapperSet.Allocation allocation = swappers.getAllocation( swapperId );
if ( allocation != null )
{
FlushEvent flushEvent = evictionEvent.flushEventOpportunity().beginFlush( filePageId, pageRef, swapper );
try
{
long address = getAddress( pageRef );
long bytesWritten = swapper.write( filePageId, address );
explicitlyMarkPageUnmodifiedUnderExclusiveLock( pageRef );
flushEvent.addBytesWritten( bytesWritten );
flushEvent.addPagesFlushed( 1 );
flushEvent.done();
}
catch ( IOException e )
// The allocation can be null if the file has been unmapped, but there are still pages
// lingering in the cache that were bound to file page in that file.
PageSwapper swapper = allocation.swapper;
evictionEvent.setSwapper( swapper );

if ( isModified( pageRef ) )
{
unlockExclusive( pageRef );
flushEvent.done( e );
evictionEvent.threwException( e );
throw e;
flushModifiedPage( pageRef, evictionEvent, filePageId, swapper );
}
swapper.evicted( filePageId );
}
swapper.evicted( filePageId );
}
clearBinding( pageRef );
}

private void clearBinding( long pageRef )
private void flushModifiedPage( long pageRef, EvictionEvent evictionEvent, long filePageId, PageSwapper swapper )
throws IOException
{
FlushEvent flushEvent = evictionEvent.flushEventOpportunity().beginFlush( filePageId, pageRef, swapper );
try
{
long address = getAddress( pageRef );
long bytesWritten = swapper.write( filePageId, address );
explicitlyMarkPageUnmodifiedUnderExclusiveLock( pageRef );
flushEvent.addBytesWritten( bytesWritten );
flushEvent.addPagesFlushed( 1 );
flushEvent.done();
}
catch ( IOException e )
{
unlockExclusive( pageRef );
flushEvent.done( e );
evictionEvent.threwException( e );
throw e;
}
}

protected void clearBinding( long pageRef )
{
setFilePageId( pageRef, PageCursor.UNBOUND_PAGE_ID );
setSwapperId( pageRef, 0 );
Expand Down
Expand Up @@ -20,7 +20,8 @@
package org.neo4j.io.pagecache.impl.muninn;

import java.util.Arrays;
import java.util.function.IntConsumer;
import java.util.function.Consumer;
import java.util.function.IntPredicate;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveIntIterator;
Expand All @@ -31,9 +32,14 @@ final class SwapperSet
{
// The sentinel is used to reserve swapper id 0 as a special value.
private static final Allocation SENTINEL = new Allocation( 0, null );
// The tombstone is used as a marker to reserve allocation entries that have been freed, but not yet vacuumed.
// An allocation cannot be reused until it has been vacuumed.
private static final Allocation TOMBSTONE = new Allocation( 0, null );
private static final int MAX_SWAPPER_ID = Short.MAX_VALUE;
private volatile Allocation[] allocations = new Allocation[] { SENTINEL };
private final PrimitiveIntSet free = Primitive.intSet();
private final Object vacuumLock = new Object();
private int freeCounter; // Used in `free`; Guarded by `this`

public static final class Allocation
{
Expand All @@ -51,9 +57,9 @@ public Allocation getAllocation( int id )
{
checkId( id );
Allocation allocation = allocations[id];
if ( allocation == null )
if ( allocation == null || allocation == TOMBSTONE )
{
throw noSuchId( id );
return null;
}
return allocation;
}
Expand All @@ -66,99 +72,98 @@ private void checkId( int id )
}
}

private NullPointerException noSuchId( int id )
{
return new NullPointerException( "Swapper allocation by id " + id + " does not exist; it might have been freed" );
}

public synchronized int allocate( PageSwapper swapper )
{
Allocation[] allocations = this.allocations;

// First look for an available slot.
// First look for an available freed slot.
synchronized ( free )
{
for ( int i = 0; i < allocations.length; i++ )
if ( !free.isEmpty() )
{
if ( allocations[i] == null && !free.contains( i ) )
{
// Found an available slot; there's no current allocation, and it is also not in the free set.
// We cannot reuse freed ids before they have been vacuumed. Vacuuming means that we ensure that
// there are no pages in the PageList that refers to these ids. The point is, that unmapping a file
// only ensures that all of its pages are flushed; not that they are evicted. Vacuuming is the
// eviction of the pages that are bound to the now unmapped file.
allocations[i] = new Allocation( i, swapper );
this.allocations = allocations; // Volatile store synchronizes-with loads in getters.
return i;
}
int id = free.iterator().next();
free.remove( id );
allocations[id] = new Allocation( id, swapper );
this.allocations = allocations; // Volatile store synchronizes-with loads in getters.
return id;
}
}

// No free slot was found above, so we extend the array to make room for a new slot.
int idx = allocations.length;
allocations = Arrays.copyOf( allocations, idx + 1 );
allocations[idx] = new Allocation( idx, swapper );
int id = allocations.length;
if ( id + 1 > MAX_SWAPPER_ID )
{
throw new IllegalStateException( "All swapper ids are allocated: " + MAX_SWAPPER_ID );
}
allocations = Arrays.copyOf( allocations, id + 1 );
allocations[id] = new Allocation( id, swapper );
this.allocations = allocations; // Volatile store synchronizes-with loads in getters.
return idx;
return id;
}

public synchronized void free( int id )
public synchronized boolean free( int id )
{
checkId( id );
Allocation[] allocations = this.allocations;
if ( allocations[id] == null )
Allocation current = allocations[id];
if ( current == null || current == TOMBSTONE )
{
throw new IllegalStateException(
"PageSwapper allocation id " + id + " is currently not allocated. Likely a double free bug." );
}
allocations[id] = null;
synchronized ( free )
allocations[id] = TOMBSTONE;
this.allocations = allocations; // Volatile store synchronizes-with loads in getters.
freeCounter++;
if ( freeCounter == 20 )
{
free.add( id );
freeCounter = 0;
return true;
}
this.allocations = allocations; // Volatile store synchronizes-with loads in getters.
return false;
}

public void vacuum( IntConsumer evictAllLoadedPagesCallback )
public void vacuum( Consumer<IntPredicate> evictAllLoadedPagesCallback )
{
// We do this complicated locking to avoid blocking allocate() and free() as much as possible, while still only
// allow a single thread to do vacuum at a time, and at the same time have consistent locking around the
// set of free ids.
synchronized ( vacuumLock )
{
// Collect currently free ids.
int[] freeIds;
synchronized ( free )
PrimitiveIntSet freeIds = Primitive.intSet();
Allocation[] allocations = this.allocations;
for ( int id = 0; id < allocations.length; id++ )
{
freeIds = new int[free.size()];
PrimitiveIntIterator iterator = free.iterator();
int index = 0;
while ( iterator.hasNext() )
Allocation allocation = allocations[id];
if ( allocation == TOMBSTONE )
{
freeIds[index] = iterator.next();
index++;
freeIds.add( id );
}
}

// Evict all of them without holding up the lock on the free id set. This allows allocate() and free() to
// proceed concurrently with our eviction. This is safe because we know that the ids we are evicting cannot
// possibly be reused until we remove them from the free id set, which we won't do until after we've evicted
// all of their loaded pages.
for ( int freeId : freeIds )
{
evictAllLoadedPagesCallback.accept( freeId );
}
evictAllLoadedPagesCallback.accept( freeIds );

// Finally, all of the pages that remained in memory with an unmapped swapper id have been evicted. We can
// now safely allow those ids to be reused. Note, however, that free() might have been called while we were
// doing this, so we can't just free.clear() the set; no, we have to explicitly remove only those specific
// ids whose pages we evicted.
synchronized ( free )
synchronized ( this )
{
for ( int freeId : freeIds )
PrimitiveIntIterator itr = freeIds.iterator();
while ( itr.hasNext() )
{
free.remove( freeId );
int freeId = itr.next();
allocations[freeId] = null;
}
this.allocations = allocations; // Volatile store synchronizes-with loads in getters.
}
synchronized ( free )
{
free.addAll( freeIds.iterator() );
}
}
}
Expand Down
Expand Up @@ -622,4 +622,22 @@ private void verifyAdversarialPagedContent( PagedFile pagedFile ) throws IOExcep
}
}
}

@Test
public void mustNotRunOutOfSwapperAllocationSpace() throws Exception
{
configureStandardPageCache();

File file = file( "a" );
int iterations = Short.MAX_VALUE * 20; // Integer.MAX_VALUE;
for ( int i = 0; i < iterations; i++ )
{
PagedFile pagedFile = pageCache.map( file, filePageSize );
try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next() );
}
pagedFile.close();
}
}
}

0 comments on commit 86d527b

Please sign in to comment.