Skip to content

Commit

Permalink
Handle extra columns in CSV files using bad file
Browse files Browse the repository at this point in the history
Now either fails with an exception or logs to bad file.
  • Loading branch information
spacecowboy committed Oct 29, 2015
1 parent b73f7f9 commit d820493
Show file tree
Hide file tree
Showing 18 changed files with 197 additions and 96 deletions.
Expand Up @@ -93,6 +93,7 @@
import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators.startingFromTheBeginning;
import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMappers.longs;
import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMappers.strings;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector;
import static org.neo4j.unsafe.impl.batchimport.staging.ProcessorAssignmentStrategies.eagerRandomSaturation;
import static org.neo4j.unsafe.impl.batchimport.store.BatchingPageCache.SYNCHRONOUS;

Expand Down Expand Up @@ -178,7 +179,8 @@ public void shouldImportCsvData() throws Exception
nodes( nodeRandomSeed, NODE_COUNT, inputIdGenerator, groups ),
relationships( relationshipRandomSeed, RELATIONSHIP_COUNT, inputIdGenerator, groups ),
idMapper, idGenerator, false,
RELATIONSHIP_COUNT/*insanely high bad tolerance, but it will actually never be that many*/ ) );
/*insanely high bad tolerance, but it will actually never be that many*/
silentBadCollector( RELATIONSHIP_COUNT ) ) );

// THEN
GraphDatabaseService db = new TestGraphDatabaseFactory().newEmbeddedDatabase( directory.absolutePath() );
Expand Down
Expand Up @@ -19,8 +19,10 @@
*/
package org.neo4j.tooling;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -53,6 +55,7 @@
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.DuplicateInputIdException;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
Expand Down Expand Up @@ -176,7 +179,12 @@ enum Options
+ "nodes within the same group having the same id, the first encountered will be imported "
+ "whereas consecutive such nodes will be skipped. "
+ "Skipped nodes will be logged"
+ ", containing at most number of entities specified by " + BAD_TOLERANCE.key() + "." );
+ ", containing at most number of entities specified by " + BAD_TOLERANCE.key() + "." ),
IGNORE_EXTRA_COLUMNS( "ignore-extra-columns", Boolean.FALSE,
"<true/false>",
"Whether or not to ignore extra columns in the data not specified by the header. "
+ "Skipped columns will be logged, containing at most number of entities specified by "
+ BAD_TOLERANCE.key() + "." );

private final String key;
private final Object defaultValue;
Expand Down Expand Up @@ -293,12 +301,15 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
Input input = null;
int badTolerance;
Charset inputEncoding;
boolean skipBadRelationships, skipDuplicateNodes;
boolean skipBadRelationships, skipDuplicateNodes, ignoreExtraColumns;

try
{
storeDir = args.interpretOption( Options.STORE_DIR.key(), Converters.<File>mandatory(),
Converters.toFile(), Validators.DIRECTORY_IS_WRITABLE, Validators.CONTAINS_NO_EXISTING_DATABASE );

File badFile = new File( storeDir, BAD_FILE_NAME );
OutputStream badOutput = new BufferedOutputStream( fs.openAsOutputStream( badFile, false ) );
nodesFiles = INPUT_FILES_EXTRACTOR.apply( args, Options.NODE_DATA.key() );
relationshipsFiles = INPUT_FILES_EXTRACTOR.apply( args, Options.RELATIONSHIP_DATA.key() );
validateInputFiles( nodesFiles, relationshipsFiles );
Expand All @@ -313,16 +324,24 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
(Boolean)Options.SKIP_BAD_RELATIONSHIPS.defaultValue(), true );
skipDuplicateNodes = args.getBoolean( Options.SKIP_DUPLICATE_NODES.key(),
(Boolean)Options.SKIP_DUPLICATE_NODES.defaultValue(), true );
input = new CsvInput(
nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(),
ignoreExtraColumns = args.getBoolean( Options.IGNORE_EXTRA_COLUMNS.key(),
(Boolean)Options.IGNORE_EXTRA_COLUMNS.defaultValue(), true );

Collector badCollector = badCollector( badOutput, badTolerance, collect( skipBadRelationships,
skipDuplicateNodes, ignoreExtraColumns ) );

input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(),
relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(),
idType, csvConfiguration( args, defaultSettingsSuitableForTests ),
badCollector( badTolerance, collect( skipBadRelationships, skipDuplicateNodes ) ) );
idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector );
}
catch ( IllegalArgumentException e )
{
throw andPrintError( "Input error", e, false );
}
catch ( IOException e )
{
throw andPrintError( "File error", e, false );
}

