Skip to content

Commit

Permalink
More testing and multiple blocks working
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint authored and burqen committed Feb 18, 2019
1 parent 9b3b08c commit 201ce6b
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 109 deletions.
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.IOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.pagecache.PageCursor;

import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.extractKeySize;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.extractValueSize;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.readKeyValueSize;

public class BlockReader<KEY,VALUE> implements Closeable
{
private final long entryCount;
private final PageCursor pageCursor;
private final KEY key;
private final VALUE value;
private final Layout<KEY,VALUE> layout;
private long readEntries;

BlockReader( PageCursor pageCursor, Layout<KEY,VALUE> layout )
{
this.pageCursor = pageCursor;
this.entryCount = pageCursor.getLong();
this.layout = layout;
this.key = layout.newKey();
this.value = layout.newValue();
}

public boolean next() throws IOException
{
if ( readEntries >= entryCount )
{
return false;
}

long entrySize = readKeyValueSize( pageCursor );
layout.readKey( pageCursor, key, extractKeySize( entrySize ) );
layout.readValue( pageCursor, value, extractValueSize( entrySize ) );

readEntries++;
return true;
}

public long entryCount()
{
return entryCount;
}

KEY key()
{
return key;
}

VALUE value()
{
return value;
}

@Override
public void close() throws IOException
{
pageCursor.close();
}
}
Expand Up @@ -31,17 +31,11 @@
import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.ByteArrayPageCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;

import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.extractKeySize;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.extractValueSize;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.getOverhead;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.putKeyValueSize;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.readKeyValueSize;

class BlockStorage<KEY, VALUE> implements Closeable
{
Expand All @@ -58,7 +52,6 @@ class BlockStorage<KEY, VALUE> implements Closeable
private final int bufferSize;
private final ByteBufferFactory bufferFactory;
private int currentBufferSize;
private long currentKeyCount;
private int mergeIteration;

BlockStorage( Layout<KEY,VALUE> layout, ByteBufferFactory bufferFactory, FileSystemAbstraction fs, File blockFile, Monitor monitor, int bufferSize )
Expand Down Expand Up @@ -92,13 +85,12 @@ public void add( KEY key, VALUE value ) throws IOException

bufferedEntries.add( new BlockEntry<>( key, value ) );
currentBufferSize += entrySize;
currentKeyCount++;
monitor.entryAdded( entrySize );
}

public void doneAdding() throws IOException
{
if ( currentKeyCount > 0 )
if ( !bufferedEntries.isEmpty() )
{
flushAndResetBuffer();
}
Expand All @@ -109,7 +101,6 @@ private void resetBuffer()
byteBuffer.clear();
bufferedEntries.clear();
currentBufferSize = BLOCK_HEADER_SIZE;
currentKeyCount = 0;
}

private void flushAndResetBuffer() throws IOException
Expand All @@ -118,7 +109,7 @@ private void flushAndResetBuffer() throws IOException
ByteArrayPageCursor pageCursor = new ByteArrayPageCursor( byteBuffer );

// Header
pageCursor.putLong( currentKeyCount );
pageCursor.putLong( bufferedEntries.size() );

// Entries
for ( BlockEntry<KEY,VALUE> entry : bufferedEntries )
Expand All @@ -130,13 +121,14 @@ private void flushAndResetBuffer() throws IOException
layout.writeValue( pageCursor, entry.value() );
}

// TODO solve the BIG padding problem
// Zero pad
pageCursor.putBytes( bufferSize - currentBufferSize, (byte) 0 );

// Append to file
byteBuffer.flip();
storeChannel.writeAll( byteBuffer );
monitor.blockFlushed( currentKeyCount, currentBufferSize, storeChannel.position() );
monitor.blockFlushed( bufferedEntries.size(), currentBufferSize, storeChannel.position() );
resetBuffer();
}

Expand All @@ -151,93 +143,9 @@ private long calculateBlockSize()
return (long) Math.pow( 2, mergeIteration ) * bufferSize;
}

public BlockReader reader() throws IOException
public BlockStorageReader<KEY,VALUE> reader() throws IOException
{
return new BlockReader( fs.open( blockFile, OpenMode.READ ) );
}

public class BlockReader implements Closeable
{
private final StoreChannel channel;

BlockReader( StoreChannel channel )
{
this.channel = channel;
}

public EntryReader nextBlock() throws IOException
{
long position = channel.position();
if ( position >= channel.size() )
{
return null;
}
StoreChannel blockChannel = fs.open( blockFile, OpenMode.READ );
blockChannel.position( position );
channel.position( position + calculateBlockSize() );
PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) );
EntryReader entryReader = new EntryReader( pageCursor );
return entryReader;
}

@Override
public void close() throws IOException
{
channel.close();
}
}

public class EntryReader implements Closeable
{
private final long entryCount;
private final PageCursor pageCursor;
private final KEY key;
private final VALUE value;
private long readEntries;

EntryReader( PageCursor pageCursor )
{
this.pageCursor = pageCursor;
this.entryCount = pageCursor.getLong();
this.key = layout.newKey();
this.value = layout.newValue();
}

public boolean next() throws IOException
{
if ( readEntries >= entryCount )
{
return false;
}

long entrySize = readKeyValueSize( pageCursor );
layout.readKey( pageCursor, key, extractKeySize( entrySize ) );
layout.readValue( pageCursor, value, extractValueSize( entrySize ) );

readEntries++;
return true;
}

public long entryCount()
{
return entryCount;
}

KEY key()
{
return key;
}

VALUE value()
{
return value;
}

@Override
public void close() throws IOException
{
pageCursor.close();
}
return new BlockStorageReader<>( fs, blockFile, calculateBlockSize(), layout );
}

public interface Monitor
Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;

public class BlockStorageReader<KEY,VALUE> implements Closeable
{
private final StoreChannel channel;
private final long blockSize;
private final FileSystemAbstraction fs;
private final File file;
private final Layout<KEY,VALUE> layout;

BlockStorageReader( FileSystemAbstraction fs, File file, long blockSize, Layout<KEY,VALUE> layout ) throws IOException
{
this.fs = fs;
this.file = file;
this.layout = layout;
this.channel = fs.open( file, OpenMode.READ );
this.blockSize = blockSize;
}

public BlockReader<KEY,VALUE> nextBlock() throws IOException
{
long position = channel.position();
if ( position >= channel.size() )
{
return null;
}
StoreChannel blockChannel = fs.open( file, OpenMode.READ );
blockChannel.position( position );
channel.position( position + blockSize );
PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) );
return new BlockReader<>( pageCursor, layout );
}

@Override
public void close() throws IOException
{
channel.close();
}
}

0 comments on commit 201ce6b

Please sign in to comment.