From 69e147dbf50fed57e7277e516fcc335b72085ab1 Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Wed, 15 Mar 2017 17:16:17 +0100 Subject: [PATCH] Use directly a PageCursor in NodeCursor --- .../kernel/impl/api/store/NodeCursor.java | 34 ++++++-- .../kernel/impl/api/store/StoreStatement.java | 2 +- .../impl/store/CommonAbstractStore.java | 14 +++- .../kernel/impl/store/RecordCursors.java | 12 +-- .../kernel/impl/api/store/NodeCursorTest.java | 80 +++++++++++-------- .../StorageLayerRelTypesAndDegreeTest.java | 5 +- .../kernel/impl/store/RecordCursorsTest.java | 1 - 7 files changed, 94 insertions(+), 54 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java index cd22b32fbc25..4c6ec858d83d 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java @@ -19,18 +19,21 @@ */ package org.neo4j.kernel.impl.api.store; +import java.io.IOException; import java.util.Iterator; import java.util.function.Consumer; import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.cursor.Cursor; +import org.neo4j.io.pagecache.PageCursor; import org.neo4j.kernel.impl.locking.Lock; import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.store.NodeLabelsField; +import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.RecordCursors; +import org.neo4j.kernel.impl.store.UnderlyingStorageException; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.Record; -import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.kernel.impl.util.IoPrimitiveUtils; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.txstate.NodeState; @@ -48,6 +51,7 @@ public class NodeCursor implements NodeItem, Cursor { private final NodeRecord nodeRecord; private final Consumer instanceCache; + private final NodeStore nodeStore; private final RecordCursors recordCursors; private final LockService lockService; @@ -56,18 +60,21 @@ public class NodeCursor implements NodeItem, Cursor private boolean fetched; private long[] labels; private Iterator added; + private PageCursor nodePageCursor; - NodeCursor( NodeRecord nodeRecord, Consumer instanceCache, RecordCursors recordCursors, - LockService lockService ) + NodeCursor( NodeRecord nodeRecord, Consumer instanceCache, NodeStore nodeStore, + RecordCursors recordCursors, LockService lockService ) { this.nodeRecord = nodeRecord; this.instanceCache = instanceCache; + this.nodeStore = nodeStore; this.recordCursors = recordCursors; this.lockService = lockService; } public Cursor init( Progression progression, ReadableTransactionState state ) { + this.nodePageCursor = nodeStore.newPageCursor(); this.progression = progression; this.state = state; this.added = state != null && progression.mode() == APPEND @@ -88,8 +95,7 @@ private boolean fetchNext() long id; while ( (id = progression.nextId()) >= 0 ) { - if ( (state == null || !state.nodeIsDeletedInThisTx( id )) && - recordCursors.node().next( id, nodeRecord, RecordLoad.CHECK ) ) + if ( (state == null || !state.nodeIsDeletedInThisTx( id )) && readNodeRecord( id ) ) { return true; } @@ -110,6 +116,20 @@ private boolean fetchNext() return false; } + private boolean readNodeRecord( long id ) + { + try + { + nodeRecord.clear(); + nodeStore.readIntoRecord( id, nodeRecord, CHECK, nodePageCursor ); + return nodeRecord.inUse(); + } + catch ( IOException e ) + { + throw new UnderlyingStorageException( e ); + } + } + private void recordFromTxState( long id ) { nodeRecord.clear(); @@ -122,6 +142,8 @@ public void close() labels = null; added = null; state = null; + nodePageCursor.close(); + nodePageCursor = null; instanceCache.accept( this ); } @@ -222,7 +244,7 @@ private Lock acquireLock() try { // It's safer to re-read the node record here, specifically nextProp, after acquiring the lock - if ( !recordCursors.node().next( nodeRecord.getId(), nodeRecord, CHECK ) ) + if ( !readNodeRecord( nodeRecord.getId() ) ) { // So it looks like the node has been deleted. The current behavior of NodeStore#loadRecord // is to only set the inUse field on loading an unused record. This should (and will) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java index 45bfe62013f0..e5af5a627527 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java @@ -89,7 +89,7 @@ public StoreStatement( NeoStores neoStores, Supplier indexRe @Override protected NodeCursor create() { - return new NodeCursor( nodeStore.newRecord(), this, recordCursors, lockService ); + return new NodeCursor( nodeStore.newRecord(), this, nodeStore, recordCursors, lockService ); } }; singleRelationshipCursor = new InstanceCache() diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CommonAbstractStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CommonAbstractStore.java index af112192bb69..6762eb3e987a 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CommonAbstractStore.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/CommonAbstractStore.java @@ -1028,7 +1028,7 @@ public RECORD getRecord( long id, RECORD record, RecordLoad mode ) } } - void readIntoRecord( long id, RECORD record, RecordLoad mode, PageCursor cursor ) throws IOException + public void readIntoRecord( long id, RECORD record, RecordLoad mode, PageCursor cursor ) throws IOException { // Mark the record with this id regardless of whether or not we load the contents of it. // This is done in this method since there are multiple call sites and they all want the id @@ -1123,6 +1123,18 @@ public Collection getRecords( long firstId, RecordLoad mode ) } } + public PageCursor newPageCursor() + { + try + { + return storeFile.io( getNumberOfReservedLowIds(), PF_SHARED_READ_LOCK ); + } + catch ( IOException e ) + { + throw new UnderlyingStorageException( e ); + } + } + @Override public RecordCursor newRecordCursor( final RECORD record ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/RecordCursors.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/RecordCursors.java index f1aa78647d6c..7285d7cf3213 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/RecordCursors.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/RecordCursors.java @@ -24,7 +24,6 @@ import org.neo4j.kernel.impl.store.record.DynamicRecord; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.PropertyRecord; -import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.kernel.impl.store.record.RelationshipRecord; @@ -35,7 +34,6 @@ */ public class RecordCursors implements AutoCloseable { - private final RecordCursor node; private final RecordCursor relationship; private final RecordCursor relationshipGroup; private final RecordCursor property; @@ -45,7 +43,6 @@ public class RecordCursors implements AutoCloseable public RecordCursors( NeoStores neoStores ) { - node = newCursor( neoStores.getNodeStore() ); relationship = newCursor( neoStores.getRelationshipStore() ); relationshipGroup = newCursor( neoStores.getRelationshipGroupStore() ); property = newCursor( neoStores.getPropertyStore() ); @@ -62,13 +59,8 @@ private static RecordCursor newCursor( RecordS @Override public void close() { - IOUtils.closeAll( RuntimeException.class, - node, relationship, relationshipGroup, property, propertyArray, propertyString, label ); - } - - public RecordCursor node() - { - return node; + IOUtils.closeAll( RuntimeException.class, relationship, relationshipGroup, property, propertyArray, + propertyString, label ); } public RecordCursor relationship() diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java index e883f2803fcc..81c375cd2a52 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java @@ -27,6 +27,7 @@ import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; +import java.io.IOException; import java.util.Arrays; import java.util.Set; import java.util.function.LongFunction; @@ -34,11 +35,12 @@ import org.neo4j.collection.primitive.PrimitiveIntIterator; import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.cursor.Cursor; +import org.neo4j.io.pagecache.PageCursor; import org.neo4j.kernel.api.StatementConstants; import org.neo4j.kernel.impl.api.state.TxState; import org.neo4j.kernel.impl.api.store.Progression.Mode; import org.neo4j.kernel.impl.locking.Lock; -import org.neo4j.kernel.impl.store.RecordCursor; +import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.RecordCursors; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.util.IoPrimitiveUtils; @@ -50,7 +52,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.collection.primitive.PrimitiveIntCollections.asArray; import static org.neo4j.collection.primitive.PrimitiveIntCollections.asSet; @@ -72,15 +76,15 @@ public class NodeCursorTest private static final int NON_EXISTING_LABEL = Integer.MAX_VALUE; private final NodeRecord nodeRecord = new NodeRecord( -1 ); + private final NodeStore nodeStore = mock( NodeStore.class ); private final RecordCursors recordCursors = mock( RecordCursors.class ); - @SuppressWarnings( "unchecked" ) - private final RecordCursor recordCursor = mock( RecordCursor.class ); + private final PageCursor pageCursor = mock( PageCursor.class ); // cursor is shared since it is designed to be reusable - private final NodeCursor reusableCursor = new NodeCursor( nodeRecord, i -> {}, recordCursors, + private final NodeCursor reusableCursor = new NodeCursor( nodeRecord, i -> {}, nodeStore, recordCursors, NO_LOCK_SERVICE ); { - when( recordCursors.node() ).thenReturn( recordCursor ); + when( nodeStore.newPageCursor() ).thenReturn( pageCursor ); } @SuppressWarnings( "unchecked" ) @@ -96,7 +100,7 @@ public void nothingInAppendMode() throws Throwable { // given TestRun test = new TestRun( APPEND, new Operation[0] ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -108,7 +112,7 @@ public void nothingInFetchMode() throws Throwable { // given TestRun test = new TestRun( FETCH, new Operation[0] ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -120,7 +124,7 @@ public void anElementInAppendMode( LongFunction op0 ) throws Throwabl { // given TestRun test = new TestRun( APPEND, new Operation[]{op0.apply( 0 )} ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -132,7 +136,7 @@ public void anElementInFetchMode( LongFunction op0 ) throws Throwable { // given TestRun test = new TestRun( FETCH, new Operation[]{op0.apply( 0 )} ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -144,7 +148,7 @@ public void twoElementsInAppendMode( LongFunction op0, LongFunction cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -156,7 +160,7 @@ public void twoElementsInFetchMode( LongFunction op0, LongFunction cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -169,7 +173,7 @@ public void threeElementsInAppendMode( LongFunction op0, LongFunction { // given TestRun test = new TestRun( APPEND, new Operation[]{op0.apply( 0 ), op1.apply( 1 ), op2.apply( 2 )} ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -182,7 +186,7 @@ public void threeElementsInFetchMode( LongFunction op0, LongFunction< { // given TestRun test = new TestRun( FETCH, new Operation[]{op0.apply( 0 ), op1.apply( 1 ), op2.apply( 2 )} ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -196,7 +200,7 @@ public void fourElementsInAppendMode( LongFunction op0, LongFunction< // given TestRun test = new TestRun( APPEND, new Operation[]{op0.apply( 0 ), op1.apply( 1 ), op2.apply( 2 ), op3.apply( 3 )} ); - Cursor cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); @@ -210,21 +214,23 @@ public void fourElementsInFetchMode( LongFunction op0, LongFunction cursor = test.initialize( reusableCursor, recordCursor, nodeRecord ); + Cursor cursor = test.initialize( reusableCursor, nodeStore, pageCursor, nodeRecord ); // when/then test.runAndVerify( cursor ); } @Test - public void shouldCallTheConsumerOnClose() + public void shouldCloseThePageCursorAndCallTheConsumerOnClose() { MutableBoolean called = new MutableBoolean(); - NodeCursor cursor = new NodeCursor( nodeRecord, c -> called.setTrue(), recordCursors, NO_LOCK_SERVICE ); + NodeCursor cursor = + new NodeCursor( nodeRecord, c -> called.setTrue(), nodeStore, recordCursors, NO_LOCK_SERVICE ); cursor.init( mock( Progression.class ), mock( ReadableTransactionState.class ) ); assertFalse( called.booleanValue() ); cursor.close(); + verify( pageCursor ).close(); assertTrue( called.booleanValue() ); } @@ -236,12 +242,12 @@ private ModifiedNode( long id ) } @Override - public TxState prepare( RecordCursor recordCursor, NodeRecord nodeRecord, TxState state ) + public TxState prepare( NodeStore nodeStore, PageCursor pageCursor, NodeRecord nodeRecord, TxState state ) { state = state == null ? new TxState() : state; state.nodeDoAddLabel( 6 + (int) id, id ); state.nodeDoRemoveLabel( 5 + (int) id, id ); - return super.prepare( recordCursor, nodeRecord, state ); + return super.prepare( nodeStore, pageCursor, nodeRecord, state ); } @Override @@ -269,7 +275,7 @@ public long id() } @Override - public TxState prepare( RecordCursor recordCursor, NodeRecord nodeRecord, TxState state ) + public TxState prepare( NodeStore nodeStore, PageCursor pageCursor, NodeRecord nodeRecord, TxState state ) { this.state = state = state == null ? new TxState() : state; state.nodeDoCreate( id ); @@ -326,11 +332,11 @@ public long id() } @Override - public TxState prepare( RecordCursor recordCursor, NodeRecord nodeRecord, TxState state ) + public TxState prepare( NodeStore nodeStore, PageCursor pageCursor, NodeRecord nodeRecord, TxState state ) { state = state == null ? new TxState() : state; state.nodeDoDelete( id ); - record( id, recordCursor, nodeRecord, state ); + record( id, nodeStore, pageCursor, nodeRecord, state ); return state; } @@ -370,9 +376,9 @@ public long id() } @Override - public TxState prepare( RecordCursor recordCursor, NodeRecord nodeRecord, TxState state ) + public TxState prepare( NodeStore nodeStore, PageCursor pageCursor, NodeRecord nodeRecord, TxState state ) { - expected = record( id, recordCursor, nodeRecord, state ); + expected = record( id, nodeStore, pageCursor, nodeRecord, state ); return state; } @@ -413,19 +419,26 @@ public String toString() } } - private static NodeItem record( long id, RecordCursor recordCursor, NodeRecord nodeRecord, + private static NodeItem record( long id, NodeStore nodeStore, PageCursor pageCursor, NodeRecord nodeRecord, ReadableTransactionState state ) { boolean dense = id % 2 == 0; int nextProp = 42 + (int) id; long nextRel = 43 + id; long[] labelIds = new long[]{4 + id, 5 + id}; - when( recordCursor.next( id, nodeRecord, CHECK ) ).thenAnswer( invocationOnMock -> + try { - nodeRecord.setId( id ); - nodeRecord.initialize( true, nextProp, dense, nextRel, inlinedLabelsLongRepresentation( labelIds ) ); - return true; - } ); + doAnswer( invocationOnMock -> + { + nodeRecord.setId( id ); + nodeRecord.initialize( true, nextProp, dense, nextRel, inlinedLabelsLongRepresentation( labelIds ) ); + return null; + } ).when( nodeStore ).readIntoRecord( id, nodeRecord, CHECK, pageCursor ); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } return new NodeItem() { @Override @@ -484,7 +497,7 @@ private interface Operation { long id(); - TxState prepare( RecordCursor recordCursor, NodeRecord nodeRecord, TxState state ); + TxState prepare( NodeStore nodeStore, PageCursor pageCursor, NodeRecord nodeRecord, TxState state ); boolean fromDisk(); @@ -504,11 +517,12 @@ private TestRun( Mode mode, Operation[] ops ) this.ops = ops; } - Cursor initialize( NodeCursor cursor, RecordCursor recordCursor, NodeRecord nodeRecord ) + Cursor initialize( NodeCursor cursor, NodeStore nodeStore, PageCursor pageCursor, + NodeRecord nodeRecord ) { for ( Operation op : ops ) { - state = op.prepare( recordCursor, nodeRecord, state ); + state = op.prepare( nodeStore, pageCursor, nodeRecord, state ); } return cursor.init( createProgression( ops, mode ), state ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java index 2906fd6e9a20..1d3d458a611a 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java @@ -472,8 +472,9 @@ private Set degrees( NodeItem nodeItem ) @SuppressWarnings( "unchecked" ) private NodeCursor newCursor( long nodeId ) { - NodeCursor cursor = new NodeCursor( new NodeRecord( -1 ), mock( Consumer.class ), - new RecordCursors( resolveNeoStores() ), NO_LOCK_SERVICE ); + NeoStores neoStores = resolveNeoStores(); + NodeCursor cursor = new NodeCursor( new NodeRecord( -1 ), mock( Consumer.class ), neoStores.getNodeStore(), + new RecordCursors( neoStores ), NO_LOCK_SERVICE ); cursor.init( new SingleNodeProgression( nodeId ), null ); assertTrue( cursor.next() ); return cursor; diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/RecordCursorsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/RecordCursorsTest.java index ab0a066e517a..179bf9f870a3 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/RecordCursorsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/RecordCursorsTest.java @@ -66,7 +66,6 @@ public void closesCursorsEvenIfSomeCursorFailsToClose() private static void verifyAllCursorsClosed( RecordCursors recordCursors ) { - verify( recordCursors.node() ).close(); verify( recordCursors.relationship() ).close(); verify( recordCursors.relationshipGroup() ).close(); verify( recordCursors.property() ).close();