Skip to content

Commit

Permalink
Start the fulltext index applier background thread via the JobScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Sep 19, 2017
1 parent 2881a4a commit 65acc5b
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 72 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler;

/**
* Provider class that manages and provides fulltext indices. This is the main entry point for the fulltext addon.
Expand All @@ -52,12 +53,14 @@ public class FulltextProvider implements AutoCloseable
* @param db Database that this provider should work with.
* @param log For logging errors.
* @param availabilityGuard Used for waiting with populating the index until the database is available.
* @param scheduler
*/
public FulltextProvider( GraphDatabaseService db, Log log, AvailabilityGuard availabilityGuard )
public FulltextProvider( GraphDatabaseService db, Log log, AvailabilityGuard availabilityGuard,
JobScheduler scheduler )
{
this.db = db;
this.log = log;
applier = new FulltextUpdateApplier( log, availabilityGuard );
applier = new FulltextUpdateApplier( log, availabilityGuard, scheduler );
applier.start();
fulltextTransactionEventUpdater = new FulltextTransactionEventUpdater( this, log, applier );
nodeProperties = new HashSet<>();
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -45,6 +46,7 @@
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.impl.schema.writer.PartitionedIndexWriter;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler;

import static org.neo4j.kernel.api.impl.fulltext.LuceneFulltextDocumentStructure.documentForPopulation;
import static org.neo4j.kernel.api.impl.fulltext.LuceneFulltextDocumentStructure.documentRepresentingProperties;
Expand All @@ -54,16 +56,20 @@ class FulltextUpdateApplier
{
private static final FulltextIndexUpdate STOP_SIGNAL = () -> null;
private static final int POPULATING_BATCH_SIZE = 10_000;
private static final JobScheduler.Group UPDATE_APPLIER = new JobScheduler.Group( "FulltextIndexUpdateApplier" );
private static final String APPLIER_THREAD_NAME = "Fulltext Index Add-On Applier Thread";

private final LinkedBlockingQueue<FulltextIndexUpdate> workQueue;
private final Log log;
private final AvailabilityGuard availabilityGuard;
private ApplierThread workerThread;
private final JobScheduler scheduler;
private JobScheduler.JobHandle workerThread;

FulltextUpdateApplier( Log log, AvailabilityGuard availabilityGuard )
FulltextUpdateApplier( Log log, AvailabilityGuard availabilityGuard, JobScheduler scheduler )
{
this.log = log;
this.availabilityGuard = availabilityGuard;
this.scheduler = scheduler;
workQueue = new LinkedBlockingQueue<>();
}

Expand Down Expand Up @@ -203,10 +209,9 @@ void start()
{
if ( workerThread != null )
{
throw new IllegalStateException( workerThread.getName() + " already started." );
throw new IllegalStateException( APPLIER_THREAD_NAME + " already started." );
}
workerThread = new ApplierThread( workQueue, log, availabilityGuard );
workerThread.start();
workerThread = scheduler.schedule( UPDATE_APPLIER, new ApplierWorker( workQueue, log, availabilityGuard ) );
}

void stop()
Expand All @@ -220,12 +225,16 @@ void stop()

try
{
workerThread.join();
workerThread.waitTermination();
workerThread = null;
}
catch ( InterruptedException e )
{
log.error( "Interrupted before " + workerThread.getName() + " could shut down.", e );
log.error( "Interrupted before " + APPLIER_THREAD_NAME + " could shut down.", e );
}
catch ( ExecutionException e )
{
log.error( "Exception while waiting for " + APPLIER_THREAD_NAME + " to shut down.", e );
}
}

Expand All @@ -234,25 +243,24 @@ private interface FulltextIndexUpdate
Pair<WritableFulltext,BinaryLatch> applyUpdateAndReturnIndex() throws IOException;
}

