Skip to content

Commit

Permalink
Removes usages of RecordCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jul 2, 2018
1 parent 678780a commit 3382a96
Show file tree
Hide file tree
Showing 21 changed files with 123 additions and 171 deletions.
Expand Up @@ -55,11 +55,6 @@ public abstract class AbstractStoreProcessor extends RecordStore.Processor<Runti
private final RecordCheck<LabelTokenRecord,LabelTokenConsistencyReport> labelTokenChecker; private final RecordCheck<LabelTokenRecord,LabelTokenConsistencyReport> labelTokenChecker;
private final RecordCheck<RelationshipGroupRecord,RelationshipGroupConsistencyReport> relationshipGroupChecker; private final RecordCheck<RelationshipGroupRecord,RelationshipGroupConsistencyReport> relationshipGroupChecker;


public AbstractStoreProcessor()
{
this( CheckDecorator.NONE );
}

public AbstractStoreProcessor( CheckDecorator decorator ) public AbstractStoreProcessor( CheckDecorator decorator )
{ {
this.sparseNodeChecker = decorator.decorateNodeChecker( NodeRecordCheck.forSparseNodes() ); this.sparseNodeChecker = decorator.decorateNodeChecker( NodeRecordCheck.forSparseNodes() );
Expand Down
Expand Up @@ -76,11 +76,6 @@ public Stage getStage()
return stage; return stage;
} }


public int getStageIndex()
{
return stage.ordinal();
}

