Skip to content

Commit

Permalink
Revert "Merge remote-tracking branch 'upstream/3.3' into 3.4"
Browse files Browse the repository at this point in the history
This reverts commit 36e16d4, reversing
changes made to be5c7e8.
  • Loading branch information
tinwelint committed Aug 29, 2018
1 parent 0577418 commit d04c28c
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 643 deletions.
Expand Up @@ -33,6 +33,7 @@

import org.neo4j.function.Predicates;
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -102,10 +103,20 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
}

@Override
protected void flushAll()
public StoreScan<IndexPopulationFailedKernelException> indexAllNodes()
{
super.flushAll();
StoreScan<IndexPopulationFailedKernelException> storeScan = super.indexAllNodes();
return new BatchingStoreScan<>( storeScan );
}

@Override
protected void populateFromQueue( long currentlyIndexedNodeId )
{
log.debug( "Populating from queue." + EOL + this );
flushAll();
awaitCompletion();
super.populateFromQueue( currentlyIndexedNodeId );
log.debug( "Drained queue and all batched updates." + EOL + this );
}

@Override
Expand Down Expand Up @@ -170,13 +181,6 @@ protected void flush( IndexPopulation population )
} );
}

@Override
public void close( boolean populationCompletedSuccessfully )
{
super.close( populationCompletedSuccessfully );
shutdownExecutor( !populationCompletedSuccessfully );
}

/**
* Shuts down the executor waiting {@link #AWAIT_TIMEOUT_MINUTES} minutes for it's termination.
* Restores the interrupted status and exits normally when interrupted during waiting.
Expand Down Expand Up @@ -257,4 +261,43 @@ private int getNumberOfPopulationWorkers()
{
return Math.max( 2, MAXIMUM_NUMBER_OF_WORKERS );
}

/**
* A delegating {@link StoreScan} implementation that flushes all pending updates and terminates the executor after
* the delegate store scan completes.
*
* @param <E> type of the exception this store scan might get.
*/
private class BatchingStoreScan<E extends Exception> extends DelegatingStoreScan<E>
{
BatchingStoreScan( StoreScan<E> delegate )
{
super( delegate );
}

@Override
public void run() throws E
{
try
{
super.run();
log.info( "Completed node store scan. " +
"Flushing all pending updates." + EOL + BatchingMultipleIndexPopulator.this );
flushAll();
}
catch ( Throwable scanError )
{
try
{
shutdownExecutor( true );
}
catch ( Throwable error )
{
scanError.addSuppressed( error );
}
throw scanError;
}
shutdownExecutor( false );
}
}
}
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.register.Register.DoubleLongRegister;
Expand Down Expand Up @@ -82,6 +83,12 @@ public void stop()
{
}

@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate update,
long currentlyIndexedNodeId )
{
}

@Override
public PopulationProgress getProgress()
{
Expand Down
Expand Up @@ -351,9 +351,22 @@ private boolean removeFromOngoingPopulations( IndexPopulation indexPopulation )
return populations.remove( indexPopulation );
}

void populateFromQueueBatched()
void populateFromQueueBatched( long currentlyIndexedNodeId )
{
populateFromQueue( QUEUE_THRESHOLD );
if ( isQueueThresholdReached() )
{
populateFromQueue( currentlyIndexedNodeId );
}
}

private boolean isQueueThresholdReached()
{
return queue.size() >= QUEUE_THRESHOLD;
}

protected void populateFromQueue( long currentlyIndexedNodeId )
{
populateFromQueueIfAvailable( currentlyIndexedNodeId );
}

void flushAll()
Expand All @@ -373,23 +386,17 @@ protected void flush( IndexPopulation population )
}
}

