Skip to content

Commit

Permalink
GBPTreeConcurrencyIT is generic in <KEY,VALUE>
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen committed Jan 16, 2018
1 parent 47526fd commit f0a1406
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 32 deletions.
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.index.internal.gbptree;

import org.apache.commons.lang3.mutable.MutableLong;

import org.neo4j.test.rule.RandomRule;

public class GBPTreeConcurrencyFIxedSizeIT extends GBPTreeConcurrencyITBase<MutableLong,MutableLong>
{
@Override
protected TestLayout<MutableLong,MutableLong> getLayout( RandomRule random )
{
return new SimpleLongLayout( random.intBetween( 0, 10 ) );
}
}
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.index.internal.gbptree; package org.neo4j.index.internal.gbptree;


import org.apache.commons.lang3.mutable.MutableLong;
import org.junit.After; import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -60,7 +59,6 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.rules.RuleChain.outerRule; import static org.junit.rules.RuleChain.outerRule;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config; import static org.neo4j.test.rule.PageCacheRule.config;


/** /**
Expand All @@ -77,7 +75,7 @@
* toAdd and toRemove, prepare the GB+Tree with entries and serve readers and writer with information * toAdd and toRemove, prepare the GB+Tree with entries and serve readers and writer with information
* about what they should do next. * about what they should do next.
*/ */
public class GBPTreeConcurrencyIT public abstract class GBPTreeConcurrencyITBase<KEY,VALUE>
{ {
private final DefaultFileSystemRule fs = new DefaultFileSystemRule(); private final DefaultFileSystemRule fs = new DefaultFileSystemRule();
private final TestDirectory directory = TestDirectory.testDirectory( getClass(), fs.get() ); private final TestDirectory directory = TestDirectory.testDirectory( getClass(), fs.get() );
Expand All @@ -87,27 +85,21 @@ public class GBPTreeConcurrencyIT
@Rule @Rule
public final RuleChain rules = outerRule( fs ).around( directory ).around( pageCacheRule ).around( random ); public final RuleChain rules = outerRule( fs ).around( directory ).around( pageCacheRule ).around( random );


private Layout<MutableLong,MutableLong> layout; private TestLayout<KEY,VALUE> layout;
private GBPTree<MutableLong,MutableLong> index; private GBPTree<KEY,VALUE> index;
private final ExecutorService threadPool = private final ExecutorService threadPool =
Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );


private GBPTree<MutableLong,MutableLong> createIndex() private GBPTree<KEY,VALUE> createIndex() throws IOException
throws IOException
{
return createIndex( NO_MONITOR );
}

private GBPTree<MutableLong,MutableLong> createIndex( GBPTree.Monitor monitor )
throws IOException
{ {
int pageSize = 512; int pageSize = 512;
layout = new SimpleLongLayout( random.intBetween( 0, 10 ) ); layout = getLayout( random );
PageCache pageCache = PageCache pageCache = pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ).withAccessChecks( true ) );
pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ).withAccessChecks( true ) );
return index = new GBPTreeBuilder<>( pageCache, directory.file( "index" ), layout ).build(); return index = new GBPTreeBuilder<>( pageCache, directory.file( "index" ), layout ).build();
} }


protected abstract TestLayout<KEY,VALUE> getLayout( RandomRule random );

@After @After
public void consistencyCheckAndClose() throws IOException public void consistencyCheckAndClose() throws IOException
{ {
Expand Down Expand Up @@ -200,6 +192,7 @@ private void shouldReadCorrectlyWithConcurrentUpdates( TestCoordinator testCoord
threadPool.awaitTermination( 10, TimeUnit.SECONDS ); threadPool.awaitTermination( 10, TimeUnit.SECONDS );
if ( readerError.get() != null ) if ( readerError.get() != null )
{ {
//noinspection ThrowFromFinallyBlock
throw readerError.get(); throw readerError.get();
} }
} }
Expand Down Expand Up @@ -249,25 +242,25 @@ List<Long> shuffleToNewList( List<Long> sourceList, Random random )
return shuffledList; return shuffledList;
} }


void prepare( GBPTree<MutableLong,MutableLong> index ) throws IOException void prepare( GBPTree<KEY,VALUE> index ) throws IOException
{ {
prepareIndex( index, readersShouldSee, toRemove, toAdd, random ); prepareIndex( index, readersShouldSee, toRemove, toAdd, random );
iterationFinished(); iterationFinished();
} }


