Skip to content

Commit

Permalink
Introduce IndexKeyStorage
Browse files Browse the repository at this point in the history
IndexKeyStorage will be used to offload store of duplicate keys instead
of keeping them all in memory.

Extract SimpleEntryStorage as a super class from IndexUpdateStorage.
SimpleEntryStorage takes responsibility for all resource management and
subclasses IndexUpdateStorage and the new IndexKeyStorage can focus
only on serializing and deserializing the entries that they want to
store.

Improved SimpleEntryStorage to create file and allocate ByteBuffer
lazily so that we're not throwing memory away for no reason. This is
mostly a concern when populating many indexes simultaneously.
  • Loading branch information
burqen committed Mar 28, 2019
1 parent 8c0fc28 commit e5a5268
Show file tree
Hide file tree
Showing 5 changed files with 526 additions and 71 deletions.
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.File;
import java.io.IOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCursor;

import static java.lang.String.format;

class IndexKeyStorage<KEY extends NativeIndexKey<KEY>> extends SimpleEntryStorage<KEY,IndexKeyStorage.KeyEntryCursor<KEY>>
{
private static final byte KEY_TYPE = 1;
private final Layout<KEY,?> layout;

IndexKeyStorage( FileSystemAbstraction fs, File file, ByteBufferFactory byteBufferFactory, int blockSize, Layout<KEY,?> layout ) throws IOException
{
super( fs, file, byteBufferFactory, blockSize );
this.layout = layout;
}

@Override
void add( KEY key, PageCursor pageCursor ) throws IOException
{
int entrySize = TYPE_SIZE + BlockEntry.keySize( layout, key );
prepareWrite( entrySize );
pageCursor.putByte( KEY_TYPE );
BlockEntry.write( pageCursor, layout, key );
}

@Override
KeyEntryCursor<KEY> reader( PageCursor pageCursor )
{
return new KeyEntryCursor<>( pageCursor, layout );
}

static class KeyEntryCursor<KEY> implements BlockEntryCursor<KEY,Void>
{
private final PageCursor pageCursor;
private final Layout<KEY,?> layout;
private final KEY key;

KeyEntryCursor( PageCursor pageCursor, Layout<KEY,?> layout )
{
this.pageCursor = pageCursor;
this.layout = layout;
this.key = layout.newKey();
}

@Override
public boolean next() throws IOException
{
byte type = pageCursor.getByte();
if ( type == STOP_TYPE )
{
return false;
}
if ( type != KEY_TYPE )
{
throw new RuntimeException( format( "Unexpected entry type. Expected %d or %d, but was %d.", STOP_TYPE, KEY_TYPE, type ) );
}
BlockEntry.read( pageCursor, layout, key );
return true;
}

@Override
public KEY key()
{
return key;
}

@Override
public Void value()
{
return null;
}

@Override
public void close() throws IOException
{
pageCursor.close();
}
}
}
Expand Up @@ -19,61 +19,40 @@
*/ */
package org.neo4j.kernel.impl.index.schema; package org.neo4j.kernel.impl.index.schema;


import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;


import org.neo4j.index.internal.gbptree.Layout; import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.ByteArrayPageCursor;
import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.api.index.UpdateMode; import org.neo4j.kernel.impl.api.index.UpdateMode;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;


import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyAndValueFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyAndValueFromUpdate;
import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate; import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate;


