Skip to content

Commit

Permalink
BlockStorage and IndexUpdateStorage cleans up files on merge and close
Browse files Browse the repository at this point in the history
Including:
- Naive index sampling. Sampling should really be done while building tree.
- Native error exception handling in "finishUp".
- Delegate markAsFailed for NativeIndexPopulator
  • Loading branch information
tinwelint authored and burqen committed Feb 18, 2019
1 parent d6ec1ee commit 81119ff
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
Expand Down Expand Up @@ -106,12 +107,6 @@ public void create()
}
}

@Override
public void drop()
{
// TODO Make responsive
}

@Override
public void add( Collection<? extends IndexEntryUpdate<?>> updates )
{
Expand Down Expand Up @@ -142,18 +137,39 @@ private void storeUpdate( IndexEntryUpdate<?> update, BlockStorage<KEY,VALUE> bl
storeUpdate( update.getEntityId(), update.values(), blockStorage );
}

void finishUp() throws IOException, IndexEntryConflictException
void finishUp() throws IndexEntryConflictException
{
scanUpdates.doneAdding();
scanUpdates.merge();
externalUpdates.doneAdding();
// don't merge and sort the external updates
try
{
scanUpdates.doneAdding();
scanUpdates.merge();
externalUpdates.doneAdding();
// don't merge and sort the external updates

// Build the tree from the scan updates
writeScanUpdatesToTree();
// Build the tree from the scan updates
writeScanUpdatesToTree();

// Apply the external updates
writeExternalUpdatesToTree();
// Apply the external updates
writeExternalUpdatesToTree();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

@Override
public IndexSample sampleResult()
{
try
{
finishUp();
return super.sampleResult();
}
catch ( IndexEntryConflictException e )
{
throw new IllegalStateException( e );
}
}

private void writeExternalUpdatesToTree() throws IOException
Expand Down Expand Up @@ -211,6 +227,8 @@ private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictExce
public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor ) throws IndexEntryConflictException
{
// On building tree
finishUp(); // TODO just kidding, perhaps not here
super.verifyDeferredConstraints( nodePropertyAccessor );
}

@Override
Expand Down Expand Up @@ -247,25 +265,8 @@ NativeIndexReader<KEY,VALUE> newReader()
@Override
public void close( boolean populationCompletedSuccessfully )
{
// Make responsive
}

@Override
public void markAsFailed( String failure )
{

}

@Override
public void includeSample( IndexEntryUpdate<?> update )
{
// leave for now, can either sample when we build tree in the end or when updates come in
}

@Override
public IndexSample sampleResult()
{
// leave for now, look at what NativeIndexPopulator does
return null;
// TODO Make responsive
IOUtils.closeAllSilently( externalUpdates, scanUpdates );
super.close( populationCompletedSuccessfully );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public class BlockReader<KEY,VALUE> implements Closeable
private final FileSystemAbstraction fs;
private final File file;
private final Layout<KEY,VALUE> layout;
private final int blockSize;

BlockReader( FileSystemAbstraction fs, File file, Layout<KEY,VALUE> layout ) throws IOException
BlockReader( FileSystemAbstraction fs, File file, Layout<KEY,VALUE> layout, int blockSize ) throws IOException
{
this.fs = fs;
this.file = file;
this.layout = layout;
this.blockSize = blockSize;
this.channel = fs.open( file, OpenMode.READ );
}

Expand All @@ -54,7 +56,7 @@ BlockEntryReader<KEY,VALUE> nextBlock() throws IOException
}
StoreChannel blockChannel = fs.open( file, OpenMode.READ );
blockChannel.position( position );
PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) );
PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel, blockSize ) );
BlockEntryReader<KEY,VALUE> blockEntryReader = new BlockEntryReader<>( pageCursor, layout );
long blockSize = blockEntryReader.blockSize();
channel.position( position + blockSize );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void doneAdding() throws IOException
numberOfBlocksInCurrentFile++;
}
doneAdding = true;
storeChannel.close();
}

private void resetBufferedEntries()
Expand All @@ -120,34 +121,47 @@ private void flushAndResetBuffer() throws IOException

public void merge() throws IOException
{
int mergeFactor = 2;
File sourceFile = blockFile;
File targetFile = new File( blockFile.getParent(), blockFile.getName() + ".b" );
while ( numberOfBlocksInCurrentFile > 1 )
File tempFile = new File( blockFile.getParent(), blockFile.getName() + ".b" );
try
{
// Perform one complete merge iteration, merging all blocks from source into target.
// After this step, target will contain fewer blocks than source, but may need another merge iteration.
try ( BlockReader<KEY,VALUE> reader = reader( sourceFile );
StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) )
int mergeFactor = 10;
File targetFile = tempFile;
while ( numberOfBlocksInCurrentFile > 1 )
{
int blocksMergedSoFar = 0;
int blocksInMergedFile = 0;
while ( blocksMergedSoFar < numberOfBlocksInCurrentFile )
// Perform one complete merge iteration, merging all blocks from source into target.
// After this step, target will contain fewer blocks than source, but may need another merge iteration.
try ( BlockReader<KEY,VALUE> reader = reader( sourceFile ); StoreChannel targetChannel = fs.open( targetFile, OpenMode.READ_WRITE ) )
{
blocksMergedSoFar += performSingleMerge( mergeFactor, reader, targetChannel );
blocksInMergedFile++;
int blocksMergedSoFar = 0;
int blocksInMergedFile = 0;
while ( blocksMergedSoFar < numberOfBlocksInCurrentFile )
{
blocksMergedSoFar += performSingleMerge( mergeFactor, reader, targetChannel );
blocksInMergedFile++;
}
numberOfBlocksInCurrentFile = blocksInMergedFile;
monitor.mergeIterationFinished( blocksMergedSoFar, blocksInMergedFile );
}
numberOfBlocksInCurrentFile = blocksInMergedFile;
monitor.mergeIterationFinished( blocksMergedSoFar, blocksInMergedFile );
}

// Flip and restore the channelz
File tmpSourceFile = sourceFile;
sourceFile = targetFile;
targetFile = tmpSourceFile;
// Flip and restore the channelz
File tmpSourceFile = sourceFile;
sourceFile = targetFile;
targetFile = tmpSourceFile;
}
}
finally
{
if ( sourceFile == blockFile )
{
fs.deleteFile( tempFile );
}
else
{
fs.deleteFile( blockFile );
fs.renameFile( tempFile, blockFile );
}
}
blockFile = sourceFile;
// todo Clean away the other file
}

private int performSingleMerge( int mergeFactor, BlockReader<KEY,VALUE> reader, StoreChannel targetChannel )
Expand Down Expand Up @@ -243,6 +257,7 @@ private static void writeLastEntriesWithPadding( StoreChannel channel, ByteBuffe
public void close() throws IOException
{
IOUtils.closeAll( storeChannel );
fs.deleteFile( blockFile );
}

BlockReader<KEY,VALUE> reader() throws IOException
Expand Down Expand Up @@ -298,7 +313,7 @@ public void close() throws IOException

private BlockReader<KEY,VALUE> reader( File file ) throws IOException
{
return new BlockReader<>( fs, file, layout );
return new BlockReader<>( fs, file, layout, blockSize );
}

public interface Monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,6 @@ private void flush() throws IOException
public void close() throws IOException
{
storeChannel.close();
fs.deleteFile( file );
}
}

0 comments on commit 81119ff

Please sign in to comment.