Skip to content

Commit

Permalink
A new IndexUpdateStorage for external updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint authored and burqen committed Feb 18, 2019
1 parent be6d2b3 commit c860fac
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 9 deletions.
Expand Up @@ -63,6 +63,12 @@ static <VALUE, KEY> int entrySize( Layout<KEY,VALUE> layout, KEY key, VALUE valu
return keySize + valueSize + getOverhead( keySize, valueSize );
}

static <VALUE, KEY> int keySize( Layout<KEY,VALUE> layout, KEY key )
{
int keySize = layout.keySize( key );
return keySize + getOverhead( keySize, 0 );
}

static <KEY, VALUE> BlockEntry<KEY,VALUE> read( PageCursor pageCursor, Layout<KEY,VALUE> layout )
{
KEY key = layout.newKey();
Expand All @@ -78,6 +84,12 @@ static <KEY, VALUE> void read( PageCursor pageCursor, Layout<KEY,VALUE> layout,
layout.readValue( pageCursor, value, extractValueSize( entrySize ) );
}

static <KEY, VALUE> void read( PageCursor pageCursor, Layout<KEY,VALUE> layout, KEY key )
{
long entrySize = readKeyValueSize( pageCursor );
layout.readKey( pageCursor, key, extractKeySize( entrySize ) );
}

static <KEY, VALUE> void write( PageCursor pageCursor, Layout<KEY,VALUE> layout, BlockEntry<KEY,VALUE> entry )
{
write( pageCursor, layout, entry.key(), entry.value() );
Expand All @@ -91,4 +103,11 @@ static <KEY, VALUE> void write( PageCursor pageCursor, Layout<KEY,VALUE> layout,
layout.writeKey( pageCursor, key );
layout.writeValue( pageCursor, value );
}

static <KEY, VALUE> void write( PageCursor pageCursor, Layout<KEY,VALUE> layout, KEY key )
{
int keySize = layout.keySize( key );
putKeyValueSize( pageCursor, keySize, 0 );
layout.writeKey( pageCursor, key );
}
}
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.api.index.UpdateMode;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;

import static org.neo4j.kernel.impl.index.schema.IndexUpdateStorage.STOP_TYPE;

public class IndexUpdateCursor<KEY, VALUE> implements BlockEntryCursor<KEY,VALUE>
{
private final ReadAheadChannel channel;
private final PageCursor cursor;
private final Layout<KEY,VALUE> layout;

// Fields for the last entry
private UpdateMode updateMode;
private KEY key1;
private KEY key2;
private VALUE value;

public IndexUpdateCursor( ReadAheadChannel channel, PageCursor cursor, Layout<KEY,VALUE> layout )
{
this.channel = channel;
this.cursor = cursor;
this.layout = layout;
this.key1 = layout.newKey();
this.key2 = layout.newKey();
this.value = layout.newValue();
}

@Override
public boolean next() throws IOException
{
byte updateModeType = cursor.getByte();
if ( updateModeType == STOP_TYPE )
{
return false;
}

updateMode = UpdateMode.MODES[updateModeType];
IndexUpdateEntry.read( cursor, layout, updateMode, key1, key2, value );
return true;
}

@Override
public KEY key()
{
return key1;
}

@Override
public VALUE value()
{
return value;
}

public KEY key2()
{
return key2;
}

public UpdateMode updateMode()
{
return updateMode;
}

@Override
public void close() throws IOException
{
cursor.close();
}
}
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.api.index.UpdateMode;

public class IndexUpdateEntry
{
public static <KEY, VALUE> void read( PageCursor cursor, Layout<KEY,VALUE> layout, UpdateMode updateMode, KEY key1, KEY key2, VALUE value )
{
switch ( updateMode )
{
case ADDED:
BlockEntry.read( cursor, layout, key1, value );
break;
case REMOVED:
BlockEntry.read( cursor, layout, key1 );
break;
case CHANGED:
BlockEntry.read( cursor, layout, key1 );
BlockEntry.read( cursor, layout, key2, value );
break;
default:
throw new IllegalArgumentException( "Unknown update mode " + updateMode );
}
}

public static <KEY,VALUE> void write( PageCursor cursor, Layout<KEY,VALUE> layout, UpdateMode updateMode, KEY key1, KEY key2, VALUE value )
{
switch ( updateMode )
{
case ADDED:
BlockEntry.write( cursor, layout, key1, value );
break;
case REMOVED:
BlockEntry.write( cursor, layout, key1 );
break;
case CHANGED:
BlockEntry.write( cursor, layout, key1 );
BlockEntry.write( cursor, layout, key2, value );
break;
default:
throw new IllegalArgumentException( "Unknown update mode " + updateMode );
}
}
}
@@ -0,0 +1,129 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.ByteArrayPageCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.api.index.UpdateMode;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;

import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyAndValueFromUpdate;
import static org.neo4j.kernel.impl.index.schema.NativeIndexUpdater.initializeKeyFromUpdate;

public class IndexUpdateStorage<KEY extends NativeIndexKey<KEY>,VALUE extends NativeIndexValue> implements Closeable
{
private static final int TYPE_SIZE = Byte.BYTES;
static final byte STOP_TYPE = -1;

private final Layout<KEY,VALUE> layout;
private final FileSystemAbstraction fs;
private final File file;
private final ByteBuffer buffer;
private final ByteArrayPageCursor pageCursor;
private final StoreChannel storeChannel;
private final KEY key1;
private final KEY key2;
private final VALUE value;

IndexUpdateStorage( Layout<KEY,VALUE> layout, FileSystemAbstraction fs, File file, ByteBuffer buffer ) throws IOException
{
this.layout = layout;
this.fs = fs;
this.file = file;
this.buffer = buffer;
this.pageCursor = new ByteArrayPageCursor( buffer );
this.storeChannel = fs.create( file );
this.key1 = layout.newKey();
this.key2 = layout.newKey();
this.value = layout.newValue();
}

public void add( IndexEntryUpdate<?> update ) throws IOException
{
int entrySize = TYPE_SIZE;
UpdateMode updateMode = update.updateMode();
switch ( updateMode )
{
case ADDED:
initializeKeyAndValueFromUpdate( key1, value, update.getEntityId(), update.values() );
entrySize += BlockEntry.entrySize( layout, key1, value );
break;
case REMOVED:
initializeKeyFromUpdate( key1, update.getEntityId(), update.values() );
entrySize += BlockEntry.keySize( layout, key1 );
break;
case CHANGED:
initializeKeyFromUpdate( key1, update.getEntityId(), update.beforeValues() );
initializeKeyAndValueFromUpdate( key2, value, update.getEntityId(), update.values() );
entrySize += BlockEntry.keySize( layout, key1 ) + BlockEntry.entrySize( layout, key2, value );
break;
default:
throw new IllegalArgumentException( "Unknown update mode " + updateMode );
}

if ( entrySize > buffer.remaining() )
{
flush();
}

pageCursor.putByte( (byte) updateMode.ordinal() );
IndexUpdateEntry.write( pageCursor, layout, updateMode, key1, key2, value );
}

public void doneAdding() throws IOException
{
if ( buffer.remaining() < TYPE_SIZE )
{
flush();
}
pageCursor.putByte( STOP_TYPE );
flush();
}

public IndexUpdateCursor<KEY,VALUE> reader() throws IOException
{
ReadAheadChannel<StoreChannel> channel = new ReadAheadChannel<>( fs.open( file, OpenMode.READ ) );
PageCursor pageCursor = new ReadableChannelPageCursor( channel );
return new IndexUpdateCursor<>( channel, pageCursor, layout );
}

private void flush() throws IOException
{
buffer.flip();
storeChannel.write( buffer );
buffer.clear();
}

@Override
public void close() throws IOException
{
storeChannel.close();
}
}
Expand Up @@ -127,13 +127,19 @@ private static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue>
Writer<KEY,VALUE> writer, ConflictDetectingValueMerger<KEY,VALUE> conflictDetectingValueMerger )
throws IndexEntryConflictException
{
initializeKeyFromUpdate( treeKey, update.getEntityId(), update.values() );
treeValue.from( update.values() );
initializeKeyAndValueFromUpdate( treeKey, treeValue, update.getEntityId(), update.values() );
conflictDetectingValueMerger.controlConflictDetection( treeKey );
writer.merge( treeKey, treeValue, conflictDetectingValueMerger );
conflictDetectingValueMerger.checkConflict( update.values() );
}

static <KEY extends NativeIndexKey<KEY>, VALUE extends NativeIndexValue> void initializeKeyAndValueFromUpdate( KEY treeKey, VALUE treeValue,
long entityId, Value[] values )
{
initializeKeyFromUpdate( treeKey, entityId, values );
treeValue.from( values );
}

static <KEY extends NativeIndexKey<KEY>> void initializeKeyFromUpdate( KEY treeKey, long entityId, Value[] values )
{
treeKey.initialize( entityId );
Expand Down
Expand Up @@ -134,22 +134,27 @@ public void putInt( int offset, int value )
@Override
public void getBytes( byte[] data )
{
getBytes( data, 0, data.length );
}

@Override
public void getBytes( byte[] data, int arrayOffset, int length )
{
if ( arrayOffset != 0 )
{
throw new UnsupportedOperationException();
}

try
{
channel.get( data, data.length );
channel.get( data, length );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}

@Override
public void getBytes( byte[] data, int arrayOffset, int length )
{
throw new UnsupportedOperationException();
}

@Override
public void putBytes( byte[] data )
{
Expand Down

0 comments on commit c860fac

Please sign in to comment.