Skip to content

Commit

Permalink
introduces InputCache for caching Input data streams to disk
Browse files Browse the repository at this point in the history
for Input implementations that cannot provide the data streams multiple
times, something that the parallel batch importer requires. The cache u
es a binary format stored on disk. InputIterable can tell the importer
whether or not it supports multiple passes, if not then the importer will
cache the data streams the first run and use for consecutive runs.
  • Loading branch information
tinwelint committed Mar 9, 2015
1 parent b83db67 commit fa5d4a3
Show file tree
Hide file tree
Showing 30 changed files with 1,966 additions and 72 deletions.
Expand Up @@ -61,7 +61,6 @@
import org.neo4j.test.TargetDirectory; import org.neo4j.test.TargetDirectory;
import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.tooling.GlobalGraphOperations; import org.neo4j.tooling.GlobalGraphOperations;
import org.neo4j.unsafe.impl.batchimport.cache.AvailableMemoryCalculator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputNode;
Expand Down Expand Up @@ -112,24 +111,28 @@ public int denseNodeThreshold()
private final InputIdGenerator inputIdGenerator; private final InputIdGenerator inputIdGenerator;
private final IdMapper idMapper; private final IdMapper idMapper;
private final IdGenerator idGenerator; private final IdGenerator idGenerator;
private final boolean multiPassIterators;


@Parameterized.Parameters(name = "{0},{1},{2},{4}") @Parameterized.Parameters(name = "{0},{1},{2},{4}")
public static Collection<Object[]> data() public static Collection<Object[]> data()
{ {
return Arrays.<Object[]>asList( return Arrays.<Object[]>asList(
// synchronous I/O, actual node id input // synchronous I/O, actual node id input
new Object[]{SYNCHRONOUS, new LongInputIdGenerator(), actual(), fromInput()}, new Object[]{SYNCHRONOUS, new LongInputIdGenerator(), actual(), fromInput(), true},
// synchronous I/O, string id input // synchronous I/O, string id input
new Object[]{SYNCHRONOUS, new StringInputIdGenerator(), strings( AUTO ), startingFromTheBeginning()}, new Object[]{SYNCHRONOUS, new StringInputIdGenerator(), strings( AUTO ), startingFromTheBeginning(), true},
// synchronous I/O, string id input
new Object[]{SYNCHRONOUS, new StringInputIdGenerator(), strings( AUTO ), startingFromTheBeginning(), false},
// extra slow parallel I/O, actual node id input // extra slow parallel I/O, actual node id input
new Object[]{new IoQueue( 4, 4, 30, synchronousSlowWriterFactory ), new Object[]{new IoQueue( 4, 4, 30, synchronousSlowWriterFactory ),
new LongInputIdGenerator(), actual(), fromInput()} new LongInputIdGenerator(), actual(), fromInput(), false}
); );
} }


