Skip to content

Commit

Permalink
BlockBasedIndexPopulator offload duplicate values to file instead of …
Browse files Browse the repository at this point in the history
…keeping in memory

By letting RecordingConflictDetector use IndexKeyStorage instead of
List to store the duplicate keys.
  • Loading branch information
burqen committed Mar 28, 2019
1 parent e5a5268 commit b384053
Showing 1 changed file with 39 additions and 26 deletions.
Expand Up @@ -19,16 +19,12 @@
*/
package org.neo4j.kernel.impl.index.schema;

import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.impl.factory.Lists;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -175,8 +171,8 @@ public void create()
super.create();
try
{
externalUpdates = new IndexUpdateStorage<>( layout, fileSystem, new File( storeFile.getParent(), storeFile.getName() + ".ext" ),
bufferFactory, blockSize );
File externalUpdatesFile = new File( storeFile.getParent(), storeFile.getName() + ".ext" );
externalUpdates = new IndexUpdateStorage<>( fileSystem, externalUpdatesFile, bufferFactory, blockSize, layout );
}
catch ( IOException e )
{
Expand Down Expand Up @@ -252,17 +248,21 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict

// Build the tree from the scan updates
phaseTracker.enterPhase( PhaseTracker.Phase.BUILD );
RecordingConflictDetector<KEY,VALUE> recordingConflictDetector = new RecordingConflictDetector<>( !descriptor.isUnique() );
writeScanUpdatesToTree( recordingConflictDetector );
File duplicatesFile = new File( storeFile.getParentFile(), storeFile.getName() + ".dup" );
try ( IndexKeyStorage<KEY> indexKeyStorage = new IndexKeyStorage<>( fileSystem, duplicatesFile, bufferFactory, blockSize, layout ) )
{
RecordingConflictDetector<KEY,VALUE> recordingConflictDetector = new RecordingConflictDetector<>( !descriptor.isUnique(), indexKeyStorage );
writeScanUpdatesToTree( recordingConflictDetector );

// Apply the external updates
phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL );
writeExternalUpdatesToTree( recordingConflictDetector );
// Apply the external updates
phaseTracker.enterPhase( PhaseTracker.Phase.APPLY_EXTERNAL );
writeExternalUpdatesToTree( recordingConflictDetector );

// Verify uniqueness
if ( descriptor.isUnique() )
{
verifyUniqueKeys( recordingConflictDetector.allConflicts() );
// Verify uniqueness
if ( descriptor.isUnique() )
{
verifyUniqueKeys( recordingConflictDetector.allConflicts() );
}
}
}
catch ( IOException e )
Expand Down Expand Up @@ -350,12 +350,11 @@ private void writeExternalUpdatesToTree( RecordingConflictDetector<KEY,VALUE> re
}
}

private void verifyUniqueKeys( List<KEY> allConflictingKeys ) throws IOException, IndexEntryConflictException
private void verifyUniqueKeys( IndexKeyStorage.KeyEntryCursor<KEY> allConflictingKeys ) throws IOException, IndexEntryConflictException
{
Iterator<KEY> iter = allConflictingKeys.iterator();
while ( iter.hasNext() && !cancellation.cancelled() )
while ( allConflictingKeys.next() && !cancellation.cancelled() )
{
KEY key = iter.next();
KEY key = allConflictingKeys.key();
key.setCompareId( false );
verifyUniqueSeek( tree.seek( key, key ) );
}
Expand Down Expand Up @@ -639,30 +638,44 @@ public boolean cancelled()
}

private static class RecordingConflictDetector<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
extends ConflictDetectingValueMerger<KEY,VALUE,KEY>
extends ConflictDetectingValueMerger<KEY,VALUE,KEY> implements Closeable
{
private final MutableList<KEY> allConflictingKeys;
private final IndexKeyStorage<KEY> allConflictingKeys;

RecordingConflictDetector( boolean compareEntityIds )
RecordingConflictDetector( boolean compareEntityIds, IndexKeyStorage<KEY> indexKeyStorage )
{
super( compareEntityIds );
allConflictingKeys = Lists.mutable.empty();
allConflictingKeys = indexKeyStorage;
}

@Override
void doReportConflict( long existingNodeId, long addedNodeId, KEY conflictingKey )
{
allConflictingKeys.add( conflictingKey );
try
{
allConflictingKeys.add( conflictingKey );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

List<KEY> allConflicts()
IndexKeyStorage.KeyEntryCursor<KEY> allConflicts() throws IOException
{
return allConflictingKeys;
allConflictingKeys.doneAdding();
return allConflictingKeys.reader();
}

void relaxUniqueness( KEY key )
{
key.setCompareId( true );
}

@Override
public void close() throws IOException
{
allConflictingKeys.close();
}
}
}

0 comments on commit b384053

Please sign in to comment.