Skip to content

Commit

Permalink
Allow tracking version context on transaction that executed by multiple
Browse files Browse the repository at this point in the history
threads (transactions that goes over rest end point).
Re-check version context when cursor repositioned to its current page again.

Check version context state after eager result visiting during serialization
since both bolt and rest writers will try to load all properties of
nodes and relationships from database, if those are part of result,
to send those to client as well. And since those executed outside of query
we need to expand version check to verify that after streaming result
we still based on valid snapshot.
If dirty context will be noticed exception will be thrown since we can't
restart that query - its too late and client already get partial results for it.
  • Loading branch information
MishaDemianenko committed Jan 26, 2018
1 parent 47ddf5f commit c787e0c
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 70 deletions.
Expand Up @@ -25,11 +25,14 @@
import java.util.Map; import java.util.Map;


import org.neo4j.graphdb.ExecutionPlanDescription; import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.Notification; import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryExecutionType; import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.QueryStatistics; import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContext;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;


import static java.lang.System.lineSeparator; import static java.lang.System.lineSeparator;


Expand All @@ -40,12 +43,14 @@ class EagerResult implements Result
{ {
private static final String ITEM_SEPARATOR = ", "; private static final String ITEM_SEPARATOR = ", ";
private final Result originalResult; private final Result originalResult;
private final VersionContext versionContext;
private final List<Map<String, Object>> queryResult = new ArrayList<>(); private final List<Map<String, Object>> queryResult = new ArrayList<>();
private int cursor; private int cursor;


EagerResult( Result result ) EagerResult( Result result, VersionContext versionContext )
{ {
this.originalResult = result; this.originalResult = result;
this.versionContext = versionContext;
} }


public void consume() public void consume()
Expand Down Expand Up @@ -145,9 +150,28 @@ public Iterable<Notification> getNotifications()
public <VisitationException extends Exception> void accept( ResultVisitor<VisitationException> visitor ) public <VisitationException extends Exception> void accept( ResultVisitor<VisitationException> visitor )
throws VisitationException throws VisitationException
{ {
for ( Map<String,Object> map : queryResult ) try
{ {
visitor.visit( new MapRow( map ) ); for ( Map<String,Object> map : queryResult )
{
visitor.visit( new MapRow( map ) );
}
checkIfDirty();
}
catch ( NotFoundException e )
{
checkIfDirty();
throw e;
}
}

private void checkIfDirty()
{
if ( versionContext.isDirty() )
{
throw new QueryExecutionKernelException(
new UnstableSnapshotException( "Unable to get clean data snapshot for query serialisation." ) )
.asUserException();
} }
} }


Expand Down
Expand Up @@ -80,7 +80,7 @@ protected Result executeWithRetries( String query, Map<String,Object> parameters
attempt++; attempt++;
versionContext.initRead(); versionContext.initRead();
Result result = executor.execute( query, parameters, context ); Result result = executor.execute( query, parameters, context );
eagerResult = new EagerResult( result ); eagerResult = new EagerResult( result, versionContext );
eagerResult.consume(); eagerResult.consume();
dirtySnapshot = versionContext.isDirty(); dirtySnapshot = versionContext.isDirty();
if ( dirtySnapshot && result.getQueryStatistics().containsUpdates() ) if ( dirtySnapshot && result.getQueryStatistics().containsUpdates() )
Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.QueryExecutionType; import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
Expand Down Expand Up @@ -201,6 +203,30 @@ public void eagerResultVisit() throws Exception
assertThat( values, Matchers.containsInAnyOrder( "d", "y" ) ); assertThat( values, Matchers.containsInAnyOrder( "d", "y" ) );
} }


@Test( expected = QueryExecutionException.class )
public void dirtyContextDuringResultVisitResultInUnstableSnapshotException() throws Exception
{
Result result = database.execute( "MATCH (n) RETURN n.c" );
List<String> values = new ArrayList<>();
result.accept( (Result.ResultVisitor<Exception>) row ->
{
testCursorContext.markAsDirty();
values.add( row.getString( "n.c" ) );
return false;
} );
}

@Test( expected = QueryExecutionException.class )
public void dirtyContextEntityNotFoundExceptionDuringResultVisitResultInUnstableSnapshotException() throws Exception
{
Result result = database.execute( "MATCH (n) RETURN n.c" );
result.accept( (Result.ResultVisitor<Exception>) row ->
{
testCursorContext.markAsDirty();
throw new NotFoundException( new RuntimeException() );
} );
}

private String printToStream( Result result ) private String printToStream( Result result )
{ {
StringWriter stringWriter = new StringWriter(); StringWriter stringWriter = new StringWriter();
Expand Down
Expand Up @@ -74,8 +74,7 @@ MuninnReadPageCursor takeReadCursor( long pageId, int pf_flags )


private MuninnReadPageCursor createReadCursor( CursorSets cursorSets ) private MuninnReadPageCursor createReadCursor( CursorSets cursorSets )
{ {
MuninnReadPageCursor cursor = new MuninnReadPageCursor( cursorSets, victimPage, getPageCursorTracer(), versionContextSupplier MuninnReadPageCursor cursor = new MuninnReadPageCursor( cursorSets, victimPage, getPageCursorTracer(), versionContextSupplier );
.getVersionContext() );
cursor.initialiseFile( pagedFile ); cursor.initialiseFile( pagedFile );
return cursor; return cursor;
} }
Expand All @@ -98,8 +97,7 @@ MuninnWritePageCursor takeWriteCursor( long pageId, int pf_flags )


private MuninnWritePageCursor createWriteCursor( CursorSets cursorSets ) private MuninnWritePageCursor createWriteCursor( CursorSets cursorSets )
{ {
MuninnWritePageCursor cursor = new MuninnWritePageCursor( cursorSets, victimPage, getPageCursorTracer(), versionContextSupplier MuninnWritePageCursor cursor = new MuninnWritePageCursor( cursorSets, victimPage, getPageCursorTracer(), versionContextSupplier );
.getVersionContext() );
cursor.initialiseFile( pagedFile ); cursor.initialiseFile( pagedFile );
return cursor; return cursor;
} }
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.io.pagecache.tracing.PinEvent; import org.neo4j.io.pagecache.tracing.PinEvent;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContext; import org.neo4j.io.pagecache.tracing.cursor.context.VersionContext;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContextSupplier;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil; import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;


import static org.neo4j.io.pagecache.PagedFile.PF_EAGER_FLUSH; import static org.neo4j.io.pagecache.PagedFile.PF_EAGER_FLUSH;
Expand Down Expand Up @@ -66,7 +67,7 @@ abstract class MuninnPageCursor extends PageCursor
private long pointer; private long pointer;
private int pageSize; private int pageSize;
private int filePageSize; private int filePageSize;
protected VersionContext versionContext; protected final VersionContextSupplier versionContextSupplier;
private int offset; private int offset;
private boolean outOfBounds; private boolean outOfBounds;
private boolean isLinkedCursor; private boolean isLinkedCursor;
Expand All @@ -75,12 +76,12 @@ abstract class MuninnPageCursor extends PageCursor
// offending code. // offending code.
private Object cursorException; private Object cursorException;


MuninnPageCursor( long victimPage, PageCursorTracer tracer, VersionContext versionContext ) MuninnPageCursor( long victimPage, PageCursorTracer tracer, VersionContextSupplier versionContextSupplier )
{ {
this.victimPage = victimPage; this.victimPage = victimPage;
this.pointer = victimPage; this.pointer = victimPage;
this.tracer = tracer; this.tracer = tracer;
this.versionContext = versionContext; this.versionContextSupplier = versionContextSupplier;
} }


final void initialiseFile( MuninnPagedFile pagedFile ) final void initialiseFile( MuninnPagedFile pagedFile )
Expand Down Expand Up @@ -119,12 +120,27 @@ public final boolean next( long pageId ) throws IOException
{ {
if ( currentPageId == pageId ) if ( currentPageId == pageId )
{ {
verifyContext();
return true; return true;
} }
nextPageId = pageId; nextPageId = pageId;
return next(); return next();
} }


void verifyContext()
{
VersionContext versionContext = versionContextSupplier.getVersionContext();
if ( versionContext.lastClosedTransactionId() == Long.MAX_VALUE )
{
return;
}
if ( page.getLastModifiedTxId() > versionContext.lastClosedTransactionId() ||
pagedFile.getHighestEvictedTransactionId() > versionContext.lastClosedTransactionId() )
{
versionContext.markAsDirty();
}
}

@Override @Override
public final void close() public final void close()
{ {
Expand Down
Expand Up @@ -23,7 +23,7 @@


import org.neo4j.io.pagecache.PageSwapper; import org.neo4j.io.pagecache.PageSwapper;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContext; import org.neo4j.io.pagecache.tracing.cursor.context.VersionContextSupplier;


final class MuninnReadPageCursor extends MuninnPageCursor final class MuninnReadPageCursor extends MuninnPageCursor
{ {
Expand All @@ -32,9 +32,9 @@ final class MuninnReadPageCursor extends MuninnPageCursor
MuninnReadPageCursor nextCursor; MuninnReadPageCursor nextCursor;


MuninnReadPageCursor( CursorPool.CursorSets cursorSets, long victimPage, PageCursorTracer pageCursorTracer, MuninnReadPageCursor( CursorPool.CursorSets cursorSets, long victimPage, PageCursorTracer pageCursorTracer,
VersionContext versionContext ) VersionContextSupplier versionContextSupplier )
{ {
super( victimPage, pageCursorTracer, versionContext ); super( victimPage, pageCursorTracer, versionContextSupplier );
this.cursorSets = cursorSets; this.cursorSets = cursorSets;
} }


Expand Down Expand Up @@ -65,18 +65,18 @@ public boolean next() throws IOException
return true; return true;
} }


private void verifyContext() // private void verifyContext()
{ // {
if ( versionContext.lastClosedTransactionId() == Long.MAX_VALUE ) // if ( versionContext.lastClosedTransactionId() == Long.MAX_VALUE )
{ // {
return; // return;
} // }
// if ( page.getLastModifiedTxId() > versionContext.lastClosedTransactionId() || // if ( page.getLastModifiedTxId() > versionContext.lastClosedTransactionId() ||
// pagedFile.getHighestEvictedTransactionId() > versionContext.lastClosedTransactionId() ) // pagedFile.getHighestEvictedTransactionId() > versionContext.lastClosedTransactionId() )
// { // {
// versionContext.markAsDirty(); // versionContext.markAsDirty();
// } // }
} // }


@Override @Override
protected boolean tryLockPage( long pageRef ) protected boolean tryLockPage( long pageRef )
Expand Down
Expand Up @@ -24,17 +24,17 @@
import org.neo4j.io.pagecache.PageSwapper; import org.neo4j.io.pagecache.PageSwapper;
import org.neo4j.io.pagecache.PagedFile; import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContext; import org.neo4j.io.pagecache.tracing.cursor.context.VersionContextSupplier;


final class MuninnWritePageCursor extends MuninnPageCursor final class MuninnWritePageCursor extends MuninnPageCursor
{ {
private final CursorPool.CursorSets cursorSets; private final CursorPool.CursorSets cursorSets;
MuninnWritePageCursor nextCursor; MuninnWritePageCursor nextCursor;


MuninnWritePageCursor( CursorPool.CursorSets cursorSets, long victimPage, PageCursorTracer pageCursorTracer, MuninnWritePageCursor( CursorPool.CursorSets cursorSets, long victimPage, PageCursorTracer pageCursorTracer,
VersionContext versionContext ) VersionContextSupplier versionContextSupplier )
{ {
super( victimPage, pageCursorTracer, versionContext ); super( victimPage, pageCursorTracer, versionContextSupplier );
this.cursorSets = cursorSets; this.cursorSets = cursorSets;
} }


Expand Down Expand Up @@ -126,7 +126,7 @@ protected void pinCursorToPage( long pageRef, long filePageId, PageSwapper swapp
// be closed and the page lock will be released. // be closed and the page lock will be released.
assertPagedFileStillMappedAndGetIdOfLastPage(); assertPagedFileStillMappedAndGetIdOfLastPage();
pagedFile.incrementUsage( pageRef ); pagedFile.incrementUsage( pageRef );
pagedFile.setLastModifiedTxId( versionContext.committingTransactionId() ); pagedFile.setLastModifiedTxId( versionContextSupplier.getVersionContext().committingTransactionId() );
} }


@Override @Override
Expand Down
Expand Up @@ -65,9 +65,4 @@ public boolean isDirty()
{ {
return false; return false;
} }

@Override
public void clear()
{
}
} }
Expand Up @@ -64,9 +64,4 @@ public interface VersionContext
*/ */
boolean isDirty(); boolean isDirty();


/**
* Clear transaction ids that read/write context was initialised with
*/
void clear();

} }
Expand Up @@ -244,7 +244,7 @@ public void trackPageModificationTransactionId() throws Exception
} }