/** /**
* Buffer {@link IndexEntryUpdate} by writing them out to a file. Can be read back in insert order through {@link #reader()}. * Buffer {@link IndexEntryUpdate} by writing them out to a file. Can be read back in insert order through {@link #reader()}.
*/ */
public class IndexUpdateStorage<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> implements Closeable public class IndexUpdateStorage<KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
extends SimpleEntryStorage<IndexEntryUpdate<?>,IndexUpdateCursor<KEY,VALUE>>
{ {
private static final int TYPE_SIZE = Byte.BYTES;
static final byte STOP_TYPE = -1;

private final Layout<KEY,VALUE> layout; private final Layout<KEY,VALUE> layout;
private final FileSystemAbstraction fs;
private final File file;
private final ByteBufferFactory byteBufferFactory;
private final int blockSize;
private final ByteBuffer buffer;
private final ByteArrayPageCursor pageCursor;
private final StoreChannel storeChannel;
private final KEY key1; private final KEY key1;
private final KEY key2; private final KEY key2;
private final VALUE value; private final VALUE value;
private volatile long count;


IndexUpdateStorage( Layout<KEY,VALUE> layout, FileSystemAbstraction fs, File file, ByteBufferFactory byteBufferFactory, int blockSize ) throws IOException IndexUpdateStorage( FileSystemAbstraction fs, File file, ByteBufferFactory byteBufferFactory, int blockSize, Layout<KEY,VALUE> layout ) throws IOException
{ {
super( fs, file, byteBufferFactory, blockSize );
this.layout = layout; this.layout = layout;
this.fs = fs;
this.file = file;
this.byteBufferFactory = byteBufferFactory;
this.blockSize = blockSize;
this.buffer = byteBufferFactory.newBuffer( blockSize );
this.pageCursor = new ByteArrayPageCursor( buffer );
this.storeChannel = fs.create( file );
this.key1 = layout.newKey(); this.key1 = layout.newKey();
this.key2 = layout.newKey(); this.key2 = layout.newKey();
this.value = layout.newValue(); this.value = layout.newValue();
} }


public void add( IndexEntryUpdate<?> update ) throws IOException @Override
public void add( IndexEntryUpdate<?> update, PageCursor pageCursor ) throws IOException
{ {
int entrySize = TYPE_SIZE; int entrySize = TYPE_SIZE;
UpdateMode updateMode = update.updateMode(); UpdateMode updateMode = update.updateMode();
Expand All @@ -96,50 +75,15 @@ public void add( IndexEntryUpdate<?> update ) throws IOException
throw new IllegalArgumentException( "Unknown update mode " + updateMode ); throw new IllegalArgumentException( "Unknown update mode " + updateMode );
} }


if ( entrySize > buffer.remaining() ) prepareWrite( entrySize );
{
flush();
}


pageCursor.putByte( (byte) updateMode.ordinal() ); pageCursor.putByte( (byte) updateMode.ordinal() );
IndexUpdateEntry.write( pageCursor, layout, updateMode, key1, key2, value ); IndexUpdateEntry.write( pageCursor, layout, updateMode, key1, key2, value );
// a single thread, and the same thread every time, increments this count
count++;
}

void doneAdding() throws IOException
{
if ( buffer.remaining() < TYPE_SIZE )
{
flush();
}
pageCursor.putByte( STOP_TYPE );
flush();
}

public IndexUpdateCursor<KEY,VALUE> reader() throws IOException
{
ReadAheadChannel<StoreChannel> channel = new ReadAheadChannel<>( fs.open( file, OpenMode.READ ), byteBufferFactory.newBuffer( blockSize ) );
PageCursor pageCursor = new ReadableChannelPageCursor( channel );
return new IndexUpdateCursor<>( pageCursor, layout );
}

private void flush() throws IOException
{
buffer.flip();
storeChannel.write( buffer );
buffer.clear();
}

long count()
{
return count;
} }


@Override @Override
public void close() throws IOException public IndexUpdateCursor<KEY,VALUE> reader( PageCursor pageCursor )
{ {
storeChannel.close(); return new IndexUpdateCursor<>( pageCursor, layout );
fs.deleteFile( file );
} }
} }
@@ -0,0 +1,171 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.ByteArrayPageCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;

import static org.neo4j.io.IOUtils.closeAllUnchecked;
import static org.neo4j.util.concurrent.Runnables.runAll;