LifeSupport life = new LifeSupport();
Logging logging = life.add( new ClassicLoggingService(
Expand All @@ -347,11 +366,16 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ),
}
finally
{
File badFile = new File( storeDir, BAD_FILE_NAME );
if ( badFile.exists() )
input.badCollector().close();

if ( input.badCollector().badEntries() > 0 )
{
out.println( "There were bad entries which were skipped and logged into " +
badFile.getAbsolutePath() );
File badFile = new File( storeDir, BAD_FILE_NAME );
if ( badFile.exists() )
{
out.println(
"There were bad entries which were skipped and logged into " + badFile.getAbsolutePath() );
}
}

life.shutdown();
Expand Down
Expand Up @@ -47,10 +47,11 @@
public class CsvDataGeneratorInput extends CsvDataGenerator<InputNode,InputRelationship> implements Input
{
private final IdType idType;
private final Collector badCollector;

public CsvDataGeneratorInput( final Header nodeHeader, final Header relationshipHeader,
Configuration config, long nodes, long relationships, final Groups groups, final IdType idType,
int numberOfLabels, int numberOfRelationshipTypes )
int numberOfLabels, int numberOfRelationshipTypes, Collector badCollector )
{
super( nodeHeader, relationshipHeader, config, nodes, relationships,
new Function<SourceTraceability,Deserialization<InputNode>>()
Expand All @@ -71,6 +72,7 @@ public Deserialization<InputRelationship> apply( SourceTraceability from ) throw
},
numberOfLabels, numberOfRelationshipTypes );
this.idType = idType;
this.badCollector = badCollector;
}

@Override
Expand Down Expand Up @@ -130,8 +132,8 @@ public boolean specificRelationshipIds()
}

@Override
public Collector badCollector( OutputStream out )
public Collector badCollector()
{
return Collectors.badCollector( out, 0 );
return badCollector;
}
}
Expand Up @@ -193,16 +193,50 @@ public void import4097Labels() throws Exception
}

@Test
public void shouldPrintWarningsIfHeaderHasLessColumnsThanData() throws Exception
public void shouldFailIfHeaderHasLessColumnsThanData() throws Exception
{
// GIVEN
List<String> nodeIds = nodeIds();
Configuration config = Configuration.TABS;

// WHEN data file contains more columns than header file
int extraColumns = 3;
String output = executeImportAndCatchOutput(
try
{
executeImportAndCatchOutput(
"--into", dbRule.getStoreDirAbsolutePath(),
"--delimiter", "TAB",
"--array-delimiter", String.valueOf( config.arrayDelimiter() ),
"--nodes", nodeHeader( config ).getAbsolutePath() + MULTI_FILE_DELIMITER +
nodeData( false, config, nodeIds, alwaysTrue(), Charset.defaultCharset(), extraColumns )
.getAbsolutePath(),
"--relationships", relationshipHeader( config ).getAbsolutePath() + MULTI_FILE_DELIMITER +
relationshipData( false, config, nodeIds, alwaysTrue(), true ).getAbsolutePath() );

fail( "Should have thrown exception" );
}
catch ( InputException e )
{
// THEN
assertTrue( e.getMessage().contains( "Extra column not present in header on line" ) );
}
}

@Test
public void shouldWarnIfHeaderHasLessColumnsThanDataWhenToldTo() throws Exception
{
// GIVEN
List<String> nodeIds = nodeIds();
Configuration config = Configuration.TABS;
File bad = file( "bad.log" );

// WHEN data file contains more columns than header file
int extraColumns = 3;
executeImportAndCatchOutput(
"--into", dbRule.getStoreDirAbsolutePath(),
"--bad", bad.getAbsolutePath(),
"--bad-tolerance", Integer.toString( nodeIds.size() * extraColumns ),
"--ignore-extra-columns",
"--delimiter", "TAB",
"--array-delimiter", String.valueOf( config.arrayDelimiter() ),
"--nodes", nodeHeader( config ).getAbsolutePath() + MULTI_FILE_DELIMITER +
Expand All @@ -212,7 +246,8 @@ public void shouldPrintWarningsIfHeaderHasLessColumnsThanData() throws Exception
relationshipData( false, config, nodeIds, alwaysTrue(), true ).getAbsolutePath() );

// THEN
assertTrue( "Should warn when columns are ignored:\n" + output, output.contains( "Warning: ignored columns" ) );
String badContents = FileUtils.readTextFile( bad, Charset.defaultCharset() );
assertTrue( badContents.contains( "Extra column not present in header on line" ) );
}

@Test
Expand Down
Expand Up @@ -40,6 +40,7 @@
import static org.neo4j.tooling.CsvDataGenerator.bareboneNodeHeader;
import static org.neo4j.tooling.CsvDataGenerator.bareboneRelationshipHeader;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatRelationshipFileHeader;
Expand Down Expand Up @@ -78,7 +79,8 @@ public static void main( String[] arguments ) throws IOException

Input input = new CsvDataGeneratorInput(
nodeHeader, relationshipHeader,
COMMAS, nodeCount, relationshipCount, new Groups(), idType, labelCount, relationshipTypeCount );
COMMAS, nodeCount, relationshipCount, new Groups(), idType, labelCount, relationshipTypeCount,
silentBadCollector( 0 ));
BatchImporter importer = new ParallelBatchImporter( dir, DEFAULT, new SystemOutLogging(), defaultVisible() );
importer.doImport( input );
}
Expand Down
Expand Up @@ -19,9 +19,12 @@
*/
package org.neo4j.kernel.impl.storemigration;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
Expand Down Expand Up @@ -85,6 +88,7 @@
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerators;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMappers;
import org.neo4j.unsafe.impl.batchimport.input.Collectors;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
Expand Down Expand Up @@ -339,13 +343,16 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
}