public ParallelBatchImporterTest( WriterFactory writerFactory, InputIdGenerator inputIdGenerator, public ParallelBatchImporterTest( WriterFactory writerFactory, InputIdGenerator inputIdGenerator,
IdMapper idMapper, IdGenerator idGenerator ) IdMapper idMapper, IdGenerator idGenerator, boolean multiPassIterators )
{ {
this.multiPassIterators = multiPassIterators;
this.writerFactory = constant( writerFactory ); this.writerFactory = constant( writerFactory );
this.inputIdGenerator = inputIdGenerator; this.inputIdGenerator = inputIdGenerator;
this.idMapper = idMapper; this.idMapper = idMapper;
Expand All @@ -149,8 +152,10 @@ public void shouldImportCsvData() throws Exception
try try
{ {
// WHEN // WHEN
inserter.doImport( Inputs.input( nodes( NODE_COUNT, inputIdGenerator ), inserter.doImport( Inputs.input(
relationships( relationshipCount, inputIdGenerator ), idMapper, idGenerator, false ) ); nodes( NODE_COUNT, inputIdGenerator ),
relationships( relationshipCount, inputIdGenerator ),
idMapper, idGenerator, false ) );
// THEN // THEN
GraphDatabaseService db = new TestGraphDatabaseFactory().newEmbeddedDatabase( directory.absolutePath() ); GraphDatabaseService db = new TestGraphDatabaseFactory().newEmbeddedDatabase( directory.absolutePath() );
try ( Transaction tx = db.beginTx() ) try ( Transaction tx = db.beginTx() )
Expand Down Expand Up @@ -329,9 +334,17 @@ private InputIterable<InputRelationship> relationships( final long count, final
{ {
return new InputIterable<InputRelationship>() return new InputIterable<InputRelationship>()
{ {
private int calls;

@Override @Override
public InputIterator<InputRelationship> iterator() public InputIterator<InputRelationship> iterator()
{ {
calls++;
assertTrue( "Unexpected use of input iterator " + multiPassIterators + ", " + calls,
multiPassIterators || (!multiPassIterators && calls == 1) );

// we still do the reset, even if tell the batch importer to not use use this iterable multiple times,
// since we use it to compare the imported data against after the import has been completed.
random.reset(); random.reset();
return new SimpleInputIterator<InputRelationship>( "test relationships" ) return new SimpleInputIterator<InputRelationship>( "test relationships" )
{ {
Expand Down Expand Up @@ -369,16 +382,28 @@ protected InputRelationship fetchNextOrNull()
} }
}; };
} }

@Override
public boolean supportsMultiplePasses()
{
return multiPassIterators;
}
}; };
} }


private static InputIterable<InputNode> nodes( final long count, final InputIdGenerator inputIdGenerator ) private InputIterable<InputNode> nodes( final long count, final InputIdGenerator inputIdGenerator )
{ {
return new InputIterable<InputNode>() return new InputIterable<InputNode>()
{ {
private int calls;

@Override @Override
public InputIterator<InputNode> iterator() public InputIterator<InputNode> iterator()
{ {
calls++;
assertTrue( "Unexpected use of input iterator " + multiPassIterators + ", " + calls,
multiPassIterators || (!multiPassIterators && calls == 1) );

return new SimpleInputIterator<InputNode>( "test nodes" ) return new SimpleInputIterator<InputNode>( "test nodes" )
{ {
private int cursor; private int cursor;
Expand Down Expand Up @@ -410,29 +435,14 @@ protected InputNode fetchNextOrNull()
} }
}; };
} }

@Override
public boolean supportsMultiplePasses()
{
return multiPassIterators;
}
}; };
} }


private static final AvailableMemoryCalculator LOW_MEMORY = new AvailableMemoryCalculator()
{
@Override
public long availableOffHeapMemory()
{
return 0;
}

@Override
public long availableHeapMemory()
{
return 0;
}

@Override
public String toString()
{
return "low memory";
}
};

public static final @ClassRule RandomRule random = new RandomRule(); public static final @ClassRule RandomRule random = new RandomRule();
} }
Expand Up @@ -83,6 +83,12 @@ public InputIterator<InputNode> iterator()
{ {
return nodeData(); return nodeData();
} }

@Override
public boolean supportsMultiplePasses()
{
return true;
}
}; };
} }


Expand All @@ -96,6 +102,12 @@ public InputIterator<InputRelationship> iterator()
{ {
return relationshipData(); return relationshipData();
} }

@Override
public boolean supportsMultiplePasses()
{
return true;
}
}; };
} }


Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.csv.reader.SourceTraceability; import org.neo4j.csv.reader.SourceTraceability;
import org.neo4j.function.Function; import org.neo4j.function.Function;
import org.neo4j.helpers.collection.PrefetchingIterator; import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.test.Randoms;
import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.csv.Deserialization; import org.neo4j.unsafe.impl.batchimport.input.csv.Deserialization;
Expand All @@ -40,6 +41,7 @@ public class RandomDataIterator<T> extends PrefetchingIterator<T> implements Inp
private final Header header; private final Header header;
private final long limit; private final long limit;
private final Random random; private final Random random;
private final Randoms randoms;
private final Deserialization<T> deserialization; private final Deserialization<T> deserialization;
private final long nodeCount; private final long nodeCount;
private final Distribution<String> labels; private final Distribution<String> labels;
Expand All @@ -56,6 +58,7 @@ public RandomDataIterator( Header header, long limit, Random random,
this.header = header; this.header = header;
this.limit = limit; this.limit = limit;
this.random = random; this.random = random;
this.randoms = new Randoms( random, Randoms.DEFAULT );
this.deserialization = deserialization.apply( this ); this.deserialization = deserialization.apply( this );
this.nodeCount = nodeCount; this.nodeCount = nodeCount;
this.labels = new Distribution<>( tokens( "Label", labelCount ) ); this.labels = new Distribution<>( tokens( "Label", labelCount ) );
Expand Down Expand Up @@ -149,7 +152,7 @@ private Object randomProperty( Entry entry, Random random )
String type = entry.extractor().toString(); String type = entry.extractor().toString();
if ( type.equals( "String" ) ) if ( type.equals( "String" ) )
{ {
return randomString( random ); return randoms.string( 5, 20, Randoms.CSA_LETTERS_AND_DIGITS );
} }
else if ( type.equals( "long" ) ) else if ( type.equals( "long" ) )
{ {
Expand Down Expand Up @@ -184,17 +187,6 @@ private String[] randomLabels( Random random )
return result; return result;
} }


private String randomString( Random random )
{
char[] chars = new char[random.nextInt( 10 )+5];
for ( int i = 0; i < chars.length; i++ )
{
chars[i] = (char) ('a' + random.nextInt( 20 ));
}
position += chars.length;
return String.valueOf( chars );
}

@Override @Override
public void close() public void close()
{ // Nothing to close { // Nothing to close
Expand Down
11 changes: 10 additions & 1 deletion community/kernel/src/main/java/org/neo4j/helpers/ArrayUtil.java
Expand Up @@ -219,8 +219,17 @@ public static <T> boolean containsAll( T[] array, T[] contains )
*/ */
public static <T> boolean contains( T[] array, T contains ) public static <T> boolean contains( T[] array, T contains )
{ {
for ( T item : array ) return contains( array, array.length, contains );
}

/**
* @return {@code true} if {@code contains} exists in {@code array}, otherwise {@code false}.
*/
public static <T> boolean contains( T[] array, int arrayLength, T contains )
{
for ( int i = 0; i < arrayLength; i++ )
{ {
T item = array[i];
if ( nullSafeEquals( item, contains ) ) if ( nullSafeEquals( item, contains ) )
{ {
return true; return true;
Expand Down
Expand Up @@ -603,6 +603,12 @@ public void close()
} }
}; };
} }

@Override
public boolean supportsMultiplePasses()
{
return true;
}
}; };
} }


