Skip to content

Commit

Permalink
Shutdown index population jobs during indexing service shutdown
Browse files Browse the repository at this point in the history
Shutdown indexing population jobs by canceling them and waiting to
finish ongoing population before closing all indexes down.
This change prevent ongoing index populations to throw exceptions because
of already closed indexes.
Also change order of stop/shutdown for label scan and indexing service
since indexes can use label scan store for indexes population
  • Loading branch information
MishaDemianenko committed Oct 12, 2017
1 parent f3150b0 commit 2a3fb78
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 11 deletions.
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.index;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import org.neo4j.scheduler.JobScheduler;

import static org.neo4j.scheduler.JobScheduler.Groups.indexPopulation;

public class IndexPopulationJobController
{
private final Set<IndexPopulationJob> populationJobs =
Collections.newSetFromMap( new ConcurrentHashMap<IndexPopulationJob,Boolean>() );
private final JobScheduler scheduler;

IndexPopulationJobController( JobScheduler scheduler )
{
this.scheduler = scheduler;
}

public void stop() throws ExecutionException, InterruptedException
{
for ( IndexPopulationJob job : populationJobs )
{
job.cancel().get();
}
}

public void startIndexPopulation( IndexPopulationJob job )
{
populationJobs.add( job );
scheduler.schedule( indexPopulation, new IndexPopulationJobWrapper( job, this ) );
}

public void indexPopulationCompleted( IndexPopulationJob populationJob )
{
populationJobs.remove( populationJob );
}

Set<IndexPopulationJob> getPopulationJobs()
{
return populationJobs;
}

private static class IndexPopulationJobWrapper implements Runnable
{
private IndexPopulationJob indexPopulationJob;
private IndexPopulationJobController jobController;

IndexPopulationJobWrapper( IndexPopulationJob indexPopulationJob,
IndexPopulationJobController jobController )
{
this.indexPopulationJob = indexPopulationJob;
this.jobController = jobController;
}

@Override
public void run()
{
try
{
indexPopulationJob.run();
}
finally
{
jobController.indexPopulationCompleted( indexPopulationJob );
}
}
}
}
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -66,7 +67,6 @@
import static org.neo4j.helpers.collection.Iterables.asList;
import static org.neo4j.kernel.api.index.InternalIndexState.FAILED;
import static org.neo4j.kernel.impl.api.index.IndexPopulationFailure.failure;
import static org.neo4j.scheduler.JobScheduler.Groups.indexPopulation;

