Skip to content

Commit

Permalink
Encapsulate the FulltextFactory for the bloom FTS add-on
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Oct 5, 2017
1 parent 0e6ac85 commit a903fd0
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 175 deletions.
Expand Up @@ -25,11 +25,9 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;


import org.neo4j.function.Factory; import org.neo4j.function.Factory;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.api.exceptions.InvalidArgumentsException;
import org.neo4j.kernel.api.impl.index.IndexWriterConfigs; import org.neo4j.kernel.api.impl.index.IndexWriterConfigs;
import org.neo4j.kernel.api.impl.index.builder.LuceneIndexStorageBuilder; import org.neo4j.kernel.api.impl.index.builder.LuceneIndexStorageBuilder;
import org.neo4j.kernel.api.impl.index.partition.WritableIndexPartitionFactory; import org.neo4j.kernel.api.impl.index.partition.WritableIndexPartitionFactory;
Expand All @@ -38,11 +36,10 @@
/** /**
* Used for creating {@link LuceneFulltext} and registering those to a {@link FulltextProvider}. * Used for creating {@link LuceneFulltext} and registering those to a {@link FulltextProvider}.
*/ */
public class FulltextFactory class FulltextFactory
{ {
public static final String INDEX_DIR = "fulltext"; public static final String INDEX_DIR = "fulltext";
private final FileSystemAbstraction fileSystem; private final FileSystemAbstraction fileSystem;
private final FulltextProvider provider;
private final WritableIndexPartitionFactory partitionFactory; private final WritableIndexPartitionFactory partitionFactory;
private final File indexDir; private final File indexDir;
private final Analyzer analyzer; private final Analyzer analyzer;
Expand All @@ -53,15 +50,12 @@ public class FulltextFactory
* @param fileSystem The filesystem to use. * @param fileSystem The filesystem to use.
* @param storeDir Store directory of the database. * @param storeDir Store directory of the database.
* @param analyzerClassName The Lucene analyzer to use for the {@link LuceneFulltext} created by this factory. * @param analyzerClassName The Lucene analyzer to use for the {@link LuceneFulltext} created by this factory.
* @param provider The {@link FulltextProvider} to register the indexes with.
* @throws IOException * @throws IOException
*/ */
public FulltextFactory( FileSystemAbstraction fileSystem, File storeDir, String analyzerClassName, FulltextFactory( FileSystemAbstraction fileSystem, File storeDir, String analyzerClassName ) throws IOException
FulltextProvider provider ) throws IOException
{ {
this.analyzer = getAnalyzer( analyzerClassName ); this.analyzer = getAnalyzer( analyzerClassName );
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.provider = provider;
Factory<IndexWriterConfig> indexWriterConfigFactory = () -> IndexWriterConfigs.standard( analyzer ); Factory<IndexWriterConfig> indexWriterConfigFactory = () -> IndexWriterConfigs.standard( analyzer );
partitionFactory = new WritableIndexPartitionFactory( indexWriterConfigFactory ); partitionFactory = new WritableIndexPartitionFactory( indexWriterConfigFactory );
indexDir = new File( storeDir, INDEX_DIR ); indexDir = new File( storeDir, INDEX_DIR );
Expand All @@ -82,48 +76,21 @@ private Analyzer getAnalyzer( String analyzerClassName )
return analyzer; return analyzer;
} }


/** LuceneFulltext createFulltextIndex( String identifier, FulltextIndexType type, List<String> properties ) throws IOException
* Creates an instance of {@link LuceneFulltext} and registers it with the supplied {@link FulltextProvider}.
*
* @param identifier The identifier of the new fulltext index
* @param type The type of the new fulltext index
* @param properties The properties to index
* @throws IOException
*/
public void createFulltextIndex( String identifier, FulltextProvider.FulltextIndexType type, List<String> properties ) throws IOException
{ {
File indexRootFolder = new File( indexDir, identifier ); File indexRootFolder = new File( indexDir, identifier );
LuceneIndexStorageBuilder storageBuilder = LuceneIndexStorageBuilder.create(); LuceneIndexStorageBuilder storageBuilder = LuceneIndexStorageBuilder.create();
storageBuilder.withFileSystem( fileSystem ).withIndexFolder( indexRootFolder ); storageBuilder.withFileSystem( fileSystem ).withIndexFolder( indexRootFolder );
PartitionedIndexStorage storage = storageBuilder.build(); PartitionedIndexStorage storage = storageBuilder.build();
LuceneFulltext index = new LuceneFulltext( storage, partitionFactory, properties, analyzer, identifier, type ); return new LuceneFulltext( storage, partitionFactory, properties, analyzer, identifier, type );

provider.register( index );
} }


public void openFulltextIndex( String identifier, FulltextProvider.FulltextIndexType type ) throws IOException LuceneFulltext openFulltextIndex( String identifier, FulltextIndexType type ) throws IOException
{ {
File indexRootFolder = new File( indexDir, identifier ); File indexRootFolder = new File( indexDir, identifier );
LuceneIndexStorageBuilder storageBuilder = LuceneIndexStorageBuilder.create(); LuceneIndexStorageBuilder storageBuilder = LuceneIndexStorageBuilder.create();
storageBuilder.withFileSystem( fileSystem ).withIndexFolder( indexRootFolder ); storageBuilder.withFileSystem( fileSystem ).withIndexFolder( indexRootFolder );
PartitionedIndexStorage storage = storageBuilder.build(); PartitionedIndexStorage storage = storageBuilder.build();
LuceneFulltext index = new LuceneFulltext( storage, partitionFactory, analyzer, identifier, type ); return new LuceneFulltext( storage, partitionFactory, analyzer, identifier, type );

provider.register( index );
}

public void changeIndexedProperties( String identifier, FulltextProvider.FulltextIndexType type, List<String> propertyKeys )
throws IOException, InvalidArgumentsException
{
if ( propertyKeys.stream().anyMatch( s -> s.startsWith( FulltextProvider.LUCENE_FULLTEXT_ADDON_PREFIX ) ) )
{
throw new InvalidArgumentsException( "It is not possible to index property keys starting with " + FulltextProvider.LUCENE_FULLTEXT_ADDON_PREFIX );
}
Set<String> currentProperties = provider.getProperties( identifier, type );
if ( !currentProperties.containsAll( propertyKeys ) || !propertyKeys.containsAll( currentProperties ) )
{
provider.drop( identifier, type );
createFulltextIndex( identifier, type, propertyKeys );
}
} }
} }
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api.impl.fulltext;

