Skip to content

Commit

Permalink
Fix reading records via RecordCursor
Browse files Browse the repository at this point in the history
Currently record cursor has an optimization where it does not reposition the
underlying page cursor if it points to the correct page id. However this is
not entirely correct because the underlying page might have been evicted and
we might attempt reading garbage data. This resulted in failures during
reading properties.

This commit removes page id check.
  • Loading branch information
lutovich committed Mar 10, 2016
1 parent 6d2cb58 commit 204bda1
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 44 deletions.
Expand Up @@ -23,6 +23,9 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -37,22 +40,44 @@

public class RecordingPageCacheTracer implements PageCacheTracer
{
private final Set<Class<? extends Event>> eventTypesToTrace = new HashSet<>();
private final BlockingQueue<Event> record = new LinkedBlockingQueue<>();
private CountDownLatch trapLatch;
private Matcher<? extends Event> trap;

public void pageFaulted( long filePageId, PageSwapper swapper )
public RecordingPageCacheTracer()
{
Fault event = new Fault( swapper, filePageId );
record.add( event );
trip( event );
this( Evict.class, Fault.class );
}

public void evicted( long filePageId, PageSwapper swapper )
@SafeVarargs
public RecordingPageCacheTracer( Class<? extends Event>... eventTypesToTrace )
{
Evict event = new Evict( swapper, filePageId );
record.add( event );
trip( event );
Collections.addAll( this.eventTypesToTrace, eventTypesToTrace );
}

private void pageFaulted( long filePageId, PageSwapper swapper )
{
record( new Fault( swapper, filePageId ) );
}

private void evicted( long filePageId, PageSwapper swapper )
{
record( new Evict( swapper, filePageId ) );
}

private void pinned( long filePageId, PageSwapper swapper )
{
record( new Pin( swapper, filePageId ) );
}

private void record( Event event )
{
if ( eventTypesToTrace.contains( event.getClass() ) )
{
record.add( event );
trip( event );
}
}

@Override
Expand Down Expand Up @@ -132,6 +157,7 @@ public void setCachePageId( int cachePageId )
@Override
public void done()
{
pinned( filePageId, swapper );
}
};
}
Expand Down Expand Up @@ -305,15 +331,23 @@ public String toString()

public static class Fault extends Event
{
public Fault( PageSwapper io, long pageId )
private Fault( PageSwapper io, long pageId )
{
super( io, pageId );
}
}

public static class Evict extends Event
{
public Evict( PageSwapper io, long pageId )
private Evict( PageSwapper io, long pageId )
{
super( io, pageId );
}
}

public static class Pin extends Event
{
private Pin( PageSwapper io, long pageId )
{
super( io, pageId );
}
Expand Down
Expand Up @@ -46,15 +46,19 @@ public static PageCache createPageCache( FileSystemAbstraction fileSystem )
return createPageCache( fileSystem, Config.defaults() );
}

public static PageCache createPageCache(
FileSystemAbstraction fileSystem, Config config )
public static PageCache createPageCache( FileSystemAbstraction fileSystem, Config config )
{
return createPageCache( fileSystem, PageCacheTracer.NULL, config );
}