Configuration importConfig = new Configuration.Overridden( config );
File badFile = new File( storeDir, Configuration.BAD_FILE_NAME );
OutputStream badOutput = new BufferedOutputStream( new FileOutputStream( badFile, false ) );
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsolutePath(), fileSystem,
importConfig, logging, withDynamicProcessorAssignment( migrationBatchImporterMonitor(
legacyStore, progressMonitor ), importConfig ),
parallel(), readAdditionalIds( storeDir, lastTxId, lastTxChecksum ) );
InputIterable<InputNode> nodes = legacyNodesAsInput( legacyStore );
InputIterable<InputRelationship> relationships = legacyRelationshipsAsInput( legacyStore );
importer.doImport( Inputs.input( nodes, relationships, IdMappers.actual(), IdGenerators.fromInput(), true, 0 ) );
importer.doImport( Inputs.input( nodes, relationships, IdMappers.actual(), IdGenerators.fromInput(), true,
Collectors.badCollector( badOutput, 0 )) );

// During migration the batch importer only writes node, relationship, relationship group and counts stores.
// Delete the property store files from the batch import migration so that even if we won't
Expand Down
Expand Up @@ -128,12 +128,11 @@ public void doImport( Input input ) throws IOException
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
try ( BatchingNeoStore neoStore = new BatchingNeoStore( fileSystem, storeDir, config,
writeMonitor, logging, monitors, writerFactory, additionalInitialIds );
OutputStream badOutput = new BufferedOutputStream( fileSystem.openAsOutputStream( badFile, false ) );
Collector badCollector = input.badCollector( badOutput );
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir ) )
{
Collector badCollector = input.badCollector();
// Some temporary caches and indexes in the import
IdMapper idMapper = input.idMapper();
IdGenerator idGenerator = input.idGenerator();
Expand Down
Expand Up @@ -48,7 +48,8 @@ private interface ProblemReporter

public static final int BAD_RELATIONSHIPS = 0x1;
public static final int DUPLICATE_NODES = 0x2;
public static final int COLLECT_ALL = BAD_RELATIONSHIPS | DUPLICATE_NODES;
public static final int EXTRA_COLUMNS = 0x4;
public static final int COLLECT_ALL = BAD_RELATIONSHIPS | DUPLICATE_NODES | EXTRA_COLUMNS;

private final PrintStream out;
private final int tolerance;
Expand Down Expand Up @@ -114,6 +115,28 @@ public InputException exception()
leftOverDuplicateNodeIds[leftOverDuplicateNodeIdsCursor++] = actualId;
}

@Override
public void collectExtraColumns( final String source, final long row, final String value )
{
checkTolerance( EXTRA_COLUMNS, new ProblemReporter()
{
private final String message = format( "Extra column not present in header on line %d in %s with value %s",
row, source, value );

@Override
public String message()
{
return message;
}

@Override
public InputException exception()
{
return new InputException( message );
}
} );
}

@Override
public PrimitiveLongIterator leftOverDuplicateNodesIds()
{
Expand Down
Expand Up @@ -31,6 +31,8 @@ public interface Collector extends AutoCloseable

void collectDuplicateNode( Object id, long actualId, String group, String firstSource, String otherSource );

void collectExtraColumns( final String source, final long row, final String value );

int badEntries();

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport.input;

import java.io.IOException;
import java.io.OutputStream;

import org.neo4j.function.Function;
Expand All @@ -28,6 +29,21 @@
*/
public class Collectors
{
public static Collector silentBadCollector( int tolerance ) {
return silentBadCollector( tolerance, BadCollector.COLLECT_ALL );
}

public static Collector silentBadCollector( int tolerance, int collect ) {
return badCollector( new OutputStream()
{
@Override
public void write( int i ) throws IOException
{
// ignored
}
}, tolerance, collect );
}

public static Collector badCollector( OutputStream out, int tolerance )
{
return badCollector( out, tolerance, BadCollector.COLLECT_ALL );
Expand Down Expand Up @@ -55,9 +71,10 @@ public Collector apply( OutputStream out ) throws RuntimeException
};
}

public static int collect( boolean skipBadRelationships, boolean skipDuplicateNodes )
public static int collect( boolean skipBadRelationships, boolean skipDuplicateNodes, boolean ignoreExtraColumns )
{
return (skipBadRelationships ? BadCollector.BAD_RELATIONSHIPS : 0 ) |
(skipDuplicateNodes ? BadCollector.DUPLICATE_NODES : 0 );
(skipDuplicateNodes ? BadCollector.DUPLICATE_NODES : 0 ) |
(ignoreExtraColumns ? BadCollector.EXTRA_COLUMNS : 0 );
}
}
Expand Up @@ -71,5 +71,5 @@ public interface Input
* @return a {@link Collector} capable of writing {@link InputRelationship bad relationships}
* and {@link InputNode duplicate nodes} to an output stream for later handling.
*/
Collector badCollector( OutputStream out );
Collector badCollector();
}

0 comments on commit d820493

Please sign in to comment.