/**
* Fulltext index type.
*/
public enum FulltextIndexType
{
NODES
{
@Override
public String toString()
{
return "Nodes";
}
},
RELATIONSHIPS
{
@Override
public String toString()
{
return "Relationships";
}
}
}
Expand Up @@ -19,10 +19,12 @@
*/ */
package org.neo4j.kernel.api.impl.fulltext; package org.neo4j.kernel.api.impl.fulltext;


import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -34,7 +36,9 @@
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.exceptions.InvalidArgumentsException;
import org.neo4j.kernel.api.index.InternalIndexState; import org.neo4j.kernel.api.index.InternalIndexState;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand All @@ -57,25 +61,33 @@ public class FulltextProvider implements AutoCloseable
private final Map<String,WritableFulltext> writableNodeIndices; private final Map<String,WritableFulltext> writableNodeIndices;
private final Map<String,WritableFulltext> writableRelationshipIndices; private final Map<String,WritableFulltext> writableRelationshipIndices;
private final FulltextUpdateApplier applier; private final FulltextUpdateApplier applier;
private final FulltextFactory factory;
private final ReadWriteLock configurationLock; private final ReadWriteLock configurationLock;


/** /**
* Creates a provider of fulltext indices for the given database. This is the entry point for all fulltext index operations. * Creates a provider of fulltext indices for the given database. This is the entry point for all fulltext index operations.
*
*
* @param db Database that this provider should work with. * @param db Database that this provider should work with.
* @param log For logging errors. * @param log For logging errors.
* @param availabilityGuard Used for waiting with populating the index until the database is available. * @param availabilityGuard Used for waiting with populating the index until the database is available.
* @param scheduler For background work. * @param scheduler For background work.
* @param transactionIdStore Used for checking if the store has had transactions applied to it, while the fulltext * @param transactionIdStore Used for checking if the store has had transactions applied to it, while the fulltext
* indexes have been disabled. If so, then the indexes will be rebuilt. * @param fileSystem The filesystem to use.
* @param storeDir Store directory of the database.
* @param analyzerClassName The Lucene analyzer to use for the {@link LuceneFulltext} created by this factory.
*/ */
public FulltextProvider( GraphDatabaseService db, Log log, AvailabilityGuard availabilityGuard, public FulltextProvider( GraphDatabaseService db, Log log, AvailabilityGuard availabilityGuard,
JobScheduler scheduler, TransactionIdStore transactionIdStore ) JobScheduler scheduler, TransactionIdStore transactionIdStore,
FileSystemAbstraction fileSystem, File storeDir,
String analyzerClassName ) throws IOException
{ {
this.db = db; this.db = db;
this.log = log; this.log = log;
this.transactionIdStore = transactionIdStore; this.transactionIdStore = transactionIdStore;
applier = new FulltextUpdateApplier( log, availabilityGuard, scheduler ); applier = new FulltextUpdateApplier( log, availabilityGuard, scheduler );
applier.start(); applier.start();
factory = new FulltextFactory( fileSystem, storeDir, analyzerClassName );
fulltextTransactionEventUpdater = new FulltextTransactionEventUpdater( this, log, applier ); fulltextTransactionEventUpdater = new FulltextTransactionEventUpdater( this, log, applier );
nodeProperties = ConcurrentHashMap.newKeySet(); nodeProperties = ConcurrentHashMap.newKeySet();
relationshipProperties = ConcurrentHashMap.newKeySet(); relationshipProperties = ConcurrentHashMap.newKeySet();
Expand All @@ -84,7 +96,7 @@ public FulltextProvider( GraphDatabaseService db, Log log, AvailabilityGuard ava
configurationLock = new ReentrantReadWriteLock( true ); configurationLock = new ReentrantReadWriteLock( true );
} }


public void init() throws IOException public void registerTransactionEventHandler() throws IOException
{ {
db.registerTransactionEventHandler( fulltextTransactionEventUpdater ); db.registerTransactionEventHandler( fulltextTransactionEventUpdater );
} }
Expand All @@ -108,7 +120,7 @@ private boolean matchesConfiguration( WritableFulltext index ) throws IOExceptio
* *
* Such population, where the entire store is scanned for data to write to the index, will be started if the index * Such population, where the entire store is scanned for data to write to the index, will be started if the index
* needs to recover after an unclean shut-down, or a configuration change. * needs to recover after an unclean shut-down, or a configuration change.
* @throws IOException If it was not possible to wait for the population to finish, for some reason. * @throws RuntimeException If it was not possible to wait for the population to finish, for some reason.
*/ */
public void awaitPopulation() public void awaitPopulation()
{ {
Expand All @@ -126,31 +138,20 @@ public void awaitPopulation()
} }
} }


