Skip to content

Commit

Permalink
Delete RecordCursor.next( long ) API
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed May 8, 2017
1 parent 741ca65 commit 5fd0468
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 357 deletions.
Expand Up @@ -26,10 +26,12 @@
import org.neo4j.consistency.checking.RecordCheck;
import org.neo4j.consistency.checking.cache.CacheAccess;
import org.neo4j.consistency.report.ConsistencyReport;
import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.InvalidRecordException;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.StoreType;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.test.rule.NeoStoresRule;

import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -78,21 +80,16 @@ public void shouldStopProcessingRecordsWhenSignalledToStop() throws Exception
reporter, Stage.SEQUENTIAL_FORWARD, CacheAccess.EMPTY );
RecordStore<NodeRecord> nodeStore = new RecordStore.Delegator<NodeRecord>( stores.open().getNodeStore() )
{

@Override
public RecordCursor<NodeRecord> newRecordCursor( NodeRecord record )
public NodeRecord readRecord( long id, NodeRecord target, RecordLoad mode, PageCursor cursor )
throws InvalidRecordException
{
return new RecordCursor.Delegator<NodeRecord>( super.newRecordCursor( record ) )
if ( id == 3 )
{
@Override
public boolean next( long id )
{
if ( id == 3 )
{
processor.stop();
}
return super.next( id );
}
};
processor.stop();
}
return super.readRecord( id, target, mode, cursor );
}
};
nodeStore.updateRecord( node( 0, false, 0, 0 ) );
Expand Down
Expand Up @@ -1028,6 +1028,7 @@ public RECORD getRecord( long id, RECORD record, RecordLoad mode )
}
}

@Override
public RECORD readRecord( long id, RECORD record, RecordLoad mode, PageCursor cursor )
{
try
Expand Down Expand Up @@ -1112,15 +1113,16 @@ public void prepareForCommit( RECORD record )
@Override
public <EXCEPTION extends Exception> void scanAllRecords( Visitor<RECORD,EXCEPTION> visitor ) throws EXCEPTION
{
try ( RecordCursor<RECORD> cursor = newRecordCursor( newRecord() ) )
try ( PageCursor cursor = newPageCursor() )
{
long highId = getHighId();
cursor.acquire( getNumberOfReservedLowIds(), CHECK );
RECORD record = newRecord();
for ( long id = getNumberOfReservedLowIds(); id < highId; id++ )
{
if ( cursor.next( id ) )
readRecord( id, record, CHECK, cursor );
if ( record.inUse() )
{
visitor.visit( cursor.get() );
visitor.visit( record );
}
}
}
Expand All @@ -1136,6 +1138,7 @@ public Collection<RECORD> getRecords( long firstId, RecordLoad mode )
}
}