@Override @Override
public void processNode( RecordStore<NodeRecord> store, NodeRecord node ) public void processNode( RecordStore<NodeRecord> store, NodeRecord node )
{ {
Expand Down
Expand Up @@ -26,10 +26,12 @@
import org.neo4j.consistency.checking.RecordCheck; import org.neo4j.consistency.checking.RecordCheck;
import org.neo4j.consistency.checking.cache.CacheAccess; import org.neo4j.consistency.checking.cache.CacheAccess;
import org.neo4j.consistency.report.ConsistencyReport; import org.neo4j.consistency.report.ConsistencyReport;
import org.neo4j.kernel.impl.store.RecordCursor; import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.InvalidRecordException;
import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.StoreType; import org.neo4j.kernel.impl.store.StoreType;
import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.test.rule.NeoStoresRule; import org.neo4j.test.rule.NeoStoresRule;


import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -80,20 +82,13 @@ public void shouldStopProcessingRecordsWhenSignalledToStop() throws Exception
stores.builder().build().getNodeStore() ) stores.builder().build().getNodeStore() )
{ {
@Override @Override
public RecordCursor<NodeRecord> newRecordCursor( NodeRecord record ) public void getRecordByCursor( long id, NodeRecord target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException
{ {
return new RecordCursor.Delegator<NodeRecord>( super.newRecordCursor( record ) ) if ( id == 3 )
{ {
@Override processor.stop();
public boolean next( long id ) }
{ super.getRecordByCursor( id, target, mode, cursor );
if ( id == 3 )
{
processor.stop();
}
return super.next( id );
}
};
} }
}; };
nodeStore.updateRecord( node( 0, false, 0, 0 ) ); nodeStore.updateRecord( node( 0, false, 0, 0 ) );
Expand Down
Expand Up @@ -23,16 +23,13 @@
import org.neo4j.kernel.impl.newapi.RelationshipReferenceEncoding; import org.neo4j.kernel.impl.newapi.RelationshipReferenceEncoding;
import org.neo4j.kernel.impl.store.NodeLabelsField; import org.neo4j.kernel.impl.store.NodeLabelsField;
import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.storageengine.api.StorageNodeCursor; import org.neo4j.storageengine.api.StorageNodeCursor;


public class RecordNodeCursor extends NodeRecord implements StorageNodeCursor public class RecordNodeCursor extends NodeRecord implements StorageNodeCursor
{ {
private NodeStore read; private NodeStore read;
private RecordCursor<DynamicRecord> labelCursor;
private PageCursor pageCursor; private PageCursor pageCursor;
private long next; private long next;
private long highMark; private long highMark;
Expand Down Expand Up @@ -89,14 +86,14 @@ public long nodeReference()
@Override @Override
public long[] labels() public long[] labels()
{ {
return NodeLabelsField.get( this, labelCursor() ); return NodeLabelsField.get( this, read );
} }


@Override @Override
public boolean hasLabel( int label ) public boolean hasLabel( int label )
{ {
//Get labels from store and put in intSet, unfortunately we get longs back //Get labels from store and put in intSet, unfortunately we get longs back
long[] longs = NodeLabelsField.get( this, labelCursor() ); long[] longs = NodeLabelsField.get( this, read );
for ( long labelToken : longs ) for ( long labelToken : longs )
{ {
if ( labelToken == label ) if ( labelToken == label )
Expand Down Expand Up @@ -204,15 +201,6 @@ private void reset()
clear(); clear();
} }


private RecordCursor<DynamicRecord> labelCursor()
{
if ( labelCursor == null )
{
labelCursor = read.newLabelCursor();
}
return labelCursor;
}

private boolean isSingle() private boolean isSingle()
{ {
return highMark == NO_ID; return highMark == NO_ID;
Expand All @@ -237,12 +225,6 @@ public String toString()
@Override @Override
public void release() public void release()
{ {
if ( labelCursor != null )
{
labelCursor.close();
labelCursor = null;
}

if ( pageCursor != null ) if ( pageCursor != null )
{ {
pageCursor.close(); pageCursor.close();
Expand Down
Expand Up @@ -186,7 +186,7 @@ public DynamicRecord nextRecord()
return StandardDynamicRecordAllocator.allocateRecord( nextId() ); return StandardDynamicRecordAllocator.allocateRecord( nextId() );
} }


public void allocateRecordsFromBytes( Collection<DynamicRecord> target, byte[] src ) void allocateRecordsFromBytes( Collection<DynamicRecord> target, byte[] src )
{ {
allocateRecordsFromBytes( target, src, this ); allocateRecordsFromBytes( target, src, this );
} }
Expand All @@ -198,8 +198,7 @@ public String toString()
", blockSize:" + getRecordDataSize() + "]"; ", blockSize:" + getRecordDataSize() + "]";
} }


public Pair<byte[]/*header in the first record*/, byte[]/*all other bytes*/> readFullByteArray( Pair<byte[]/*header in the first record*/, byte[]/*all other bytes*/> readFullByteArray( Iterable<DynamicRecord> records, PropertyType propertyType )
Iterable<DynamicRecord> records, PropertyType propertyType )
{ {
for ( DynamicRecord record : records ) for ( DynamicRecord record : records )
{ {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption; import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;


import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
Expand Down Expand Up @@ -415,6 +416,7 @@ public boolean isInUse( long id )
/** /**
* DANGER: make sure to always close this cursor. * DANGER: make sure to always close this cursor.
*/ */
@Override
public PageCursor openPageCursorForReading( long id ) public PageCursor openPageCursorForReading( long id )
{ {
try try
Expand Down Expand Up @@ -1159,15 +1161,16 @@ public void prepareForCommit( RECORD record, IdSequence idSequence )
@Override @Override
public <EXCEPTION extends Exception> void scanAllRecords( Visitor<RECORD,EXCEPTION> visitor ) throws EXCEPTION public <EXCEPTION extends Exception> void scanAllRecords( Visitor<RECORD,EXCEPTION> visitor ) throws EXCEPTION
{ {
try ( RecordCursor<RECORD> cursor = newRecordCursor( newRecord() ) ) try ( PageCursor cursor = openPageCursorForReading( 0 ) )
{ {
RECORD record = newRecord();
long highId = getHighId(); long highId = getHighId();
cursor.acquire( getNumberOfReservedLowIds(), CHECK );
for ( long id = getNumberOfReservedLowIds(); id < highId; id++ ) for ( long id = getNumberOfReservedLowIds(); id < highId; id++ )
{ {
if ( cursor.next( id ) ) getRecordByCursor( id, record, CHECK, cursor );
if ( record.inUse() )
{ {
visitor.visit( cursor.get() ); visitor.visit( record );
} }
} }
} }
Expand All @@ -1176,11 +1179,24 @@ public <EXCEPTION extends Exception> void scanAllRecords( Visitor<RECORD,EXCEPTI
@Override @Override
public Collection<RECORD> getRecords( long firstId, RecordLoad mode ) public Collection<RECORD> getRecords( long firstId, RecordLoad mode )
{ {
try ( RecordCursor<RECORD> cursor = newRecordCursor( newRecord() ) ) Collection<RECORD> records = new ArrayList<>();
long id = firstId;
try ( PageCursor cursor = openPageCursorForReading( firstId ) )
{ {
cursor.acquire( firstId, mode ); RECORD record;
return cursor.getAll(); do
{
record = newRecord();
getRecordByCursor( id, record, mode, cursor );
if ( record.inUse() )
{
records.add( record );
}
id = getNextRecordReference( record );
}
while ( record.inUse() && !Record.NULL_REFERENCE.is( id ) );
} }
return records;
} }


@Override @Override
Expand All @@ -1193,7 +1209,6 @@ private void verifyAfterNotRead( RECORD record, RecordLoad mode )
{ {
record.clear(); record.clear();
mode.verify( record ); mode.verify( record );

} }


final void checkForDecodingErrors( PageCursor cursor, long recordId, RecordLoad mode ) final void checkForDecodingErrors( PageCursor cursor, long recordId, RecordLoad mode )
Expand Down
Expand Up @@ -64,15 +64,6 @@ public static long[] get( NodeRecord node, NodeStore nodeStore )
return getDynamicLabelsArray( node.getUsedDynamicLabelRecords(), nodeStore.getDynamicLabelStore() ); return getDynamicLabelsArray( node.getUsedDynamicLabelRecords(), nodeStore.getDynamicLabelStore() );
} }


public static long[] get( NodeRecord node, RecordCursor<DynamicRecord> dynamicLabelCursor )
{
if ( node.isLight() )
{
NodeStore.ensureHeavy( node, dynamicLabelCursor );
}
return getDynamicLabelsArrayFromHeavyRecords( node.getUsedDynamicLabelRecords() );
}

@Override @Override
public long[] getIfLoaded() public long[] getIfLoaded()
{ {
Expand All @@ -91,8 +82,7 @@ public Collection<DynamicRecord> put( long[] labelIds, NodeStore nodeStore, Dyna
return putSorted( node, labelIds, nodeStore, allocator ); return putSorted( node, labelIds, nodeStore, allocator );
} }


public static Collection<DynamicRecord> putSorted( NodeRecord node, long[] labelIds, NodeStore nodeStore, static Collection<DynamicRecord> putSorted( NodeRecord node, long[] labelIds, NodeStore nodeStore, DynamicRecordAllocator allocator )
DynamicRecordAllocator allocator )
{ {
long existingLabelsField = node.getLabelField(); long existingLabelsField = node.getLabelField();
long existingLabelsBits = parseLabelsBody( existingLabelsField ); long existingLabelsBits = parseLabelsBody( existingLabelsField );
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.kernel.impl.store; package org.neo4j.kernel.impl.store;


import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.NodeRecord;


/** /**
Expand Down Expand Up @@ -59,13 +58,6 @@ public static long[] get( NodeRecord node, NodeStore nodeStore )
: InlineNodeLabels.get( node ); : InlineNodeLabels.get( node );
} }


public static long[] get( NodeRecord node, RecordCursor<DynamicRecord> dynamicLabelCursor )
{
return fieldPointsToDynamicRecordOfLabels( node.getLabelField() )
? DynamicNodeLabels.get( node, dynamicLabelCursor )
: InlineNodeLabels.get( node );
}

public static boolean fieldPointsToDynamicRecordOfLabels( long labelField ) public static boolean fieldPointsToDynamicRecordOfLabels( long labelField )
{ {
return (labelField & 0x8000000000L) != 0; return (labelField & 0x8000000000L) != 0;
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.File; import java.io.File;
import java.nio.file.OpenOption; import java.nio.file.OpenOption;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;


import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -107,14 +106,6 @@ public void ensureHeavy( NodeRecord node, long firstDynamicLabelRecord )
node.setLabelField( node.getLabelField(), dynamicLabelStore.getRecords( firstDynamicLabelRecord, RecordLoad.NORMAL ) ); node.setLabelField( node.getLabelField(), dynamicLabelStore.getRecords( firstDynamicLabelRecord, RecordLoad.NORMAL ) );
} }


public static void ensureHeavy( NodeRecord node, RecordCursor<DynamicRecord> dynamicLabelCursor )
{
long firstDynamicLabelId = NodeLabelsField.firstDynamicLabelRecordId( node.getLabelField() );
dynamicLabelCursor.placeAt( firstDynamicLabelId, RecordLoad.NORMAL );
List<DynamicRecord> dynamicLabelRecords = dynamicLabelCursor.getAll();
node.setLabelField( node.getLabelField(), dynamicLabelRecords );
}

@Override @Override
public void updateRecord( NodeRecord record ) public void updateRecord( NodeRecord record )
{ {
Expand Down
Expand Up @@ -115,6 +115,16 @@ public interface RecordStore<RECORD extends AbstractBaseRecord> extends IdSequen
*/ */
RECORD getRecord( long id, RECORD target, RecordLoad mode ) throws InvalidRecordException; RECORD getRecord( long id, RECORD target, RecordLoad mode ) throws InvalidRecordException;


/**
* Opens a {@link PageCursor} on this store, capable of reading records using
* {@link #getRecordByCursor(long, AbstractBaseRecord, RecordLoad, PageCursor)}.
* The caller is responsible for closing it when done with it.
*
* @param id cursor will initially be placed at the page containing this record id.
* @return PageCursor for reading records.
*/
PageCursor openPageCursorForReading( long id );

/** /**
* Reads a record from the store into {@code target}, see * Reads a record from the store into {@code target}, see
* {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)}. * {@link RecordStore#getRecord(long, AbstractBaseRecord, RecordLoad)}.
Expand Down Expand Up @@ -304,6 +314,12 @@ public R getRecord( long id, R target, RecordLoad mode ) throws InvalidRecordExc
return actual.getRecord( id, target, mode ); return actual.getRecord( id, target, mode );
} }


@Override
public PageCursor openPageCursorForReading( long id )
{
return actual.openPageCursorForReading( id );
}

@Override @Override
public void getRecordByCursor( long id, R target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException public void getRecordByCursor( long id, R target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException
{ {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.graphdb.ResourceIterable; import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator; import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.PrefetchingResourceIterator; import org.neo4j.helpers.collection.PrefetchingResourceIterator;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.kernel.impl.store.record.RecordLoad;


Expand Down Expand Up @@ -57,25 +58,28 @@ public static <R extends AbstractBaseRecord> ResourceIterable<R> scan( final Rec
private static class Scan<R extends AbstractBaseRecord> extends PrefetchingResourceIterator<R> private static class Scan<R extends AbstractBaseRecord> extends PrefetchingResourceIterator<R>
{ {
private final LongIterator ids; private final LongIterator ids;
private final RecordCursor<R> cursor; private final RecordStore<R> store;
private final PageCursor cursor;
private final R record;
private final Predicate<? super R>[] filters; private final Predicate<? super R>[] filters;


Scan( RecordStore<R> store, boolean forward, final Predicate<? super R>... filters ) Scan( RecordStore<R> store, boolean forward, final Predicate<? super R>... filters )
{ {
this.filters = filters; this.filters = filters;
this.ids = new StoreIdIterator( store, forward ); this.ids = new StoreIdIterator( store, forward );
this.cursor = store.newRecordCursor( store.newRecord() ); this.store = store;
cursor.acquire( 0, RecordLoad.CHECK ); this.cursor = store.openPageCursorForReading( 0 );
this.record = store.newRecord();
} }


@Override @Override
protected R fetchNextOrNull() protected R fetchNextOrNull()
{ {
while ( ids.hasNext() ) while ( ids.hasNext() )
{ {
if ( cursor.next( ids.next() ) ) store.getRecordByCursor( ids.next(), record, RecordLoad.CHECK, cursor );
if ( record.inUse() )
{ {
R record = cursor.get();
if ( passesFilters( record ) ) if ( passesFilters( record ) )
{ {
return record; return record;
Expand Down
Expand Up @@ -21,6 +21,7 @@


import org.eclipse.collections.api.iterator.LongIterator; import org.eclipse.collections.api.iterator.LongIterator;


import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordCursor; import org.neo4j.kernel.impl.store.RecordCursor;
Expand Down Expand Up @@ -59,13 +60,13 @@ protected void process()
{ {
NodeRecord nodeRecord = nodeStore.newRecord(); NodeRecord nodeRecord = nodeStore.newRecord();
PropertyRecord propertyRecord = propertyStore.newRecord(); PropertyRecord propertyRecord = propertyStore.newRecord();
try ( RecordCursor<NodeRecord> cursor = nodeStore.newRecordCursor( nodeRecord ).acquire( 0, NORMAL ); try ( PageCursor cursor = nodeStore.openPageCursorForReading( 0 );
RecordCursor<PropertyRecord> propertyCursor = propertyStore.newRecordCursor( propertyRecord ).acquire( 0, NORMAL ) ) PageCursor propertyCursor = propertyStore.openPageCursorForReading( 0 ) )
{ {
while ( nodeIds.hasNext() ) while ( nodeIds.hasNext() )
{ {
long duplicateNodeId = nodeIds.next(); long duplicateNodeId = nodeIds.next();
cursor.next( duplicateNodeId ); nodeStore.getRecordByCursor( duplicateNodeId, nodeRecord, NORMAL, cursor );
assert nodeRecord.inUse() : nodeRecord; assert nodeRecord.inUse() : nodeRecord;
// Ensure heavy so that the dynamic label records gets loaded (and then deleted) too // Ensure heavy so that the dynamic label records gets loaded (and then deleted) too
nodeStore.ensureHeavy( nodeRecord ); nodeStore.ensureHeavy( nodeRecord );
Expand All @@ -74,7 +75,7 @@ protected void process()
long nextProp = nodeRecord.getNextProp(); long nextProp = nodeRecord.getNextProp();
while ( !Record.NULL_REFERENCE.is( nextProp ) ) while ( !Record.NULL_REFERENCE.is( nextProp ) )
{ {
propertyCursor.next( nextProp ); propertyStore.getRecordByCursor( nextProp, propertyRecord, NORMAL, propertyCursor );
assert propertyRecord.inUse() : propertyRecord + " for " + nodeRecord; assert propertyRecord.inUse() : propertyRecord + " for " + nodeRecord;
propertyStore.ensureHeavy( propertyRecord ); propertyStore.ensureHeavy( propertyRecord );
propertiesRemoved += propertyRecord.numberOfProperties(); propertiesRemoved += propertyRecord.numberOfProperties();
Expand Down

0 comments on commit 3382a96

Please sign in to comment.