/**
* Manages the indexes that were introduced in 2.0. These indexes depend on the normal neo4j logical log for
Expand Down Expand Up @@ -96,8 +96,8 @@ public class IndexingService extends LifecycleAdapter implements IndexingUpdateS
private final MultiPopulatorFactory multiPopulatorFactory;
private final LogProvider logProvider;
private final Monitor monitor;
private final JobScheduler scheduler;
private final SchemaState schemaState;
private final IndexPopulationJobController populationJobController;

enum State
{
Expand Down Expand Up @@ -158,11 +158,11 @@ public void awaitingPopulationOfRecoveredIndex( long indexId, IndexDescriptor de
this.indexRules = indexRules;
this.samplingController = samplingController;
this.tokenNameLookup = tokenNameLookup;
this.scheduler = scheduler;
this.schemaState = schemaState;
this.multiPopulatorFactory = multiPopulatorFactory;
this.logProvider = logProvider;
this.monitor = monitor;
this.populationJobController = new IndexPopulationJobController( scheduler );
this.log = logProvider.getLog( getClass() );
}

Expand Down Expand Up @@ -340,10 +340,11 @@ private void awaitOnline( IndexProxy proxy ) throws InterruptedException
// We need to stop indexing service on shutdown since we can have transactions that are ongoing/finishing
// after we start stopping components and those transactions should be able to finish successfully
@Override
public void shutdown()
public void shutdown() throws ExecutionException, InterruptedException
{
state = State.STOPPED;
samplingController.stop();
populationJobController.stop();
closeAllIndexes();
}

Expand Down Expand Up @@ -557,6 +558,7 @@ private void dropRecoveringIndexes( IndexMap indexMap, Iterable<Long> indexesToR
for ( long indexId : indexesToRebuild )
{
IndexProxy indexProxy = indexMap.removeIndexProxy( indexId );
assert indexProxy != null;
indexProxy.drop();
}
}
Expand Down Expand Up @@ -691,7 +693,7 @@ private IndexPopulationJob newIndexPopulationJob()

private void startIndexPopulation( IndexPopulationJob job )
{
scheduler.schedule( indexPopulation, job );
populationJobController.startIndexPopulation( job );
}

private String indexStateInfo( String tag, Long indexId, InternalIndexState state, IndexDescriptor descriptor )
Expand Down
Expand Up @@ -440,16 +440,16 @@ public void clearBufferedIds()
@Override
public void stop() throws Throwable
{
labelScanStore.stop();
indexingService.stop();
labelScanStore.stop();
idController.stop();
}

@Override
public void shutdown() throws Throwable
{
labelScanStore.shutdown();
indexingService.shutdown();
labelScanStore.shutdown();
neoStores.close();
}

Expand Down
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.index;


import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.neo4j.test.OnDemandJobScheduler;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class IndexPopulationJobControllerTest
{

private final OnDemandJobScheduler executer = new OnDemandJobScheduler();
private final IndexPopulationJobController jobController = new IndexPopulationJobController( executer );

@Test
public void trackPopulationJobs()
{
assertThat( jobController.getPopulationJobs(), is( empty() ) );

IndexPopulationJob populationJob = mock( IndexPopulationJob.class );
jobController.startIndexPopulation( populationJob );
assertThat( jobController.getPopulationJobs(), hasSize( 1 ) );

IndexPopulationJob populationJob2 = mock( IndexPopulationJob.class );
jobController.startIndexPopulation( populationJob2 );
assertThat( jobController.getPopulationJobs(), hasSize( 2 ) );
}

@Test
public void stopOngoingPopulationJobs() throws ExecutionException, InterruptedException
{
IndexPopulationJob populationJob = getIndexPopulationJob();
IndexPopulationJob populationJob2 = getIndexPopulationJob();
jobController.startIndexPopulation( populationJob );
jobController.startIndexPopulation( populationJob2 );

jobController.stop();

verify( populationJob ).cancel();
verify( populationJob2 ).cancel();
}

@Test
public void untrackFinishedPopulations()
{
IndexPopulationJob populationJob = getIndexPopulationJob();
jobController.startIndexPopulation( populationJob );

assertThat( jobController.getPopulationJobs(), hasSize( 1 ) );

executer.runJob();

assertThat( jobController.getPopulationJobs(), hasSize( 0 ) );
verify( populationJob ).run();
}

private IndexPopulationJob getIndexPopulationJob()
{
IndexPopulationJob populationJob = mock( IndexPopulationJob.class );
when( populationJob.cancel() ).thenReturn( CompletableFuture.completedFuture( null ) );
return populationJob;
}
}
Expand Up @@ -21,6 +21,7 @@

import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.StringDescription;
import org.junit.rules.TestRule;
import org.junit.runners.model.Statement;
Expand Down Expand Up @@ -303,17 +304,16 @@ protected Log buildLog( String context )
// TEST TOOLS
//

private static final Matcher<Class> ANY_CLASS_MATCHER = any( Class.class );
private static final Matcher<Level> DEBUG_LEVEL_MATCHER = equalTo( Level.DEBUG );
private static final Matcher<Level> INFO_LEVEL_MATCHER = equalTo( Level.INFO );
private static final Matcher<Level> WARN_LEVEL_MATCHER = equalTo( Level.WARN );
private static final Matcher<Level> ERROR_LEVEL_MATCHER = equalTo( Level.ERROR );
private static final Matcher<Level> ANY_LEVEL_MATCHER = any( Level.class );
private static final Matcher<String> ANY_MESSAGE_MATCHER = any( String.class );
private static final Matcher<String> ANY_MESSAGE_MATCHER = anyOf( any( String.class ), nullValue() );
private static final Matcher<Object[]> NULL_ARGUMENTS_MATCHER = nullValue( Object[].class );
private static final Matcher<Object[]> ANY_ARGUMENTS_MATCHER = any( Object[].class );
private static final Matcher<Object[]> ANY_ARGUMENTS_MATCHER = anyOf( any( Object[].class ), nullValue() );
private static final Matcher<Throwable> NULL_THROWABLE_MATCHER = nullValue( Throwable.class );
private static final Matcher<Throwable> ANY_THROWABLE_MATCHER = any( Throwable.class );
private static final Matcher<Throwable> ANY_THROWABLE_MATCHER = anyOf( any( Throwable.class ), nullValue() );

public static final class LogMatcher
{
Expand Down Expand Up @@ -454,6 +454,12 @@ public LogMatcher warn( Matcher<String> format, Object... arguments )
arrayContaining( ensureMatchers( arguments ) ), NULL_THROWABLE_MATCHER );
}

public LogMatcher anyError()
{
return new LogMatcher( contextMatcher, ERROR_LEVEL_MATCHER, Matchers.any( String.class ),
ANY_ARGUMENTS_MATCHER, ANY_THROWABLE_MATCHER );
}

public LogMatcher error( String message )
{
return new LogMatcher( contextMatcher, ERROR_LEVEL_MATCHER, equalTo( message ), NULL_ARGUMENTS_MATCHER,
Expand Down
41 changes: 41 additions & 0 deletions community/neo4j/src/test/java/schema/IndexPopulationIT.java
Expand Up @@ -19,11 +19,13 @@
*/
package schema;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import java.io.File;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -41,6 +43,9 @@
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.impl.api.index.IndexPopulationJob;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.TestDirectory;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -132,6 +137,42 @@ public void indexCreationDoNotBlockWritesOnOtherLabel() throws ExecutionExceptio
}
}

@Test
public void shutdownDatabaseDuringIndexPopulations()
{
AssertableLogProvider assertableLogProvider = new AssertableLogProvider( true );
File storeDir = directory.directory( "shutdownDbTest" );
Label testLabel = Label.label( "testLabel" );
String propertyName = "testProperty";
GraphDatabaseService shutDownDb = new TestGraphDatabaseFactory().setInternalLogProvider( assertableLogProvider )
.newEmbeddedDatabase( storeDir );
prePopulateDatabase( shutDownDb, testLabel, propertyName );

try ( Transaction transaction = shutDownDb.beginTx() )
{
shutDownDb.schema().indexFor( testLabel ).on( propertyName ).create();
transaction.success();
}
shutDownDb.shutdown();
assertableLogProvider.assertNone( AssertableLogProvider.inLog( IndexPopulationJob.class ).anyError() );
}

private void prePopulateDatabase( GraphDatabaseService database, Label testLabel, String propertyName )
{
for ( int j = 0; j < 100; j++ )
{
for ( int i = 0; i < 100; i++ )
{
try ( Transaction transaction = database.beginTx() )
{
Node node = database.createNode( testLabel );
node.setProperty( propertyName, RandomStringUtils.randomAlphabetic( 10 ) );
transaction.success();
}
}
}
}

private Runnable createNodeWithLabel( Label label )
{
return () ->
Expand Down

0 comments on commit 2a3fb78

Please sign in to comment.