public static PageCache createPageCache( FileSystemAbstraction fileSystem, PageCacheTracer tracer, Config config )
{
Config baseConfig = new Config( MapUtil.stringMap(
GraphDatabaseSettings.pagecache_memory.name(), "8M" ) );
Config finalConfig = baseConfig.with( config.getParams() );
FormattedLogProvider logProvider = FormattedLogProvider.toOutputStream( System.err );
ConfiguringPageCacheFactory pageCacheFactory = new ConfiguringPageCacheFactory(
fileSystem, finalConfig, PageCacheTracer.NULL, logProvider.getLog( PageCache.class ) );
fileSystem, finalConfig, tracer, logProvider.getLog( PageCache.class ) );
return pageCacheFactory.getOrCreatePageCache();
}
}
Expand Up @@ -1114,8 +1114,7 @@ public boolean next()
record.setId( currentId );
long pageId = pageIdForRecord( currentId );
int offset = offsetForId( currentId );
if ( pageId == pageCursor.getCurrentPageId() ||
pageCursor.next( pageId ) )
if ( pageCursor.next( pageId ) )
{
do
{
Expand Down
Expand Up @@ -19,73 +19,167 @@
*/
package org.neo4j.kernel.impl.store;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.mockito.InOrder;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.RecordingPageCacheTracer;
import org.neo4j.io.pagecache.RecordingPageCacheTracer.Pin;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.format.RecordFormat;
import org.neo4j.kernel.impl.store.format.lowlimit.LowLimitV3_0;
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.id.IdGenerator;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.PageCacheRule;
import org.neo4j.test.TargetDirectory;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
import static org.neo4j.io.pagecache.RecordingPageCacheTracer.Event;
import static org.neo4j.kernel.impl.store.record.Record.NO_NEXT_PROPERTY;
import static org.neo4j.kernel.impl.store.record.Record.NO_NEXT_RELATIONSHIP;
import static org.neo4j.test.TargetDirectory.testDirForTest;

public class CommonAbstractStoreTest
{
private static final NullLogProvider LOG = NullLogProvider.getInstance();
private static final int PAGE_SIZE = 32;
private static final int RECORD_SIZE = 10;
private static final int HIGH_ID = 42;

private final IdGenerator idGenerator = mock( IdGenerator.class );
private final IdGeneratorFactory idGeneratorFactory = mock( IdGeneratorFactory.class );
private final PagedFile pageFile = mock( PagedFile.class );
private final PageCache pageCache = mock( PageCache.class );
private final Config config = Config.empty();
private final File storeFile = new File( "store" );
private final IdType idType = IdType.RELATIONSHIP; // whatever

@Test
public void shouldCloseStoreFileFirstAndIdGeneratorAfter() throws Throwable
private static final FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
private static final TargetDirectory.TestDirectory dir = testDirForTest( CommonAbstractStoreTest.class, fs );
private static final PageCacheRule pageCacheRule = new PageCacheRule();

@ClassRule
public static final RuleChain ruleChain = RuleChain.outerRule( dir ).around( pageCacheRule );

@Before
public void setUpMocks() throws IOException
{
// given
PagedFile storePagedFile = mock( PagedFile.class );
when( pageCache.map( eq( storeFile ), anyInt() ) ).thenReturn( storePagedFile );
IdGenerator idGenerator = mock(
IdGenerator.class );
when( idGeneratorFactory.open( any( File.class ), anyInt(), eq( idType ), anyInt(), anyInt() ) )
.thenReturn( idGenerator );
RecordFormat recordFormat = Mockito.mock( RecordFormat.class );
CommonAbstractStore store = new TheStore( storeFile, config, idType, idGeneratorFactory, pageCache, LOG, recordFormat );
store.initialise( false );

// this is needed to forget all interaction with the mocks during the construction of the store
reset( storePagedFile, idGenerator );
when( pageFile.pageSize() ).thenReturn( PAGE_SIZE );
when( pageCache.map( eq( storeFile ), anyInt() ) ).thenReturn( pageFile );
}

InOrder inOrder = inOrder( storePagedFile, idGenerator );
@Test
public void shouldCloseStoreFileFirstAndIdGeneratorAfter() throws Throwable
{
// given
TheStore store = newStore();
InOrder inOrder = inOrder( pageFile, idGenerator );

// when
store.close();

// then
inOrder.verify( storePagedFile, times( 1 ) ).close();
inOrder.verify( pageFile, times( 1 ) ).close();
inOrder.verify( idGenerator, times( 1 ) ).close();
}

private static class TheStore extends CommonAbstractStore
@Test
public void recordCursorCallsNextOnThePageCursor() throws IOException
{
TheStore store = newStore();
long recordId = 4;
long pageIdForRecord = store.pageIdForRecord( recordId );

PageCursor pageCursor = mock( PageCursor.class );
when( pageCursor.getCurrentPageId() ).thenReturn( pageIdForRecord );
when( pageCursor.next( anyInt() ) ).thenReturn( true );

RecordCursor<TheRecord> cursor = store.newRecordCursor( new TheRecord( -1 ) );
cursor.init( recordId, RecordLoad.FORCE, pageCursor );

cursor.next( recordId );

InOrder order = inOrder( pageCursor );
order.verify( pageCursor ).next( pageIdForRecord );
order.verify( pageCursor ).shouldRetry();
}

@Test
public void recordCursorPinsEachPageItReads() throws Exception
{
public TheStore( File fileName, Config configuration, IdType idType, IdGeneratorFactory idGeneratorFactory,
PageCache pageCache, LogProvider logProvider, RecordFormat recordFormat )
File storeFile = dir.file( "a" );
RecordingPageCacheTracer tracer = new RecordingPageCacheTracer( Pin.class );
PageCache pageCache = pageCacheRule.getPageCache( fs, tracer, Config.empty() );

try ( NodeStore store = new NodeStore( storeFile, Config.empty(), new DefaultIdGeneratorFactory( fs ),
pageCache, NullLogProvider.getInstance(), null, LowLimitV3_0.RECORD_FORMATS ) )
{
store.initialise( true );
assertNull( tracer.tryObserve( Event.class ) );

NodeRecord record = store.newRecord();
record.setId( 0 );
record.initialize( true, NO_NEXT_PROPERTY.intValue(), false, NO_NEXT_RELATIONSHIP.intValue(), 42 );
store.updateRecord( record );
assertNotNull( tracer.tryObserve( Pin.class ) );
assertNull( tracer.tryObserve( Event.class ) );

long pageId = store.pageIdForRecord( record.getId() );
try ( RecordCursor<NodeRecord> cursor = store.newRecordCursor( store.newRecord() );
PageCursor pageCursor = store.storeFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
assertTrue( pageCursor.next( pageId ) );
cursor.init( record.getId(), RecordLoad.NORMAL, pageCursor );
assertTrue( cursor.next() );
assertNotNull( tracer.tryObserve( Pin.class ) );
assertNull( tracer.tryObserve( Event.class ) );
}
}
}

@SuppressWarnings( "unchecked" )
private TheStore newStore()
{
RecordFormat<TheRecord> recordFormat = mock( RecordFormat.class );
LogProvider log = NullLogProvider.getInstance();
TheStore store = new TheStore( storeFile, config, idType, idGeneratorFactory, pageCache, log, recordFormat );
store.initialise( false );
return store;
}

private static class TheStore extends CommonAbstractStore<TheRecord,NoStoreHeader>
{
TheStore( File fileName, Config configuration, IdType idType, IdGeneratorFactory idGeneratorFactory,
PageCache pageCache, LogProvider logProvider, RecordFormat<TheRecord> recordFormat )
{
super( fileName, configuration, idType, idGeneratorFactory, pageCache, logProvider, "TheType",
recordFormat, NoStoreHeaderFormat.NO_STORE_HEADER_FORMAT, "v1" );
Expand All @@ -99,18 +193,26 @@ protected void initialiseNewStoreFile( PagedFile file ) throws IOException
@Override
protected int determineRecordSize()
{
return 10;
return RECORD_SIZE;
}

@Override
public long scanForHighId()
{
return 42;
return HIGH_ID;
}

@Override
public void accept( Processor processor, AbstractBaseRecord record ) throws Exception
public <FAILURE extends Exception> void accept( Processor<FAILURE> processor, TheRecord record ) throws FAILURE
{
}
}

private static class TheRecord extends AbstractBaseRecord
{
TheRecord( long id )
{
super( id );
}
}
}

0 comments on commit 204bda1

Please sign in to comment.