Skip to content

Commit

Permalink
Label index reader can be kept up2date with concurrent updates
Browse files Browse the repository at this point in the history
This is important for index population to see created entities
right in front of it while it's scanning and pauses to apply
external updates. Internally each time a batch of updates
have been applied the label index reader is restarted from the
current idRange and seen ids within that idRange skipped.

This allows populators to have the guarantee that updates coming
in from the scan haven't been indexed before and can therefore
be indexed as efficiently as possible.
  • Loading branch information
tinwelint committed Sep 10, 2018
1 parent d42b5bb commit ad4a33f
Show file tree
Hide file tree
Showing 22 changed files with 205 additions and 64 deletions.
Expand Up @@ -36,7 +36,6 @@
import java.util.function.IntPredicate;
import java.util.stream.Collectors;

import org.neo4j.collection.PrimitiveLongResourceIterator;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.neo4j.kernel.impl.store.SchemaStorage;
import org.neo4j.kernel.impl.transaction.state.DirectIndexUpdates;
import org.neo4j.kernel.impl.transaction.state.storeview.DynamicIndexStoreView;
import org.neo4j.kernel.impl.transaction.state.storeview.EntityIdIterator;
import org.neo4j.kernel.impl.transaction.state.storeview.LabelScanViewNodeStoreScan;
import org.neo4j.kernel.impl.transaction.state.storeview.NeoStoreIndexStoreView;
import org.neo4j.logging.NullLogProvider;
Expand Down Expand Up @@ -529,20 +529,20 @@ private class LabelScanViewNodeStoreWrapper<FAILURE extends Exception> extends L
}

@Override
public PrimitiveLongResourceIterator getEntityIdIterator()
public EntityIdIterator getEntityIdIterator()
{
PrimitiveLongResourceIterator originalIterator = delegate.getEntityIdIterator();
return new DelegatingPrimitiveLongResourceIterator( originalIterator, customAction );
EntityIdIterator originalIterator = delegate.getEntityIdIterator();
return new DelegatingEntityIdIterator( originalIterator, customAction );
}
}

private class DelegatingPrimitiveLongResourceIterator implements PrimitiveLongResourceIterator
private class DelegatingEntityIdIterator implements EntityIdIterator
{
private final Runnable customAction;
private final PrimitiveLongResourceIterator delegate;
private final EntityIdIterator delegate;

DelegatingPrimitiveLongResourceIterator(
PrimitiveLongResourceIterator delegate,
DelegatingEntityIdIterator(
EntityIdIterator delegate,
Runnable customAction )
{
this.delegate = delegate;
Expand Down Expand Up @@ -571,6 +571,12 @@ public void close()
{
delegate.close();
}

@Override
public void invalidateCache()
{
delegate.invalidateCache();
}
}

private class UpdateGenerator implements Runnable
Expand Down
Expand Up @@ -531,6 +531,11 @@ public void stop()
latch.finish();
}

@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update, long currentlyIndexedNodeId )
{
}

@Override
public PopulationProgress getProgress()
{
Expand Down
Expand Up @@ -42,10 +42,11 @@ public interface LabelScanReader extends Resource
void nodesWithLabel( IndexProgressor.NodeLabelClient client, int labelId );

/**
* @param startId the entity id to start at.
* @param labelIds label token ids.
* @return node ids with any of the given label ids.
*/
PrimitiveLongResourceIterator nodesWithAnyOfLabels( int... labelIds );
PrimitiveLongResourceIterator nodesWithAnyOfLabels( long startId, int... labelIds );

/**
* @param labelIds label token ids.
Expand Down
Expand Up @@ -348,9 +348,9 @@ private boolean removeFromOngoingPopulations( IndexPopulation indexPopulation )
return populations.remove( indexPopulation );
}

void populateFromQueueBatched( long currentlyIndexedNodeId )
boolean populateFromQueueBatched( long currentlyIndexedNodeId )
{
populateFromQueue( QUEUE_THRESHOLD, currentlyIndexedNodeId );
return populateFromQueue( QUEUE_THRESHOLD, currentlyIndexedNodeId );
}

void flushAll()
Expand All @@ -372,8 +372,10 @@ protected void flush( IndexPopulation population )

/**
* Populates external updates from the update queue if there are {@code queueThreshold} or more queued updates.
*
* @return whether or not there were external updates applied.
*/
void populateFromQueue( int queueThreshold, long currentlyIndexedNodeId )
boolean populateFromQueue( int queueThreshold, long currentlyIndexedNodeId )
{
int queueSize = updatesQueue.size();
if ( queueSize > 0 && queueSize >= queueThreshold )
Expand All @@ -392,7 +394,9 @@ void populateFromQueue( int queueThreshold, long currentlyIndexedNodeId )
}
while ( !updatesQueue.isEmpty() );
}
return true;
}
return false;
}

