Skip to content

Commit

Permalink
Use the availability guard to wait for the database to start before p…
Browse files Browse the repository at this point in the history
…opulating the fulltext index, since the populating transaction is otherwise likely to time out
  • Loading branch information
chrisvest committed Sep 19, 2017
1 parent b4078be commit 9c398d9
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 73 deletions.
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.logging.Log;

/**
Expand All @@ -50,12 +51,13 @@ public class FulltextProvider implements AutoCloseable
* Creates a provider of fulltext indices for the given database. This is the entry point for all fulltext index operations.
* @param db Database that this provider should work with.
* @param log For logging errors.
* @param availabilityGuard Used for waiting with populating the index until the database is available.
*/
public FulltextProvider( GraphDatabaseService db, Log log )
public FulltextProvider( GraphDatabaseService db, Log log, AvailabilityGuard availabilityGuard )
{
this.db = db;
this.log = log;
applier = new FulltextUpdateApplier( log );
applier = new FulltextUpdateApplier( log, availabilityGuard );
applier.start();
fulltextTransactionEventUpdater = new FulltextTransactionEventUpdater( this, log, applier );
nodeProperties = new HashSet<>();
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.event.PropertyEntry;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.impl.schema.writer.PartitionedIndexWriter;
import org.neo4j.logging.Log;

Expand All @@ -53,11 +54,13 @@ class FulltextUpdateApplier
private static final FulltextIndexUpdate STOP_SIGNAL = () -> null;
private final LinkedBlockingQueue<FulltextIndexUpdate> workQueue;
private final Log log;
private final AvailabilityGuard availabilityGuard;
private ApplierThread workerThread;

FulltextUpdateApplier( Log log )
FulltextUpdateApplier( Log log, AvailabilityGuard availabilityGuard )
{
this.log = log;
this.availabilityGuard = availabilityGuard;
workQueue = new LinkedBlockingQueue<>();
}

Expand Down Expand Up @@ -153,7 +156,7 @@ private BinaryLatch enqueuePopulateIndex( WritableFulltext index, GraphDatabaseS
{
PartitionedIndexWriter indexWriter = index.getIndexWriter();
String[] indexedPropertyKeys = index.properties().toArray( new String[0] );
try ( Transaction ignore = db.beginTx( 10, TimeUnit.HOURS ) )
try ( Transaction ignore = db.beginTx( 1, TimeUnit.DAYS ) )
{
ResourceIterable<? extends Entity> entities = entitySupplier.get();
for ( Entity entity : entities )
Expand Down Expand Up @@ -192,7 +195,7 @@ void start()
{
throw new IllegalStateException( workerThread.getName() + " already started." );
}
workerThread = new ApplierThread( workQueue, log );
workerThread = new ApplierThread( workQueue, log, availabilityGuard );
workerThread.start();
}

Expand Down Expand Up @@ -225,18 +228,22 @@ private static class ApplierThread extends Thread
{
private LinkedBlockingQueue<FulltextIndexUpdate> workQueue;
private final Log log;
private final AvailabilityGuard availabilityGuard;

ApplierThread( LinkedBlockingQueue<FulltextIndexUpdate> workQueue, Log log )
ApplierThread( LinkedBlockingQueue<FulltextIndexUpdate> workQueue, Log log,
AvailabilityGuard availabilityGuard )
{
super( "Fulltext Index Add-On Applier Thread" );
this.workQueue = workQueue;
this.log = log;
this.availabilityGuard = availabilityGuard;
setDaemon( true );
}

@Override
public void run()
{
waitForDatabaseToBeAvailable();
Set<WritableFulltext> refreshableSet = Collections.newSetFromMap( new IdentityHashMap<>() );
List<BinaryLatch> latches = new ArrayList<>();

Expand All @@ -254,6 +261,16 @@ public void run()
}
}

private void waitForDatabaseToBeAvailable()
{
boolean isAvailable;
do
{
isAvailable = availabilityGuard.isAvailable( 100 );
}
while ( !isAvailable && !availabilityGuard.isShutdown() );
}

private FulltextIndexUpdate drainQueueAndApplyUpdates(
FulltextIndexUpdate update,
Set<WritableFulltext> refreshableSet,
Expand Down
Expand Up @@ -27,6 +27,7 @@

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.impl.fulltext.FulltextFactory;
import org.neo4j.kernel.api.impl.fulltext.FulltextProvider;
Expand All @@ -45,17 +46,20 @@ class BloomKernelExtension extends LifecycleAdapter
private final GraphDatabaseService db;
private final Procedures procedures;
private LogService logService;
private final AvailabilityGuard availabilityGuard;
private FulltextProvider provider;

BloomKernelExtension( FileSystemAbstraction fileSystemAbstraction, File storeDir, Config config, GraphDatabaseService db, Procedures procedures,
LogService logService )
BloomKernelExtension( FileSystemAbstraction fileSystemAbstraction, File storeDir, Config config,
GraphDatabaseService db, Procedures procedures,
LogService logService, AvailabilityGuard availabilityGuard )
{
this.storeDir = storeDir;
this.config = config;
this.fileSystemAbstraction = fileSystemAbstraction;
this.db = db;
this.procedures = procedures;
this.logService = logService;
this.availabilityGuard = availabilityGuard;
}

@Override
Expand All @@ -64,7 +68,7 @@ public void init() throws IOException, KernelException
List<String> properties = config.get( LoadableBloomFulltextConfig.bloom_indexed_properties );
Analyzer analyzer = getAnalyzer();

provider = new FulltextProvider( db, logService.getInternalLog( FulltextProvider.class ) );
provider = new FulltextProvider( db, logService.getInternalLog( FulltextProvider.class ), availabilityGuard );
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemAbstraction, storeDir, analyzer );
fulltextFactory.createFulltextIndex( BLOOM_NODES, FulltextProvider.FulltextIndexType.NODES, properties, provider );
fulltextFactory.createFulltextIndex( BLOOM_RELATIONSHIPS, FulltextProvider.FulltextIndexType.RELATIONSHIPS, properties, provider );
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.logging.LogService;
Expand Down Expand Up @@ -50,6 +51,8 @@ public interface Dependencies
Procedures procedures();

LogService logService();

AvailabilityGuard availabilityGuard();
}

BloomKernelExtensionFactory()
Expand All @@ -61,6 +64,6 @@ public interface Dependencies
public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) throws Throwable
{
return new BloomKernelExtension( dependencies.fileSystem(), context.storeDir(), dependencies.getConfig(), dependencies.db(),
dependencies.procedures(), dependencies.logService() );
dependencies.procedures(), dependencies.logService(), dependencies.availabilityGuard() );
}
}
Expand Up @@ -25,44 +25,49 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.Arrays;
import java.io.File;
import java.time.Clock;