Expand Down Expand Up @@ -650,6 +656,12 @@ public void close()
} }
}; };
} }

@Override
public boolean supportsMultiplePasses()
{
return true;
}
}; };
} }


Expand Down
Expand Up @@ -44,4 +44,14 @@ public LogPosition newPosition()
{ {
return specified ? new LogPosition( logVersion, byteOffset ) : LogPosition.UNSPECIFIED; return specified ? new LogPosition( logVersion, byteOffset ) : LogPosition.UNSPECIFIED;
} }

public long getLogVersion()
{
return logVersion;
}

public long getByteOffset()
{
return byteOffset;
}
} }
Expand Up @@ -29,11 +29,17 @@
public class PhysicalWritableLogChannel implements WritableLogChannel public class PhysicalWritableLogChannel implements WritableLogChannel
{ {
private LogVersionedStoreChannel channel; private LogVersionedStoreChannel channel;
private final ByteBuffer buffer = ByteBuffer.allocate( 512*KB ); private final ByteBuffer buffer;


public PhysicalWritableLogChannel( LogVersionedStoreChannel channel ) public PhysicalWritableLogChannel( LogVersionedStoreChannel channel )
{
this( channel, 512*KB );
}

public PhysicalWritableLogChannel( LogVersionedStoreChannel channel, int bufferSize )
{ {
this.channel = channel; this.channel = channel;
this.buffer = ByteBuffer.allocate( bufferSize );
} }


@Override @Override
Expand Down
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import java.io.IOException;

import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.Receiver;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Caches the incoming {@link InputEntity} to disk, for later use.
*/
public class InputEntityCacherStep<INPUT extends InputEntity>
extends ExecutorServiceStep<Batch<INPUT,? extends PrimitiveRecord>>
{
private final Receiver<INPUT[],IOException> cacher;

public InputEntityCacherStep( StageControl control, int workAheadSize, int movingAverageSize,
Receiver<INPUT[],IOException> cacher )
{
super( control, "CACHE", workAheadSize, movingAverageSize, 1 );
this.cacher = cacher;
}

@Override
protected Object process( long ticket, Batch<INPUT,? extends PrimitiveRecord> batch ) throws IOException
{
cacher.receive( batch.input );
return batch;
}

@Override
protected void done()
{
try
{
cacher.close();
}
catch ( IOException e )
{
throw new RuntimeException( "Couldn't close input cacher", e );
}
}
}
Expand Up @@ -27,5 +27,11 @@
public interface InputIterable<T> extends ResourceIterable<T> public interface InputIterable<T> extends ResourceIterable<T>
{ {
@Override @Override
public InputIterator<T> iterator(); InputIterator<T> iterator();

/**
* @return whether or not multiple calls to {@link #iterator()} and therefore multiple passes
* over its data is supported.
*/
boolean supportsMultiplePasses();
} }

0 comments on commit fa5d4a3

Please sign in to comment.