Skip to content

Commit

Permalink
BlockBasedIndexPopulator verify uniqueness after applying scan and ex…
Browse files Browse the repository at this point in the history
…ternal

While inserting keys into the GBPTree, both from scan and external
updates, BlockBasedIndexPopulator record all KEYs that look like
duplicates at the time of insert.

After tree has been completely populated all of the potential duplicates
are verified.
  • Loading branch information
burqen committed Mar 28, 2019
1 parent f30ff66 commit 8c0fc28
Showing 1 changed file with 84 additions and 50 deletions.
Expand Up @@ -19,12 +19,16 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.impl.factory.Lists;

import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -248,11 +252,18 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict


// Build the tree from the scan updates // Build the tree from the scan updates
phaseTracker.enterPhase( PhaseTracker.Phase.BUILD ); phaseTracker.enterPhase( PhaseTracker.Phase.BUILD );
writeScanUpdatesToTree(); RecordingConflictDetector<KEY,VALUE> recordingConflictDetector = new RecordingConflictDetector<>( !descriptor.isUnique() );
writeScanUpdatesToTree( recordingConflictDetector );


// Apply the external updates // Apply the external updates
phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL ); phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL );
writeExternalUpdatesToTree(); writeExternalUpdatesToTree( recordingConflictDetector );

// Verify uniqueness
if ( descriptor.isUnique() )
{
verifyUniqueKeys( recordingConflictDetector.allConflicts() );
}
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down Expand Up @@ -312,7 +323,7 @@ private void mergeScanUpdates() throws InterruptedException, ExecutionException
* @throws IOException If something goes wrong while reading from index. * @throws IOException If something goes wrong while reading from index.
* @throws IndexEntryConflictException If a duplicate is found. * @throws IndexEntryConflictException If a duplicate is found.
*/ */
private void writeExternalUpdatesToTree() throws IOException, IndexEntryConflictException private void writeExternalUpdatesToTree( RecordingConflictDetector<KEY,VALUE> recordingConflictDetector ) throws IOException, IndexEntryConflictException
{ {
try ( Writer<KEY,VALUE> writer = tree.writer(); try ( Writer<KEY,VALUE> writer = tree.writer();
IndexUpdateCursor<KEY,VALUE> updates = externalUpdates.reader() ) IndexUpdateCursor<KEY,VALUE> updates = externalUpdates.reader() )
Expand All @@ -322,62 +333,31 @@ private void writeExternalUpdatesToTree() throws IOException, IndexEntryConflict
switch ( updates.updateMode() ) switch ( updates.updateMode() )
{ {
case ADDED: case ADDED:
writer.put( updates.key(), updates.value() ); writeToTree( writer, recordingConflictDetector, updates.key(), updates.value() );
break; break;
case REMOVED: case REMOVED:
writer.remove( updates.key() ); writer.remove( updates.key() );
break; break;
case CHANGED: case CHANGED:
writer.remove( updates.key() ); writer.remove( updates.key() );
writer.put( updates.key2(), updates.value() ); writeToTree( writer, recordingConflictDetector, updates.key2(), updates.value() );
break; break;
default: default:
throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() ); throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() );
} }
numberOfAppliedExternalUpdates++; numberOfAppliedExternalUpdates++;
} }
} }

if ( descriptor.isUnique() )
{
verifyUniquenessOnExternalUpdates();
}
} }


/** private void verifyUniqueKeys( List<KEY> allConflictingKeys ) throws IOException, IndexEntryConflictException
* When this method is called, all updates have been applied to the tree. Here we loop over all updates again and for each update we verify that in the
* end there are no duplicate entries for the value of the update update value in the tree. If intermediate duplicates was seen while applying the
* updates, that is fine as long as the tree is completely unique now. Note that only updates that result in adding a new key to the tree can possible
* cause a duplication to appear.
* @throws IOException If something goes wrong while reading from index.
* @throws IndexEntryConflictException If a duplicate is found.
*/
private void verifyUniquenessOnExternalUpdates() throws IOException, IndexEntryConflictException
{ {
try ( IndexUpdateCursor<KEY,VALUE> updates = externalUpdates.reader() ) Iterator<KEY> iter = allConflictingKeys.iterator();
while ( iter.hasNext() && !cancellation.cancelled() )
{ {
while ( updates.next() && !cancellation.cancelled() ) KEY key = iter.next();
{ key.setCompareId( false );
RawCursor<Hit<KEY,VALUE>,IOException> seek; verifyUniqueSeek( tree.seek( key, key ) );
switch ( updates.updateMode() )
{
case ADDED:
updates.key().setCompareId( false );
seek = tree.seek( updates.key(), updates.key() );
break;
case CHANGED:
updates.key2().setCompareId( false );
seek = tree.seek( updates.key2(), updates.key2() );
break;
case REMOVED:
// Can't cause uniqueness conflict
seek = null;
break;
default:
throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() );
}
verifyUniqueSeek( seek );
}
} }
} }


