Skip to content

Commit

Permalink
Made batch inserter directly use lucene for legacy indexes
Browse files Browse the repository at this point in the history
Currently it uses lucene through a thin wrapper around lucene's IndexWriter
called ObsoleteLuceneIndexWriter. This is not needed since legacy indexes
can't use partitioned lucene index infrastructure.

Previously batch inserter used thin wrapper around lucene's IndexWriter that
was able to track number of documents in the index and throw meaningful
exceptions when 2B limit was reached. Previously used lucene version 3.6.2 did
not track this limit and failed during query execution with cryptic exceptions.
Currently such manual tracking functionality is not needed because updated
lucene version 5.3.1 tracks 2B limit on it's own.

This commit makes batch inserter use lucene APIs directly for legacy indexes
which are not partitioned index aware. ObsoleteLuceneIndexWriter class is
removed and it's test that models a commit-close race on IndexWriter is
rewritten using AbstractLuceneIndex.
  • Loading branch information
lutovich authored and MishaDemianenko committed Jan 21, 2016
1 parent e480e9b commit 159f24c
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 230 deletions.
Expand Up @@ -21,10 +21,12 @@

import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
Expand All @@ -38,29 +40,25 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.index.IndexHits;
import org.neo4j.helpers.collection.LruCache;
import org.neo4j.index.lucene.ValueContext;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.LegacyIndexHits;
import org.neo4j.kernel.api.impl.index.DocValuesCollector;
import org.neo4j.kernel.api.impl.index.IndexWriterFactories;
import org.neo4j.kernel.api.impl.index.ObsoleteLuceneIndexWriter;
import org.neo4j.kernel.impl.index.IndexEntityType;
import org.neo4j.kernel.impl.util.IoPrimitiveUtils;
import org.neo4j.unsafe.batchinsert.BatchInserterIndex;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

class LuceneBatchInserterIndex implements BatchInserterIndex
{
private final IndexIdentifier identifier;
private final IndexType type;

private ObsoleteLuceneIndexWriter writer;
private IndexWriter writer;
private SearcherManager searcherManager;
private final boolean createdNow;
private Map<String, LruCache<String, Collection<EntityId>>> cache;
Expand Down Expand Up @@ -250,17 +248,19 @@ private void removeFromCache( long entityId, String key, Object value )
}
}

private ObsoleteLuceneIndexWriter instantiateWriter( File directory )
private IndexWriter instantiateWriter( File folder )
{
Directory dir = null;
try
{
dir = LuceneDataSource.getDirectory( folder, identifier );
IndexWriterConfig writerConfig = new IndexWriterConfig( type.analyzer );
writerConfig.setRAMBufferSizeMB( determineGoodBufferSize( writerConfig.getRAMBufferSizeMB() ) );
Directory luceneDir = LuceneDataSource.getDirectory( directory, identifier );
return IndexWriterFactories.batchInsert( writerConfig ).create( luceneDir );
return new IndexWriter( dir, writerConfig );
}
catch ( IOException e )
{
IOUtils.closeAllSilently( dir );
throw new RuntimeException( e );
}
}
Expand All @@ -272,11 +272,11 @@ private double determineGoodBufferSize( double atLeast )
return Math.min( result, 700 );
}

private static SearcherManager instantiateSearcherManager( ObsoleteLuceneIndexWriter writer )
private static SearcherManager instantiateSearcherManager( IndexWriter writer )
{
try
{
return writer.createSearcherManager();
return new SearcherManager( writer, true, new SearcherFactory() );
}
catch ( IOException e )
{
Expand All @@ -288,7 +288,10 @@ private void closeSearcher()
{
try
{
this.searcherManager.close();
if ( searcherManager != null )
{
this.searcherManager.close();
}
}
catch ( IOException e )
{
Expand All @@ -306,7 +309,6 @@ private void closeWriter()
{
if ( this.writer != null )
{
this.writer.optimize();
this.writer.close();
}
}
Expand Down Expand Up @@ -483,10 +485,7 @@ public void flush()
{
try
{
while ( !searcherManager.maybeRefresh() )
{
LockSupport.parkNanos( MILLISECONDS.toNanos( 100 ) );
}
searcherManager.maybeRefreshBlocking();
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -77,6 +77,11 @@ public void open() throws IOException
open = true;
}

boolean isOpen()
{
return open;
}

public boolean exists() throws IOException
{
List<File> folders = indexStorage.listFolders();
Expand Down
Expand Up @@ -26,34 +26,22 @@
import org.neo4j.index.impl.lucene.legacy.MultipleBackupDeletionPolicy;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

public final class IndexWriterFactories
public final class IndexWriterConfigs
{

private static final int MAX_BUFFERED_DOCS =
FeatureToggles.getInteger( IndexWriterFactories.class, "max_buffered_docs", 100000 );
FeatureToggles.getInteger( IndexWriterConfigs.class, "max_buffered_docs", 100000 );
private static final int MERGE_POLICY_MERGE_FACTOR =
FeatureToggles.getInteger( IndexWriterFactories.class, "merge.factor", 2 );
FeatureToggles.getInteger( IndexWriterConfigs.class, "merge.factor", 2 );
private static final double MERGE_POLICY_NO_CFS_RATIO =
FeatureToggles.getDouble( IndexWriterFactories.class, "nocfs.ratio", 1.0 );
FeatureToggles.getDouble( IndexWriterConfigs.class, "nocfs.ratio", 1.0 );
private static final double MERGE_POLICY_MIN_MERGE_MB =
FeatureToggles.getDouble( IndexWriterFactories.class, "min.merge", 0.1 );
FeatureToggles.getDouble( IndexWriterConfigs.class, "min.merge", 0.1 );

private IndexWriterFactories()
private IndexWriterConfigs()
{
throw new AssertionError( "Not for instantiation!" );
}


public static IndexWriterFactory<ObsoleteLuceneIndexWriter> standard()
{
return directory -> new ObsoleteLuceneIndexWriter( directory, standardConfig() );
}

public static IndexWriterFactory<ObsoleteLuceneIndexWriter> batchInsert( final IndexWriterConfig config )
{
return directory -> new ObsoleteLuceneIndexWriter( directory, config );
}

public static IndexWriterConfig standardConfig()
{
IndexWriterConfig writerConfig = new IndexWriterConfig( LuceneDataSource.KEYWORD_ANALYZER );
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -30,7 +30,7 @@

import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.impl.index.IndexWriterFactories;
import org.neo4j.kernel.api.impl.index.IndexWriterConfigs;
import org.neo4j.kernel.api.impl.index.backup.LuceneIndexSnapshotFileIterator;

public class IndexPartition implements Closeable
Expand All @@ -44,7 +44,7 @@ public IndexPartition( File indexDirectory, Directory directory ) throws IOExcep
{
this.indexDirectory = indexDirectory;
this.directory = directory;
this.indexWriter = new IndexWriter( directory, IndexWriterFactories.standardConfig() );
this.indexWriter = new IndexWriter( directory, IndexWriterConfigs.standardConfig() );
this.searcherManager = new SearcherManager( indexWriter, true, new SearcherFactory() );
}

Expand Down

0 comments on commit 159f24c

Please sign in to comment.