/**
* Not thread safe, except for {@link #count()} which does not support calls concurrent with {@link #add(Object)}.
*
* Storage that store {@link ENTRY entries} in a file by simply appending them.
* Entries can be read back, in the order they where added, through a {@link CURSOR}.
* This storage is useful when we don't want to hold all entries in memory.
*
* Extending classes are responsible for serializing and deserializing entries.
*
* On close, file will be deleted but provided {@link ByteBufferFactory} will not be closed.
*
* @param <ENTRY> Type of entry we are storing.
* @param <CURSOR> Cursor type responsible for deserializing what we have stored.
*/
public abstract class SimpleEntryStorage<ENTRY, CURSOR> implements Closeable
{
static final int TYPE_SIZE = Byte.BYTES;
static final byte STOP_TYPE = -1;
private static final byte[] NO_ENTRIES = {STOP_TYPE};
private final File file;
private final FileSystemAbstraction fs;
private final int blockSize;
private final ByteBufferFactory byteBufferFactory;

// Resources allocated lazily upon add
private boolean allocated;
private ByteBuffer buffer;
private ByteArrayPageCursor pageCursor;
private StoreChannel storeChannel;

private volatile long count;

SimpleEntryStorage( FileSystemAbstraction fs, File file, ByteBufferFactory byteBufferFactory, int blockSize ) throws IOException
{
this.fs = fs;
this.file = file;
this.byteBufferFactory = byteBufferFactory;
this.blockSize = blockSize;
}

void add( ENTRY entry ) throws IOException
{
allocateResources();
add( entry, pageCursor );
// a single thread, and the same thread every time, increments this count
count++;
}

CURSOR reader() throws IOException
{
if ( !allocated )
{
return reader( new ByteArrayPageCursor( NO_ENTRIES ) );
}
ReadAheadChannel<StoreChannel> channel = new ReadAheadChannel<>( fs.open( file, OpenMode.READ ), byteBufferFactory.newBuffer( blockSize ) );
PageCursor pageCursor = new ReadableChannelPageCursor( channel );
return reader( pageCursor );
}

long count()
{
return count;
}

void doneAdding() throws IOException
{
if ( !allocated )
{
return;
}
if ( buffer.remaining() < TYPE_SIZE )
{
flush();
}
pageCursor.putByte( STOP_TYPE );
flush();
}

@Override
public void close() throws IOException
{
if ( allocated )
{
runAll( "Failed while trying to close " + getClass().getSimpleName(),
() -> closeAllUnchecked( pageCursor, storeChannel ),
() -> fs.deleteFile( file )
);
}
else
{
fs.deleteFile( file );
}
}

/**
* DON'T CALL THIS METHOD DIRECTLY. Instead, use {@link #add(Object)}.
* Write entry to pageCursor. Implementor of this method is responsible for calling {@link #prepareWrite(int)} before actually start writing.
*/
abstract void add( ENTRY entry, PageCursor pageCursor ) throws IOException;

/**
* DON'T CALL THIS METHOD DIRECTLY. Instead use {@link #reader()}.
* Return {@link CURSOR} responsible for deserializing wrapping provided {@link PageCursor}, pointing to head of file.
*/
abstract CURSOR reader( PageCursor pageCursor ) throws IOException;

/**
* DON'T CALL THIS METHOD DIRECTLY. Only used by subclasses.
*/
void prepareWrite( int entrySize ) throws IOException
{
if ( entrySize > buffer.remaining() )
{
flush();
}
}

private void flush() throws IOException
{
buffer.flip();
storeChannel.write( buffer );
buffer.clear();
}

private void allocateResources() throws IOException
{
if ( !allocated )
{
this.buffer = byteBufferFactory.newBuffer( blockSize );
this.pageCursor = new ByteArrayPageCursor( buffer );
this.storeChannel = fs.create( file );
this.allocated = true;
}
}
}

0 comments on commit e5a5268

Please sign in to comment.