Skip to content

Commit

Permalink
Split sampling and insertion during index population
Browse files Browse the repository at this point in the history
Currently index sampling is done implicitly during index population inside
IndexPopulator#add() method. This is true for unique, non-unique and test
in-memory index. All three respect this implicit contract. At the same time
method IndexPopulator#sampleResult() exists and thus sampling during population
relies on both explicit and implicit parts of the IndexPopulator interface.

Glued sampling and insertion turned out to be a problem for multi-threaded
index population. Index samplers are not thread safe but they were updated
inside IndexPopulator#add() which is executed concurrently by different threads.

This commit splits sampling and addition to the index. New method
IndexPopulator#includeSample() is added. It is called from the thread
that performs store scan during population. This removes need to make index
samplers (UniqueIndexSampler and NonUniqueIndexSampler) thread safe.
  • Loading branch information
lutovich committed Feb 15, 2016
1 parent 496f268 commit 78e5d21
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 31 deletions.
Expand Up @@ -108,6 +108,8 @@ void add( List<NodePropertyUpdate> updates )
*/ */
void markAsFailed( String failure ) throws IOException; void markAsFailed( String failure ) throws IOException;


void includeSample( NodePropertyUpdate update );

long sampleResult( DoubleLong.Out result ); long sampleResult( DoubleLong.Out result );