private static class ApplierThread extends Thread
private static class ApplierWorker implements Runnable
{
private LinkedBlockingQueue<FulltextIndexUpdate> workQueue;
private final Log log;
private final AvailabilityGuard availabilityGuard;

ApplierThread( LinkedBlockingQueue<FulltextIndexUpdate> workQueue, Log log,
ApplierWorker( LinkedBlockingQueue<FulltextIndexUpdate> workQueue, Log log,
AvailabilityGuard availabilityGuard )
{
super( "Fulltext Index Add-On Applier Thread" );
this.workQueue = workQueue;
this.log = log;
this.availabilityGuard = availabilityGuard;
setDaemon( true );
}

@Override
public void run()
{
Thread.currentThread().setName( APPLIER_THREAD_NAME );
waitForDatabaseToBeAvailable();
Set<WritableFulltext> refreshableSet = Collections.newSetFromMap( new IdentityHashMap<>() );
List<BinaryLatch> latches = new ArrayList<>();
Expand Down Expand Up @@ -324,7 +332,7 @@ private FulltextIndexUpdate getNextUpdate()
}
catch ( InterruptedException e )
{
log.debug( getName() + " decided to ignore an interrupt.", e );
log.debug( APPLIER_THREAD_NAME + " decided to ignore an interrupt.", e );
}
}
while ( update == null );
Expand Down Expand Up @@ -358,7 +366,7 @@ private void refreshIndex( WritableFulltext index )
}
catch ( IOException e )
{
log.error( "Failed to refresh fulltext after updates", e );
log.error( "Failed to refresh fulltext after updates.", e );
}
}
}
Expand Down
Expand Up @@ -35,6 +35,8 @@
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler;

class BloomKernelExtension extends LifecycleAdapter
{
Expand All @@ -47,11 +49,13 @@ class BloomKernelExtension extends LifecycleAdapter
private final Procedures procedures;
private LogService logService;
private final AvailabilityGuard availabilityGuard;
private final JobScheduler scheduler;
private FulltextProvider provider;

BloomKernelExtension( FileSystemAbstraction fileSystemAbstraction, File storeDir, Config config,
GraphDatabaseService db, Procedures procedures,
LogService logService, AvailabilityGuard availabilityGuard )
LogService logService, AvailabilityGuard availabilityGuard,
JobScheduler scheduler )
{
this.storeDir = storeDir;
this.config = config;
Expand All @@ -60,6 +64,7 @@ class BloomKernelExtension extends LifecycleAdapter
this.procedures = procedures;
this.logService = logService;
this.availabilityGuard = availabilityGuard;
this.scheduler = scheduler;
}

@Override
Expand All @@ -68,7 +73,8 @@ public void init() throws IOException, KernelException
List<String> properties = config.get( LoadableBloomFulltextConfig.bloom_indexed_properties );
Analyzer analyzer = getAnalyzer();

provider = new FulltextProvider( db, logService.getInternalLog( FulltextProvider.class ), availabilityGuard );
Log log = logService.getInternalLog( FulltextProvider.class );
provider = new FulltextProvider( db, log, availabilityGuard, scheduler );
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemAbstraction, storeDir, analyzer );
fulltextFactory.createFulltextIndex( BLOOM_NODES, FulltextProvider.FulltextIndexType.NODES, properties, provider );
fulltextFactory.createFulltextIndex( BLOOM_RELATIONSHIPS, FulltextProvider.FulltextIndexType.RELATIONSHIPS, properties, provider );
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.api.impl.fulltext.integrations.bloom;

import java.io.File;

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
Expand All @@ -28,11 +30,12 @@
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.scheduler.JobScheduler;

/**
* A {@link KernelExtensionFactory} for the bloom fulltext addon.
*
* @see BloomProcedure
* @see BloomProcedures
* @see LoadableBloomFulltextConfig
*/
public class BloomKernelExtensionFactory extends KernelExtensionFactory<BloomKernelExtensionFactory.Dependencies>
Expand All @@ -53,6 +56,8 @@ public interface Dependencies
LogService logService();

AvailabilityGuard availabilityGuard();