import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.NullLog;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EmbeddedDatabaseRule;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;
import org.neo4j.test.rule.fs.FileSystemRule;

import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.neo4j.kernel.api.impl.fulltext.FulltextProvider.FulltextIndexType;
import static org.neo4j.kernel.api.impl.fulltext.FulltextProvider.FulltextIndexType.NODES;

public class FulltextAnalyzerTest
{
private static final Label LABEL = Label.label( "label" );
private static final LogService LOG_SERVICE = NullLogService.getInstance();
private static final NullLog LOG = NullLog.getInstance();
@ClassRule
public static FileSystemRule fileSystemRule = new DefaultFileSystemRule();
@ClassRule
public static TestDirectory testDirectory = TestDirectory.testDirectory( fileSystemRule );
@Rule
public DatabaseRule dbRule = new EmbeddedDatabaseRule().startLazily();

private AvailabilityGuard availabilityGuard = new AvailabilityGuard( Clock.systemDefaultZone(), LOG );

@Test
public void shouldBeAbleToSpecifyEnglishAnalyzer() throws Exception
{
GraphDatabaseAPI db = dbRule.getGraphDatabaseAPI();
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemRule, testDirectory.graphDbDir(), new EnglishAnalyzer() );
File storeDir = testDirectory.graphDbDir();
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemRule, storeDir, new EnglishAnalyzer() );

try ( FulltextProvider provider = new FulltextProvider( db, LOG_SERVICE.getInternalLog( FulltextProvider.class ) ); )
try ( FulltextProvider provider = new FulltextProvider( db, LOG, availabilityGuard ) )
{
fulltextFactory.createFulltextIndex( "bloomNodes", FulltextIndexType.NODES, Arrays.asList( "prop" ), provider );
fulltextFactory.createFulltextIndex( "bloomNodes", NODES, singletonList( "prop" ), provider );
provider.init();

long firstID;
Expand All @@ -79,7 +84,7 @@ public void shouldBeAbleToSpecifyEnglishAnalyzer() throws Exception
tx.success();
}

try ( ReadOnlyFulltext reader = provider.getReader( "bloomNodes", FulltextIndexType.NODES ) )
try ( ReadOnlyFulltext reader = provider.getReader( "bloomNodes", NODES ) )
{

assertFalse( reader.query( "and" ).hasNext() );
Expand All @@ -96,11 +101,12 @@ public void shouldBeAbleToSpecifyEnglishAnalyzer() throws Exception
public void shouldBeAbleToSpecifySwedishAnalyzer() throws Exception
{
GraphDatabaseAPI db = dbRule.getGraphDatabaseAPI();
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemRule, testDirectory.graphDbDir(), new SwedishAnalyzer() );
File storeDir = testDirectory.graphDbDir();
FulltextFactory fulltextFactory = new FulltextFactory( fileSystemRule, storeDir, new SwedishAnalyzer() );

try ( FulltextProvider provider = new FulltextProvider( db, LOG_SERVICE.getInternalLog( FulltextProvider.class ) ); )
try ( FulltextProvider provider = new FulltextProvider( db, LOG, availabilityGuard ); )
{
fulltextFactory.createFulltextIndex( "bloomNodes", FulltextIndexType.NODES, Arrays.asList( "prop" ), provider );
fulltextFactory.createFulltextIndex( "bloomNodes", NODES, singletonList( "prop" ), provider );
provider.init();

long firstID;
Expand All @@ -117,7 +123,7 @@ public void shouldBeAbleToSpecifySwedishAnalyzer() throws Exception
tx.success();
}

try ( ReadOnlyFulltext reader = provider.getReader( "bloomNodes", FulltextIndexType.NODES ) )
try ( ReadOnlyFulltext reader = provider.getReader( "bloomNodes", NODES ) )
{
assertEquals( firstID, reader.query( "and" ).next() );
assertEquals( firstID, reader.query( "in" ).next() );
Expand Down

0 comments on commit 9c398d9

Please sign in to comment.