Skip to content

Commit

Permalink
Uniqueness verification in partitioned index
Browse files Browse the repository at this point in the history
Implemented PartitionedUniquenessVerifier that is able to check value
uniqueness across multiple lucene index partitions. Internally it uses
MultiTerms to combine terms from all partitions. Later it checks document
frequency for each term in the same way SimpleUniquenessVerifier does this.
  • Loading branch information
lutovich committed Jan 25, 2016
1 parent 3f86685 commit b121045
Show file tree
Hide file tree
Showing 10 changed files with 1,066 additions and 38 deletions.
Expand Up @@ -19,14 +19,13 @@
*/
package org.neo4j.kernel.api.index;

import org.neo4j.helpers.Strings;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;

import static java.lang.String.format;

/**
* Thrown from update methods (eg. {@link IndexPopulator#add(long, Object)}, {@link IndexPopulator#update(Iterable)},
* and {@link IndexAccessor#updateAndCommit(Iterable)}) of an index that is unique when a conflicting entry (clashing
* with an existing value - violating uniqueness) is being added.
* Thrown when a conflicting entry (clashing with an existing value - violating uniqueness) is detected.
*/
public class PreexistingIndexEntryConflictException extends IndexEntryConflictException
{
Expand Down Expand Up @@ -103,9 +102,9 @@ public int hashCode()
public String toString()
{
return "PreexistingIndexEntryConflictException{" +
"propertyValue=" + propertyValue +
", addedNodeId=" + addedNodeId +
", existingNodeId=" + existingNodeId +
'}';
"propertyValue=" + Strings.prettyPrint( propertyValue ) +
", addedNodeId=" + addedNodeId +
", existingNodeId=" + existingNodeId +
'}';
}
}
71 changes: 46 additions & 25 deletions community/kernel/src/test/java/org/neo4j/test/Randoms.java
Expand Up @@ -133,6 +133,19 @@ public String string( int minLength, int maxLength, int characterSets )
return String.valueOf( chars );
}

public Object array()
{
int length = intBetween( configuration.arrayMinLength(), configuration.arrayMaxLength() );
byte componentType = propertyType( false );
Object itemType = propertyValue( componentType );
Object array = Array.newInstance( itemType.getClass(), length );
for ( int i = 0; i < length; i++ )
{
Array.set( array, i, propertyValue( componentType ) );
}
return array;
}

public char character( int characterSets )
{
int setCount = bitCount( characterSets );
Expand Down Expand Up @@ -194,26 +207,28 @@ private Object propertyValue( byte type )
{
switch ( type )
{
case 0: return random.nextBoolean();
case 1: return (byte)random.nextInt();
case 2: return (short)random.nextInt();
case 3: return character( CSA_LETTERS_AND_DIGITS );
case 4: return random.nextInt();
case 5: return random.nextLong();
case 6: return random.nextFloat();
case 7: return random.nextDouble();
case 8: return string();
case 0:
return random.nextBoolean();
case 1:
return (byte) random.nextInt();
case 2:
return (short) random.nextInt();
case 3:
return character( CSA_LETTERS_AND_DIGITS );
case 4:
return random.nextInt();
case 5:
return random.nextLong();
case 6:
return random.nextFloat();
case 7:
return random.nextDouble();
case 8:
return string();
case 9:
int length = intBetween( configuration.arrayMinLength(), configuration.arrayMaxLength() );
byte componentType = propertyType( false );
Object itemType = propertyValue( componentType );
Object array = Array.newInstance( itemType.getClass(), length );
for ( int i = 0; i < length; i++ )
{
Array.set( array, i, propertyValue( componentType ) );
}
return array;
default: throw new IllegalArgumentException( "Unknown value type " + type );
return array();
default:
throw new IllegalArgumentException( "Unknown value type " + type );
}
}

Expand All @@ -222,12 +237,18 @@ private char symbol()
int range = random.nextInt( 5 );
switch ( range )
{
case 0: return (char) intBetween( 33, 47 );
case 1: return (char) intBetween( 58, 64 );
case 2: return (char) intBetween( 91, 96 );
case 3: return (char) intBetween( 123, 126 );
case 4: return ' ';
default: throw new IllegalArgumentException( "Unknown symbol range " + range );
case 0:
return (char) intBetween( 33, 47 );
case 1:
return (char) intBetween( 58, 64 );
case 2:
return (char) intBetween( 91, 96 );
case 3:
return (char) intBetween( 123, 126 );
case 4:
return ' ';
default:
throw new IllegalArgumentException( "Unknown symbol range " + range );
}
}
}
Expand Up @@ -34,7 +34,7 @@
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.properties.Property;

