diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockEntryCursor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockEntryCursor.java new file mode 100644 index 0000000000000..7983c53f6e811 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockEntryCursor.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.io.Closeable; +import java.io.IOException; + +public interface BlockEntryCursor extends Closeable +{ + boolean next() throws IOException; + + KEY key(); + + VALUE value(); +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockEntryReader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockEntryReader.java new file mode 100644 index 0000000000000..b5e5ab69e5f6c --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockEntryReader.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.io.IOException; + +import org.neo4j.index.internal.gbptree.Layout; +import org.neo4j.io.pagecache.PageCursor; + +public class BlockEntryReader implements BlockEntryCursor +{ + private final long entryCount; + private final PageCursor pageCursor; + private final KEY key; + private final VALUE value; + private final Layout layout; + private long readEntries; + + BlockEntryReader( PageCursor pageCursor, Layout layout ) + { + this.pageCursor = pageCursor; + this.entryCount = pageCursor.getLong(); + this.layout = layout; + this.key = layout.newKey(); + this.value = layout.newValue(); + } + + public boolean next() throws IOException + { + if ( readEntries >= entryCount ) + { + return false; + } + BlockEntry.read( pageCursor, layout, key, value ); + readEntries++; + return true; + } + + public long entryCount() + { + return entryCount; + } + + public KEY key() + { + return key; + } + + public VALUE value() + { + return value; + } + + @Override + public void close() throws IOException + { + pageCursor.close(); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockReader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockReader.java index 721d6343c51a4..a3656e89cb86a 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockReader.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockReader.java @@ -20,58 +20,50 @@ package org.neo4j.kernel.impl.index.schema; import java.io.Closeable; +import java.io.File; import java.io.IOException; import org.neo4j.index.internal.gbptree.Layout; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.fs.OpenMode; +import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.pagecache.PageCursor; +import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel; public class BlockReader implements Closeable { - private final long entryCount; - private final PageCursor pageCursor; - private final KEY key; - private final VALUE value; + private final StoreChannel channel; + private final long blockSize; + private final FileSystemAbstraction fs; + private final File file; private final Layout layout; - private long readEntries; - BlockReader( PageCursor pageCursor, Layout layout ) + BlockReader( FileSystemAbstraction fs, File file, long blockSize, Layout layout ) throws IOException { - this.pageCursor = pageCursor; - this.entryCount = pageCursor.getLong(); + this.fs = fs; + this.file = file; this.layout = layout; - this.key = layout.newKey(); - this.value = layout.newValue(); + this.channel = fs.open( file, OpenMode.READ ); + this.blockSize = blockSize; } - public boolean next() throws IOException + BlockEntryReader nextBlock() throws IOException { - if ( readEntries >= entryCount ) + long position = channel.position(); + if ( position >= channel.size() ) { - return false; + return null; } - BlockEntry.read( pageCursor, layout, key, value ); - readEntries++; - return true; - } - - public long entryCount() - { - return entryCount; - } - - KEY key() - { - return key; - } - - VALUE value() - { - return value; + StoreChannel blockChannel = fs.open( file, OpenMode.READ ); + blockChannel.position( position ); + channel.position( position + blockSize ); + PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) ); + return new BlockEntryReader<>( pageCursor, layout ); } @Override public void close() throws IOException { - pageCursor.close(); + channel.close(); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorageReader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorageReader.java deleted file mode 100644 index fda34c148a6c6..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockStorageReader.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2002-2019 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.kernel.impl.index.schema; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; - -import org.neo4j.index.internal.gbptree.Layout; -import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.fs.OpenMode; -import org.neo4j.io.fs.StoreChannel; -import org.neo4j.io.pagecache.PageCursor; -import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel; - -public class BlockStorageReader implements Closeable -{ - private final StoreChannel channel; - private final long blockSize; - private final FileSystemAbstraction fs; - private final File file; - private final Layout layout; - - BlockStorageReader( FileSystemAbstraction fs, File file, long blockSize, Layout layout ) throws IOException - { - this.fs = fs; - this.file = file; - this.layout = layout; - this.channel = fs.open( file, OpenMode.READ ); - this.blockSize = blockSize; - } - - BlockReader nextBlock() throws IOException - { - long position = channel.position(); - if ( position >= channel.size() ) - { - return null; - } - StoreChannel blockChannel = fs.open( file, OpenMode.READ ); - blockChannel.position( position ); - channel.position( position + blockSize ); - PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) ); - return new BlockReader<>( pageCursor, layout ); - } - - @Override - public void close() throws IOException - { - channel.close(); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java new file mode 100644 index 0000000000000..becdd209258ff --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; + +import org.neo4j.index.internal.gbptree.Layout; +import org.neo4j.io.IOUtils; + +public class MergingBlockEntryReader implements BlockEntryCursor +{ + // todo more or less expensive compared to iterating over heads? + private final PriorityQueue> sortedReaders; + private List> readersToClose = new ArrayList<>(); + private BlockEntryCursor lastReturned; + + MergingBlockEntryReader( Layout layout ) + { + this.sortedReaders = new PriorityQueue<>( ( o1, o2 ) -> layout.compare( o1.key(), o2.key() ) ); + } + + void addSource( BlockEntryCursor source ) throws IOException + { + readersToClose.add( source ); + if ( source.next() ) + { + sortedReaders.add( source ); + } + } + + @Override + public boolean next() throws IOException + { + if ( lastReturned != null ) + { + if ( lastReturned.next() ) + { + sortedReaders.add( lastReturned ); + } + } + + if ( sortedReaders.isEmpty() ) + { + return false; + } + lastReturned = sortedReaders.poll(); + return true; + } + + @Override + public KEY key() + { + return lastReturned.key(); + } + + @Override + public VALUE value() + { + return lastReturned.value(); + } + + @Override + public void close() throws IOException + { + IOUtils.closeAll( readersToClose ); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java index 07127a461c477..66d4431c53550 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockStorageTest.java @@ -215,11 +215,11 @@ private final void assertContents( SimpleLongLayout layout, BlockStorage>... expectedBlocks ) throws IOException { - try ( BlockStorageReader reader = storage.reader() ) + try ( BlockReader reader = storage.reader() ) { for ( List> expectedBlock : expectedBlocks ) { - try ( BlockReader block = reader.nextBlock() ) + try ( BlockEntryReader block = reader.nextBlock() ) { assertNotNull( block ); assertEquals( expectedBlock.size(), block.entryCount() ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java new file mode 100644 index 0000000000000..e26c0a44e617d --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.index.schema; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.neo4j.index.internal.gbptree.SimpleLongLayout; +import org.neo4j.test.extension.Inject; +import org.neo4j.test.extension.RandomExtension; +import org.neo4j.test.rule.RandomRule; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singleton; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith( RandomExtension.class ) +class MergingBlockEntryReaderTest +{ + @Inject + RandomRule rnd; + + private static SimpleLongLayout layout = SimpleLongLayout.longLayout().build(); + private static Comparator> blockEntryComparator = ( b1, b2 ) -> layout.compare( b1.key(), b2.key() ); + + @Test + void shouldMergeSingleReader() throws IOException + { + // given + MergingBlockEntryReader merger = new MergingBlockEntryReader<>( layout ); + List> data = someBlockEntries(); + + // when + merger.addSource( newReader( data ) ); + + // then + List> expected = sortAll( singleton( data ) ); + verifyMerged( expected, merger ); + } + + @Test + void shouldMergeSingleEmptyReader() throws IOException + { + // given + MergingBlockEntryReader merger = new MergingBlockEntryReader<>( layout ); + List> data = emptyList(); + + // when + merger.addSource( newReader( data ) ); + + // then + assertFalse( merger.next() ); + } + + @Test + void shouldMergeMultipleReaders() throws IOException + { + // given + MergingBlockEntryReader merger = new MergingBlockEntryReader<>( layout ); + List>> datas = new ArrayList<>(); + int nbrOfReaders = rnd.nextInt( 10 ) + 1; + for ( int i = 0; i < nbrOfReaders; i++ ) + { + // when + List> data = someBlockEntries(); + datas.add( data ); + merger.addSource( newReader( data ) ); + } + + // then + List> expected = sortAll( datas ); + verifyMerged( expected, merger ); + } + + @Test + void shouldCloseAllReaderEvenEmpty() throws IOException + { + // given + MergingBlockEntryReader merger = new MergingBlockEntryReader<>( layout ); + ListBasedBlockEntryCursor empty = newReader( emptyList() ); + ListBasedBlockEntryCursor nonEmpty = newReader( someBlockEntries() ); + merger.addSource( empty ); + merger.addSource( nonEmpty ); + + // when + merger.close(); + + // then + assertTrue( empty.closed ); + assertTrue( nonEmpty.closed ); + } + + @Test + void shouldCloseAllReaderEvenEmptyAndExhausted() throws IOException + { + // given + MergingBlockEntryReader merger = new MergingBlockEntryReader<>( layout ); + ListBasedBlockEntryCursor empty = newReader( emptyList() ); + ListBasedBlockEntryCursor nonEmpty = newReader( someBlockEntries() ); + merger.addSource( empty ); + merger.addSource( nonEmpty ); + + // when + while ( merger.next() ) + { // exhaust + } + merger.close(); + + // then + assertTrue( empty.closed ); + assertTrue( nonEmpty.closed ); + } + + private void verifyMerged( List> expected, MergingBlockEntryReader merger ) throws IOException + { + for ( BlockEntry expectedEntry : expected ) + { + assertTrue( merger.next() ); + assertEquals( 0, layout.compare( expectedEntry.key(), merger.key() ) ); + assertEquals( expectedEntry.value(), merger.value() ); + } + assertFalse( merger.next() ); + } + + private List> sortAll( Iterable>> data ) + { + List> result = new ArrayList<>(); + for ( List> list : data ) + { + result.addAll( list ); + } + result.sort( blockEntryComparator ); + return result; + } + + private ListBasedBlockEntryCursor newReader( List> expected ) + { + return new ListBasedBlockEntryCursor<>( expected ); + } + + private List> someBlockEntries() + { + List> entries = new ArrayList<>(); + for ( int i = 0; i < rnd.nextInt( 10 ); i++ ) + { + MutableLong key = layout.key( rnd.nextLong( 10_000 ) ); + MutableLong value = layout.value( rnd.nextLong( 10_000 ) ); + entries.add( new BlockEntry<>( key, value ) ); + } + entries.sort( blockEntryComparator ); + return entries; + } + + private class ListBasedBlockEntryCursor implements BlockEntryCursor + { + private Iterator> entries; + private BlockEntry next; + private boolean closed; + + ListBasedBlockEntryCursor( List> entries ) + { + this.entries = entries.iterator(); + } + + @Override + public boolean next() + { + if ( entries.hasNext() ) + { + next = entries.next(); + return true; + } + return false; + } + + @Override + public KEY key() + { + return next.key(); + } + + @Override + public VALUE value() + { + return next.value(); + } + + @Override + public void close() + { + closed = true; + } + } +}