@Test @Test
public void pareModificationTrackingNoticeWriteFromAnotherThread() throws Exception public void pageModificationTrackingNoticeWriteFromAnotherThread() throws Exception
{ {
TestVersionContext cursorContext = new TestVersionContext( () -> 0 ); TestVersionContext cursorContext = new TestVersionContext( () -> 0 );
VersionContextSupplier versionContextSupplier = new ConfiguredVersionContextSupplier( cursorContext ); VersionContextSupplier versionContextSupplier = new ConfiguredVersionContextSupplier( cursorContext );
Expand Down Expand Up @@ -312,6 +312,28 @@ public void pageModificationTracksHighestModifierTransactionId() throws IOExcept
} }
} }


@Test
public void markCursorContextDirtyWhenRepositionCursorOnItsCurrentPage() throws IOException
{
TestVersionContext cursorContext = new TestVersionContext( () -> 3 );
VersionContextSupplier versionContextSupplier = new ConfiguredVersionContextSupplier( cursorContext );
MuninnPageCache pageCache =
createPageCache( fs, 2, 8, PageCacheTracer.NULL, PageCursorTracerSupplier.NULL, versionContextSupplier );

PagedFile pagedFile = pageCache.map( file( "a" ), 8 );
cursorContext.initRead();
try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next( 0 ) );
assertFalse( cursorContext.isDirty() );

((MuninnPageCursor) cursor).page.setLastModifiedTxId( 17 );

assertTrue( cursor.next( 0 ) );
assertTrue( cursorContext.isDirty() );
}
}

@Test @Test
public void markCursorContextAsDirtyWhenReadingDataFromMoreRecentTransactions() throws IOException public void markCursorContextAsDirtyWhenReadingDataFromMoreRecentTransactions() throws IOException
{ {
Expand Down Expand Up @@ -651,11 +673,5 @@ public boolean isDirty()
{ {
return dirty; return dirty;
} }

@Override
public void clear()
{

}
} }
} }

0 comments on commit c787e0c

Please sign in to comment.