class DuplicateCheckingCollector extends SimpleCollector
public class DuplicateCheckingCollector extends SimpleCollector
{
private final PropertyAccessor accessor;
private final int propertyKeyId;
Expand Down
Expand Up @@ -19,14 +19,46 @@
*/
package org.neo4j.kernel.api.impl.schema.verification;

import org.apache.lucene.index.Fields;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.MultiTerms;
import org.apache.lucene.index.ReaderSlice;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.neo4j.helpers.collection.Iterables;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.impl.index.partition.PartitionSearcher;
import org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure;
import org.neo4j.kernel.api.index.PropertyAccessor;

import static java.util.stream.Collectors.toList;

/**
* A {@link UniquenessVerifier} that is able to verify value uniqueness across multiple index partitions using
* corresponding {@link PartitionSearcher}s.
* <p>
* This verifier reads all terms from all partitions using {@link MultiTerms}, checks document frequency for each term
* and verifies uniqueness of values from the property store if document frequency is greater than 1.
*
* @see MultiTerms
* @see PartitionSearcher
* @see DuplicateCheckingCollector
*/
public class PartitionedUniquenessVerifier implements UniquenessVerifier
{
private final List<PartitionSearcher> searchers;
Expand All @@ -39,19 +71,110 @@ public PartitionedUniquenessVerifier( List<PartitionSearcher> searchers )
@Override
public void verify( PropertyAccessor accessor, int propKeyId ) throws IndexEntryConflictException, IOException
{
throw new UnsupportedOperationException();
for ( String field : allFields() )
{
if ( LuceneDocumentStructure.NODE_ID_KEY.equals( field ) )
{
continue;
}

TermsEnum terms = termsForField( field ).iterator();
BytesRef termsRef;
while ( (termsRef = terms.next()) != null )
{
if ( terms.docFreq() > 1 )
{
TermQuery query = new TermQuery( new Term( field, termsRef ) );
searchForDuplicates( query, accessor, propKeyId );
}
}
}
}

@Override
public void verify( PropertyAccessor accessor, int propKeyId, List<Object> updatedPropertyValues )
throws IndexEntryConflictException, IOException
{
throw new UnsupportedOperationException();
for ( Object propertyValue : updatedPropertyValues )
{
Query query = LuceneDocumentStructure.newSeekQuery( propertyValue );
searchForDuplicates( query, accessor, propKeyId );
}
}

@Override
public void close() throws IOException
{
IOUtils.closeAll( searchers );
}

private Terms termsForField( String fieldName ) throws IOException
{
List<Terms> terms = new ArrayList<>();
List<ReaderSlice> readerSlices = new ArrayList<>();

for ( LeafReader leafReader : allLeafReaders() )
{
Fields fields = leafReader.fields();

Terms leafTerms = fields.terms( fieldName );
if ( leafTerms != null )
{
ReaderSlice readerSlice = new ReaderSlice( 0, Math.toIntExact( leafTerms.size() ), 0 );
terms.add( leafTerms );
readerSlices.add( readerSlice );
}
}

Terms[] termsArray = terms.toArray( new Terms[terms.size()] );
ReaderSlice[] readerSlicesArray = readerSlices.toArray( new ReaderSlice[readerSlices.size()] );

return new MultiTerms( termsArray, readerSlicesArray );
}

private void searchForDuplicates( Query query, PropertyAccessor accessor, int propertyKeyId )
throws IOException, IndexEntryConflictException
{
try
{
/**
* Here {@link DuplicateCheckingCollector#reset()} is deliberately not called to preserve accumulated
* state (knowledge about duplicates) across all {@link IndexSearcher#search(Query, Collector)} calls.
*/
DuplicateCheckingCollector collector = new DuplicateCheckingCollector( accessor, propertyKeyId );
for ( PartitionSearcher searcher : searchers )
{
searcher.getIndexSearcher().search( query, collector );
}
}
catch ( IOException e )
{
Throwable cause = e.getCause();
if ( cause instanceof IndexEntryConflictException )
{
throw (IndexEntryConflictException) cause;
}
throw e;
}
}

private Set<String> allFields() throws IOException
{
Set<String> allFields = new HashSet<>();
for ( LeafReader leafReader : allLeafReaders() )
{
Iterables.addAll( allFields, leafReader.fields() );
}
return allFields;
}

private List<LeafReader> allLeafReaders()
{
return searchers.stream()
.map( PartitionSearcher::getIndexSearcher )
.map( IndexSearcher::getIndexReader )
.flatMap( indexReader -> indexReader.leaves().stream() )
.map( LeafReaderContext::reader )
.collect( toList() );
}
}
Expand Up @@ -36,8 +36,16 @@
import org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure;
import org.neo4j.kernel.api.index.PropertyAccessor;

import static org.neo4j.kernel.api.impl.schema.LuceneDocumentStructure.NODE_ID_KEY;

/**
* A {@link UniquenessVerifier} that is able to verify value uniqueness inside a single index partition using
* it's {@link PartitionSearcher}.
* <p>
* This verifier reads all terms, checks document frequency for each term and verifies uniqueness of values from the
* property store if document frequency is greater than 1.
*
* @see PartitionSearcher
* @see DuplicateCheckingCollector
*/
public class SimpleUniquenessVerifier implements UniquenessVerifier
{
private final PartitionSearcher partitionSearcher;
Expand All @@ -59,7 +67,7 @@ public void verify( PropertyAccessor accessor, int propKeyId ) throws IndexEntry
Fields fields = leafReaderContext.reader().fields();
for ( String field : fields )
{
if ( NODE_ID_KEY.equals( field ) )
if ( LuceneDocumentStructure.NODE_ID_KEY.equals( field ) )
{
continue;
}
Expand Down
Expand Up @@ -26,10 +26,35 @@
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.PropertyAccessor;

/**
* A component that verifies uniqueness of values in a lucene index.
* During uniqueness constraint creation we ensure that already existing data is unique using
* {@link #verify(PropertyAccessor, int)}.
* Since updates can be applied while index is being populated we need to verify them as well.
* Verification does not handle that automatically. They need to be collected in some way and then checked by
* {@link #verify(PropertyAccessor, int, List)}.
*/
public interface UniquenessVerifier extends Closeable
{
/**
* Verifies uniqueness of existing data.
*
* @param accessor the accessor to retrieve actual property values from the store.
* @param propKeyId the id of the property to verify.
* @throws IndexEntryConflictException if there are duplicates.
* @throws IOException when Lucene throws {@link IOException}.
*/
void verify( PropertyAccessor accessor, int propKeyId ) throws IndexEntryConflictException, IOException;

/**
* Verifies uniqueness of given values and existing data.
*
* @param accessor the accessor to retrieve actual property values from the store.
* @param propKeyId the id of the property to verify.
* @param updatedPropertyValues the values to check uniqueness for.
* @throws IndexEntryConflictException if there are duplicates.
* @throws IOException when Lucene throws {@link IOException}.
*/
void verify( PropertyAccessor accessor, int propKeyId, List<Object> updatedPropertyValues )
throws IndexEntryConflictException, IOException;
}

0 comments on commit b121045

Please sign in to comment.