Skip to content

Commit

Permalink
Simplified creation of DeserializerFactory
Browse files Browse the repository at this point in the history
into common implementations in DeserializerFactories as to deduplicate these
implementations found throught main and test code and make that code more readable
  • Loading branch information
tinwelint committed Oct 15, 2016
1 parent df16fca commit 49f050d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 27 deletions.
Expand Up @@ -34,6 +34,9 @@
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory; import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory;


import static org.neo4j.unsafe.impl.batchimport.input.csv.DeserializerFactories.defaultNodeDeserializer;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DeserializerFactories.defaultRelationshipDeserializer;

/** /**
* Provides {@link Input} from data contained in tabular/csv form. Expects factories for instantiating * Provides {@link Input} from data contained in tabular/csv form. Expects factories for instantiating
* the {@link CharSeeker} objects seeking values in the csv data and header factories for how to * the {@link CharSeeker} objects seeking values in the csv data and header factories for how to
Expand Down Expand Up @@ -108,13 +111,7 @@ public InputIterable<InputNode> nodes()
@Override @Override
public InputIterator<InputNode> iterator() public InputIterator<InputNode> iterator()
{ {
DeserializerFactory<InputNode> factory = (dataHeader, dataStream, decorator, validator) -> DeserializerFactory<InputNode> factory = defaultNodeDeserializer( groups, config, idType, badCollector );
{
InputNodeDeserialization deserialization =
new InputNodeDeserialization( dataHeader, dataStream, groups, idType.idsAreExternal() );
return new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(),
deserialization, decorator, validator, badCollector );
};
return new InputGroupsDeserializer<>( nodeDataFactory.iterator(), nodeHeaderFactory, config, return new InputGroupsDeserializer<>( nodeDataFactory.iterator(), nodeHeaderFactory, config,
idType, maxProcessors, 1, factory, Validators.<InputNode>emptyValidator(), InputNode.class ); idType, maxProcessors, 1, factory, Validators.<InputNode>emptyValidator(), InputNode.class );
} }
Expand All @@ -135,13 +132,8 @@ public InputIterable<InputRelationship> relationships()
@Override @Override
public InputIterator<InputRelationship> iterator() public InputIterator<InputRelationship> iterator()
{ {
DeserializerFactory<InputRelationship> factory = (dataHeader, dataStream, decorator, validator) -> DeserializerFactory<InputRelationship> factory =
{ defaultRelationshipDeserializer( groups, config, idType, badCollector );
InputRelationshipDeserialization deserialization =
new InputRelationshipDeserialization( dataHeader, dataStream, groups );
return new InputEntityDeserializer<>( dataHeader, dataStream, config.delimiter(),
deserialization, decorator, validator, badCollector );
};
return new InputGroupsDeserializer<>( relationshipDataFactory.iterator(), relationshipHeaderFactory, return new InputGroupsDeserializer<>( relationshipDataFactory.iterator(), relationshipHeaderFactory,
config, idType, maxProcessors, 1, factory, new InputRelationshipValidator(), config, idType, maxProcessors, 1, factory, new InputRelationshipValidator(),
InputRelationship.class ); InputRelationship.class );
Expand Down
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.input.csv;

import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Groups;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory;

/**
* Common {@link DeserializerFactory} implementations.
*/
public class DeserializerFactories
{
public static DeserializerFactory<InputNode> defaultNodeDeserializer(
Groups groups, Configuration config, IdType idType, Collector badCollector )
{
return (header,stream,decorator,validator) ->
{
InputNodeDeserialization deserialization =
new InputNodeDeserialization( header, stream, groups, idType.idsAreExternal() );
return new InputEntityDeserializer<>( header, stream, config.delimiter(),
deserialization, decorator, validator, badCollector );
};
}

public static DeserializerFactory<InputRelationship> defaultRelationshipDeserializer(
Groups groups, Configuration config, IdType idType, Collector badCollector )
{
return (header,stream,decorator,validator) ->
{
InputRelationshipDeserialization deserialization =
new InputRelationshipDeserialization( header, stream, groups );
return new InputEntityDeserializer<>( header, stream, config.delimiter(),
deserialization, decorator, validator, badCollector );
};
}
}
Expand Up @@ -45,7 +45,7 @@ class InputGroupsDeserializer<ENTITY extends InputEntity>
private final IdType idType; private final IdType idType;
private InputIterator<ENTITY> currentInput = new InputIterator.Empty<>(); private InputIterator<ENTITY> currentInput = new InputIterator.Empty<>();
private long previousInputsCollectivePositions; private long previousInputsCollectivePositions;
private int previousInputProcessors = 1; private int previousInputProcessors;
private boolean currentInputOpen; private boolean currentInputOpen;
private final int maxProcessors; private final int maxProcessors;
private final DeserializerFactory<ENTITY> factory; private final DeserializerFactory<ENTITY> factory;
Expand All @@ -60,15 +60,15 @@ InputEntityDeserializer<ENTITY> create( Header dataHeader, CharSeeker dataStream
} }