/** public void openIndex( String identifier, FulltextIndexType type ) throws IOException
* Closes the provider and all associated resources.
*/
@Override
public void close()
{ {
db.unregisterTransactionEventHandler( fulltextTransactionEventUpdater ); LuceneFulltext index = factory.openFulltextIndex( identifier, type );
applier.stop(); register( index );
Consumer<WritableFulltext> fulltextCloser = luceneFulltextIndex ->
{
try
{
luceneFulltextIndex.saveConfiguration( transactionIdStore.getLastCommittedTransactionId() );
luceneFulltextIndex.close();
}
catch ( IOException e )
{
log.error( "Unable to close fulltext index.", e );
}
};
writableNodeIndices.values().forEach( fulltextCloser );
writableRelationshipIndices.values().forEach( fulltextCloser );
} }


void register( LuceneFulltext fulltextIndex ) throws IOException public void createIndex( String identifier, FulltextIndexType type, List<String> properties )
throws IOException
{
LuceneFulltext index = factory.createFulltextIndex( identifier, type, properties );
register( index );
}

private void register( LuceneFulltext fulltextIndex ) throws IOException
{ {
configurationLock.writeLock().lock(); configurationLock.writeLock().lock();
try try
Expand Down Expand Up @@ -222,13 +223,21 @@ Collection<WritableFulltext> writableRelationshipIndices()
*/ */
public ReadOnlyFulltext getReader( String identifier, FulltextIndexType type ) throws IOException public ReadOnlyFulltext getReader( String identifier, FulltextIndexType type ) throws IOException
{ {
if ( type == FulltextIndexType.NODES ) WritableFulltext writableFulltext = getIndexMap( type ).get( identifier );
if ( writableFulltext == null )
{ {
return writableNodeIndices.get( identifier ).getIndexReader(); throw new IllegalArgumentException( "No such " + type + " index '" + identifier + "'." );
} }
else return writableFulltext.getIndexReader();
}

private Map<String,WritableFulltext> getIndexMap( FulltextIndexType type )
{
switch ( type )
{ {
return writableRelationshipIndices.get( identifier ).getIndexReader(); case NODES: return writableNodeIndices;
case RELATIONSHIPS: return writableRelationshipIndices;
default: throw new IllegalArgumentException( "No such fulltext index type: " + type );
} }
} }


Expand Down Expand Up @@ -292,26 +301,51 @@ Lock readLockIndexConfiguration()
return lock; return lock;
} }