void prepareIndex( GBPTree<MutableLong,MutableLong> index, TreeSet<Long> dataInIndex, void prepareIndex( GBPTree<KEY,VALUE> index, TreeSet<Long> dataInIndex,
Queue<Long> toRemove, Queue<Long> toAdd, Random random ) throws IOException Queue<Long> toRemove, Queue<Long> toAdd, Random random ) throws IOException
{ {
List<Long> fullRange = LongStream.range( minRange, maxRange ).boxed().collect( Collectors.toList() ); List<Long> fullRange = LongStream.range( minRange, maxRange ).boxed().collect( Collectors.toList() );
List<Long> rangeOutOfOrder = shuffleToNewList( fullRange, random ); List<Long> rangeOutOfOrder = shuffleToNewList( fullRange, random );
try ( Writer<MutableLong, MutableLong> writer = index.writer() ) try ( Writer<KEY,VALUE> writer = index.writer() )
{ {
for ( Long key : rangeOutOfOrder ) for ( Long key : rangeOutOfOrder )
{ {
boolean addForRemoval = random.nextDouble() > writePercentage; boolean addForRemoval = random.nextDouble() > writePercentage;
if ( addForRemoval ) if ( addForRemoval )
{ {
writer.put( new MutableLong( key ), new MutableLong( key ) ); writer.put( key( key ),value( key ) );
dataInIndex.add( key ); dataInIndex.add( key );
toRemove.add( key ); toRemove.add( key );
} }
Expand All @@ -282,8 +275,7 @@ void prepareIndex( GBPTree<MutableLong,MutableLong> index, TreeSet<Long> dataInI
void iterationFinished() void iterationFinished()
{ {
// Create new set to not modify set that readers use concurrently // Create new set to not modify set that readers use concurrently
TreeSet<Long> tmp = new TreeSet<>( readersShouldSee ); readersShouldSee = new TreeSet<>( readersShouldSee );
readersShouldSee = tmp;
updateRecentlyInsertedData( readersShouldSee, updatesForNextIteration ); updateRecentlyInsertedData( readersShouldSee, updatesForNextIteration );
updatesForNextIteration = generateUpdatesForNextIteration(); updatesForNextIteration = generateUpdatesForNextIteration();
updateWithSoonToBeRemovedData( readersShouldSee, updatesForNextIteration ); updateWithSoonToBeRemovedData( readersShouldSee, updatesForNextIteration );
Expand Down Expand Up @@ -384,7 +376,7 @@ private abstract static class UpdateOperation
this.key = key; this.key = key;
} }


abstract void apply( Writer<MutableLong,MutableLong> writer ) throws IOException; abstract void apply( Writer<KEY,VALUE> writer ) throws IOException;


abstract void applyToSet( Set<Long> set ); abstract void applyToSet( Set<Long> set );


Expand All @@ -399,9 +391,9 @@ private static class PutOperation extends UpdateOperation
} }


@Override @Override
void apply( Writer<MutableLong,MutableLong> writer ) throws IOException void apply( Writer<KEY,VALUE> writer ) throws IOException
{ {
writer.put( new MutableLong( key ), new MutableLong( key ) ); writer.put( key( key ), value( key ) );
} }


@Override @Override
Expand All @@ -425,9 +417,9 @@ private static class RemoveOperation extends UpdateOperation
} }


@Override @Override
void apply( Writer<MutableLong,MutableLong> writer ) throws IOException void apply( Writer<KEY,VALUE> writer ) throws IOException
{ {
writer.remove( new MutableLong( key ) ); writer.remove( key( key ) );
} }


@Override @Override
Expand Down Expand Up @@ -465,7 +457,7 @@ private void writeOneIteration( TestCoordinator testCoordinator,
Iterator<UpdateOperation> toWriteIterator = toWrite.iterator(); Iterator<UpdateOperation> toWriteIterator = toWrite.iterator();
while ( toWriteIterator.hasNext() ) while ( toWriteIterator.hasNext() )
{ {
try ( Writer<MutableLong,MutableLong> writer = index.writer() ) try ( Writer<KEY,VALUE> writer = index.writer() )
{ {
int inBatch = 0; int inBatch = 0;
while ( toWriteIterator.hasNext() && inBatch < batchSize ) while ( toWriteIterator.hasNext() && inBatch < batchSize )
Expand Down Expand Up @@ -532,17 +524,16 @@ private void doRead() throws IOException
long start = readerInstruction.start(); long start = readerInstruction.start();
long end = readerInstruction.end(); long end = readerInstruction.end();
boolean forward = start <= end; boolean forward = start <= end;
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> cursor = try ( RawCursor<Hit<KEY,VALUE>,IOException> cursor = index.seek( key( start ), key( end ) ) )
index.seek( new MutableLong( start ), new MutableLong( end ) ) )
{ {
if ( expectToSee.hasNext() ) if ( expectToSee.hasNext() )
{ {
long nextToSee = expectToSee.next(); long nextToSee = expectToSee.next();
while ( cursor.next() ) while ( cursor.next() )
{ {
// Actual // Actual
long lastSeenKey = cursor.get().key().longValue(); long lastSeenKey = keySeed( cursor.get().key() );
long lastSeenValue = cursor.get().value().longValue(); long lastSeenValue = valueSeed( cursor.get().value() );


if ( lastSeenKey != lastSeenValue ) if ( lastSeenKey != lastSeenValue )
{ {
Expand Down Expand Up @@ -634,4 +625,24 @@ TreeSet<Long> expectToSee()
return expectToSee; return expectToSee;
} }
} }

private KEY key( long seed )
{
return layout.key( seed );
}

private VALUE value( long seed )
{
return layout.value( seed );
}

private long keySeed( KEY key )
{
return layout.keySeed( key );
}

private long valueSeed( VALUE value )
{
return layout.valueSeed( value );
}
} }

0 comments on commit f0a1406

Please sign in to comment.