Expand All @@ -397,9 +377,8 @@ private void verifyUniqueSeek( RawCursor<Hit<KEY,VALUE>,IOException> seek ) thro
} }
} }


private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictException private void writeScanUpdatesToTree( RecordingConflictDetector<KEY,VALUE> recordingConflictDetector ) throws IOException, IndexEntryConflictException
{ {
ConflictDetectingValueMerger<KEY,VALUE> conflictDetector = getMainConflictDetector();
try ( MergingBlockEntryReader<KEY,VALUE> allEntries = new MergingBlockEntryReader<>( layout ) ) try ( MergingBlockEntryReader<KEY,VALUE> allEntries = new MergingBlockEntryReader<>( layout ) )
{ {
for ( ThreadLocalBlockStorage part : allScanUpdates ) for ( ThreadLocalBlockStorage part : allScanUpdates )
Expand All @@ -423,12 +402,7 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce
{ {
while ( allEntries.next() && !cancellation.cancelled() ) while ( allEntries.next() && !cancellation.cancelled() )
{ {
conflictDetector.controlConflictDetection( allEntries.key() ); writeToTree( writer, recordingConflictDetector, allEntries.key(), allEntries.value() );
writer.merge( allEntries.key(), allEntries.value(), conflictDetector );
if ( conflictDetector.wasConflicting() )
{
conflictDetector.reportConflict( allEntries.key().asValues() );
}
numberOfAppliedScanUpdates++; numberOfAppliedScanUpdates++;
} }
} }
Expand Down Expand Up @@ -580,6 +554,38 @@ public PopulationProgress progress( PopulationProgress scanProgress )
return builder.build(); return builder.build();
} }


/**
* Write key and value to tree and record duplicates if any.
*/
private void writeToTree( Writer<KEY,VALUE> writer, RecordingConflictDetector<KEY,VALUE> recordingConflictDetector, KEY key, VALUE value )
throws IndexEntryConflictException
{
recordingConflictDetector.controlConflictDetection( key );
writer.merge( key, value, recordingConflictDetector );
handleMergeConflict( writer, recordingConflictDetector, key, value );
}

/**
* Will check if recording conflict detector saw a conflict. If it did, that conflict has been recorded and we will verify uniqueness for this
* value later on. But for now we try and insert conflicting value again but with a relaxed uniqueness constraint. Insert is done with a throwing
* conflict checker which means it will throw if we see same value AND same id in one key.
*/
private void handleMergeConflict( Writer<KEY,VALUE> writer, RecordingConflictDetector<KEY,VALUE> recordingConflictDetector, KEY key, VALUE value )
throws IndexEntryConflictException
{
if ( recordingConflictDetector.wasConflicting() )
{
// Report conflict
KEY copy = layout.newKey();
layout.copyKey( key, copy );
recordingConflictDetector.reportConflict( copy );

// Insert and overwrite with relaxed uniqueness constraint
recordingConflictDetector.relaxUniqueness( key );
writer.put( key, value );
}
}

/** /**
* Keeps track of a {@link BlockStorage} instance as well as monitoring some aspects of it to be able to provide a fairly accurate * Keeps track of a {@link BlockStorage} instance as well as monitoring some aspects of it to be able to provide a fairly accurate
* progress report from {@link BlockBasedIndexPopulator#progress(PopulationProgress)}. * progress report from {@link BlockBasedIndexPopulator#progress(PopulationProgress)}.
Expand Down Expand Up @@ -631,4 +637,32 @@ public boolean cancelled()
return cancelled; return cancelled;
} }
} }

private static class RecordingConflictDetector<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
extends ConflictDetectingValueMerger<KEY,VALUE,KEY>
{
private final MutableList<KEY> allConflictingKeys;

RecordingConflictDetector( boolean compareEntityIds )
{
super( compareEntityIds );
allConflictingKeys = Lists.mutable.empty();
}

@Override
void doReportConflict( long existingNodeId, long addedNodeId, KEY conflictingKey )
{
allConflictingKeys.add( conflictingKey );
}

List<KEY> allConflicts()
{
return allConflictingKeys;
}

void relaxUniqueness( KEY key )
{
key.setCompareId( true );
}
}
} }

0 comments on commit 8c0fc28

Please sign in to comment.