@Override
public PageCursor newPageCursor()
{
try
Expand Down
Expand Up @@ -67,16 +67,6 @@ public interface RecordCursor<R extends AbstractBaseRecord> extends Cursor<R>
@Override
boolean next();

/**
* An additional way of placing this cursor at an arbitrary record id. This is useful when stride,
* as opposed to following record chains, is controlled from the outside.
* The read record can be accessed using {@link #get()}.
*
* @param id record id to place cursor at.
* @return whether or not that record is in use.
*/
boolean next( long id );

/**
* Read all records in the chain starting from the id this cursor is positioned at using either
* {@link #acquire(long, RecordLoad)} or {@link #placeAt(long, RecordLoad)}. Each next record in the chain is
Expand Down Expand Up @@ -135,11 +125,5 @@ public RecordCursor<R> acquire( long id, RecordLoad mode )
actual.acquire( id, mode );
return this;
}

@Override
public boolean next( long id )
{
return actual.next( id );
}
}
}
Expand Up @@ -116,7 +116,7 @@ public interface RecordStore<RECORD extends AbstractBaseRecord> extends IdSequen

/**
* Same as {@link RecordStore#getRecord(long, RECORD, RecordLoad)} but accept a {@link PageCursor} in input to use
* for the read.
* for the read rather then allocating a new one.
*/
RECORD readRecord( long id, RECORD target, RecordLoad mode, PageCursor cursor ) throws InvalidRecordException;

Expand Down Expand Up @@ -152,6 +152,13 @@ public interface RecordStore<RECORD extends AbstractBaseRecord> extends IdSequen
*/
RecordCursor<RECORD> newRecordCursor( RECORD record );

/**
* Instantiates a new page cursor capable of iterating over records in this store.
*
* @return a new {@link PageCursor} instance capable of reading records in this store.
*/
PageCursor newPageCursor();

/**
* Returns another record id which the given {@code record} references and which a {@link RecordCursor}
* would follow and read next.
Expand Down Expand Up @@ -290,6 +297,12 @@ public RecordCursor<R> newRecordCursor( R record )
return actual.newRecordCursor( record );
}

@Override
public PageCursor newPageCursor()
{
return actual.newPageCursor();
}

@Override
public long getNextRecordReference( R record )
{
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.PrefetchingResourceIterator;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad;

Expand Down Expand Up @@ -56,25 +57,28 @@ public static <R extends AbstractBaseRecord> ResourceIterable<R> scan( final Rec
private static class Scan<R extends AbstractBaseRecord> extends PrefetchingResourceIterator<R>
{
private final PrimitiveLongIterator ids;
private final RecordCursor<R> cursor;
private final RecordStore<R> store;
private final R record;
private final PageCursor cursor;
private final Predicate<? super R>[] filters;

Scan( RecordStore<R> store, boolean forward, final Predicate<? super R>... filters )
{
this.store = store;
this.filters = filters;
this.ids = new StoreIdIterator( store, forward );
this.cursor = store.newRecordCursor( store.newRecord() );
cursor.acquire( 0, RecordLoad.CHECK );
this.record = store.newRecord();
this.cursor = store.newPageCursor();
}

@Override
protected R fetchNextOrNull()
{
while ( ids.hasNext() )
{
if ( cursor.next( ids.next() ) )
store.readRecord( ids.next(), record, RecordLoad.CHECK, cursor );
if ( record.inUse() )
{
R record = cursor.get();
if ( passesFilters( record ) )
{
return record;
Expand Down
Expand Up @@ -47,7 +47,16 @@ public boolean next()
{
try
{
return next( currentId );
assert pageCursor != null : "Not initialized";
if ( NULL_REFERENCE.is( currentId ) )
{
record.clear();
record.setId( NULL_REFERENCE.intValue() );
return false;
}

store.readRecord( currentId, record, mode, pageCursor );
return record.inUse();
}
finally
{
Expand All @@ -60,21 +69,6 @@ public boolean next()
}
}

@Override
public boolean next( long id )
{
assert pageCursor != null : "Not initialized";
if ( NULL_REFERENCE.is( id ) )
{
record.clear();
record.setId( NULL_REFERENCE.intValue() );
return false;
}

store.readRecord( id, record, mode, pageCursor );
return record.inUse();
}

@Override
public void placeAt( long id, RecordLoad mode )
{
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.kernel.impl.storemigration.participant;

import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.unsafe.impl.batchimport.InputIterable;
Expand All @@ -38,22 +38,21 @@ abstract class StoreScanAsInputIterable<INPUT extends InputEntity,RECORD extends
implements InputIterable<INPUT>
{
private final RecordStore<RECORD> store;
private final RecordCursor<RECORD> cursor;
private final StoreSourceTraceability traceability;

StoreScanAsInputIterable( RecordStore<RECORD> store )
{
this.store = store;
this.cursor = store.newRecordCursor( store.newRecord() );
this.traceability = new StoreSourceTraceability( store.toString(), store.getRecordSize() );
}

@Override
public InputIterator<INPUT> iterator()
{
cursor.acquire( 0, CHECK );
return new InputIterator.Adapter<INPUT>()
{
private final RECORD record = store.newRecord();
private final PageCursor cursor = store.newPageCursor();
private final long highId = store.getHighId();
private long id;

Expand Down Expand Up @@ -86,9 +85,8 @@ protected INPUT fetchNextOrNull()
{
while ( id < highId )
{
if ( cursor.next( id++ ) )
if ( store.readRecord( id++, record, CHECK, cursor ).inUse() )
{
RECORD record = cursor.get();
traceability.atId( record.getId() );
return inputEntityOf( record );
}
Expand Down
Expand Up @@ -20,12 +20,13 @@
package org.neo4j.unsafe.impl.batchimport;

import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.impl.store.NodeLabelsField;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.staging.LonelyProcessingStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
Expand All @@ -37,6 +38,8 @@
public class DeleteDuplicateNodesStep extends LonelyProcessingStep
{
private final NodeStore nodeStore;
private final NodeRecord record;
private final PageCursor cursor;
private final PrimitiveLongIterator nodeIds;
private final LabelScanWriter labelScanWriter;

Expand All @@ -45,19 +48,19 @@ public DeleteDuplicateNodesStep( StageControl control, Configuration config, Pri
{
super( control, "DEDUP", config );
this.nodeStore = nodeStore;
this.record = nodeStore.newRecord();
this.cursor = nodeStore.newPageCursor();
this.nodeIds = nodeIds;
this.labelScanWriter = labelScanStore.newWriter();
}

@Override
protected void process() throws IOException
{
NodeRecord record = nodeStore.newRecord();
RecordCursor<NodeRecord> cursor = nodeStore.newRecordCursor( record ).acquire( 0, NORMAL );
while ( nodeIds.hasNext() )
{
long duplicateNodeId = nodeIds.next();
cursor.next( duplicateNodeId );
nodeStore.readRecord( duplicateNodeId, record, NORMAL, cursor );
long[] labels = NodeLabelsField.get( record, nodeStore );
record.setInUse( false );
nodeStore.updateRecord( record );
Expand All @@ -71,6 +74,7 @@ protected void process() throws IOException
@Override
public void close() throws Exception
{
cursor.close();
labelScanWriter.close();
super.close();
}
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
Expand All @@ -37,7 +37,9 @@ public class NodeSetFirstGroupStep extends ProcessorStep<RelationshipGroupRecord
{
private final int batchSize;
private final ByteArray cache;
private final RecordCursor<NodeRecord> nodeRecordCursor;
private final RecordStore<NodeRecord> nodeStore;
private final PageCursor nodeCursor;
private final NodeRecord nodeRecord;

private NodeRecord[] current;
private int cursor;
Expand All @@ -48,17 +50,12 @@ public NodeSetFirstGroupStep( StageControl control, Configuration config,
super( control, "FIRST", config, 1 );
this.cache = cache;
this.batchSize = config.batchSize();
this.nodeRecordCursor = nodeStore.newRecordCursor( nodeStore.newRecord() );
this.nodeStore = nodeStore;
this.nodeRecord = nodeStore.newRecord();
this.nodeCursor = nodeStore.newPageCursor();
newBatch();
}

@Override
public void start( int orderingGuarantees )
{
nodeRecordCursor.acquire( 0, NORMAL );
super.start( orderingGuarantees );
}

private void newBatch()
{
current = new NodeRecord[batchSize];
Expand All @@ -75,10 +72,8 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th
if ( cache.getByte( nodeId, 0 ) == 0 )
{
cache.setByte( nodeId, 0, (byte) 1 );
nodeRecordCursor.next( nodeId );
NodeRecord node = nodeRecordCursor.get().clone();
NodeRecord node = nodeStore.readRecord( nodeId, nodeRecord, NORMAL, nodeCursor ).clone();
node.setNextRel( group.getId() );

current[cursor++] = node;
if ( cursor == batchSize )
{
Expand All @@ -101,7 +96,7 @@ protected void lastCallForEmittingOutstandingBatches( BatchSender sender )
@Override
public void close() throws Exception
{
nodeRecordCursor.close();
nodeCursor.close();
super.close();
}
}

0 comments on commit 5fd0468

Please sign in to comment.