JobScheduler scheduler();
}

BloomKernelExtensionFactory()
Expand All @@ -63,7 +68,15 @@ public interface Dependencies
@Override
public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) throws Throwable
{
return new BloomKernelExtension( dependencies.fileSystem(), context.storeDir(), dependencies.getConfig(), dependencies.db(),
dependencies.procedures(), dependencies.logService(), dependencies.availabilityGuard() );
FileSystemAbstraction fs = dependencies.fileSystem();
File storeDir = context.storeDir();
Config config = dependencies.getConfig();
GraphDatabaseService db = dependencies.db();
Procedures procedures = dependencies.procedures();
LogService logService = dependencies.logService();
AvailabilityGuard availabilityGuard = dependencies.availabilityGuard();
JobScheduler scheduler = dependencies.scheduler();
return new BloomKernelExtension(
fs, storeDir, config, db, procedures, logService, availabilityGuard, scheduler );
}
}
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.analysis.en.EnglishAnalyzer;
import org.apache.lucene.analysis.sv.SwedishAnalyzer;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -31,14 +30,13 @@
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.NullLog;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EmbeddedDatabaseRule;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;
import org.neo4j.test.rule.fs.FileSystemRule;

import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
Expand All @@ -49,10 +47,7 @@ public class FulltextAnalyzerTest
{
private static final Label LABEL = Label.label( "label" );
private static final NullLog LOG = NullLog.getInstance();
@ClassRule
public static FileSystemRule fileSystemRule = new DefaultFileSystemRule();
@ClassRule
public static TestDirectory testDirectory = TestDirectory.testDirectory( fileSystemRule );

@Rule
public DatabaseRule dbRule = new EmbeddedDatabaseRule().startLazily();

Expand All @@ -62,10 +57,12 @@ public class FulltextAnalyzerTest
public void shouldBeAbleToSpecifyEnglishAnalyzer() throws Exception
{
GraphDatabaseAPI db = dbRule.getGraphDatabaseAPI();
File storeDir = testDirectory.graphDbDir();
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemRule, storeDir, new EnglishAnalyzer() );
JobScheduler scheduler = dbRule.resolveDependency( JobScheduler.class );
FileSystemAbstraction fs = dbRule.resolveDependency( FileSystemAbstraction.class );
File storeDir = dbRule.getStoreDir();
FulltextFactory fulltextFactory = new FulltextFactory( fs, storeDir, new EnglishAnalyzer() );

try ( FulltextProvider provider = new FulltextProvider( db, LOG, availabilityGuard ) )
try ( FulltextProvider provider = new FulltextProvider( db, LOG, availabilityGuard, scheduler ) )
{
fulltextFactory.createFulltextIndex( "bloomNodes", NODES, singletonList( "prop" ), provider );
provider.init();
Expand Down Expand Up @@ -101,10 +98,12 @@ public void shouldBeAbleToSpecifyEnglishAnalyzer() throws Exception
public void shouldBeAbleToSpecifySwedishAnalyzer() throws Exception
{
GraphDatabaseAPI db = dbRule.getGraphDatabaseAPI();
File storeDir = testDirectory.graphDbDir();
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemRule, storeDir, new SwedishAnalyzer() );
JobScheduler scheduler = dbRule.resolveDependency( JobScheduler.class );
FileSystemAbstraction fs = dbRule.resolveDependency( FileSystemAbstraction.class );
File storeDir = dbRule.getStoreDir();
FulltextFactory fulltextFactory = new FulltextFactory( fs, storeDir, new SwedishAnalyzer() );

try ( FulltextProvider provider = new FulltextProvider( db, LOG, availabilityGuard ); )
try ( FulltextProvider provider = new FulltextProvider( db, LOG, availabilityGuard, scheduler ); )
{
fulltextFactory.createFulltextIndex( "bloomNodes", NODES, singletonList( "prop" ), provider );
provider.init();
Expand Down

0 comments on commit 65acc5b

Please sign in to comment.