class Adapter implements IndexPopulator class Adapter implements IndexPopulator
Expand Down Expand Up @@ -148,6 +150,11 @@ public void markAsFailed( String failure )
{ {
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
}

@Override @Override
public long sampleResult( DoubleLong.Out result ) public long sampleResult( DoubleLong.Out result )
{ {
Expand Down
Expand Up @@ -307,6 +307,12 @@ public void markAsFailed( String failure ) throws IOException
throw new UnsupportedOperationException( "Multiple index populator can't be marked as failed." ); throw new UnsupportedOperationException( "Multiple index populator can't be marked as failed." );
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
throw new UnsupportedOperationException( "Multiple index populator can't perform index sampling." );
}

@Override @Override
public long sampleResult( Out result ) public long sampleResult( Out result )
{ {
Expand Down Expand Up @@ -503,6 +509,7 @@ private void add( NodePropertyUpdate update )
{ {
if ( isApplicable( update ) ) if ( isApplicable( update ) )
{ {
populator.includeSample( update );
addApplicable( update ); addApplicable( update );
} }
} }
Expand Down
Expand Up @@ -221,7 +221,6 @@ public void add( List<NodePropertyUpdate> updates )
{ {
ReadOperations statement = ctxSupplier.get().readOperations(); ReadOperations statement = ctxSupplier.get().readOperations();
updatesCommitted.add( update ); updatesCommitted.add( update );
addValueToSample( update.getNodeId(), update.getValueAfter() );
} }
} }


Expand Down Expand Up @@ -265,6 +264,12 @@ public void markAsFailed( String failure )
{ {
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
addValueToSample( update.getNodeId(), update.getValueAfter() );
}

@Override @Override
public long sampleResult( Register.DoubleLong.Out result ) public long sampleResult( Register.DoubleLong.Out result )
{ {
Expand Down
Expand Up @@ -117,10 +117,13 @@ public void shouldPopulateIndexWithOneNode() throws Exception
job.run(); job.run();


// THEN // THEN
NodePropertyUpdate update = add( nodeId, 0, value, new long[]{0} );

verify( populator ).create(); verify( populator ).create();
verify( populator ).add( singletonList( add( nodeId, 0, value, new long[]{0} ) ) ); verify( populator ).includeSample( update );
verify( populator ).add( singletonList( update ) );
verify( populator ).verifyDeferredConstraints( indexStoreView ); verify( populator ).verifyDeferredConstraints( indexStoreView );
verify( populator ).sampleResult( any( DoubleLong.Out.class) ); verify( populator ).sampleResult( any( DoubleLong.Out.class ) );
verify( populator ).close( true ); verify( populator ).close( true );


verifyNoMoreInteractions( populator ); verifyNoMoreInteractions( populator );
Expand Down Expand Up @@ -160,11 +163,16 @@ public void shouldPopulateIndexWithASmallDataset() throws Exception
job.run(); job.run();


// THEN // THEN
NodePropertyUpdate update1 = add( node1, 0, value, new long[]{0} );
NodePropertyUpdate update2 = add( node4, 0, value, new long[]{0} );

verify( populator ).create(); verify( populator ).create();
verify( populator ).add( Collections.singletonList( add( node1, 0, value, new long[]{0} ) ) ); verify( populator ).includeSample( update1 );
verify( populator ).add( Collections.singletonList( add( node4, 0, value, new long[]{0} ) ) ); verify( populator ).add( Collections.singletonList( update1 ) );
verify( populator ).includeSample( update2 );
verify( populator ).add( Collections.singletonList( update2 ) );
verify( populator ).verifyDeferredConstraints( indexStoreView ); verify( populator ).verifyDeferredConstraints( indexStoreView );
verify( populator ).sampleResult( any( DoubleLong.Out.class) ); verify( populator ).sampleResult( any( DoubleLong.Out.class ) );
verify( populator ).close( true ); verify( populator ).close( true );


verifyNoMoreInteractions( populator ); verifyNoMoreInteractions( populator );
Expand Down
Expand Up @@ -237,6 +237,7 @@ public void shouldDeliverUpdatesThatOccurDuringPopulationToPopulator() throws Ex
assertEquals( InternalIndexState.ONLINE, proxy.getState() ); assertEquals( InternalIndexState.ONLINE, proxy.getState() );
InOrder order = inOrder( populator, accessor, updater); InOrder order = inOrder( populator, accessor, updater);
order.verify( populator ).create(); order.verify( populator ).create();
order.verify( populator ).includeSample( add( 1, "value1" ) );
order.verify( populator ).add( singletonList( add( 1, "value1" ) ) ); order.verify( populator ).add( singletonList( add( 1, "value1" ) ) );




Expand Down
Expand Up @@ -173,6 +173,11 @@ public void markAsFailed( String failureString )
state = InternalIndexState.FAILED; state = InternalIndexState.FAILED;
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
}

@Override @Override
public long sampleResult( DoubleLong.Out result ) public long sampleResult( DoubleLong.Out result )
{ {
Expand Down
Expand Up @@ -59,8 +59,6 @@ public void drop() throws IOException
@Override @Override
public void add( List<NodePropertyUpdate> updates ) throws IndexEntryConflictException, IOException public void add( List<NodePropertyUpdate> updates ) throws IndexEntryConflictException, IOException
{ {
includeSamples( updates );

Iterator<Document> documents = updates.stream() Iterator<Document> documents = updates.stream()
.map( LuceneIndexPopulator::updateAsDocument ) .map( LuceneIndexPopulator::updateAsDocument )
.iterator(); .iterator();
Expand Down Expand Up @@ -93,8 +91,6 @@ public void markAsFailed( String failure ) throws IOException


protected abstract void flush() throws IOException; protected abstract void flush() throws IOException;


protected abstract void includeSamples( List<NodePropertyUpdate> updates );

private static Document updateAsDocument( NodePropertyUpdate update ) private static Document updateAsDocument( NodePropertyUpdate update )
{ {
return LuceneDocumentStructure.documentRepresentingProperty( update.getNodeId(), update.getValueAfter() ); return LuceneDocumentStructure.documentRepresentingProperty( update.getNodeId(), update.getValueAfter() );
Expand Down
Expand Up @@ -48,15 +48,6 @@ public NonUniqueLuceneIndexPopulator( LuceneSchemaIndex luceneIndex, IndexSampli
this.sampler = new NonUniqueIndexSampler( samplingConfig.bufferSize() ); this.sampler = new NonUniqueIndexSampler( samplingConfig.bufferSize() );
} }


@Override
protected void includeSamples( List<NodePropertyUpdate> updates )
{
for ( NodePropertyUpdate update : updates )
{
sampler.include( LuceneDocumentStructure.encodedStringValue( update.getValueAfter() ) );
}
}

@Override @Override
public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{ {
Expand Down Expand Up @@ -115,6 +106,12 @@ public void remove( PrimitiveLongSet nodeIds ) throws IOException
}; };
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
sampler.include( LuceneDocumentStructure.encodedStringValue( update.getValueAfter() ) );
}

@Override @Override
public long sampleResult( Register.DoubleLong.Out result ) public long sampleResult( Register.DoubleLong.Out result )
{ {
Expand Down
Expand Up @@ -52,12 +52,6 @@ protected void flush() throws IOException
// no need to do anything yet. // no need to do anything yet.
} }


@Override
protected void includeSamples( List<NodePropertyUpdate> updates )
{
sampler.increment( updates.size() );
}

@Override @Override
public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException public void verifyDeferredConstraints( PropertyAccessor accessor ) throws IndexEntryConflictException, IOException
{ {
Expand Down Expand Up @@ -117,6 +111,12 @@ public void remove( PrimitiveLongSet nodeIds )
}; };
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
sampler.increment( 1 );
}

@Override @Override
public long sampleResult( DoubleLong.Out result ) public long sampleResult( DoubleLong.Out result )
{ {
Expand Down
Expand Up @@ -95,7 +95,7 @@ public void sampleEmptyIndex() throws IOException
} }


@Test @Test
public void sampleAddedUpdates() throws Exception public void sampleIncludedUpdates() throws Exception
{ {
populator = newPopulator(); populator = newPopulator();


Expand All @@ -104,7 +104,7 @@ public void sampleAddedUpdates() throws Exception
NodePropertyUpdate.add( 2, 1, "bbb", new long[]{1} ), NodePropertyUpdate.add( 2, 1, "bbb", new long[]{1} ),
NodePropertyUpdate.add( 3, 1, "ccc", new long[]{1} ) ); NodePropertyUpdate.add( 3, 1, "ccc", new long[]{1} ) );


populator.add( updates ); updates.forEach( populator::includeSample );


Register.DoubleLongRegister register = Registers.newDoubleLongRegister(); Register.DoubleLongRegister register = Registers.newDoubleLongRegister();
long indexSize = populator.sampleResult( register ); long indexSize = populator.sampleResult( register );
Expand All @@ -117,7 +117,7 @@ public void sampleAddedUpdates() throws Exception
} }


@Test @Test
public void sampleAddedUpdatesWithDuplicates() throws Exception public void sampleIncludedUpdatesWithDuplicates() throws Exception
{ {
populator = newPopulator(); populator = newPopulator();


Expand All @@ -126,7 +126,7 @@ public void sampleAddedUpdatesWithDuplicates() throws Exception
NodePropertyUpdate.add( 2, 1, "bar", new long[]{1} ), NodePropertyUpdate.add( 2, 1, "bar", new long[]{1} ),
NodePropertyUpdate.add( 3, 1, "foo", new long[]{1} ) ); NodePropertyUpdate.add( 3, 1, "foo", new long[]{1} ) );


populator.add( updates ); updates.forEach( populator::includeSample );


Register.DoubleLongRegister register = Registers.newDoubleLongRegister(); Register.DoubleLongRegister register = Registers.newDoubleLongRegister();
long indexSize = populator.sampleResult( register ); long indexSize = populator.sampleResult( register );
Expand Down
Expand Up @@ -508,7 +508,7 @@ public void sampleEmptyIndex() throws Exception
} }


@Test @Test
public void sampleAddedUpdates() throws Exception public void sampleIncludedUpdates() throws Exception
{ {
populator = newPopulator(); populator = newPopulator();
List<NodePropertyUpdate> updates = Arrays.asList( List<NodePropertyUpdate> updates = Arrays.asList(
Expand All @@ -517,7 +517,7 @@ public void sampleAddedUpdates() throws Exception
NodePropertyUpdate.add( 3, 1, "baz", new long[]{1} ), NodePropertyUpdate.add( 3, 1, "baz", new long[]{1} ),
NodePropertyUpdate.add( 4, 1, "qux", new long[]{1} ) ); NodePropertyUpdate.add( 4, 1, "qux", new long[]{1} ) );


populator.add( updates ); updates.forEach( populator::includeSample );


Register.DoubleLongRegister register = Registers.newDoubleLongRegister(); Register.DoubleLongRegister register = Registers.newDoubleLongRegister();
long indexSize = populator.sampleResult( register ); long indexSize = populator.sampleResult( register );
Expand Down
Expand Up @@ -470,6 +470,12 @@ public void markAsFailed( String failure ) throws IOException
delegate.markAsFailed( failure ); delegate.markAsFailed( failure );
} }


@Override
public void includeSample( NodePropertyUpdate update )
{
delegate.includeSample( update );
}

@Override @Override
public long sampleResult( DoubleLong.Out result ) public long sampleResult( DoubleLong.Out result )
{ {
Expand Down

0 comments on commit 78e5d21

Please sign in to comment.