/**
* Populates external updates from the update queue if there are {@code queueThreshold} or more queued updates.
*/
protected void populateFromQueue( int queueThreshold )
private void populateFromQueueIfAvailable( long currentlyIndexedNodeId )
{
int queueSize = queue.size();
if ( queueSize > 0 && queueSize >= queueThreshold )
if ( !queue.isEmpty() )
{
// Before applying updates from the updates queue any pending scan updates needs to be applied, i.e. flushed.
// This is because 'currentlyIndexedNodeId' is based on how far the scan has come.
flushAll();

try ( MultipleIndexUpdater updater = newPopulatingUpdater( storeView ) )
{
do
{
updater.process( queue.poll() );
// no need to check for null as nobody else is emptying this queue
IndexEntryUpdate<?> update = queue.poll();
storeScan.acceptUpdate( updater, update, currentlyIndexedNodeId );
}
while ( !queue.isEmpty() );
}
Expand Down Expand Up @@ -566,7 +573,7 @@ void flip() throws FlipFailedKernelException
if ( populationOngoing )
{
populator.add( takeCurrentBatch() );
populateFromQueue( 0 );
populateFromQueueIfAvailable( Long.MAX_VALUE );
if ( populations.contains( IndexPopulation.this ) )
{
IndexSample sample = populator.sampleResult();
Expand Down Expand Up @@ -633,7 +640,7 @@ private class NodePopulationVisitor implements Visitor<NodeUpdates,
public boolean visit( NodeUpdates updates )
{
add( updates );
populateFromQueueBatched();
populateFromQueueBatched( updates.getNodeId() );
return false;
}

Expand Down Expand Up @@ -669,6 +676,12 @@ public void stop()
delegate.stop();
}

@Override
public void acceptUpdate( MultipleIndexUpdater updater, IndexEntryUpdate<?> update, long currentlyIndexedNodeId )
{
delegate.acceptUpdate( updater, update, currentlyIndexedNodeId );
}

@Override
public PopulationProgress getProgress()
{
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.api.index;

import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.storageengine.api.schema.PopulationProgress;

public interface StoreScan<FAILURE extends Exception>
Expand All @@ -27,5 +28,8 @@ public interface StoreScan<FAILURE extends Exception>

void stop();

void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId );

PopulationProgress getProgress();
}
Expand Up @@ -27,7 +27,9 @@
import org.neo4j.collection.primitive.PrimitiveLongResourceIterator;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator;
import org.neo4j.kernel.impl.api.index.NodeUpdates;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NodeStore;
Expand Down Expand Up @@ -141,6 +143,16 @@ private static boolean containsAnyLabel( int[] labelIdFilter, long[] labels )
return false;
}

@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId )
{
if ( update.getEntityId() <= currentlyIndexedNodeId )
{
updater.process( update );
}
}

private class PropertyBlockIterator extends PrefetchingIterator<PropertyBlock>
{
private final Iterator<PropertyRecord> records;
Expand Down
Expand Up @@ -101,7 +101,7 @@ public void populateFromQueueDoesNothingIfThresholdNotReached() throws Exception
batchingPopulator.queue( update1 );
batchingPopulator.queue( update2 );

batchingPopulator.populateFromQueueBatched();
batchingPopulator.populateFromQueueBatched( 42 );

verify( updater, never() ).process( any() );
verify( populator, never() ).newPopulatingUpdater( any() );
Expand Down Expand Up @@ -137,7 +137,7 @@ public void populateFromQueuePopulatesWhenThresholdReached() throws Exception
batchingPopulator.queue( update2 );
batchingPopulator.queue( update3 );

batchingPopulator.populateFromQueueBatched();
batchingPopulator.populateFromQueue( 42 );

verify( updater1 ).process( update1 );
verify( updater1 ).process( update3 );
Expand All @@ -160,9 +160,6 @@ public void executorShutdownAfterStoreScanCompletes() throws Exception
verify( executor, never() ).shutdown();

storeScan.run();
verify( executor, never() ).shutdown();
verify( executor, never() ).awaitTermination( anyLong(), any() );
batchingPopulator.close( true );
verify( executor ).shutdown();
verify( executor ).awaitTermination( anyLong(), any() );
}
Expand Down Expand Up @@ -196,9 +193,6 @@ public void executorForcefullyShutdownIfStoreScanFails() throws Exception
assertSame( scanError, t );
}

verify( executor, never() ).shutdown();
verify( executor, never() ).awaitTermination( anyLong(), any() );
batchingPopulator.close( false );
verify( executor ).shutdownNow();
verify( executor ).awaitTermination( anyLong(), any() );
}
Expand Down Expand Up @@ -442,6 +436,12 @@ public void stop()
stop = true;
}

@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId )
{
}

@Override
public PopulationProgress getProgress()
{
Expand Down
Expand Up @@ -452,6 +452,13 @@ public void stop()
latch.finish();
}

@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId )
{
// no-op
}

@Override
public PopulationProgress getProgress()
{
Expand Down

0 comments on commit d04c28c

Please sign in to comment.