Skip to content

Commit

Permalink
A cancelled index population stays in POPULATING state
Browse files Browse the repository at this point in the history
This also matches the behaviour of the Lucene populator
  • Loading branch information
tinwelint committed May 27, 2019
1 parent 52db613 commit b26dfcb
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 22 deletions.
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.graphdb.schema;

import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.Barrier;
import org.neo4j.test.TestLabels;
import org.neo4j.test.rule.EmbeddedDatabaseRule;

import static org.junit.Assert.assertEquals;
import static org.neo4j.helpers.collection.Iterables.first;

public class CancelIndexPopulationIT
{
private static final Label LABEL = TestLabels.LABEL_ONE;
private static final String KEY = "key";

@Rule
public final EmbeddedDatabaseRule db = new EmbeddedDatabaseRule();

@Test
public void shouldKeepIndexInPopulatingStateBetweenRestarts() throws InterruptedException, IOException
{
// given
Monitors monitors = db.getDependencyResolver().resolveDependency( Monitors.class );
Barrier.Control barrier = new Barrier.Control();
monitors.addMonitorListener( populationCompletionBlocker( barrier ) );

// when
createRelevantNode();
createIndex();
barrier.await();
// This call will eventually make a call to populationCancelled on the monitor below
db.restartDatabase();

// then
assertEquals( Schema.IndexState.ONLINE, awaitAndGetIndexState() );
}

private Schema.IndexState awaitAndGetIndexState()
{
try ( Transaction tx = db.beginTx() )
{
IndexDefinition indexDefinition = first( db.schema().getIndexes( LABEL ) );
db.schema().awaitIndexOnline( indexDefinition, 1, TimeUnit.MINUTES );
Schema.IndexState indexState = db.schema().getIndexState( indexDefinition );
tx.success();
return indexState;
}
}

private void createIndex()
{
try ( Transaction tx = db.beginTx() )
{
db.schema().indexFor( LABEL ).on( KEY ).create();
tx.success();
}
}

private void createRelevantNode()
{
try ( Transaction tx = db.beginTx() )
{
db.createNode( LABEL ).setProperty( KEY, "value" );
tx.success();
}
}

private IndexingService.MonitorAdapter populationCompletionBlocker( Barrier.Control barrier )
{
return new IndexingService.MonitorAdapter()
{
@Override
public void indexPopulationScanComplete()
{
barrier.reached();
}

@Override
public void populationCancelled()
{
// When we get this call we know that the population is still active (due to being blocked in indexPopulationScanComplete())
// and have just gotten a call to being cancelled, which should now be known to index populators.
barrier.release();
}
};
}
}
Expand Up @@ -112,10 +112,12 @@ public interface IndexPopulator extends IndexConfigProvider
* {@link IndexProvider#getInitialState(StoreIndexDescriptor)} also returns {@link InternalIndexState#ONLINE}.
*
* @param populationCompletedSuccessfully {@code true} if the index population was successful, where the index should
* be marked as {@link InternalIndexState#ONLINE}, otherwise {@code false} where index should be marked as
* {@link InternalIndexState#FAILED} and the failure, previously handed to this populator using {@link #markAsFailed(String)}
* should be stored and made available for later requests from {@link IndexProvider#getPopulationFailure(StoreIndexDescriptor)}.
* @throws UncheckedIOException on I/O error.
* be marked as {@link InternalIndexState#ONLINE}. Supplying {@code false} can have two meanings:
* <ul>
* <li>if {@link #markAsFailed(String)} have been called the end state should be {@link InternalIndexState#FAILED}.
* This method call should also make sure that the failure message gets stored for retrieval the next open too.</li>
* <li>if {@link #markAsFailed(String)} have NOT been called the end state should be {@link InternalIndexState#POPULATING}</li>
* </ul>
*/
void close( boolean populationCompletedSuccessfully );

Expand Down
Expand Up @@ -148,6 +148,7 @@ public Future<Void> cancel()
{
cancelled = true;
storeScan.stop();
monitor.populationCancelled();
}

return latchGuardedValue( Suppliers.singleton( null ), doneSignal, "Index population job cancel" );
Expand Down
Expand Up @@ -144,6 +144,8 @@ public interface Monitor
void indexPopulationScanComplete();

void awaitingPopulationOfRecoveredIndex( StoreIndexDescriptor descriptor );

void populationCancelled();
}

public static class MonitorAdapter implements Monitor
Expand Down Expand Up @@ -172,6 +174,11 @@ public void indexPopulationScanComplete()
public void awaitingPopulationOfRecoveredIndex( StoreIndexDescriptor descriptor )
{ // Do nothing
}

@Override
public void populationCancelled()
{ // Do nothing
}
}

public static final Monitor NO_MONITOR = new MonitorAdapter();
Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.kernel.impl.index.schema;

import org.apache.commons.lang3.ArrayUtils;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -45,6 +43,7 @@
import org.neo4j.storageengine.api.NodePropertyAccessor;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.util.Preconditions;
import org.neo4j.values.storable.Value;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
Expand Down Expand Up @@ -190,17 +189,20 @@ public synchronized void close( boolean populationCompletedSuccessfully )

try
{
assertNotDropped();
if ( populationCompletedSuccessfully )
{
// Successful and completed population
assertPopulatorOpen();
markTreeAsOnline();
}
else
else if ( failureBytes != null )
{
assertNotDropped();
// Failed population
ensureTreeInstantiated();
markTreeAsFailed();
}
// else cancelled population. Here we simply close the tree w/o checkpointing it and it will look like POPULATING state on next open
}
finally
{
Expand Down Expand Up @@ -249,10 +251,7 @@ private void assertPopulatorOpen()

private void markTreeAsFailed()
{
if ( failureBytes == null )
{
failureBytes = ArrayUtils.EMPTY_BYTE_ARRAY;
}
Preconditions.checkState( failureBytes != null, "markAsFailed hasn't been called, populator not actually failed?" );
tree.checkpoint( IOLimiter.UNLIMITED, new FailureHeaderWriter( failureBytes ) );
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.helpers.Exceptions;
import org.neo4j.index.internal.gbptree.GBPTree;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
import org.neo4j.internal.kernel.api.InternalIndexState;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PagedFile;
Expand All @@ -58,9 +59,13 @@
import static org.junit.Assert.fail;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_READER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.internal.kernel.api.InternalIndexState.FAILED;
import static org.neo4j.internal.kernel.api.InternalIndexState.ONLINE;
import static org.neo4j.internal.kernel.api.InternalIndexState.POPULATING;
import static org.neo4j.kernel.impl.api.index.PhaseTracker.nullInstance;
import static org.neo4j.kernel.impl.index.schema.NativeIndexPopulator.BYTE_FAILED;
import static org.neo4j.kernel.impl.index.schema.NativeIndexPopulator.BYTE_ONLINE;
import static org.neo4j.kernel.impl.index.schema.NativeIndexPopulator.BYTE_POPULATING;
import static org.neo4j.kernel.impl.index.schema.ValueCreatorUtil.countUniqueValues;

public abstract class NativeIndexPopulatorTests<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue>
Expand Down Expand Up @@ -268,7 +273,7 @@ public void successfulCloseMustMarkIndexAsOnline() throws Exception
populator.close( true );

// then
assertHeader( true, null, false );
assertHeader( ONLINE, null, false );
}

@Test
Expand Down Expand Up @@ -314,7 +319,7 @@ public void unsuccessfulCloseMustNotMarkIndexAsOnline() throws Exception
populator.close( false );

// then
assertHeader( false, "", false );
assertHeader( POPULATING, null, false );
}

@Test
Expand All @@ -329,7 +334,7 @@ public void closeMustWriteFailureMessageAfterMarkedAsFailed() throws Exception
populator.close( false );

// then
assertHeader( false, failureMessage, false );
assertHeader( FAILED, failureMessage, false );
}

@Test
Expand All @@ -344,7 +349,7 @@ public void closeMustWriteFailureMessageAfterMarkedAsFailedWithLongMessage() thr
populator.close( false );

// then
assertHeader( false, failureMessage, true );
assertHeader( FAILED, failureMessage, true );
}

@Test
Expand Down Expand Up @@ -448,7 +453,7 @@ public void unsuccessfulCloseMustSucceedWithoutSuccessfulPriorCreate() throws Ex
populator.close( false );

// then
assertHeader( false, failureMessage, false );
assertHeader( FAILED, failureMessage, false );
}

@Test
Expand Down Expand Up @@ -692,19 +697,19 @@ private int interleaveLargeAmountOfUpdates( Random updaterRandom,
return count;
}

private void assertHeader( boolean online, String failureMessage, boolean messageTruncated ) throws IOException
private void assertHeader( InternalIndexState expectedState, String failureMessage, boolean messageTruncated ) throws IOException
{
NativeIndexHeaderReader headerReader = new NativeIndexHeaderReader( NO_HEADER_READER );
try ( GBPTree<KEY,VALUE> ignored = new GBPTree<>( pageCache, getIndexFile(), layout, 0, GBPTree.NO_MONITOR,
headerReader, NO_HEADER_WRITER, RecoveryCleanupWorkCollector.immediate() ) )
{
if ( online )
switch ( expectedState )
{
case ONLINE:
assertEquals( "Index was not marked as online when expected not to be.", BYTE_ONLINE, headerReader.state );
assertNull( "Expected failure message to be null when marked as online.", headerReader.failureMessage );
}
else
{
break;
case FAILED:
assertEquals( "Index was marked as online when expected not to be.", BYTE_FAILED, headerReader.state );
if ( messageTruncated )
{
Expand All @@ -715,6 +720,13 @@ private void assertHeader( boolean online, String failureMessage, boolean messag
{
assertEquals( failureMessage, headerReader.failureMessage );
}
break;
case POPULATING:
assertEquals( "Index was not left as populating when expected to be.", BYTE_POPULATING, headerReader.state );
assertNull( "Expected failure message to be null when marked as populating.", headerReader.failureMessage );
break;
default:
throw new UnsupportedOperationException( "Unexpected index state " + expectedState );
}
}
}
Expand Down

0 comments on commit b26dfcb

Please sign in to comment.