Skip to content

Commit

Permalink
Open schema and label indexes in read only mode in case database star…
Browse files Browse the repository at this point in the history
…ted in read only mode

Using lucene index in write mode can cause index corruption/complete deletion in cases if user code use/rely on interrupts.
To avoid such cases as much as possible indexes will be opened in read only mode in case if database was started with read only mode (not in ha and core edge cases since we still need to write into index while we catching up)
As result after interruption lucene will raise ClosedByInterruptException exception as soon as code will try to get something from index again, but index itself will be untouched.
  • Loading branch information
MishaDemianenko committed Jul 29, 2016
1 parent 229c37b commit 2770c54
Show file tree
Hide file tree
Showing 63 changed files with 2,047 additions and 424 deletions.
Expand Up @@ -50,6 +50,7 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.factory.OperationalMode;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.pagecache.ConfiguringPageCacheFactory;
import org.neo4j.kernel.impl.store.NeoStores;
Expand Down Expand Up @@ -145,13 +146,14 @@ public Result runFullConsistencyCheck( final File storeDir, Config tuningConfigu
try
{
IndexStoreView indexStoreView = new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, neoStores );
OperationalMode operationalMode = OperationalMode.single;
labelScanStore = new LuceneLabelScanStoreBuilder(
storeDir, fullStoreLabelUpdateStream( () -> indexStoreView ),
fileSystem, logProvider ).build();
fileSystem, consistencyCheckerConfig, operationalMode, logProvider ).build();
SchemaIndexProvider indexes = new LuceneSchemaIndexProvider(
fileSystem,
DirectoryFactory.PERSISTENT,
storeDir );
storeDir, consistencyCheckerConfig, operationalMode );

int numberOfThreads = defaultConsistencyCheckThreadsNumber();
Statistics statistics;
Expand Down
Expand Up @@ -46,9 +46,11 @@
import org.neo4j.kernel.api.impl.index.storage.DirectoryFactory;
import org.neo4j.kernel.api.impl.schema.LuceneSchemaIndexProvider;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.factory.OperationalMode;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
Expand Down Expand Up @@ -148,23 +150,28 @@ public DirectStoreAccess directStoreAccess()
nativeStores.initialize();
IndexStoreView indexStoreView =
new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, nativeStores.getRawNeoStores() );
Config config = Config.empty();
OperationalMode operationalMode = OperationalMode.single;
directStoreAccess = new DirectStoreAccess(
nativeStores,
new LuceneLabelScanStoreBuilder(
directory,
fullStoreLabelUpdateStream( () -> indexStoreView ),
fileSystem,
config,
operationalMode,
FormattedLogProvider.toOutputStream( System.out )
).build(),
createIndexes( fileSystem )
createIndexes( fileSystem, config, operationalMode )
);
}
return directStoreAccess;
}

private SchemaIndexProvider createIndexes( FileSystemAbstraction fileSystem )
private SchemaIndexProvider createIndexes( FileSystemAbstraction fileSystem, Config config, OperationalMode operationalMode )
{
return new LuceneSchemaIndexProvider( fileSystem, DirectoryFactory.PERSISTENT, directory );
return new LuceneSchemaIndexProvider( fileSystem, DirectoryFactory.PERSISTENT, directory, config,
operationalMode );
}

public File directory()
Expand Down
Expand Up @@ -23,12 +23,14 @@
import java.io.IOException;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.api.impl.labelscan.LuceneLabelScanIndex;
import org.neo4j.kernel.api.impl.labelscan.LabelScanIndex;
import org.neo4j.kernel.api.impl.labelscan.LuceneLabelScanIndexBuilder;
import org.neo4j.kernel.api.impl.labelscan.LuceneLabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.FullStoreChangeStream;
import org.neo4j.kernel.impl.factory.OperationalMode;
import org.neo4j.logging.LogProvider;