private void forEachPopulation( ThrowingConsumer<IndexPopulation,Exception> action )
Expand Down Expand Up @@ -632,8 +636,7 @@ private class EntityPopulationVisitor implements Visitor<EntityUpdates,
public boolean visit( EntityUpdates updates )
{
add( updates );
populateFromQueueBatched( updates.getEntityId() );
return false;
return populateFromQueueBatched( updates.getEntityId() );
}

private void add( EntityUpdates updates )
Expand Down
Expand Up @@ -39,10 +39,22 @@
*/
class LabelScanValueIterator extends LabelScanValueIndexAccessor implements PrimitiveLongResourceIterator
{
private long fromId;
private boolean hasNextDecided;
private boolean hasNext;
protected long next;

/**
* @param fromId entity to start from (exclusive). The cursor gives entries that are effectively small bit-sets and the fromId may
* be somewhere inside a bit-set range.
*/
LabelScanValueIterator( RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> cursor,
Collection<RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException>> toRemoveFromWhenClosed, long fromId )
{
super( toRemoveFromWhenClosed, cursor );
this.fromId = fromId;
}

@Override
public boolean hasNext()
{
Expand All @@ -65,12 +77,6 @@ public long next()
return next;
}

LabelScanValueIterator( RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> cursor,
Collection<RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException>> toRemoveFromWhenClosed )
{
super( toRemoveFromWhenClosed, cursor );
}

/**
* @return next node id in the current {@link LabelScanValue} or, if current value exhausted,
* goes to next {@link LabelScanValue} from {@link RawCursor}. Returns {@code true} if next node id
Expand Down Expand Up @@ -106,6 +112,15 @@ protected boolean fetchNext()
baseNodeId = hit.key().idRange * LabelScanValue.RANGE_SIZE;
bits = hit.value().bits;

if ( fromId > 0 )
{
// If we've been told to start at a specific id then trim off ids in this range less than or equal to that id
long relativeStartId = fromId % LabelScanValue.RANGE_SIZE;
bits &= ~((1L << (relativeStartId + 1)) - 1);
// ... and let's not do that again, only for the first idRange
fromId = 0;
}

//noinspection AssertWithSideEffects
assert keysInOrder( hit.key() );
}
Expand Down
Expand Up @@ -36,6 +36,8 @@
import org.neo4j.storageengine.api.schema.IndexProgressor;
import org.neo4j.storageengine.api.schema.LabelScanReader;

import static org.neo4j.kernel.impl.index.labelscan.NativeLabelScanWriter.rangeOf;

/**
* {@link LabelScanReader} for reading data from {@link NativeLabelScanStore}.
* Each {@link LongIterator} returned from each of the methods is backed by {@link RawCursor}
Expand Down Expand Up @@ -85,28 +87,28 @@ public PrimitiveLongResourceIterator nodesWithLabel( int labelId )
RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> cursor;
try
{
cursor = seekerForLabel( labelId );
cursor = seekerForLabel( 0, labelId );
openCursors.add( cursor );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}

return new LabelScanValueIterator( cursor, openCursors );
return new LabelScanValueIterator( cursor, openCursors, 0 );
}

@Override
public PrimitiveLongResourceIterator nodesWithAnyOfLabels( int... labelIds )
public PrimitiveLongResourceIterator nodesWithAnyOfLabels( long fromId, int... labelIds )
{
List<PrimitiveLongResourceIterator> iterators = iteratorsForLabels( labelIds );
List<PrimitiveLongResourceIterator> iterators = iteratorsForLabels( fromId, labelIds );
return new CompositeLabelScanValueIterator( iterators, false );
}

@Override
public PrimitiveLongResourceIterator nodesWithAllLabels( int... labelIds )
{
List<PrimitiveLongResourceIterator> iterators = iteratorsForLabels( labelIds );
List<PrimitiveLongResourceIterator> iterators = iteratorsForLabels( 0, labelIds );
return new CompositeLabelScanValueIterator( iterators, true );
}

Expand All @@ -116,7 +118,7 @@ public void nodesWithLabel( IndexProgressor.NodeLabelClient client, int labelId
RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> cursor;
try
{
cursor = seekerForLabel( labelId );
cursor = seekerForLabel( 0, labelId );
openCursors.add( cursor );
}
catch ( IOException e )
Expand All @@ -127,16 +129,16 @@ public void nodesWithLabel( IndexProgressor.NodeLabelClient client, int labelId
client.scan( new LabelScanValueIndexProgressor( cursor, openCursors, client ), false, labelId );
}

private List<PrimitiveLongResourceIterator> iteratorsForLabels( int[] labelIds )
private List<PrimitiveLongResourceIterator> iteratorsForLabels( long fromId, int[] labelIds )
{
List<PrimitiveLongResourceIterator> iterators = new ArrayList<>();
try
{
for ( int labelId : labelIds )
{
RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> cursor = seekerForLabel( labelId );
RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> cursor = seekerForLabel( fromId, labelId );
openCursors.add( cursor );
iterators.add( new LabelScanValueIterator( cursor, openCursors ) );
iterators.add( new LabelScanValueIterator( cursor, openCursors, fromId ) );
}
}
catch ( IOException e )
Expand All @@ -146,9 +148,9 @@ private List<PrimitiveLongResourceIterator> iteratorsForLabels( int[] labelIds )
return iterators;
}

private RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> seekerForLabel( int labelId ) throws IOException
private RawCursor<Hit<LabelScanKey,LabelScanValue>,IOException> seekerForLabel( long startId, int labelId ) throws IOException
{
LabelScanKey from = new LabelScanKey( labelId, 0 );
LabelScanKey from = new LabelScanKey( labelId, rangeOf( startId ) );
LabelScanKey to = new LabelScanKey( labelId, Long.MAX_VALUE );
return index.seek( from, to );
}
Expand Down
Expand Up @@ -253,7 +253,7 @@ private void flushPendingRange() throws IOException
}
}

private static long rangeOf( long nodeId )
static long rangeOf( long nodeId )
{
return nodeId / RANGE_SIZE;
}
Expand Down
Expand Up @@ -240,7 +240,7 @@ public void nodeLabelUnionScan( NodeLabelIndexCursor cursor, int... labels )

DefaultNodeLabelIndexCursor client = (DefaultNodeLabelIndexCursor) cursor;
client.setRead( this );
client.unionScan( new NodeLabelIndexProgressor( labelScanReader().nodesWithAnyOfLabels( labels ), client ),
client.unionScan( new NodeLabelIndexProgressor( labelScanReader().nodesWithAnyOfLabels( 0, labels ), client ),
false, labels );
}

Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.state.storeview;

import org.neo4j.collection.PrimitiveLongResourceIterator;

public interface EntityIdIterator extends PrimitiveLongResourceIterator
{
void invalidateCache();
}
Expand Up @@ -19,23 +19,25 @@
*/
package org.neo4j.kernel.impl.transaction.state.storeview;

import org.eclipse.collections.api.iterator.LongIterator;

import org.neo4j.collection.PrimitiveLongResourceIterator;
import org.neo4j.storageengine.api.schema.LabelScanReader;

/**
* Node id iterator used during index population when we go over node ids indexed in label scan store.
*/
class LabelScanViewIdIterator implements PrimitiveLongResourceIterator
class LabelScanViewIdIterator implements EntityIdIterator
{
private LabelScanReader labelScanReader;
private LongIterator idIterator;
private final int[] labelIds;
private final LabelScanReader labelScanReader;

private PrimitiveLongResourceIterator idIterator;
private long lastReturnedId = -1;

LabelScanViewIdIterator( LabelScanReader labelScanReader, int[] labelIds )
{
this.labelScanReader = labelScanReader;
this.idIterator = labelScanReader.nodesWithAnyOfLabels( labelIds );
this.idIterator = labelScanReader.nodesWithAnyOfLabels( 0, labelIds );
this.labelIds = labelIds;
}

@Override
Expand All @@ -53,6 +55,15 @@ public boolean hasNext()
@Override
public long next()
{
return idIterator.next();
long next = idIterator.next();
lastReturnedId = next;
return next;
}

@Override
public void invalidateCache()
{
this.idIterator.close();
this.idIterator = labelScanReader.nodesWithAnyOfLabels( lastReturnedId, labelIds );
}
}
Expand Up @@ -21,7 +21,6 @@

import java.util.function.IntPredicate;

import org.neo4j.collection.PrimitiveLongResourceIterator;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
Expand Down Expand Up @@ -51,7 +50,7 @@ public LabelScanViewNodeStoreScan( NodeStore nodeStore, LockService locks,
}

@Override
public PrimitiveLongResourceIterator getEntityIdIterator()
public EntityIdIterator getEntityIdIterator()
{
return new LabelScanViewIdIterator( labelScanStore.newReader(), labelIds );
}
Expand Down

0 comments on commit ad4a33f

Please sign in to comment.