Skip to content

Commit

Permalink
MergingBlockEntryReader can merge join multiple BlockEntryReaders
Browse files Browse the repository at this point in the history
Name change:
BlockReader -> BlockEntryReader
BlockStorageReader -> BlockReader
  • Loading branch information
burqen committed Feb 18, 2019
1 parent 898dadb commit 2c903c9
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 103 deletions.
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.IOException;

public interface BlockEntryCursor<KEY,VALUE> extends Closeable
{
boolean next() throws IOException;

KEY key();

VALUE value();
}
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.pagecache.PageCursor;

public class BlockEntryReader<KEY,VALUE> implements BlockEntryCursor<KEY,VALUE>
{
private final long entryCount;
private final PageCursor pageCursor;
private final KEY key;
private final VALUE value;
private final Layout<KEY,VALUE> layout;
private long readEntries;

BlockEntryReader( PageCursor pageCursor, Layout<KEY,VALUE> layout )
{
this.pageCursor = pageCursor;
this.entryCount = pageCursor.getLong();
this.layout = layout;
this.key = layout.newKey();
this.value = layout.newValue();
}

public boolean next() throws IOException
{
if ( readEntries >= entryCount )
{
return false;
}
BlockEntry.read( pageCursor, layout, key, value );
readEntries++;
return true;
}

public long entryCount()
{
return entryCount;
}

public KEY key()
{
return key;
}

public VALUE value()
{
return value;
}

@Override
public void close() throws IOException
{
pageCursor.close();
}
}
Expand Up @@ -20,58 +20,50 @@
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;

public class BlockReader<KEY,VALUE> implements Closeable
{
private final long entryCount;
private final PageCursor pageCursor;
private final KEY key;
private final VALUE value;
private final StoreChannel channel;
private final long blockSize;
private final FileSystemAbstraction fs;
private final File file;
private final Layout<KEY,VALUE> layout;
private long readEntries;

BlockReader( PageCursor pageCursor, Layout<KEY,VALUE> layout )
BlockReader( FileSystemAbstraction fs, File file, long blockSize, Layout<KEY,VALUE> layout ) throws IOException
{
this.pageCursor = pageCursor;
this.entryCount = pageCursor.getLong();
this.fs = fs;
this.file = file;
this.layout = layout;
this.key = layout.newKey();
this.value = layout.newValue();
this.channel = fs.open( file, OpenMode.READ );
this.blockSize = blockSize;
}

public boolean next() throws IOException
BlockEntryReader<KEY,VALUE> nextBlock() throws IOException
{
if ( readEntries >= entryCount )
long position = channel.position();
if ( position >= channel.size() )
{
return false;
return null;
}
BlockEntry.read( pageCursor, layout, key, value );
readEntries++;
return true;
}

public long entryCount()
{
return entryCount;
}

KEY key()
{
return key;
}

VALUE value()
{
return value;
StoreChannel blockChannel = fs.open( file, OpenMode.READ );
blockChannel.position( position );
channel.position( position + blockSize );
PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) );
return new BlockEntryReader<>( pageCursor, layout );
}

@Override
public void close() throws IOException
{
pageCursor.close();
channel.close();
}
}

This file was deleted.

@@ -0,0 +1,87 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.IOUtils;

public class MergingBlockEntryReader<KEY,VALUE> implements BlockEntryCursor<KEY,VALUE>
{
// todo more or less expensive compared to iterating over heads?
private final PriorityQueue<BlockEntryCursor<KEY,VALUE>> sortedReaders;
private List<BlockEntryCursor<KEY,VALUE>> readersToClose = new ArrayList<>();
private BlockEntryCursor<KEY,VALUE> lastReturned;

MergingBlockEntryReader( Layout<KEY,VALUE> layout )
{
this.sortedReaders = new PriorityQueue<>( ( o1, o2 ) -> layout.compare( o1.key(), o2.key() ) );
}

void addSource( BlockEntryCursor<KEY,VALUE> source ) throws IOException
{
readersToClose.add( source );
if ( source.next() )
{
sortedReaders.add( source );
}
}

@Override
public boolean next() throws IOException
{
if ( lastReturned != null )
{
if ( lastReturned.next() )
{
sortedReaders.add( lastReturned );
}
}

if ( sortedReaders.isEmpty() )
{
return false;
}
lastReturned = sortedReaders.poll();
return true;
}

@Override
public KEY key()
{
return lastReturned.key();
}

@Override
public VALUE value()
{
return lastReturned.value();
}

@Override
public void close() throws IOException
{
IOUtils.closeAll( readersToClose );
}
}
Expand Up @@ -215,11 +215,11 @@ private final void assertContents( SimpleLongLayout layout, BlockStorage<Mutable
List<Pair<MutableLong,MutableLong>>... expectedBlocks )
throws IOException
{
try ( BlockStorageReader<MutableLong,MutableLong> reader = storage.reader() )
try ( BlockReader<MutableLong,MutableLong> reader = storage.reader() )
{
for ( List<Pair<MutableLong,MutableLong>> expectedBlock : expectedBlocks )
{
try ( BlockReader<MutableLong,MutableLong> block = reader.nextBlock() )
try ( BlockEntryReader<MutableLong,MutableLong> block = reader.nextBlock() )
{
assertNotNull( block );
assertEquals( expectedBlock.size(), block.entryCount() );
Expand Down

0 comments on commit 2c903c9

Please sign in to comment.