public void changeIndexedProperties( String identifier, FulltextIndexType type, List<String> propertyKeys )
throws IOException, InvalidArgumentsException
{
configurationLock.writeLock().lock();
try
{
if ( propertyKeys.stream().anyMatch( s -> s.startsWith( FulltextProvider.LUCENE_FULLTEXT_ADDON_PREFIX ) ) )
{
throw new InvalidArgumentsException( "It is not possible to index property keys starting with " + FulltextProvider.LUCENE_FULLTEXT_ADDON_PREFIX );
}
Set<String> currentProperties = getProperties( identifier, type );
if ( !currentProperties.containsAll( propertyKeys ) || !propertyKeys.containsAll( currentProperties ) )
{
drop( identifier, type );
createIndex( identifier, type, propertyKeys );
}
}
finally
{
configurationLock.writeLock().unlock();
}
}

/** /**
* Fulltext index type. * Closes the provider and all associated resources.
*/ */
public enum FulltextIndexType @Override
public void close()
{ {
NODES db.unregisterTransactionEventHandler( fulltextTransactionEventUpdater );
{ applier.stop();
@Override Consumer<WritableFulltext> fulltextCloser = luceneFulltextIndex ->
public String toString() {
{ try
return "Nodes"; {
} luceneFulltextIndex.saveConfiguration( transactionIdStore.getLastCommittedTransactionId() );
}, luceneFulltextIndex.close();
RELATIONSHIPS }
{ catch ( IOException e )
@Override {
public String toString() log.error( "Unable to close fulltext index.", e );
{ }
return "Relationships"; };
} writableNodeIndices.values().forEach( fulltextCloser );
} writableRelationshipIndices.values().forEach( fulltextCloser );
} }

} }
Expand Up @@ -42,13 +42,13 @@ class LuceneFulltext extends AbstractLuceneIndex
{ {
private final Analyzer analyzer; private final Analyzer analyzer;
private final String identifier; private final String identifier;
private final FulltextProvider.FulltextIndexType type; private final FulltextIndexType type;
private Set<String> properties; private Set<String> properties;
private volatile InternalIndexState state; private volatile InternalIndexState state;


LuceneFulltext( PartitionedIndexStorage indexStorage, IndexPartitionFactory partitionFactory, Collection<String> properties, Analyzer analyzer, LuceneFulltext( PartitionedIndexStorage indexStorage, IndexPartitionFactory partitionFactory, Collection<String> properties, Analyzer analyzer,
String identifier, String identifier,
FulltextProvider.FulltextIndexType type ) FulltextIndexType type )
{ {
super( indexStorage, partitionFactory ); super( indexStorage, partitionFactory );
this.properties = Collections.unmodifiableSet( new HashSet<>( properties ) ); this.properties = Collections.unmodifiableSet( new HashSet<>( properties ) );
Expand All @@ -59,7 +59,7 @@ class LuceneFulltext extends AbstractLuceneIndex
} }


LuceneFulltext( PartitionedIndexStorage indexStorage, WritableIndexPartitionFactory partitionFactory, Analyzer analyzer, String identifier, LuceneFulltext( PartitionedIndexStorage indexStorage, WritableIndexPartitionFactory partitionFactory, Analyzer analyzer, String identifier,
FulltextProvider.FulltextIndexType type ) throws IOException FulltextIndexType type ) throws IOException
{ {
this( indexStorage, partitionFactory, Collections.EMPTY_SET, analyzer, identifier, type ); this( indexStorage, partitionFactory, Collections.EMPTY_SET, analyzer, identifier, type );
this.properties = readProperties(); this.properties = readProperties();
Expand Down Expand Up @@ -126,7 +126,7 @@ ReadOnlyFulltext getIndexReader() throws IOException
return hasSinglePartition( partitions ) ? createSimpleReader( partitions ) : createPartitionedReader( partitions ); return hasSinglePartition( partitions ) ? createSimpleReader( partitions ) : createPartitionedReader( partitions );
} }


FulltextProvider.FulltextIndexType getType() FulltextIndexType getType()
{ {
return type; return type;
} }
Expand Down

0 comments on commit a903fd0

Please sign in to comment.