/**
Expand All @@ -43,16 +45,20 @@ public class LuceneLabelScanStoreBuilder
private final File storeDir;
private final FullStoreChangeStream fullStoreStream;
private final FileSystemAbstraction fileSystem;
private final Config config;
private final OperationalMode operationalMode;
private final LogProvider logProvider;

private LuceneLabelScanStore labelScanStore;

public LuceneLabelScanStoreBuilder( File storeDir, FullStoreChangeStream fullStoreStream,
FileSystemAbstraction fileSystem, LogProvider logProvider )
FileSystemAbstraction fileSystem, Config config, OperationalMode operationalMode, LogProvider logProvider )
{
this.storeDir = storeDir;
this.fullStoreStream = fullStoreStream;
this.fileSystem = fileSystem;
this.config = config;
this.operationalMode = operationalMode;
this.logProvider = logProvider;
}

Expand All @@ -61,9 +67,11 @@ public LabelScanStore build()
if ( null == labelScanStore )
{
// TODO: Replace with kernel extension based lookup
LuceneLabelScanIndex index = LuceneLabelScanIndexBuilder.create()
LabelScanIndex index = LuceneLabelScanIndexBuilder.create()
.withFileSystem( fileSystem )
.withIndexRootFolder( LabelScanStoreProvider.getStoreDirectory( storeDir ) )
.withConfig( config )
.withOperationalMode( operationalMode )
.build();
labelScanStore = new LuceneLabelScanStore( index, fullStoreStream,
logProvider, LuceneLabelScanStore.Monitor.EMPTY );
Expand Down
Expand Up @@ -21,10 +21,8 @@

import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -33,15 +31,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;

import org.neo4j.function.Factory;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.impl.index.partition.IndexPartition;
import org.neo4j.kernel.api.impl.index.partition.AbstractIndexPartition;
import org.neo4j.kernel.api.impl.index.partition.IndexPartitionFactory;
import org.neo4j.kernel.api.impl.index.partition.PartitionSearcher;
import org.neo4j.kernel.api.impl.index.storage.PartitionedIndexStorage;

Expand All @@ -50,24 +47,24 @@
/**
* Abstract implementation of a partitioned index.
* Such index may consist of one or multiple separate Lucene indexes that are represented as independent
* {@link IndexPartition partitions}.
* {@link AbstractIndexPartition partitions}.
* Class and it's subclasses should not be directly used, instead please use corresponding writable or read only
* wrapper.
* @see WritableAbstractLuceneIndex
* @see ReadOnlyAbstractLuceneIndex
*/
public abstract class AbstractLuceneIndex implements Closeable
public abstract class AbstractLuceneIndex
{
// lock used to guard commits and close of lucene indexes from separate threads
protected final ReentrantLock commitCloseLock = new ReentrantLock();
// lock guard concurrent creation of new partitions
protected final ReentrantLock partitionsLock = new ReentrantLock();

protected final PartitionedIndexStorage indexStorage;
private final Factory<IndexWriterConfig> writerConfigFactory;
private List<IndexPartition> partitions = new CopyOnWriteArrayList<>();
private final IndexPartitionFactory partitionFactory;
private List<AbstractIndexPartition> partitions = new CopyOnWriteArrayList<>();
private volatile boolean open;

public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, Factory<IndexWriterConfig> writerConfigFactory )
public AbstractLuceneIndex( PartitionedIndexStorage indexStorage, IndexPartitionFactory partitionFactory )
{
this.indexStorage = indexStorage;
this.writerConfigFactory = writerConfigFactory;
this.partitionFactory = partitionFactory;
}

/**
Expand Down Expand Up @@ -97,8 +94,7 @@ public void open() throws IOException
Map<File,Directory> indexDirectories = indexStorage.openIndexDirectories();
for ( Map.Entry<File,Directory> indexDirectory : indexDirectories.entrySet() )
{
partitions.add( new IndexPartition( indexDirectory.getKey(), indexDirectory.getValue(),
writerConfigFactory.newInstance() ) );
partitions.add( partitionFactory.createPartition( indexDirectory.getKey(), indexDirectory.getValue() ) );
}
open = true;
}
Expand Down Expand Up @@ -194,35 +190,18 @@ public void drop() throws IOException
*/
public void flush() throws IOException
{
commitCloseLock.lock();
try
List<AbstractIndexPartition> partitions = getPartitions();
for ( AbstractIndexPartition partition : partitions )
{
List<IndexPartition> partitions = getPartitions();
for ( IndexPartition partition : partitions )
{
partition.getIndexWriter().commit();
}
}
finally
{
commitCloseLock.unlock();
partition.getIndexWriter().commit();
}
}

@Override
public void close() throws IOException
{
commitCloseLock.lock();
try
{
IOUtils.closeAll( partitions );
partitions.clear();
open = false;
}
finally
{
commitCloseLock.unlock();
}
IOUtils.closeAll( partitions );
partitions.clear();
open = false;
}