InputGroupsDeserializer( Iterator<DataFactory<ENTITY>> dataFactory, Header.Factory headerFactory, InputGroupsDeserializer( Iterator<DataFactory<ENTITY>> dataFactory, Header.Factory headerFactory,
Configuration config, IdType idType, int maxProcessors, int processors, DeserializerFactory<ENTITY> factory, Configuration config, IdType idType, int maxProcessors, int initialProcessors,
Validator<ENTITY> validator, Class<ENTITY> entityClass ) DeserializerFactory<ENTITY> factory, Validator<ENTITY> validator, Class<ENTITY> entityClass )
{ {
super( dataFactory ); super( dataFactory );
this.headerFactory = headerFactory; this.headerFactory = headerFactory;
this.config = config; this.config = config;
this.idType = idType; this.idType = idType;
this.maxProcessors = maxProcessors; this.maxProcessors = maxProcessors;
this.previousInputProcessors = processors; this.previousInputProcessors = initialProcessors;
this.factory = factory; this.factory = factory;
this.validator = validator; this.validator = validator;
this.entityClass = entityClass; this.entityClass = entityClass;
Expand Down
Expand Up @@ -68,7 +68,7 @@ public class ParallelInputEntityDeserializer<ENTITY extends InputEntity> extends


@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config, public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config,
IdType idType, int maxProcessors, int processors, DeserializerFactory<ENTITY> factory, IdType idType, int maxProcessors, int initialProcessors, DeserializerFactory<ENTITY> factory,
Validator<ENTITY> validator, Class<ENTITY> entityClass ) Validator<ENTITY> validator, Class<ENTITY> entityClass )
{ {
// Reader of chunks, characters aligning to nearest newline // Reader of chunks, characters aligning to nearest newline
Expand Down Expand Up @@ -110,7 +110,7 @@ public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory header
return entities.toArray( (ENTITY[]) Array.newInstance( entityClass, entities.size() ) ); return entities.toArray( (ENTITY[]) Array.newInstance( entityClass, entities.size() ) );
}, },
() -> dataHeader.clone() /*We need to clone the stateful header to each processing thread*/ ); () -> dataHeader.clone() /*We need to clone the stateful header to each processing thread*/ );
processing.processors( processors - processing.processors( 0 ) ); processing.processors( initialProcessors - processing.processors( 0 ) );


// Utility cursor which takes care of moving over processed results from chunk to chunk // Utility cursor which takes care of moving over processed results from chunk to chunk
Supplier<ENTITY[]> batchSupplier = rebaseBatches( processing ); Supplier<ENTITY[]> batchSupplier = rebaseBatches( processing );
Expand Down
Expand Up @@ -32,6 +32,8 @@
import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.Groups;
import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer.DeserializerFactory;

import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
Expand All @@ -43,6 +45,7 @@
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR; import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS; import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader; import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DeserializerFactories.defaultNodeDeserializer;
import static org.neo4j.unsafe.impl.batchimport.input.csv.IdType.INTEGER; import static org.neo4j.unsafe.impl.batchimport.input.csv.IdType.INTEGER;


public class InputGroupsDeserializerTest public class InputGroupsDeserializerTest
Expand Down Expand Up @@ -103,15 +106,10 @@ public void shouldCoordinateGroupCreationForParallelProcessing() throws Exceptio
IdType idType = IdType.INTEGER; IdType idType = IdType.INTEGER;
Collector badCollector = mock( Collector.class ); Collector badCollector = mock( Collector.class );
Configuration config = lowBufferSize( COMMAS, false ); Configuration config = lowBufferSize( COMMAS, false );
DeserializerFactory<InputNode> factory = defaultNodeDeserializer( groups, config, idType, badCollector );
try ( InputGroupsDeserializer<InputNode> deserializer = new InputGroupsDeserializer<>( try ( InputGroupsDeserializer<InputNode> deserializer = new InputGroupsDeserializer<>(
data.iterator(), defaultFormatNodeFileHeader(), config, idType, data.iterator(), defaultFormatNodeFileHeader(), config, idType,
processors, processors, (header,stream,decorator,validator) -> processors, processors, factory, Validators.<InputNode>emptyValidator(), InputNode.class ) )
{
InputNodeDeserialization deserialization =
new InputNodeDeserialization( header, stream, groups, idType.idsAreExternal() );
return new InputEntityDeserializer<>( header, stream, config.delimiter(),
deserialization, decorator, validator, badCollector );
}, Validators.<InputNode>emptyValidator(), InputNode.class ) )
{ {
// WHEN // WHEN
count( deserializer ); count( deserializer );
Expand Down

0 comments on commit 49f050d

Please sign in to comment.