Skip to content

Commit

Permalink
Start implementation of BlockBasedIndexPopulator
Browse files Browse the repository at this point in the history
BlockBasedIndexPopulator extends NativeIndexPopulator to reuse functionality
typical for native populator such as creating GBPTree and cleaning up files.

Important things that are missing:
- Drop
- VerifyDeferredConstraints
- Sampling
- Close
- newReader
- Handle external updates after building the tree from block storage.
  • Loading branch information
tinwelint authored and burqen committed Feb 18, 2019
1 parent c860fac commit d6ec1ee
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 43 deletions.
Expand Up @@ -48,5 +48,7 @@ public boolean forLabel( long[] before, long[] after, long label )
}
};

public static UpdateMode[] MODES = UpdateMode.values();

public abstract boolean forLabel( long[] before, long[] after, long label );
}
Expand Up @@ -19,84 +19,192 @@
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.gis.spatial.index.curves.SpaceFillingCurveConfiguration;
import org.neo4j.index.internal.gbptree.Writer;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexDirectoryStructure;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.NodePropertyAccessor;
import org.neo4j.kernel.impl.index.schema.config.IndexSpecificSpaceFillingCurveSettingsCache;
import org.neo4j.kernel.impl.index.schema.config.SpaceFillingCurveSettingsWriter;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.StoreIndexDescriptor;
import org.neo4j.util.FeatureToggles;
import org.neo4j.util.Preconditions;
import org.neo4j.values.storable.Value;

import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate;
import static org.neo4j.kernel.impl.index.schema.NativeIndexes.deleteIndex;

