Skip to content

Commit

Permalink
Refactor lucene index populators
Browse files Browse the repository at this point in the history
Refactoring includes two things:
 - remove batching from NonUniqueLuceneIndexPopulator because similar batching
   already happens in BatchingMultipleIndexPopulator higher in the stack;
 - pull out populating updaters from index populators, make them top-level
   classes and add tests; this is done to avoid code duplication and for
   better testability.
  • Loading branch information
lutovich committed Feb 22, 2016
1 parent f0ef821 commit ef74c27
Show file tree
Hide file tree
Showing 8 changed files with 741 additions and 143 deletions.
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api.impl.schema.populator;

import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;

import java.io.IOException;

import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure;
import org.neo4j.kernel.api.impl.schema.writer.LuceneIndexWriter;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.NodePropertyUpdate;
import org.neo4j.kernel.impl.api.index.UpdateMode;

/**
* An {@link IndexUpdater} used while index population is in progress. Takes special care of node property additions
* and changes applying them via {@link LuceneIndexWriter#updateDocument(Term, Document)} to make sure no duplicated
* documents are inserted.
*/
public abstract class LuceneIndexPopulatingUpdater implements IndexUpdater
{
private final LuceneIndexWriter writer;

public LuceneIndexPopulatingUpdater( LuceneIndexWriter writer )
{
this.writer = writer;
}

@Override
public void process( NodePropertyUpdate update ) throws IOException, IndexEntryConflictException
{
long nodeId = update.getNodeId();

switch ( update.getUpdateMode() )
{
case ADDED:
added( update );
writer.updateDocument( LuceneDocumentStructure.newTermForChangeOrRemove( nodeId ),
LuceneDocumentStructure.documentRepresentingProperty( nodeId, update.getValueAfter() ) );
break;
case CHANGED:
changed( update );
writer.updateDocument( LuceneDocumentStructure.newTermForChangeOrRemove( nodeId ),
LuceneDocumentStructure.documentRepresentingProperty( nodeId, update.getValueAfter() ) );
break;
case REMOVED:
removed( update );
writer.deleteDocuments( LuceneDocumentStructure.newTermForChangeOrRemove( nodeId ) );
break;
default:
throw new IllegalStateException( "Unknown update mode " + update.getUpdateMode() );
}
}

@Override
public final void remove( PrimitiveLongSet nodeIds )
{
throw new UnsupportedOperationException( "Should not remove from populating index" );
}

/**
* Method is invoked when {@link NodePropertyUpdate} with {@link UpdateMode#ADDED} is processed.
*
* @param update the update being processed.
*/
protected abstract void added( NodePropertyUpdate update );

/**
* Method is invoked when {@link NodePropertyUpdate} with {@link UpdateMode#CHANGED} is processed.
*
* @param update the update being processed.
*/
protected abstract void changed( NodePropertyUpdate update );

/**
* Method is invoked when {@link NodePropertyUpdate} with {@link UpdateMode#REMOVED} is processed.
*
* @param update the update being processed.
*/
protected abstract void removed( NodePropertyUpdate update );
}
Expand Up @@ -25,13 +25,17 @@
import java.util.Iterator;
import java.util.List;

import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure;
import org.neo4j.kernel.api.impl.schema.LuceneSchemaIndex;
import org.neo4j.kernel.api.impl.schema.writer.LuceneIndexWriter;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.NodePropertyUpdate;

/**
* An {@link IndexPopulator} used to create, populate and mark as online a Lucene schema index.
*/
public abstract class LuceneIndexPopulator implements IndexPopulator
{
protected LuceneSchemaIndex luceneIndex;
Expand Down Expand Up @@ -75,13 +79,12 @@ public void close( boolean populationCompletedSuccessfully ) throws IOException
{
if ( populationCompletedSuccessfully )
{
flush();
luceneIndex.markAsOnline();
}
}
finally
{
luceneIndex.close();
IOUtils.closeAllSilently( luceneIndex );
}
}

Expand All @@ -91,8 +94,6 @@ public void markAsFailed( String failure ) throws IOException
luceneIndex.markAsFailed( failure );
}

protected abstract void flush() throws IOException;

