Skip to content

Commit

Permalink
Removes old remnants of subTypes in InputCache
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 10, 2018
1 parent ecbf1e5 commit 07fde65
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 119 deletions.
Expand Up @@ -25,8 +25,6 @@
import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator;


import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;

class CachingInputIterable implements InputIterable class CachingInputIterable implements InputIterable
{ {
private final InputIterable actual; private final InputIterable actual;
Expand Down Expand Up @@ -60,11 +58,11 @@ public InputIterator iterator()
{ {
// wrap in an iterator which caches the data as it goes over it // wrap in an iterator which caches the data as it goes over it
firstTime = false; firstTime = false;
InputCacher cacher = cache.cacheNodes( MAIN ); InputCacher cacher = cache.cacheNodes();
return new CachingInputIterator( actual.iterator(), cacher ); return new CachingInputIterator( actual.iterator(), cacher );
} }
// for consecutive iterations just returned the cached data // for consecutive iterations just returned the cached data
return cache.nodes( MAIN, false ).iterator(); return cache.nodes().iterator();
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down
Expand Up @@ -23,10 +23,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.concurrent.Runnables;
import org.neo4j.function.ThrowingSupplier; import org.neo4j.function.ThrowingSupplier;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode; import org.neo4j.io.fs.OpenMode;
Expand Down Expand Up @@ -97,8 +93,6 @@
*/ */
public class InputCache implements Closeable public class InputCache implements Closeable
{ {
public static final String MAIN = "main";

private static final String HEADER = "-header"; private static final String HEADER = "-header";
private static final String NODES = "nodes"; private static final String NODES = "nodes";
private static final String RELATIONSHIPS = "relationships"; private static final String RELATIONSHIPS = "relationships";
Expand Down Expand Up @@ -126,7 +120,6 @@ public class InputCache implements Closeable
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
private final File cacheDirectory; private final File cacheDirectory;
private final RecordFormats recordFormats; private final RecordFormats recordFormats;
private final Set<String> subTypes = new HashSet<>();
private final int chunkSize; private final int chunkSize;


/** /**
Expand All @@ -143,56 +136,36 @@ public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
} }


public InputCacher cacheNodes( String subType ) throws IOException public InputCacher cacheNodes() throws IOException
{ {
return new InputNodeCacheWriter( channel( NODES, subType, READ_WRITE ), channel( NODES_HEADER, subType, READ_WRITE ), return new InputNodeCacheWriter( channel( NODES, READ_WRITE ), channel( NODES_HEADER, READ_WRITE ),
recordFormats, chunkSize ); recordFormats, chunkSize );
} }


public InputCacher cacheRelationships( String subType ) throws IOException public InputCacher cacheRelationships() throws IOException
{
return new InputRelationshipCacheWriter( channel( RELATIONSHIPS, subType, READ_WRITE ),
channel( RELATIONSHIPS_HEADER, subType, READ_WRITE ), recordFormats, chunkSize );
}

private StoreChannel channel( String type, String subType, OpenMode openMode ) throws IOException
{ {
return fs.open( file( type, subType ), openMode ); return new InputRelationshipCacheWriter( channel( RELATIONSHIPS, READ_WRITE ),
channel( RELATIONSHIPS_HEADER, READ_WRITE ), recordFormats, chunkSize );
} }


private File file( String type, String subType ) private StoreChannel channel( String type, OpenMode openMode ) throws IOException
{ {
subTypes.add( subType ); return fs.open( file( type ), openMode );
return new File( cacheDirectory, "input-" + type + "-" + subType );
} }


public InputIterable nodes( String subType, boolean deleteAfterUse ) private File file( String type )
{ {
return entities( () -> new InputNodeCacheReader( channel( NODES, subType, READ ), return new File( cacheDirectory, "input-" + type );
channel( NODES_HEADER, subType, READ ),
deleteAction( deleteAfterUse, NODES, NODES_HEADER, subType ) ) );
} }


public InputIterable relationships( String subType, boolean deleteAfterUse ) public InputIterable nodes()
{ {
return entities( () -> new InputRelationshipCacheReader( channel( RELATIONSHIPS, subType, READ ), return entities( () -> new InputNodeCacheReader( channel( NODES, READ ), channel( NODES_HEADER, READ ) ) );
channel( RELATIONSHIPS_HEADER, subType, READ ),
deleteAction( deleteAfterUse, RELATIONSHIPS, RELATIONSHIPS_HEADER, subType ) ) );
} }


protected Runnable deleteAction( boolean deleteAfterUse, String type, String header, String subType ) public InputIterable relationships()
{ {
if ( !deleteAfterUse ) return entities( () -> new InputRelationshipCacheReader( channel( RELATIONSHIPS, READ ), channel( RELATIONSHIPS_HEADER, READ ) ) );
{
return Runnables.EMPTY_RUNNABLE;
}

return () ->
{
fs.deleteFile( file( type, subType ) );
fs.deleteFile( file( header, subType ) );
subTypes.remove( subType );
};
} }


private InputIterable entities( final ThrowingSupplier<InputIterator, IOException> factory ) private InputIterable entities( final ThrowingSupplier<InputIterator, IOException> factory )
Expand Down Expand Up @@ -223,13 +196,10 @@ public boolean supportsMultiplePasses()
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
for ( String subType : subTypes ) fs.deleteFile( file( NODES ) );
{ fs.deleteFile( file( RELATIONSHIPS ) );
fs.deleteFile( file( NODES, subType ) ); fs.deleteFile( file( NODES_HEADER ) );
fs.deleteFile( file( RELATIONSHIPS, subType ) ); fs.deleteFile( file( RELATIONSHIPS_HEADER ) );
fs.deleteFile( file( NODES_HEADER, subType ) );
fs.deleteFile( file( RELATIONSHIPS_HEADER, subType ) );
}
} }


static ByteBuffer newChunkHeaderBuffer() static ByteBuffer newChunkHeaderBuffer()
Expand Down
Expand Up @@ -54,13 +54,12 @@ abstract class InputEntityCacheReader implements InputIterator
private final PrimitiveIntObjectMap<String>[] tokens; private final PrimitiveIntObjectMap<String>[] tokens;


// Not used by workers // Not used by workers
private final Runnable closeAction;
private final StoreChannel channel; private final StoreChannel channel;
private final ByteBuffer chunkHeaderBuffer = newChunkHeaderBuffer(); private final ByteBuffer chunkHeaderBuffer = newChunkHeaderBuffer();
private boolean end; private boolean end;


@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
InputEntityCacheReader( StoreChannel channel, StoreChannel header, Runnable closeAction ) InputEntityCacheReader( StoreChannel channel, StoreChannel header )
throws IOException throws IOException
{ {
tokens = new PrimitiveIntObjectMap[HIGH_TOKEN_TYPE]; tokens = new PrimitiveIntObjectMap[HIGH_TOKEN_TYPE];
Expand All @@ -69,7 +68,6 @@ abstract class InputEntityCacheReader implements InputIterator
tokens[RELATIONSHIP_TYPE_TOKEN] = Primitive.intObjectMap(); tokens[RELATIONSHIP_TYPE_TOKEN] = Primitive.intObjectMap();
tokens[GROUP_TOKEN] = Primitive.intObjectMap(); tokens[GROUP_TOKEN] = Primitive.intObjectMap();
this.channel = channel; this.channel = channel;
this.closeAction = closeAction;
readHeader( header ); readHeader( header );
} }


Expand Down Expand Up @@ -130,7 +128,6 @@ public void close()
try try
{ {
channel.close(); channel.close();
closeAction.run();
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down
Expand Up @@ -35,9 +35,9 @@
*/ */
public class InputNodeCacheReader extends InputEntityCacheReader public class InputNodeCacheReader extends InputEntityCacheReader
{ {
public InputNodeCacheReader( StoreChannel channel, StoreChannel header, Runnable closeAction ) throws IOException public InputNodeCacheReader( StoreChannel channel, StoreChannel header ) throws IOException
{ {
super( channel, header, closeAction ); super( channel, header );
} }


@Override @Override
Expand Down
Expand Up @@ -33,9 +33,9 @@
*/ */
public class InputRelationshipCacheReader extends InputEntityCacheReader public class InputRelationshipCacheReader extends InputEntityCacheReader
{ {
public InputRelationshipCacheReader( StoreChannel channel, StoreChannel header, Runnable closeAction ) throws IOException public InputRelationshipCacheReader( StoreChannel channel, StoreChannel header ) throws IOException
{ {
super( channel, header, closeAction ); super( channel, header );
} }


@Override @Override
Expand Down
Expand Up @@ -42,14 +42,11 @@
import org.neo4j.test.rule.fs.DefaultFileSystemRule; import org.neo4j.test.rule.fs.DefaultFileSystemRule;
import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator;


import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;


import static java.lang.Math.abs; import static java.lang.Math.abs;


import static org.neo4j.helpers.collection.Iterators.asSet;
import static org.neo4j.io.ByteUnit.kibiBytes; import static org.neo4j.io.ByteUnit.kibiBytes;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;


public class InputCacheTest public class InputCacheTest
{ {
Expand All @@ -60,7 +57,7 @@ public class InputCacheTest
private final RandomRule randomRule = new RandomRule(); private final RandomRule randomRule = new RandomRule();
private final int threads = Runtime.getRuntime().availableProcessors(); private final int threads = Runtime.getRuntime().availableProcessors();
private final ExecutorService executor = Executors.newFixedThreadPool( threads ); private final ExecutorService executor = Executors.newFixedThreadPool( threads );
private final List<Future> futures = new ArrayList<>(); private final List<Future<?>> futures = new ArrayList<>();
private final int totalCount = threads * countPerThread; private final int totalCount = threads * countPerThread;
@Rule @Rule
public RuleChain ruleChain = RuleChain.outerRule( dir ).around( randomRule ).around( fileSystemRule ); public RuleChain ruleChain = RuleChain.outerRule( dir ).around( randomRule ).around( fileSystemRule );
Expand All @@ -72,13 +69,13 @@ public void shouldCacheAndRetrieveNodes() throws Exception
try ( InputCache cache = try ( InputCache cache =
new InputCache( fileSystemRule.get(), dir.directory(), Standard.LATEST_RECORD_FORMATS, (int) kibiBytes( 8 ) ) ) new InputCache( fileSystemRule.get(), dir.directory(), Standard.LATEST_RECORD_FORMATS, (int) kibiBytes( 8 ) ) )
{ {
try ( InputCacher cacher = cache.cacheNodes( MAIN ) ) try ( InputCacher cacher = cache.cacheNodes() )
{ {
writeEntities( cacher, this::randomNode ); writeEntities( cacher, this::randomNode );
} }


// WHEN/THEN // WHEN/THEN
try ( InputIterator reader = cache.nodes( MAIN, true ).iterator() ) try ( InputIterator reader = cache.nodes().iterator() )
{ {
List<InputEntity> allReadEntities = readEntities( reader ); List<InputEntity> allReadEntities = readEntities( reader );
assertEquals( totalCount, allReadEntities.size() ); assertEquals( totalCount, allReadEntities.size() );
Expand All @@ -95,13 +92,13 @@ public void shouldCacheAndRetrieveRelationships() throws Exception
try ( InputCache cache = try ( InputCache cache =
new InputCache( fileSystemRule.get(), dir.directory(), Standard.LATEST_RECORD_FORMATS, 200 ) ) new InputCache( fileSystemRule.get(), dir.directory(), Standard.LATEST_RECORD_FORMATS, 200 ) )
{ {
try ( InputCacher cacher = cache.cacheRelationships( MAIN ) ) try ( InputCacher cacher = cache.cacheRelationships() )
{ {
writeEntities( cacher, this::randomRelationship ); writeEntities( cacher, this::randomRelationship );
} }


// WHEN/THEN // WHEN/THEN
try ( InputIterator reader = cache.relationships( MAIN, true ).iterator() ) try ( InputIterator reader = cache.relationships().iterator() )
{ {
List<InputEntity> allReadEntities = readEntities( reader ); List<InputEntity> allReadEntities = readEntities( reader );
assertEquals( totalCount, allReadEntities.size() ); assertEquals( totalCount, allReadEntities.size() );
Expand Down Expand Up @@ -167,35 +164,6 @@ private void assertNoFilesLeftBehind()
assertEquals( 0, fileSystemRule.get().listFiles( dir.directory() ).length ); assertEquals( 0, fileSystemRule.get().listFiles( dir.directory() ).length );
} }


private void assertRelationshipsEquals( InputEntity expectedRelationship, InputEntity relationship )
{
assertProperties( expectedRelationship, relationship );
assertEquals( expectedRelationship.startId(), relationship.startId() );
assertEquals( expectedRelationship.startIdGroup, relationship.startIdGroup );
assertEquals( expectedRelationship.endId(), relationship.endId() );
assertEquals( expectedRelationship.endIdGroup, relationship.endIdGroup );
if ( expectedRelationship.hasIntType )
{
assertEquals( expectedRelationship.intType, relationship.intType );
}
else
{
assertEquals( expectedRelationship.stringType, relationship.stringType );
}
}

private void assertProperties( InputEntity expected, InputEntity entity )
{
if ( expected.hasPropertyId )
{
assertEquals( expected.propertyId, entity.propertyId );
}
else
{
assertArrayEquals( expected.properties(), entity.properties() );
}
}

private void randomRelationship( Randoms random, InputEntityVisitor relationship ) private void randomRelationship( Randoms random, InputEntityVisitor relationship )
{ {
if ( random.random().nextFloat() < 0.1f ) if ( random.random().nextFloat() < 0.1f )
Expand Down Expand Up @@ -266,28 +234,6 @@ private String randomType( Randoms random )
return random.among( TOKENS ); return random.among( TOKENS );
} }


private void assertNodesEquals( InputEntity expectedNode, InputEntity node )
{
assertEquals( expectedNode.idGroup, node.idGroup );
assertEquals( expectedNode.id(), node.id() );
if ( expectedNode.hasPropertyId )
{
assertEquals( expectedNode.propertyId, node.propertyId );
}
else
{
assertArrayEquals( expectedNode.properties(), node.properties() );
}
if ( expectedNode.hasLabelField )
{
assertEquals( expectedNode.labelField, node.labelField );
}
else
{
assertEquals( asSet( expectedNode.labels() ), asSet( node.labels() ) );
}
}

private Group randomGroup( Randoms random ) private Group randomGroup( Randoms random )
{ {
return new Group.Adapter( random.nextInt( 100 ), random.string() ); return new Group.Adapter( random.nextInt( 100 ), random.string() );
Expand All @@ -308,9 +254,10 @@ private void submit( Callable<?> toRun )
futures.add( executor.submit( toRun ) ); futures.add( executor.submit( toRun ) );
} }


@SuppressWarnings( "unchecked" )
private <T> void results( Consumer<T> consumer ) throws Exception private <T> void results( Consumer<T> consumer ) throws Exception
{ {
for ( Future future : futures ) for ( Future<?> future : futures )
{ {
consumer.accept( (T) future.get() ); consumer.accept( (T) future.get() );
} }
Expand Down

0 comments on commit 07fde65

Please sign in to comment.