From ae8a6f57c15f7b8f220715ee1f0d97a44637693b Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Mon, 13 Jun 2016 12:47:06 +0200 Subject: [PATCH] QuickImport generates input data in multiple threads also simplified the way data can be generated this way be a great deal so that the only thing required is to implement: - one Function - one Function where Range is a start id and number of entities, and the rest will be taken care of. Introducing TicketedProcessing which simplifies writing of parallelization like this. --- .../org/neo4j/tooling/CsvDataGenerator.java | 207 ------------------ .../neo4j/tooling/CsvDataGeneratorInput.java | 113 ---------- .../org/neo4j/tooling/DataGeneratorInput.java | 169 ++++++++++++++ .../neo4j/tooling/EntityDataGenerator.java | 108 +++++++++ .../java/org/neo4j/tooling/QuickImport.java | 29 +-- .../neo4j/tooling/SimpleDataGenerator.java | 87 ++++++++ ...tor.java => SimpleDataGeneratorBatch.java} | 93 ++------ .../collection/ContinuableArrayCursor.java | 73 ++++++ .../unsafe/impl/batchimport/IdRangeInput.java | 90 ++++++++ .../executor/DynamicTaskExecutor.java | 33 ++- .../batchimport/executor/ParkStrategy.java | 7 +- .../impl/batchimport/executor/Task.java | 1 + .../batchimport/executor/TaskExecutor.java | 15 +- .../batchimport/staging/AbstractStep.java | 31 +-- .../impl/batchimport/staging/Processing.java | 59 +++++ .../batchimport/staging/ProcessorStep.java | 20 +- .../staging/TicketedProcessing.java | 203 +++++++++++++++++ .../ContinuableArrayCursorTest.java | 134 ++++++++++++ .../executor/DynamicTaskExecutorTest.java | 41 ++-- .../staging/ProcessorStepTest.java | 10 +- .../staging/TicketedProcessingTest.java | 87 ++++++++ 21 files changed, 1123 insertions(+), 487 deletions(-) delete mode 100644 community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGenerator.java delete mode 100644 community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGeneratorInput.java create mode 100644 community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java create mode 100644 community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java create mode 100644 community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java rename community/import-tool/src/test/java/org/neo4j/tooling/{RandomDataIterator.java => SimpleDataGeneratorBatch.java} (68%) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursor.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdRangeInput.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Processing.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java create mode 100644 community/kernel/src/test/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursorTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGenerator.java deleted file mode 100644 index f8bb78aae6bfe..0000000000000 --- a/community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGenerator.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.tooling; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.Writer; -import java.util.Iterator; -import java.util.Random; -import java.util.function.Function; - -import org.neo4j.csv.reader.Extractors; -import org.neo4j.csv.reader.SourceTraceability; -import org.neo4j.helpers.Args; -import org.neo4j.helpers.progress.ProgressListener; -import org.neo4j.unsafe.impl.batchimport.InputIterator; -import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration; -import org.neo4j.unsafe.impl.batchimport.input.csv.Deserialization; -import org.neo4j.unsafe.impl.batchimport.input.csv.Header; -import org.neo4j.unsafe.impl.batchimport.input.csv.Header.Entry; -import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; -import org.neo4j.unsafe.impl.batchimport.input.csv.Type; - -import static java.lang.System.currentTimeMillis; -import static org.neo4j.helpers.progress.ProgressMonitorFactory.textual; - -/** - * Utility for generating a nodes.csv and relationships.csv, with random data structured according - * to supplied headers. Mostly for testing and trying out the batch importer tool. - */ -public class CsvDataGenerator -{ - private final long nodesSeed, relationshipsSeed; - private final Header nodeHeader; - private final Header relationshipHeader; - private final Configuration config; - private final long nodes; - private final long relationships; - private final Function> nodeDeserialization; - private final Function> relDeserialization; - private final int numberOfLabels; - private final int numberOfRelationshipTypes; - - public CsvDataGenerator( Header nodeHeader, Header relationshipHeader, Configuration config, - long nodes, long relationships, - Function> nodeDeserialization, - Function> relDeserialization, - int numberOfLabels, int numberOfRelationshipTypes ) - { - this.nodeHeader = nodeHeader; - this.relationshipHeader = relationshipHeader; - this.config = config; - this.nodes = nodes; - this.relationships = relationships; - this.nodeDeserialization = nodeDeserialization; - this.relDeserialization = relDeserialization; - this.numberOfLabels = numberOfLabels; - this.numberOfRelationshipTypes = numberOfRelationshipTypes; - this.nodesSeed = currentTimeMillis(); - this.relationshipsSeed = nodesSeed+1; - } - - public String serializeNodeHeader() - { - return serializeHeader( nodeHeader ); - } - - private String serializeHeader( Header header ) - { - StringBuilder builder = new StringBuilder(); - for ( Entry entry : header.entries() ) - { - if ( builder.length() > 0 ) - { - builder.append( config.delimiter() ); - } - serializeHeaderEntry( builder, entry ); - } - return builder.toString(); - } - - private void serializeHeaderEntry( StringBuilder builder, Entry entry ) - { - String value; - switch ( entry.type() ) - { - case PROPERTY: - value = entry.name() + ":" + entry.extractor().toString(); - break; - case IGNORE: - return; - default: - value = (entry.name() != null ? entry.name() : "") + ":" + entry.type().name(); - break; - } - builder.append( value ); - } - - public String serializeRelationshipHeader() - { - return serializeHeader( relationshipHeader ); - } - - public InputIterator nodeData() - { - return new RandomDataIterator<>( nodeHeader, nodes, new Random( nodesSeed ), nodeDeserialization, nodes, - numberOfLabels, numberOfRelationshipTypes ); - } - - public InputIterator relationshipData() - { - return new RandomDataIterator<>( relationshipHeader, relationships, new Random( relationshipsSeed ), - relDeserialization, nodes, numberOfLabels, numberOfRelationshipTypes ); - } - - public static void main( String[] arguments ) throws IOException - { - Args args = Args.parse( arguments ); - int nodeCount = args.getNumber( "nodes", null ).intValue(); - int relationshipCount = args.getNumber( "relationships", null ).intValue(); - int labelCount = args.getNumber( "labels", 4 ).intValue(); - int relationshipTypeCount = args.getNumber( "relationship-types", 4 ).intValue(); - - Configuration config = Configuration.COMMAS; - Extractors extractors = new Extractors( config.arrayDelimiter() ); - IdType idType = IdType.ACTUAL; - Header nodeHeader = sillyNodeHeader( idType, extractors ); - Header relationshipHeader = bareboneRelationshipHeader( idType, extractors ); - - ProgressListener progress = textual( System.out ).singlePart( "Generating", nodeCount + relationshipCount ); - Function> deserialization = StringDeserialization.factory( config ); - CsvDataGenerator generator = new CsvDataGenerator<>( - nodeHeader, relationshipHeader, - config, nodeCount, relationshipCount, - deserialization, deserialization, - labelCount, relationshipTypeCount ); - writeData( generator.serializeNodeHeader(), generator.nodeData(), - new File( "target", "nodes.csv" ), progress ); - writeData( generator.serializeRelationshipHeader(), generator.relationshipData(), - new File( "target", "relationships.csv" ), progress ); - progress.done(); - } - - public static Header sillyNodeHeader( IdType idType, Extractors extractors ) - { - return new Header( new Entry[] { - new Entry( null, Type.ID, null, idType.extractor( extractors ) ), - new Entry( "name", Type.PROPERTY, null, extractors.string() ), - new Entry( "age", Type.PROPERTY, null, extractors.int_() ), - new Entry( "something", Type.PROPERTY, null, extractors.string() ), - new Entry( null, Type.LABEL, null, extractors.stringArray() ), - } ); - } - - public static Header bareboneNodeHeader( IdType idType, Extractors extractors ) - { - return new Header( new Entry[] { - new Entry( null, Type.ID, null, idType.extractor( extractors ) ), - new Entry( null, Type.LABEL, null, extractors.stringArray() ), - } ); - } - - public static Header bareboneRelationshipHeader( IdType idType, Extractors extractors ) - { - return new Header( new Entry[] { - new Entry( null, Type.START_ID, null, idType.extractor( extractors ) ), - new Entry( null, Type.END_ID, null, idType.extractor( extractors ) ), - new Entry( null, Type.TYPE, null, extractors.string() ) - } ); - } - - private static void writeData( String header, Iterator iterator, File file, - ProgressListener progress ) throws IOException - { - System.out.println( "Writing " + file.getAbsolutePath() ); - try ( Writer out = new BufferedWriter( new FileWriter( file ), 102*1024*10 ) ) - { - out.write( header ); - out.append( '\n' ); - while ( iterator.hasNext() ) - { - out.write( iterator.next() ); - out.append( '\n' ); - progress.add( 1 ); - } - } - } -} diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGeneratorInput.java b/community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGeneratorInput.java deleted file mode 100644 index a6424f8929581..0000000000000 --- a/community/import-tool/src/test/java/org/neo4j/tooling/CsvDataGeneratorInput.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.tooling; - -import org.neo4j.unsafe.impl.batchimport.BatchImporter; -import org.neo4j.unsafe.impl.batchimport.InputIterable; -import org.neo4j.unsafe.impl.batchimport.InputIterator; -import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; -import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; -import org.neo4j.unsafe.impl.batchimport.input.Collector; -import org.neo4j.unsafe.impl.batchimport.input.Groups; -import org.neo4j.unsafe.impl.batchimport.input.Input; -import org.neo4j.unsafe.impl.batchimport.input.InputNode; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration; -import org.neo4j.unsafe.impl.batchimport.input.csv.Header; -import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; -import org.neo4j.unsafe.impl.batchimport.input.csv.InputNodeDeserialization; -import org.neo4j.unsafe.impl.batchimport.input.csv.InputRelationshipDeserialization; - -/** - * Uses {@link CsvDataGenerator} as an {@link Input} directly into a {@link BatchImporter}. - */ -public class CsvDataGeneratorInput extends CsvDataGenerator implements Input -{ - private final IdType idType; - private final Collector badCollector; - - public CsvDataGeneratorInput( final Header nodeHeader, final Header relationshipHeader, - Configuration config, long nodes, long relationships, final Groups groups, final IdType idType, - int numberOfLabels, int numberOfRelationshipTypes, Collector badCollector ) - { - super( nodeHeader, relationshipHeader, config, nodes, relationships, - source -> new InputNodeDeserialization( source, nodeHeader, groups, idType.idsAreExternal() ), - from -> new InputRelationshipDeserialization( from, relationshipHeader, groups ), - numberOfLabels, numberOfRelationshipTypes ); - this.idType = idType; - this.badCollector = badCollector; - } - - @Override - public InputIterable nodes() - { - return new InputIterable() - { - @Override - public InputIterator iterator() - { - return nodeData(); - } - - @Override - public boolean supportsMultiplePasses() - { - return true; - } - }; - } - - @Override - public InputIterable relationships() - { - return new InputIterable() - { - @Override - public InputIterator iterator() - { - return relationshipData(); - } - - @Override - public boolean supportsMultiplePasses() - { - return true; - } - }; - } - - @Override - public IdMapper idMapper() - { - return idType.idMapper(); - } - - @Override - public IdGenerator idGenerator() - { - return idType.idGenerator(); - } - - @Override - public Collector badCollector() - { - return badCollector; - } -} diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java b/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java new file mode 100644 index 0000000000000..50a58282597ab --- /dev/null +++ b/community/import-tool/src/test/java/org/neo4j/tooling/DataGeneratorInput.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.tooling; + +import java.util.function.Function; +import org.neo4j.csv.reader.Extractors; +import org.neo4j.unsafe.impl.batchimport.IdRangeInput.Range; +import org.neo4j.unsafe.impl.batchimport.InputIterable; +import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; +import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; +import org.neo4j.unsafe.impl.batchimport.input.Collector; +import org.neo4j.unsafe.impl.batchimport.input.Input; +import org.neo4j.unsafe.impl.batchimport.input.InputNode; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.input.csv.Header; +import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; +import org.neo4j.unsafe.impl.batchimport.input.csv.Type; +import org.neo4j.unsafe.impl.batchimport.input.csv.Header.Entry; + +/** + * {@link Input} which generates data on the fly. This input wants to know number of nodes and relationships + * and then a function for generating {@link InputNode} and another for generating {@link InputRelationship}. + * Data can be generated in parallel and so those generator functions accepts a {@link Range} for which + * an array of input objects are generated, everything else will be taken care of. So typical usage would be: + * + *
+ * {@code
+ * BatchImporter importer = ...
+ * Input input = new DataGeneratorInput( 10_000_000, 1_000_000_000,
+ *      batch -> {
+ *          InputNode[] nodes = new InputNode[batch.getSize()];
+ *          for ( int i = 0; i < batch.getSize(); i++ ) {
+ *              long id = batch.getStart() + i;
+ *              nodes[i] = new InputNode( .... );
+ *          }
+ *          return nodes;
+ *      },
+ *      batch -> {
+ *          InputRelationship[] relationships = new InputRelationship[batch.getSize()];
+ *          ....
+ *          return relationships;
+ *      } );
+ * }
+ * 
+ */ +public class DataGeneratorInput implements Input +{ + private final long nodes; + private final long relationships; + private final Function nodeGenerator; + private final Function relGenerator; + private final IdType idType; + private final Collector badCollector; + + public DataGeneratorInput( long nodes, long relationships, + Function nodeGenerator, + Function relGenerator, + IdType idType, Collector badCollector ) + { + this.nodes = nodes; + this.relationships = relationships; + this.nodeGenerator = nodeGenerator; + this.relGenerator = relGenerator; + this.idType = idType; + this.badCollector = badCollector; + } + + @Override + public InputIterable nodes() + { + return new InputIterable() + { + @Override + public InputIterator iterator() + { + return new EntityDataGenerator<>( nodeGenerator, nodes ); + } + + @Override + public boolean supportsMultiplePasses() + { + return true; + } + }; + } + + @Override + public InputIterable relationships() + { + return new InputIterable() + { + @Override + public InputIterator iterator() + { + return new EntityDataGenerator<>( relGenerator, relationships ); + } + + @Override + public boolean supportsMultiplePasses() + { + return true; + } + }; + } + + @Override + public IdMapper idMapper() + { + return idType.idMapper(); + } + + @Override + public IdGenerator idGenerator() + { + return idType.idGenerator(); + } + + @Override + public Collector badCollector() + { + return badCollector; + } + + public static Header sillyNodeHeader( IdType idType, Extractors extractors ) + { + return new Header( new Entry[] { + new Entry( null, Type.ID, null, idType.extractor( extractors ) ), + new Entry( "name", Type.PROPERTY, null, extractors.string() ), + new Entry( "age", Type.PROPERTY, null, extractors.int_() ), + new Entry( "something", Type.PROPERTY, null, extractors.string() ), + new Entry( null, Type.LABEL, null, extractors.stringArray() ), + } ); + } + + public static Header bareboneNodeHeader( IdType idType, Extractors extractors ) + { + return new Header( new Entry[] { + new Entry( null, Type.ID, null, idType.extractor( extractors ) ), + new Entry( null, Type.LABEL, null, extractors.stringArray() ), + } ); + } + + public static Header bareboneRelationshipHeader( IdType idType, Extractors extractors ) + { + return new Header( new Entry[] { + new Entry( null, Type.START_ID, null, idType.extractor( extractors ) ), + new Entry( null, Type.END_ID, null, idType.extractor( extractors ) ), + new Entry( null, Type.TYPE, null, extractors.string() ) + } ); + } +} diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java new file mode 100644 index 0000000000000..c5102cb9d7886 --- /dev/null +++ b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.tooling; + +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor; +import org.neo4j.unsafe.impl.batchimport.IdRangeInput.Range; +import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; + +import static org.neo4j.unsafe.impl.batchimport.IdRangeInput.idRangeInput; + +/** + * Data generator as {@link InputIterator}, parallelizable + */ +public class EntityDataGenerator extends InputIterator.Adapter +{ + private final String sourceDescription; + private final TicketedProcessing processing; + + private long cursor; + private final ContinuableArrayCursor itemCursor; + + public EntityDataGenerator( Function generator, long count ) + { + this.sourceDescription = getClass().getSimpleName(); + BiFunction processor = (batch,ignore) -> generator.apply( batch ); + this.processing = new TicketedProcessing<>( getClass().getName(), + Runtime.getRuntime().availableProcessors(), processor, () -> null ); + this.processing.slurp( idRangeInput( count, Configuration.DEFAULT.batchSize() ), true ); + this.itemCursor = new ContinuableArrayCursor<>( () -> processing.next() ); + } + + @Override + protected T fetchNextOrNull() + { + if ( itemCursor.next() ) + { + cursor++; + return itemCursor.get(); + } + return null; + } + + @Override + public String sourceDescription() + { + return sourceDescription; + } + + @Override + public long lineNumber() + { + return cursor; + } + + @Override + public long position() + { + return 0; + } + + @Override + public void close() + { + super.close(); + processing.shutdown( false ); + } + + // We have to let our processing framework know about changes in processor count assigned to us + @Override + public int numberOfProcessors() + { + return processing.numberOfProcessors(); + } + + @Override + public boolean incrementNumberOfProcessors() + { + return processing.incrementNumberOfProcessors(); + } + + @Override + public boolean decrementNumberOfProcessors() + { + return processing.decrementNumberOfProcessors(); + } +} diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java b/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java index 18b6934856cb3..a8dca18d10d73 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java @@ -32,20 +32,16 @@ import org.neo4j.kernel.impl.logging.SimpleLogService; import org.neo4j.logging.FormattedLogProvider; import org.neo4j.unsafe.impl.batchimport.BatchImporter; +import org.neo4j.unsafe.impl.batchimport.Configuration.Default; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; -import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.Input; import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration; import org.neo4j.unsafe.impl.batchimport.input.csv.Header; import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; - import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold; - -import org.neo4j.unsafe.impl.batchimport.Configuration.Default; - import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; -import static org.neo4j.tooling.CsvDataGenerator.bareboneNodeHeader; -import static org.neo4j.tooling.CsvDataGenerator.bareboneRelationshipHeader; +import static org.neo4j.tooling.DataGeneratorInput.bareboneNodeHeader; +import static org.neo4j.tooling.DataGeneratorInput.bareboneRelationshipHeader; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector; import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS; import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader; @@ -60,10 +56,10 @@ * Quick comes from gaming terminology where you sometimes just want to play a quick game, without * any settings or hazzle, just play. * - * Uses {@link CsvDataGeneratorInput} as random data {@link Input}. + * Uses {@link DataGeneratorInput} as random data {@link Input}. * * For the time being the node/relationship data can't be controlled via command-line arguments, - * only through changing the code. The {@link CsvDataGeneratorInput} accepts two {@link Header headers} + * only through changing the code. The {@link DataGeneratorInput} accepts two {@link Header headers} * describing which sort of data it should generate. */ public class QuickImport @@ -98,12 +94,17 @@ public int denseNodeThreshold() return args.getNumber( dense_node_threshold.name(), super.denseNodeThreshold() ).intValue(); } }; - Input input = new CsvDataGeneratorInput( - nodeHeader, relationshipHeader, - COMMAS, nodeCount, relationshipCount, new Groups(), idType, labelCount, relationshipTypeCount, - silentBadCollector( 0 )); + + SimpleDataGenerator generator = new SimpleDataGenerator( nodeHeader, relationshipHeader, relationshipCount, + nodeCount, labelCount, relationshipTypeCount, idType ); + Input input = new DataGeneratorInput( + nodeCount, relationshipCount, + generator.nodes(), generator.relationships(), + idType, silentBadCollector( 0 ) ); BatchImporter importer = new ParallelBatchImporter( dir, importConfig, - new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible(), Config.defaults() ); + new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), + defaultVisible(), + Config.defaults() ); importer.doImport( input ); } diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java new file mode 100644 index 0000000000000..c480b49bb1c70 --- /dev/null +++ b/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGenerator.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.tooling; + +import java.util.function.Function; +import org.neo4j.csv.reader.SourceTraceability; +import org.neo4j.unsafe.impl.batchimport.IdRangeInput.Range; +import org.neo4j.unsafe.impl.batchimport.input.Groups; +import org.neo4j.unsafe.impl.batchimport.input.InputNode; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.input.csv.Header; +import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; +import org.neo4j.unsafe.impl.batchimport.input.csv.InputNodeDeserialization; +import org.neo4j.unsafe.impl.batchimport.input.csv.InputRelationshipDeserialization; + +public class SimpleDataGenerator extends SourceTraceability.Adapter +{ + private final Header nodeHeader; + private final Header relationshipHeader; + private final long randomSeed; + private final long nodeCount; + private final Distribution labels; + private final Distribution relationshipTypes; + private final Groups groups = new Groups(); + private final IdType idType; + + public SimpleDataGenerator( Header nodeHeader, Header relationshipHeader, long randomSeed, + long nodeCount, int labelCount, int relationshipTypeCount, IdType idType ) + { + this.nodeHeader = nodeHeader; + this.relationshipHeader = relationshipHeader; + this.randomSeed = randomSeed; + this.nodeCount = nodeCount; + this.idType = idType; + this.labels = new Distribution<>( tokens( "Label", labelCount ) ); + this.relationshipTypes = new Distribution<>( tokens( "TYPE", relationshipTypeCount ) ); + } + + public Function nodes() + { + return batch -> new SimpleDataGeneratorBatch<>( nodeHeader, batch.getStart(), randomSeed + batch.getStart(), + nodeCount, labels, relationshipTypes, + new InputNodeDeserialization( SimpleDataGenerator.this, nodeHeader, groups, idType.idsAreExternal() ), + new InputNode[batch.getSize()] ).get(); + } + + public Function relationships() + { + return batch -> new SimpleDataGeneratorBatch<>( relationshipHeader, batch.getStart(), + randomSeed + batch.getStart(), nodeCount, labels, relationshipTypes, + new InputRelationshipDeserialization( SimpleDataGenerator.this, relationshipHeader, groups ), + new InputRelationship[batch.getSize()] ).get(); + } + + private static String[] tokens( String prefix, int count ) + { + String[] result = new String[count]; + for ( int i = 0; i < count; i++ ) + { + result[i] = prefix + (i+1); + } + return result; + } + + @Override + public String sourceDescription() + { + return getClass().getSimpleName(); + } +} diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/RandomDataIterator.java b/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGeneratorBatch.java similarity index 68% rename from community/import-tool/src/test/java/org/neo4j/tooling/RandomDataIterator.java rename to community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGeneratorBatch.java index d253d7727e525..69f8be949d005 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/RandomDataIterator.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/SimpleDataGeneratorBatch.java @@ -20,13 +20,8 @@ package org.neo4j.tooling; import java.util.Random; -import java.util.function.Function; - -import org.neo4j.csv.reader.SourceTraceability; import org.neo4j.helpers.ArrayUtil; -import org.neo4j.helpers.collection.PrefetchingIterator; import org.neo4j.test.Randoms; -import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.csv.Deserialization; import org.neo4j.unsafe.impl.batchimport.input.csv.Header; @@ -34,76 +29,56 @@ import static java.lang.Math.abs; -/** - * Generates random data based on a {@link Header} and some statistics, such as label/relationship type counts. - */ -public class RandomDataIterator extends PrefetchingIterator implements InputIterator +class SimpleDataGeneratorBatch { private final Header header; - private final long limit; private final Random random; private final Randoms randoms; - private final Deserialization deserialization; private final long nodeCount; + private final long start; private final Distribution labels; private final Distribution relationshipTypes; - private final String sourceDescription; + private final Deserialization deserialization; + private final T[] target; private long cursor; private long position; - public RandomDataIterator( Header header, long limit, Random random, - Function> deserialization, long nodeCount, - int labelCount, int relationshipTypeCount ) + SimpleDataGeneratorBatch( + Header header, long start, long randomSeed, long nodeCount, + Distribution labels, Distribution relationshipTypes, + Deserialization deserialization, T[] target ) { this.header = header; - this.limit = limit; - this.random = random; - this.randoms = new Randoms( random, Randoms.DEFAULT ); - this.deserialization = deserialization.apply( this ); + this.start = start; this.nodeCount = nodeCount; - this.labels = new Distribution<>( tokens( "Label", labelCount ) ); - this.relationshipTypes = new Distribution<>( tokens( "TYPE", relationshipTypeCount ) ); - this.sourceDescription = getClass().getSimpleName() + ":" + header; + this.labels = labels; + this.relationshipTypes = relationshipTypes; + this.target = target; + this.random = new Random( randomSeed ); + this.randoms = new Randoms( random, Randoms.DEFAULT ); + this.deserialization = deserialization; - this.deserialization.initialize(); + deserialization.initialize(); } - private String[] tokens( String prefix, int count ) + T[] get() { - String[] result = new String[count]; - for ( int i = 0; i < count; i++ ) + for ( int i = 0; i < target.length; i++ ) { - result[i] = prefix + (i+1); + target[i] = next(); } - return result; + return target; } - @Override - protected T fetchNextOrNull() - { - if ( cursor < limit ) - { - try - { - return generateDataLine(); - } - finally - { - cursor++; - } - } - return null; - } - - private T generateDataLine() + private T next() { for ( Entry entry : header.entries() ) { switch ( entry.type() ) { case ID: - deserialization.handle( entry, idValue( entry, cursor ) ); + deserialization.handle( entry, idValue( entry, start + cursor ) ); break; case PROPERTY: deserialization.handle( entry, randomProperty( entry, random ) ); @@ -128,6 +103,7 @@ private T generateDataLine() finally { deserialization.clear(); + cursor++; } } @@ -191,27 +167,4 @@ private String[] randomLabels( Random random ) position += length * 6; return result; } - - @Override - public void close() - { // Nothing to close - } - - @Override - public String sourceDescription() - { - return sourceDescription; - } - - @Override - public long lineNumber() - { - return cursor; - } - - @Override - public long position() - { - return position; - } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursor.java new file mode 100644 index 0000000000000..8c780dc386a0a --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursor.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.util.collection; + +import java.util.function.Supplier; + +import org.neo4j.cursor.Cursor; + +/** + * {@link Cursor} which moves over one or more arrays, automatically transitioning to the next + * array when one runs out of items. + */ +public class ContinuableArrayCursor implements Cursor +{ + private final Supplier supplier; + private T[] current; + private int cursor; + + public ContinuableArrayCursor( Supplier supplier ) + { + this.supplier = supplier; + } + + @Override + public boolean next() + { + while ( current == null || cursor >= current.length ) + { + current = supplier.get(); + if ( current == null ) + { // End reached + return false; + } + + cursor = 0; + } + cursor++; + return true; + } + + @Override + public void close() + { + // Do nothing + } + + @Override + public T get() + { + if ( current == null ) + { + throw new IllegalStateException(); + } + return current[cursor-1]; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdRangeInput.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdRangeInput.java new file mode 100644 index 0000000000000..24448ec7fdfac --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdRangeInput.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.Iterator; + +import org.neo4j.helpers.collection.PrefetchingIterator; +import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; + +import static java.lang.Long.min; + +/** + * Useful utility in conjunction with {@link TicketedProcessing} where an id range is divided up + * in chunks and given as input to be processed. + */ +public class IdRangeInput extends PrefetchingIterator +{ + private final long max; + private final int batchSize; + private long start; + + public IdRangeInput( long max, int batchSize ) + { + this.max = max; + this.batchSize = batchSize; + } + + @Override + protected Range fetchNextOrNull() + { + int count = (int) min( batchSize, (max - start) ); + if ( count == 0 ) + { + return null; + } + + try + { + return new Range( start, count ); + } + finally + { + start += count; + } + } + + public static Iterator idRangeInput( long max, int batchSize ) + { + return new IdRangeInput( max, batchSize ); + } + + public static class Range + { + private final long start; + private final int size; + + Range( long start, int size ) + { + this.start = start; + this.size = size; + } + + public long getStart() + { + return start; + } + + public int getSize() + { + return size; + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 4189c19705b77..2bbb38342c287 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -27,8 +27,7 @@ import java.util.function.Supplier; import org.neo4j.function.Suppliers; - -import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.helpers.Exceptions.launderedException; /** @@ -37,7 +36,7 @@ */ public class DynamicTaskExecutor implements TaskExecutor { - public static final ParkStrategy DEFAULT_PARK_STRATEGY = new ParkStrategy.Park( 10 ); + public static final ParkStrategy DEFAULT_PARK_STRATEGY = new ParkStrategy.Park( 10, MILLISECONDS ); private final BlockingQueue> queue; private final ParkStrategy parkStrategy; @@ -45,6 +44,7 @@ public class DynamicTaskExecutor implements TaskExecutor @SuppressWarnings( "unchecked" ) private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 ); private volatile boolean shutDown; + private volatile boolean abortQueued; private volatile Throwable panic; private final Supplier initialLocalState; private final int maxProcessorCount; @@ -132,10 +132,14 @@ public void assertHealthy() { if ( shutDown ) { - String message = "Executor has been shut down"; - throw panic != null - ? new IllegalStateException( message, panic ) - : new IllegalStateException( message ); + if ( panic != null ) + { + throw new IllegalStateException( "Executor has been shut down in panic", panic ); + } + if ( abortQueued ) + { + throw new IllegalStateException( "Executor has been shut down, aborting queued" ); + } } } @@ -148,7 +152,7 @@ private void notifyProcessors() } @Override - public synchronized void shutdown( boolean awaitAllCompleted ) + public synchronized void shutdown( int flags ) { if ( shutDown ) { @@ -156,10 +160,12 @@ public synchronized void shutdown( boolean awaitAllCompleted ) } this.shutDown = true; + boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0; while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } + this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0; for ( Processor processor : processors ) { processor.shutDown = true; @@ -198,7 +204,6 @@ public void uncaughtException( Thread t, Throwable e ) private class Processor extends Thread { private volatile boolean shutDown; - private final LOCAL threadLocalState = initialLocalState.get(); Processor( String name ) { @@ -210,7 +215,9 @@ private class Processor extends Thread @Override public void run() { - while ( !shutDown ) + // Initialized here since it's the thread itself that needs to call it + final LOCAL threadLocalState = initialLocalState.get(); + while ( !abortQueued && !shutDown ) { Task task = queue.poll(); if ( task != null ) @@ -222,12 +229,16 @@ public void run() catch ( Throwable e ) { panic = e; - shutdown( false ); + shutdown( TaskExecutor.SF_ABORT_QUEUED ); throw launderedException( e ); } } else { + if ( shutDown ) + { + break; + } parkAWhile(); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/ParkStrategy.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/ParkStrategy.java index 0e8a791a669ae..f2b54d798ddc8 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/ParkStrategy.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/ParkStrategy.java @@ -19,9 +19,10 @@ */ package org.neo4j.unsafe.impl.batchimport.executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; /** * Strategy for waiting a while, given a certain {@link Thread}. @@ -36,9 +37,9 @@ class Park implements ParkStrategy { private final long nanos; - public Park( int millis ) + public Park( long time, TimeUnit unit ) { - this.nanos = MILLISECONDS.toNanos( millis ); + this.nanos = NANOSECONDS.convert( time, unit ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/Task.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/Task.java index 05e6b8274c0f1..2b1c1515dab20 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/Task.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/Task.java @@ -28,6 +28,7 @@ * * @param thread-local state provided by the {@link TaskExecutor} executing this task. */ +@FunctionalInterface public interface Task { void run( LOCAL threadLocalState ) throws Exception; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java index 9bad1edaba48a..4e1842c24fe92 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java @@ -32,12 +32,8 @@ */ public interface TaskExecutor extends Parallelizable { - /** - * Sets the processor count for this executor, i.e. number of threads executing tasks in parallel. - * - * @param count number of processors executing tasks. - */ - void setNumberOfProcessors( int count ); + int SF_AWAIT_ALL_COMPLETED = 0x1; + int SF_ABORT_QUEUED = 0x2; /** * Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be @@ -50,10 +46,11 @@ public interface TaskExecutor extends Parallelizable /** * Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}. * - * @param awaitAllCompleted if {@code true} will wait for all queued or already executing tasks to be - * executed and completed, before returning from this method. + * @param flags {@link #SF_AWAIT_ALL_COMPLETED} will wait for all queued or already executing tasks to be + * executed and completed, before returning from this method. {@link #SF_ABORT_QUEUED} will have + * submitted tasks which haven't started executing yet cancelled, never to be executed. */ - void shutdown( boolean awaitAllCompleted ); + void shutdown( int flags ); /** * Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java index 9d1bdbde65008..273e563a107e1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java @@ -34,7 +34,6 @@ import static java.lang.String.format; import static java.lang.System.currentTimeMillis; -import static java.lang.System.nanoTime; import static java.util.Arrays.asList; /** @@ -73,6 +72,7 @@ public boolean test( long ticket ) protected final MovingAverage totalProcessingTime; protected long startTime, endTime; private final List additionalStatsProvider; + protected final Runnable healthChecker = () -> assertHealthy(); public AbstractStep( StageControl control, String name, Configuration config, StatsProvider... additionalStatsProvider ) @@ -148,35 +148,6 @@ protected void issuePanic( Throwable cause, boolean rethrow ) } } - protected long await( LongPredicate predicate, long value ) - { - if ( predicate.test( value ) ) - { - return 0; - } - - long startTime = nanoTime(); - for ( int i = 0; i < 1_000_000 && !predicate.test( value ); i++ ) - { // Busy loop a while - } - - while ( !predicate.test( value ) ) - { - // Sleeping wait - try - { - Thread.sleep( 1 ); - Thread.yield(); - } - catch ( InterruptedException e ) - { // It's OK - } - - assertHealthy(); - } - return nanoTime() - startTime; - } - protected void assertHealthy() { if ( isPanic() ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Processing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Processing.java new file mode 100644 index 0000000000000..9eb0ef7a66a23 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Processing.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import java.util.function.LongPredicate; + +import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; + +import static java.lang.System.nanoTime; + +public class Processing +{ + /** + * Awaits a condition, parking if needed and will abort of health check doesn't pass. + * + * @param goalPredicate condition which will end the wait, if returning {@code true}. + * @param goal to feed into the {@code goalPredicate}. + * @param healthCheck to check as to not continue waiting if not passing. + * @param park {@link ParkStrategy} for each tiny little wait. + * @return how long time was spent in here, in nanos. + */ + public static long await( LongPredicate goalPredicate, long goal, Runnable healthCheck, ParkStrategy park ) + { + if ( goalPredicate.test( goal ) ) + { + return 0; + } + + long startTime = nanoTime(); + for ( int i = 0; i < 1_000_000 && !goalPredicate.test( goal ); i++ ) + { // Busy loop a while + } + + Thread thread = Thread.currentThread(); + while ( !goalPredicate.test( goal ) ) + { + park.park( thread ); + healthCheck.run(); + } + return nanoTime()-startTime; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index 96de063c3f732..b68a07508c328 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -24,19 +24,28 @@ import org.neo4j.graphdb.Resource; import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; +import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import static java.lang.System.currentTimeMillis; import static java.lang.System.nanoTime; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY; +import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED; +import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED; +import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await; /** * {@link Step} that uses {@link TaskExecutor} as a queue and execution mechanism. * Supports an arbitrary number of threads to execute batches in parallel. * Subclasses implement {@link #process(Object, BatchSender)} receiving the batch to process * and an {@link BatchSender} for sending the modified batch, or other batches downstream. + * + * There's an overlap of functionality in {@link TicketedProcessing}, however the fit isn't perfect + * for using it as the engine in a {@link ProcessorStep} because the queuing of processed results + * works a bit differently. Perhaps sometimes this can be addressed. */ public abstract class ProcessorStep extends AbstractStep { @@ -51,6 +60,7 @@ public abstract class ProcessorStep extends AbstractStep // Time stamp for when we processed the last queued batch received from upstream. // Useful for tracking how much time we spend waiting for batches from upstream. private final AtomicLong lastBatchEndTime = new AtomicLong(); + private final ParkStrategy park = new ParkStrategy.Park( 1, MILLISECONDS ); protected ProcessorStep( StageControl control, String name, Configuration config, int maxProcessors, StatsProvider... additionalStatsProviders ) @@ -77,9 +87,8 @@ private int theoreticalMaxProcessors() public long receive( final long ticket, final T batch ) { // Don't go too far ahead - long idleTime = await( catchUp, executor.numberOfProcessors() ); + long idleTime = await( catchUp, executor.numberOfProcessors(), healthChecker, park ); incrementQueue(); - executor.submit( sender -> { assertHealthy(); sender.initialize( ticket ); @@ -89,7 +98,7 @@ public long receive( final long ticket, final T batch ) // since grabbing a permit may include locking. if ( guarantees( ORDER_PROCESS ) ) { - await( rightBeginTicket, ticket ); + await( rightBeginTicket, ticket, healthChecker, park ); } try ( Resource precondition = permit( batch ) ) { @@ -113,7 +122,6 @@ public long receive( final long ticket, final T batch ) issuePanic( e ); } } ); - return idleTime; } @@ -170,7 +178,7 @@ private void incrementQueue() public void close() throws Exception { super.close(); - executor.shutdown( panic == null ); + executor.shutdown( panic == null ? SF_AWAIT_ALL_COMPLETED : SF_ABORT_QUEUED ); } @Override @@ -196,7 +204,7 @@ private void sendDownstream( long ticket, Object batch ) { if ( guarantees( ORDER_SEND_DOWNSTREAM ) ) { - await( rightDoneTicket, ticket ); + await( rightDoneTicket, ticket, healthChecker, park ); } downstreamIdleTime.addAndGet( downstream.receive( ticket, batch ) ); doneBatches.incrementAndGet(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java new file mode 100644 index 0000000000000..32c7401e13cdc --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.LongPredicate; +import java.util.function.Supplier; + +import org.neo4j.unsafe.impl.batchimport.Parallelizable; +import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; +import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; +import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static org.neo4j.helpers.FutureAdapter.future; +import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await; + +/** + * Accepts jobs and processes them, potentially in parallel. Each task is given a ticket, an incrementing + * integer where results of processed tasks are {@link #next() returned} in the order of ticket, + * i.e. in order of submission. Number of threads processing jobs can be controlled via methods from + * {@link Parallelizable}. This little ASCII image will make an attempt to visualize this simple flow. + * + *
+ *                      Processors...
+ *                        ┌────┐
+ *                     ┌─>│Job ├──┐
+ *                     │  └────┘  │
+ * ┌────────────────┐  │  ┌────┐  │  ┌─────────────────┐
+ * │Submitted jobs  ├──┼─>│Job ├──┼─>│Processed jobs   │
+ * └────────────────┘  │  └────┘  │  └─────────────────┘
+ *                     │  ┌────┐  │
+ *                     └─>│Job ├──┘
+ *                        └────┘
+ * 
+ * + * For easily spawning a thread sitting and submitting jobs to be processed from an {@link Iterator}, + * see {@link #slurp(Iterator, boolean)}. + * + * @param raw material to process + * @param thread local state that each processing thread will share between jobs + * @param result that a raw material will be processed into + */ +public class TicketedProcessing implements Parallelizable +{ + private static final ParkStrategy park = new ParkStrategy.Park( 10, MILLISECONDS ); + + private final TaskExecutor executor; + private final BiFunction processor; + private final ArrayBlockingQueue processed; + private final AtomicLong submittedTicket = new AtomicLong( -1 ); + private final AtomicLong processedTicket = new AtomicLong( -1 ); + private final LongPredicate myTurnToAddToProcessedQueue = new LongPredicate() + { + @Override + public boolean test( long ticket ) + { + return processedTicket.get() == ticket - 1; + } + }; + private final Runnable healthCheck; + private volatile boolean done; + + public TicketedProcessing( String name, int maxProcessors, BiFunction processor, + Supplier threadLocalStateSupplier ) + { + this.processor = processor; + this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, maxProcessors, park, name, + threadLocalStateSupplier ); + this.healthCheck = executor::assertHealthy; + this.processed = new ArrayBlockingQueue<>( maxProcessors ); + } + + public void submit( long ticket, FROM job ) + { + submittedTicket.incrementAndGet(); + executor.submit( threadLocalState -> + { + // Process this job (we're now in one of the processing threads) + TO result = processor.apply( job, threadLocalState ); + + // Wait until it's my turn to add this result to the result queue, we have to add it in the + // correct order so that we preserve the ticket order. We want to wait as short periods of time + // as possible here because every cycle we wait adding this result, we're also blocking + // other results from being added to the result queue + await( myTurnToAddToProcessedQueue, ticket, healthCheck, park ); + + // OK now it's my turn to add this result to the result queue which user will pull from later on + while ( !processed.offer( result, 10, MILLISECONDS ) ); + + // Signal that this ticket has been processed and added to the result queue + processedTicket.incrementAndGet(); + } ); + } + + /** + * Essentially starting a thread {@link #submit(long, Object) submitting} a stream of inputs which will + * each be processed and asynchronically made available in order of processing ticket by later calling + * {@link #next()}. + * + * @param input {@link Iterator} of input to process. + * @param shutdownAfterAllSubmitted will call {@link #shutdown(boolean)} after all jobs submitted if {@code true}. + * @return {@link Future} representing the work of submitting the inputs to be processed. When the future + * is completed all items from the {@code input} {@link Iterator} have been submitted, but some items + * may still be queued and processed. + */ + public Future slurp( Iterator input, boolean shutdownAfterAllSubmitted ) + { + return future( () -> + { + for ( long ticket = 0; input.hasNext(); ticket++ ) + { + submit( ticket, input.next() ); + } + if ( shutdownAfterAllSubmitted ) + { + shutdown( true ); + } + return null; + } ); + } + + /** + * Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking, + * waiting for new processed results. + * + * @param awaitAllProcessed if {@code true} will block until all submitted jobs have been processed, + * otherwise if {@code false} will return immediately, where processing will still commence and complete. + */ + public void shutdown( boolean awaitAllProcessed ) + { + done = true; + executor.shutdown( awaitAllProcessed ? TaskExecutor.SF_AWAIT_ALL_COMPLETED : 0 ); + } + + /** + * @return next processed job (blocking call), or {@code null} if all jobs have been processed + * and {@link #shutdown(boolean)} has been called. + */ + public TO next() + { + while ( !done || processedTicket.get() < submittedTicket.get() || !processed.isEmpty() ) + { + try + { + TO next = processed.poll( 10, MILLISECONDS ); + if ( next != null ) + { + return next; + } + } + catch ( InterruptedException e ) + { + // Someone wants us to abort this thing + Thread.currentThread().interrupt(); + return null; + } + healthCheck.run(); + } + // We've reached the end of the line + return null; + } + + @Override + public int numberOfProcessors() + { + return executor.numberOfProcessors(); + } + + @Override + public boolean incrementNumberOfProcessors() + { + return executor.incrementNumberOfProcessors(); + } + + @Override + public boolean decrementNumberOfProcessors() + { + return executor.decrementNumberOfProcessors(); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursorTest.java new file mode 100644 index 0000000000000..89ea7b70d2e98 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/util/collection/ContinuableArrayCursorTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.util.collection; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ContinuableArrayCursorTest +{ + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void shouldNotReturnAnyElementOnEmptySupplier() throws Exception + { + // given + ContinuableArrayCursor cursor = new ContinuableArrayCursor<>( () -> null ); + + // then + assertFalse( cursor.next() ); + } + + @Test + public void shouldNotReturnAnyElementOnSupplierWithOneEmptyArray() throws Exception + { + // given + ContinuableArrayCursor cursor = new ContinuableArrayCursor( supply( new Integer[0] ) ); + + // then + assertFalse( cursor.next() ); + } + + @Test + public void shouldMoveCursorOverSingleArray() throws Exception + { + // given + Integer[] array = new Integer[]{1, 2, 3}; + ContinuableArrayCursor cursor = new ContinuableArrayCursor<>( supply( array ) ); + + // then + assertCursor( cursor, array ); + } + + @Test + public void shouldMoveCursorOverMultipleArrays() throws Exception + { + // given + Integer[][] arrays = new Integer[][]{ + new Integer[]{1, 2, 3}, + new Integer[]{4, 5, 6}, + new Integer[]{7} + }; + ContinuableArrayCursor cursor = new ContinuableArrayCursor<>( supply( arrays ) ); + + // then + assertCursor( cursor, arrays ); + } + + @Test + public void callGetBeforeNextShouldThrowIllegalStateException() throws Exception + { + // given + ContinuableArrayCursor cursor = new ContinuableArrayCursor( supply( new Integer[0] ) ); + + // then + thrown.expect( IllegalStateException.class ); + cursor.get(); + } + + @Test + public void callGetAfterNextReturnsFalseShouldThrowIllegalStateException() throws Exception + { + // given + ContinuableArrayCursor cursor = new ContinuableArrayCursor<>( supply( new Integer[0] ) ); + + // when + assertFalse( cursor.next() ); + + // then + thrown.expect( IllegalStateException.class ); + cursor.get(); + } + + private Supplier supply( Integer[] array ) + { + return supply( new Integer[][]{ array } ); + } + + private Supplier supply( Integer[][] arrays ) + { + Iterator iterator = Arrays.asList( arrays ).iterator(); + return () -> iterator.hasNext() ? + iterator.next() : null; + } + + private void assertCursor( ContinuableArrayCursor cursor, Object[]... arrays ) + { + for ( Object[] array : arrays ) + { + for ( Object obj : array ) + { + assertTrue( cursor.next() ); + assertEquals( obj, cursor.get() ); + } + } + assertFalse( cursor.next() ); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java index 4a74594e28758..a94792d73c8cb 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java @@ -23,12 +23,12 @@ import java.io.IOException; import java.util.concurrent.Future; - import org.neo4j.helpers.Exceptions; import org.neo4j.test.Barrier; import org.neo4j.test.DoubleLatch; import org.neo4j.test.OtherThreadExecutor; import org.neo4j.test.OtherThreadExecutor.WorkerCommand; +import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy.Park; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,13 +36,20 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED; +import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED; + public class DynamicTaskExecutorTest { + private static final Park PARK = new ParkStrategy.Park( 1, MILLISECONDS ); + @Test public void shouldExecuteTasksInParallel() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 5, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 5, PARK, getClass().getSimpleName() ); ControlledTask task1 = new ControlledTask(); TestTask task2 = new TestTask(); @@ -58,7 +65,7 @@ public void shouldExecuteTasksInParallel() throws Exception while ( task1.executed == 0 ) { // Busy loop } - executor.shutdown( true ); + executor.shutdown( SF_AWAIT_ALL_COMPLETED ); // THEN assertEquals( 1, task1.executed ); @@ -69,7 +76,7 @@ public void shouldExecuteTasksInParallel() throws Exception public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 1, 0, 5, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 1, 0, 5, PARK, getClass().getSimpleName() ); ControlledTask task1 = new ControlledTask(); TestTask task2 = new TestTask(); @@ -86,7 +93,7 @@ public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception while ( task1.executed == 0 ) { // Busy loop } - executor.shutdown( true ); + executor.shutdown( SF_AWAIT_ALL_COMPLETED ); // THEN assertEquals( 1, task1.executed ); @@ -97,7 +104,7 @@ public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 5, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 5, PARK, getClass().getSimpleName() ); ControlledTask task1 = new ControlledTask(); ControlledTask task2 = new ControlledTask(); @@ -118,7 +125,7 @@ public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception Thread.sleep( 200 ); // gosh, a Thread.sleep... assertEquals( 0, task4.executed ); task3.latch.finish(); - executor.shutdown( true ); + executor.shutdown( SF_AWAIT_ALL_COMPLETED ); // THEN assertEquals( 1, task1.executed ); @@ -131,7 +138,7 @@ public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception public void shouldExecuteMultipleTasks() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 30, 0, 5, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 30, 0, 5, PARK, getClass().getSimpleName() ); ExpensiveTask[] tasks = new ExpensiveTask[1000]; @@ -140,7 +147,7 @@ public void shouldExecuteMultipleTasks() throws Exception { executor.submit( tasks[i] = new ExpensiveTask( 10 ) ); } - executor.shutdown( true ); + executor.shutdown( SF_AWAIT_ALL_COMPLETED ); // THEN for ( ExpensiveTask task : tasks ) @@ -153,7 +160,7 @@ public void shouldExecuteMultipleTasks() throws Exception public void shouldShutDownOnTaskFailure() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 30, 0, 5, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 30, 0, 5, PARK, getClass().getSimpleName() ); // WHEN @@ -171,7 +178,7 @@ public void shouldShutDownOnTaskFailure() throws Exception public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 10, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 10, PARK, getClass().getSimpleName() ); IOException exception = new IOException( "Test message" ); ControlledTask firstBlockingTask = new ControlledTask(); @@ -194,14 +201,14 @@ public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Excep // THEN assertExceptionOnSubmit( executor, exception ); - executor.shutdown( false ); // call would block if the shutdown as part of failure doesn't complete properly + executor.shutdown( SF_ABORT_QUEUED ); // call would block if the shutdown as part of failure doesn't complete properly } @Test public void shouldSurfaceTaskErrorInAssertHealthy() throws Exception { // GIVEN - TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 10, new ParkStrategy.Park( 1 ), + TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 10, PARK, getClass().getSimpleName() ); IOException exception = new IOException( "Failure" ); @@ -233,7 +240,7 @@ public void shouldSurfaceTaskErrorInAssertHealthy() throws Exception public void shouldLetShutdownCompleteInEventOfPanic() throws Exception { // GIVEN - final TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 10, new ParkStrategy.Park( 1 ), + final TaskExecutor executor = new DynamicTaskExecutor<>( 2, 0, 10, PARK, getClass().getSimpleName() ); IOException exception = new IOException( "Failure" ); @@ -250,7 +257,7 @@ public void shouldLetShutdownCompleteInEventOfPanic() throws Exception @Override public Void doWork( Void state ) throws Exception { - executor.shutdown( true ); + executor.shutdown( SF_AWAIT_ALL_COMPLETED ); return null; } } ); @@ -273,7 +280,7 @@ public Void doWork( Void state ) throws Exception public void shouldRespectMaxProcessors() throws Exception { // GIVEN - final TaskExecutor executor = new DynamicTaskExecutor<>( 1, 4, 10, new ParkStrategy.Park( 1 ), + final TaskExecutor executor = new DynamicTaskExecutor<>( 1, 4, 10, PARK, getClass().getSimpleName() ); // WHEN/THEN @@ -283,7 +290,7 @@ public void shouldRespectMaxProcessors() throws Exception assertEquals( 2, executor.numberOfProcessors() ); executor.setNumberOfProcessors( 10 ); assertEquals( 4, executor.numberOfProcessors() ); - executor.shutdown( true ); + executor.shutdown( SF_AWAIT_ALL_COMPLETED ); } private void assertExceptionOnSubmit( TaskExecutor executor, IOException exception ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java index bf7d8241c073a..0e68378927e0f 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java @@ -47,10 +47,7 @@ public void shouldUpholdProcessOrderingGuarantee() throws Exception StageControl control = mock( StageControl.class ); MyProcessorStep step = new MyProcessorStep( control, 0 ); step.start( ORDER_PROCESS ); - while ( step.numberOfProcessors() < 5 ) - { - step.incrementNumberOfProcessors(); - } + step.setNumberOfProcessors( 5 ); // WHEN int batches = 10; @@ -78,7 +75,7 @@ public void shouldHaveTaskQueueSizeEqualToNumberOfProcessorsIfSpecificallySet() final int processors = 2; final ProcessorStep step = new BlockingProcessorStep( control, processors, latch ); step.start( ORDER_PROCESS ); - step.incrementNumberOfProcessors(); // now at 2 + step.setNumberOfProcessors( 2 ); // adding two should be fine for ( int i = 0; i < processors+1 /* +1 since we allow queueing one more*/; i++ ) { @@ -102,8 +99,7 @@ public void shouldHaveTaskQueueSizeEqualToCurrentNumberOfProcessorsIfNotSpecific final CountDownLatch latch = new CountDownLatch( 1 ); final ProcessorStep step = new BlockingProcessorStep( control, 0, latch ); step.start( ORDER_PROCESS ); - step.incrementNumberOfProcessors(); // now at 2 - step.incrementNumberOfProcessors(); // now at 3 + step.setNumberOfProcessors( 3 ); // adding two should be fine for ( int i = 0; i < step.numberOfProcessors()+1 /* +1 since we allow queueing one more*/; i++ ) { diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java new file mode 100644 index 0000000000000..9aed766eb5c64 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import org.neo4j.test.OtherThreadExecutor.WorkerCommand; +import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; +import org.neo4j.test.OtherThreadRule; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class TicketedProcessingTest +{ + @Rule + public final OtherThreadRule asserter = new OtherThreadRule<>(); + + @Test + public void shouldReturnTicketsInOrder() throws Exception + { + // GIVEN + int items = 1_000; + ParkStrategy park = new ParkStrategy.Park( 2, MILLISECONDS ); + BiFunction processor = (from,ignore) -> + { + if ( ThreadLocalRandom.current().nextFloat() < 0.01f ) + { + park.park( Thread.currentThread() ); + } + return from*2; + }; + int processorCount = Runtime.getRuntime().availableProcessors(); + TicketedProcessing processing = new TicketedProcessing<>( + "Doubler", processorCount, processor, () -> null ); + processing.setNumberOfProcessors( processorCount ); + + // WHEN + Future assertions = asserter.execute( new WorkerCommand() + { + @Override + public Void doWork( Void state ) throws Exception + { + for ( int i = 0; i < items; i++ ) + { + Integer next = processing.next(); + assertNotNull( next ); + assertEquals( i*2, next.intValue() ); + } + assertNull( processing.next() ); + return null; + } + } ); + for ( int i = 0; i < items; i++ ) + { + processing.submit( i, i ); + } + processing.shutdown( true ); + + // THEN + assertions.get(); + } +}