public class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> implements IndexPopulator
public class BlockBasedIndexPopulator<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> extends NativeIndexPopulator<KEY,VALUE>
{
private static final String BLOCK_SIZE = FeatureToggles.getString( BlockBasedIndexPopulator.class, "blockSize", "1M" );

private final Layout<KEY,VALUE> layout;
// TODO some better ByteBuffers, right?
private static final ByteBufferFactory BYTE_BUFFER_FACTORY = ByteBuffer::allocate;

private final IndexSpecificSpaceFillingCurveSettingsCache spatialSettings;
private final IndexDirectoryStructure directoryStructure;
private final SpaceFillingCurveConfiguration configuration;
private final boolean archiveFailedIndex;
private final int blockSize;
private BlockStorage<KEY,VALUE> blockStorage;
private BlockStorage<KEY,VALUE> scanUpdates;
private IndexUpdateStorage<KEY,VALUE> externalUpdates;

BlockBasedIndexPopulator( Layout<KEY,VALUE> layout )
BlockBasedIndexPopulator( PageCache pageCache, FileSystemAbstraction fs, File file, IndexLayout<KEY,VALUE> layout, IndexProvider.Monitor monitor,
StoreIndexDescriptor descriptor, IndexSpecificSpaceFillingCurveSettingsCache spatialSettings,
IndexDirectoryStructure directoryStructure, SpaceFillingCurveConfiguration configuration, boolean archiveFailedIndex )
{
this.layout = layout;
super( pageCache, fs, file, layout, monitor, descriptor, new SpaceFillingCurveSettingsWriter( spatialSettings ) );
this.spatialSettings = spatialSettings;
this.directoryStructure = directoryStructure;
this.configuration = configuration;
this.archiveFailedIndex = archiveFailedIndex;
this.blockSize = parseBlockSize();
}

long parse = ByteUnit.parse( BLOCK_SIZE );
if ( (parse & ~0xFFFF_FFFFL) != 0 )
{
throw new IllegalArgumentException( "Block size need to fit in int. Was " + parse );
}
blockSize = (int) parse;
private static int parseBlockSize()
{
long blockSize = ByteUnit.parse( BLOCK_SIZE );
Preconditions.checkArgument( blockSize >= 20 && blockSize < Integer.MAX_VALUE, "Block size need to fit in int. Was " + blockSize );
return (int) blockSize;
}

@Override
public void create()
{
// Clean directory
try
{
deleteIndex( fileSystem, directoryStructure, descriptor.getId(), archiveFailedIndex );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
super.create();
try
{
scanUpdates = new BlockStorage<>( layout, BYTE_BUFFER_FACTORY, fileSystem, new File( storeFile.getParentFile(), storeFile.getName() + ".temp" ),
BlockStorage.Monitor.NO_MONITOR, blockSize );
externalUpdates = new IndexUpdateStorage<>( layout, fileSystem, new File( storeFile.getParent(), storeFile.getName() + ".ext" ),
BYTE_BUFFER_FACTORY.newBuffer( blockSize ) );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

@Override
public void drop()
{
// Make responsive
// TODO Make responsive
}

@Override
public void add( Collection<? extends IndexEntryUpdate<?>> updates )
{
try
for ( IndexEntryUpdate<?> update : updates )
{
for ( IndexEntryUpdate<?> update : updates )
{
KEY key = layout.newKey();
VALUE value = layout.newValue();
initializeKeyFromUpdate( key, update.getEntityId(), update.values() );
value.from( update.values() );
storeUpdate( update, scanUpdates );
}
}

blockStorage.add( key, value );
}
private void storeUpdate( long entityId, Value[] values, BlockStorage<KEY,VALUE> blockStorage )
{
try
{
KEY key = layout.newKey();
VALUE value = layout.newValue();
initializeKeyFromUpdate( key, entityId, values );
value.from( values );
blockStorage.add( key, value );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

void finishUp() throws IOException
private void storeUpdate( IndexEntryUpdate<?> update, BlockStorage<KEY,VALUE> blockStorage )
{
storeUpdate( update.getEntityId(), update.values(), blockStorage );
}

void finishUp() throws IOException, IndexEntryConflictException
{
scanUpdates.doneAdding();
scanUpdates.merge();
externalUpdates.doneAdding();
// don't merge and sort the external updates

// Build the tree from the scan updates
writeScanUpdatesToTree();

// Apply the external updates
writeExternalUpdatesToTree();
}

private void writeExternalUpdatesToTree() throws IOException
{
try ( Writer<KEY,VALUE> writer = tree.writer();
IndexUpdateCursor<KEY,VALUE> updates = externalUpdates.reader() )
{
while ( updates.next() )
{
switch ( updates.updateMode() )
{
case ADDED:
writer.put( updates.key(), updates.value() );
break;
case REMOVED:
writer.remove( updates.key() );
break;
case CHANGED:
writer.remove( updates.key() );
writer.put( updates.key(), updates.value() );
break;
default:
throw new IllegalArgumentException( "Unknown update mode " + updates.updateMode() );
}
}
}
}

private void writeScanUpdatesToTree() throws IOException, IndexEntryConflictException
{
blockStorage.doneAdding();
blockStorage.merge();
// for ( ... ) // Multi thread
// blockStorage.merge();
// Iterator<KEY,VALUE> iter = blockStorage.read();
// buildTree();
// ...
ConflictDetectingValueMerger<KEY,VALUE> conflictDetector = getMainConflictDetector();
try ( Writer<KEY,VALUE> writer = tree.writer();
BlockReader<KEY,VALUE> reader = scanUpdates.reader() )
{
BlockEntryReader<KEY,VALUE> maybeBlock = reader.nextBlock();
if ( maybeBlock != null )
{
try ( BlockEntryReader<KEY,VALUE> block = maybeBlock )
{
while ( block.next() )
{
conflictDetector.controlConflictDetection( block.key() );
writer.merge( block.key(), block.value(), conflictDetector );
if ( conflictDetector.wasConflicting() )
{
conflictDetector.reportConflict( block.key().asValues() );
}
}
}
}
}
}

@Override
Expand All @@ -108,8 +216,32 @@ public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor
@Override
public IndexUpdater newPopulatingUpdater( NodePropertyAccessor accessor )
{
// Updater that buffers all updates and apply them after tree is built
return null;
return new IndexUpdater()
{
@Override
public void process( IndexEntryUpdate<?> update )
{
try
{
externalUpdates.add( update );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

@Override
public void close()
{
}
};
}

@Override
NativeIndexReader<KEY,VALUE> newReader()
{
throw new UnsupportedOperationException( "Should not be needed because we're overriding the populating updater anyway" );
}

@Override
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.ByteArrayPageCursor;
import org.neo4j.util.Preconditions;
import org.neo4j.util.VisibleForTesting;

// TODO potentially remove padding altogether!!!!!
class BlockStorage<KEY, VALUE> implements Closeable
Expand Down Expand Up @@ -246,12 +245,57 @@ public void close() throws IOException
IOUtils.closeAll( storeChannel );
}

@VisibleForTesting
BlockReader<KEY,VALUE> reader() throws IOException
{
return reader( blockFile );
}

BlockEntryCursor<KEY,VALUE> stream() throws IOException
{
BlockReader<KEY,VALUE> reader = reader();
return new BlockEntryCursor<KEY,VALUE>()
{
private BlockEntryCursor<KEY,VALUE> block = reader.nextBlock();

@Override
public boolean next() throws IOException
{
while ( block != null )
{
if ( block.next() )
{
return true;
}
block.close();
block = reader.nextBlock();
}
return false;
}

@Override
public KEY key()
{
return block.key();
}

@Override
public VALUE value()
{
return block.value();
}

@Override
public void close() throws IOException
{
if ( block != null )
{
block.close();
block = null;
}
}
};
}

private BlockReader<KEY,VALUE> reader( File file ) throws IOException
{
return new BlockReader<>( fs, file, layout );
Expand Down
Expand Up @@ -23,5 +23,5 @@

interface ByteBufferFactory
{
ByteBuffer newBuffer( long bufferSize );
ByteBuffer newBuffer( int bufferSize );
}
Expand Up @@ -71,12 +71,22 @@ void controlConflictDetection( KEY key )
key.setCompareId( compareEntityIds );
}

boolean wasConflicting()
{
return conflict;
}

void reportConflict( Value[] values ) throws IndexEntryConflictException
{
conflict = false;
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) );
}

void checkConflict( Value[] values ) throws IndexEntryConflictException
{
if ( conflict )
if ( wasConflicting() )
{
conflict = false;
throw new IndexEntryConflictException( existingNodeId, addedNodeId, ValueTuple.of( values ) );
reportConflict( values );
}
}
}
Expand Up @@ -163,9 +163,10 @@ protected IndexPopulator newIndexPopulator( File storeFile, GenericLayout layout
}
else
{
return new WorkSyncedNativeIndexPopulator<>(
new GenericNativeIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(),
directoryStructure(), configuration, archiveFailedIndex, false ) );
NativeIndexPopulator actualPopulator =
new BlockBasedIndexPopulator( pageCache, fs, storeFile, layout, monitor, descriptor, layout.getSpaceFillingCurveSettings(),
directoryStructure(), configuration, archiveFailedIndex );
return new WorkSyncedNativeIndexPopulator<>( actualPopulator );
}
}

Expand Down

0 comments on commit d6ec1ee

Please sign in to comment.