/**
Expand All @@ -233,32 +212,24 @@ public void close() throws IOException
public LuceneAllDocumentsReader allDocumentsReader()
{
ensureOpen();
partitionsLock.lock();
List<PartitionSearcher> searchers = new ArrayList<>( partitions.size() );
try
{
List<PartitionSearcher> searchers = new ArrayList<>( partitions.size() );
try
for ( AbstractIndexPartition partition : partitions )
{
for ( IndexPartition partition : partitions )
{
searchers.add( partition.acquireSearcher() );
}
searchers.add( partition.acquireSearcher() );
}

List<LucenePartitionAllDocumentsReader> partitionReaders = searchers.stream()
.map( LucenePartitionAllDocumentsReader::new )
.collect( toList() );
List<LucenePartitionAllDocumentsReader> partitionReaders = searchers.stream()
.map( LucenePartitionAllDocumentsReader::new )
.collect( toList() );

return new LuceneAllDocumentsReader( partitionReaders );
}
catch ( IOException e )
{
IOUtils.closeAllSilently( searchers );
throw new UncheckedIOException( e );
}
return new LuceneAllDocumentsReader( partitionReaders );
}
finally
catch ( IOException e )
{
partitionsLock.unlock();
IOUtils.closeAllSilently( searchers );
throw new UncheckedIOException( e );
}
}

Expand All @@ -272,13 +243,12 @@ public LuceneAllDocumentsReader allDocumentsReader()
public ResourceIterator<File> snapshot() throws IOException
{
ensureOpen();
commitCloseLock.lock();
List<ResourceIterator<File>> snapshotIterators = null;
try
{
List<IndexPartition> partitions = getPartitions();
List<AbstractIndexPartition> partitions = getPartitions();
snapshotIterators = new ArrayList<>( partitions.size() );
for ( IndexPartition partition : partitions )
for ( AbstractIndexPartition partition : partitions )
{
snapshotIterators.add( partition.snapshot() );
}
Expand All @@ -299,10 +269,6 @@ public ResourceIterator<File> snapshot() throws IOException
}
throw e;
}
finally
{
commitCloseLock.unlock();
}
}

/**
Expand All @@ -312,7 +278,6 @@ public ResourceIterator<File> snapshot() throws IOException
*/
public void maybeRefreshBlocking() throws IOException
{
partitionsLock.lock();
try
{
getPartitions().parallelStream().forEach( this::maybeRefreshPartition );
Expand All @@ -321,13 +286,9 @@ public void maybeRefreshBlocking() throws IOException
{
throw e.getCause();
}
finally
{
partitionsLock.unlock();
}
}

private void maybeRefreshPartition( IndexPartition partition )
private void maybeRefreshPartition( AbstractIndexPartition partition )
{
try
{
Expand All @@ -339,18 +300,18 @@ private void maybeRefreshPartition( IndexPartition partition )
}
}

public List<IndexPartition> getPartitions()
public List<AbstractIndexPartition> getPartitions()
{
ensureOpen();
return partitions;
}

public boolean hasSinglePartition( List<IndexPartition> partitions )
public boolean hasSinglePartition( List<AbstractIndexPartition> partitions )
{
return partitions.size() == 1;
}

public IndexPartition getFirstPartition( List<IndexPartition> partitions )
public AbstractIndexPartition getFirstPartition( List<AbstractIndexPartition> partitions )
{
return partitions.get( 0 );
}
Expand All @@ -361,23 +322,14 @@ public IndexPartition getFirstPartition( List<IndexPartition> partitions )
* @return newly created partition
* @throws IOException
*/
public IndexPartition addNewPartition() throws IOException
AbstractIndexPartition addNewPartition() throws IOException
{
ensureOpen();
partitionsLock.lock();
try
{
File partitionFolder = createNewPartitionFolder();
Directory directory = indexStorage.openDirectory( partitionFolder );
IndexPartition indexPartition = new IndexPartition( partitionFolder, directory,
writerConfigFactory.newInstance() );
partitions.add( indexPartition );
return indexPartition;
}
finally
{
partitionsLock.unlock();
}
File partitionFolder = createNewPartitionFolder();
Directory directory = indexStorage.openDirectory( partitionFolder );
AbstractIndexPartition indexPartition = partitionFactory.createPartition( partitionFolder, directory );
partitions.add( indexPartition );
return indexPartition;
}

protected void ensureOpen()
Expand All @@ -397,12 +349,12 @@ protected void ensureNotOpen()
}
}

protected static List<PartitionSearcher> acquireSearchers( List<IndexPartition> partitions ) throws IOException
protected static List<PartitionSearcher> acquireSearchers( List<AbstractIndexPartition> partitions ) throws IOException
{
List<PartitionSearcher> searchers = new ArrayList<>( partitions.size() );
try
{
for ( IndexPartition partition : partitions )
for ( AbstractIndexPartition partition : partitions )
{
searchers.add( partition.acquireSearcher() );
}
Expand Down

0 comments on commit 2770c54

Please sign in to comment.