private static Document updateAsDocument( NodePropertyUpdate update )
{
return LuceneDocumentStructure.documentRepresentingProperty( update.getNodeId(), update.getValueAfter() );
Expand Down
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api.impl.schema.populator;

import org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure;
import org.neo4j.kernel.api.impl.schema.writer.LuceneIndexWriter;
import org.neo4j.kernel.api.index.NodePropertyUpdate;
import org.neo4j.kernel.impl.api.index.sampling.NonUniqueIndexSampler;

/**
* A {@link LuceneIndexPopulatingUpdater} used for non-unique Lucene schema indexes.
*/
public class NonUniqueLuceneIndexPopulatingUpdater extends LuceneIndexPopulatingUpdater
{
private final NonUniqueIndexSampler sampler;

public NonUniqueLuceneIndexPopulatingUpdater( LuceneIndexWriter writer, NonUniqueIndexSampler sampler )
{
super( writer );
this.sampler = sampler;
}

@Override
protected void added( NodePropertyUpdate update )
{
String encodedValue = LuceneDocumentStructure.encodedStringValue( update.getValueAfter() );
sampler.include( encodedValue );
}

@Override
protected void changed( NodePropertyUpdate update )
{
String encodedValueBefore = LuceneDocumentStructure.encodedStringValue( update.getValueBefore() );
sampler.exclude( encodedValueBefore );

String encodedValueAfter = LuceneDocumentStructure.encodedStringValue( update.getValueAfter() );
sampler.include( encodedValueAfter );
}

@Override
protected void removed( NodePropertyUpdate update )
{
String removedValue = LuceneDocumentStructure.encodedStringValue( update.getValueBefore() );
sampler.exclude( removedValue );
}

@Override
public void close()
{
}
}
Expand Up @@ -20,10 +20,7 @@
package org.neo4j.kernel.api.impl.schema.populator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure;
import org.neo4j.kernel.api.impl.schema.LuceneSchemaIndex;
Expand All @@ -33,14 +30,14 @@
import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig;
import org.neo4j.kernel.impl.api.index.sampling.NonUniqueIndexSampler;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

/**
* A {@link LuceneIndexPopulator} used for non-unique Lucene schema indexes.
* Performs sampling using {@link NonUniqueIndexSampler}.
*/
public class NonUniqueLuceneIndexPopulator extends LuceneIndexPopulator
{
private final int queueThreshold = FeatureToggles.getInteger( NonUniqueLuceneIndexPopulator.class,
"queueThreshold", 10000 );
private final NonUniqueIndexSampler sampler;
private final List<NodePropertyUpdate> updates = new ArrayList<>();

public NonUniqueLuceneIndexPopulator( LuceneSchemaIndex luceneIndex, IndexSamplingConfig samplingConfig )
{
Expand All @@ -57,53 +54,7 @@ public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexE
@Override
public IndexUpdater newPopulatingUpdater( PropertyAccessor propertyAccessor ) throws IOException
{
return new IndexUpdater()
{
@Override
public void process( NodePropertyUpdate update ) throws IOException, IndexEntryConflictException
{
switch ( update.getUpdateMode() )
{
case ADDED:
// We don't look at the "before" value, so adding and changing idempotently is done the same way.
String encodedValue = LuceneDocumentStructure.encodedStringValue( update.getValueAfter() );
sampler.include( encodedValue );
break;
case CHANGED:
// We don't look at the "before" value, so adding and changing idempotently is done the same way.
String encodedValueBefore = LuceneDocumentStructure.encodedStringValue( update.getValueBefore() );
sampler.exclude( encodedValueBefore );
String encodedValueAfter = LuceneDocumentStructure.encodedStringValue( update.getValueAfter() );
sampler.include( encodedValueAfter );
break;
case REMOVED:
String removedValue = LuceneDocumentStructure.encodedStringValue( update.getValueBefore() );
sampler.exclude( removedValue );
break;
default:
throw new IllegalStateException( "Unknown update mode " + update.getUpdateMode() );
}

updates.add( update );
}

@Override
public void close() throws IOException, IndexEntryConflictException
{
if ( updates.size() > queueThreshold )
{
flush();
updates.clear();
}

}

@Override
public void remove( PrimitiveLongSet nodeIds ) throws IOException
{
throw new UnsupportedOperationException( "Should not remove() from populating index." );
}
};
return new NonUniqueLuceneIndexPopulatingUpdater( writer, sampler );
}

@Override
Expand All @@ -117,27 +68,4 @@ public IndexSample sampleResult()
{
return sampler.result();
}

@Override
protected void flush() throws IOException
{
for ( NodePropertyUpdate update : this.updates )
{
long nodeId = update.getNodeId();
switch ( update.getUpdateMode() )
{
case ADDED:
case CHANGED:
// We don't look at the "before" value, so adding and changing idempotently is done the same way.
writer.updateDocument( LuceneDocumentStructure.newTermForChangeOrRemove( nodeId ),
LuceneDocumentStructure.documentRepresentingProperty( nodeId, update.getValueAfter() ) );
break;
case REMOVED:
writer.deleteDocuments( LuceneDocumentStructure.newTermForChangeOrRemove( nodeId ) );
break;
default:
throw new IllegalStateException( "Unknown update mode " + update.getUpdateMode() );
}
}
}
}

0 comments on commit ef74c27

Please sign in to comment.