Skip to content

Commit

Permalink
Merge pull request #5 from davidegrohmann/3.3-page-cursors-in-node-cu…
Browse files Browse the repository at this point in the history
…rsors

Use PageCursors directly in NodeCursors
  • Loading branch information
ragadeeshu authored and davidegrohmann committed May 8, 2017
2 parents f9f03c2 + 9761407 commit 48c374c
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 111 deletions.
30 changes: 30 additions & 0 deletions community/common/src/main/java/org/neo4j/function/Disposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.function;

/**
* Dispose the object by freeing all the internal resources.
* The object can't be further reused.
*/
@FunctionalInterface
public interface Disposable
{
void dispose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Optional;

import org.neo4j.function.Disposable;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.SecurityContext;
Expand Down Expand Up @@ -76,7 +77,7 @@
* }
* </pre>
*/
public interface KernelTransaction extends AutoCloseable
public interface KernelTransaction extends AutoCloseable, Disposable
{
enum Type
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ public String toString()
return "KernelTransaction[" + lockSessionId + "]";
}

@Override
public void dispose()
{
storageStatement.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.neo4j.kernel.api.StatementConstants;
import org.neo4j.kernel.impl.store.NodeStore;

public class AllNodeProgression implements Progression
public class AllNodeProgression implements NodeProgression
{
private final AllIdIterator allIdIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,59 @@
*/
package org.neo4j.kernel.impl.api.store;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Consumer;

import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.cursor.Cursor;
import org.neo4j.function.Disposable;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NodeLabelsField;
import org.neo4j.kernel.impl.store.RecordCursors;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.kernel.impl.util.IoPrimitiveUtils;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.txstate.NodeState;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;

import static org.neo4j.collection.primitive.PrimitiveIntCollections.asSet;
import static org.neo4j.kernel.impl.api.store.Progression.Mode.APPEND;
import static org.neo4j.kernel.impl.api.store.Progression.Mode.FETCH;
import static org.neo4j.kernel.impl.api.store.NodeProgression.Mode.APPEND;
import static org.neo4j.kernel.impl.api.store.NodeProgression.Mode.FETCH;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE;
import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK;
import static org.neo4j.kernel.impl.util.IoPrimitiveUtils.safeCastLongToInt;

public class NodeCursor implements NodeItem, Cursor<NodeItem>
public class NodeCursor implements NodeItem, Cursor<NodeItem>, Disposable
{
private final NodeRecord nodeRecord;
private final Consumer<NodeCursor> instanceCache;
private final RecordCursors recordCursors;
private final NodeStore nodeStore;
private final LockService lockService;

private Progression progression;
private NodeProgression progression;
private ReadableTransactionState state;
private boolean fetched;
private long[] labels;
private Iterator<Long> added;
private PageCursor pageCursor;

NodeCursor( NodeRecord nodeRecord, Consumer<NodeCursor> instanceCache, RecordCursors recordCursors,
NodeCursor( NodeRecord nodeRecord, Consumer<NodeCursor> instanceCache, NodeStore nodeStore,
LockService lockService )
{
this.pageCursor = nodeStore.newPageCursor();
this.nodeRecord = nodeRecord;
this.instanceCache = instanceCache;
this.recordCursors = recordCursors;
this.nodeStore = nodeStore;
this.lockService = lockService;
}

public Cursor<NodeItem> init( Progression progression, ReadableTransactionState state )
public Cursor<NodeItem> init( NodeProgression progression, ReadableTransactionState state )
{
this.progression = progression;
this.state = state;
Expand All @@ -88,8 +93,7 @@ private boolean fetchNext()
long id;
while ( (id = progression.nextId()) >= 0 )
{
if ( (state == null || !state.nodeIsDeletedInThisTx( id )) &&
recordCursors.node().next( id, nodeRecord, RecordLoad.CHECK ) )
if ( (state == null || !state.nodeIsDeletedInThisTx( id )) && readNodeRecord( id ) )
{
return true;
}
Expand All @@ -110,6 +114,20 @@ private boolean fetchNext()
return false;
}

private boolean readNodeRecord( long id )
{
try
{
nodeRecord.clear();
nodeStore.readIntoRecord( id, nodeRecord, CHECK, pageCursor );
return nodeRecord.inUse();
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}

private void recordFromTxState( long id )
{
nodeRecord.clear();
Expand All @@ -125,6 +143,13 @@ public void close()
instanceCache.accept( this );
}

@Override
public void dispose()
{
pageCursor.close();
pageCursor = null;
}

@Override
public NodeItem get()
{
Expand Down Expand Up @@ -171,7 +196,7 @@ private long[] loadedLabels()
{
if ( labels == null )
{
labels = NodeLabelsField.get( nodeRecord, recordCursors.label() );
labels = NodeLabelsField.get( nodeRecord, nodeStore );
}
return labels;
}
Expand Down Expand Up @@ -222,7 +247,7 @@ private Lock acquireLock()
try
{
// It's safer to re-read the node record here, specifically nextProp, after acquiring the lock
if ( !recordCursors.node().next( nodeRecord.getId(), nodeRecord, CHECK ) )
if ( !readNodeRecord( nodeRecord.getId() ) )
{
// So it looks like the node has been deleted. The current behavior of NodeStore#loadRecord
// is to only set the inUse field on loading an unused record. This should (and will)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.kernel.impl.api.store;

public interface Progression
public interface NodeProgression
{
enum Mode
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.kernel.api.StatementConstants;

public class SingleNodeProgression implements Progression
public class SingleNodeProgression implements NodeProgression
{
private long nodeId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.IntPredicate;

import org.neo4j.cursor.Cursor;
import org.neo4j.function.Disposable;
import org.neo4j.kernel.api.properties.DefinedProperty;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.store.RecordCursor;
Expand All @@ -33,7 +34,7 @@

import static org.neo4j.kernel.impl.store.record.RecordLoad.FORCE;

public abstract class StoreAbstractPropertyCursor implements Cursor<PropertyItem>, PropertyItem
public abstract class StoreAbstractPropertyCursor implements PropertyItem, Cursor<PropertyItem>, Disposable
{
protected final StorePropertyPayloadCursor payload;
private final RecordCursor<PropertyRecord> recordCursor;
Expand Down Expand Up @@ -165,4 +166,9 @@ public final void close()
}

protected abstract void doClose();

@Override
public void dispose()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.impl.api.store;

import org.neo4j.cursor.Cursor;
import org.neo4j.function.Disposable;
import org.neo4j.kernel.impl.api.RelationshipVisitor;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.locking.LockService;
Expand All @@ -37,7 +38,7 @@
* Base cursor for relationships.
*/
public abstract class StoreAbstractRelationshipCursor
implements RelationshipVisitor<RuntimeException>, Cursor<RelationshipItem>, RelationshipItem
implements RelationshipVisitor<RuntimeException>, RelationshipItem, Cursor<RelationshipItem>, Disposable
{
protected final RelationshipRecord relationshipRecord;
final RecordCursor<RelationshipRecord> relationshipRecordCursor;
Expand Down Expand Up @@ -98,8 +99,7 @@ public final long endNode()
@Override
public final long otherNode( long nodeId )
{
return relationshipRecord.getFirstNode() == nodeId ?
relationshipRecord.getSecondNode() : relationshipRecord.getFirstNode();
return relationshipRecord.getFirstNode() == nodeId ? relationshipRecord.getSecondNode() : relationshipRecord.getFirstNode();
}

@Override
Expand Down Expand Up @@ -155,4 +155,9 @@ public void close()
{
fetched = false;
}

@Override
public void dispose()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class StorePropertyCursor extends StoreAbstractPropertyCursor
{
private final Consumer<StorePropertyCursor> instanceCache;

private boolean fromDisk;
private Iterator<StorageProperty> storagePropertyIterator;

public StorePropertyCursor( RecordCursors cursors, Consumer<StorePropertyCursor> instanceCache )
Expand Down Expand Up @@ -76,7 +75,6 @@ protected DefinedProperty nextAdded()
@Override
protected void doClose()
{
fromDisk = false;
instanceCache.accept( this );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public StoreStatement( NeoStores neoStores, Supplier<IndexReaderFactory> indexRe
@Override
protected NodeCursor create()
{
return new NodeCursor( nodeStore.newRecord(), this, recordCursors, lockService );
return new NodeCursor( nodeStore.newRecord(), this, nodeStore, lockService );
}
};
singleRelationshipCursor = new InstanceCache<StoreSingleRelationshipCursor>()
Expand Down Expand Up @@ -210,6 +210,12 @@ public void close()
{
assert !closed;
closeSchemaResources();
nodeCursor.close();
singleRelationshipCursor.close();
iteratorRelationshipCursor.close();
nodeRelationshipsCursor.close();
propertyCursorCache.close();
singlePropertyCursorCache.close();
recordCursors.close();
closed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ public RECORD getRecord( long id, RECORD record, RecordLoad mode )
}
}

void readIntoRecord( long id, RECORD record, RecordLoad mode, PageCursor cursor ) throws IOException
public void readIntoRecord( long id, RECORD record, RecordLoad mode, PageCursor cursor ) throws IOException
{
// Mark the record with this id regardless of whether or not we load the contents of it.
// This is done in this method since there are multiple call sites and they all want the id
Expand Down Expand Up @@ -1123,6 +1123,18 @@ public Collection<RECORD> getRecords( long firstId, RecordLoad mode )
}
}

public PageCursor newPageCursor()
{
try
{
return storeFile.io( getNumberOfReservedLowIds(), PF_SHARED_READ_LOCK );
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}

@Override
public RecordCursor<RECORD> newRecordCursor( final RECORD record )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ public static long[] get( NodeRecord node, NodeStore nodeStore )
return getDynamicLabelsArray( node.getUsedDynamicLabelRecords(), nodeStore.getDynamicLabelStore() );
}

public static long[] get( NodeRecord node, RecordCursor<DynamicRecord> dynamicLabelCursor )
{
if ( node.isLight() )
{
NodeStore.ensureHeavy( node, dynamicLabelCursor );
}
return getDynamicLabelsArrayFromHeavyRecords( node.getUsedDynamicLabelRecords() );
}

@Override
public long[] getIfLoaded()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ public static long[] get( NodeRecord node, NodeStore nodeStore )
: InlineNodeLabels.get( node );
}

public static long[] get( NodeRecord node, RecordCursor<DynamicRecord> dynamicLabelCursor )
{
return fieldPointsToDynamicRecordOfLabels( node.getLabelField() )
? DynamicNodeLabels.get( node, dynamicLabelCursor )
: InlineNodeLabels.get( node );
}

public static boolean fieldPointsToDynamicRecordOfLabels( long labelField )
{
return (labelField & 0x8000000000L) != 0;
Expand Down

0 comments on commit 48c374c

Please sign in to comment.