-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use header_names instead of column_N if available #72
Conversation
It looks like Travis had a transient failure. The only tracebacks in the output are, Exception in thread "unVocity-parsers input reading thread" java.lang.IllegalStateException: Error closing input
at com.univocity.parsers.common.input.concurrent.ConcurrentCharLoader.stopReading(ConcurrentCharLoader.java:181)
at com.univocity.parsers.common.input.concurrent.ConcurrentCharLoader.run(ConcurrentCharLoader.java:101)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:475)
at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:656)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at sun.nio.cs.StreamDecoder.implClose(StreamDecoder.java:378)
at sun.nio.cs.StreamDecoder.close(StreamDecoder.java:193)
at java.io.InputStreamReader.close(InputStreamReader.java:199)
at com.univocity.parsers.common.input.concurrent.ConcurrentCharLoader.stopReading(ConcurrentCharLoader.java:178)
... 2 more Which I don't think my code change would cause. If you know why it failed that would be very helpful! Maybe re-running Travis will succeed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the interest @kylecbrodie!
Couldn't be done this using file_reader.delimited.settings.header
and file_reader.delimited.settings.header_names
properties?
@mmolimar Setting This PR changes the value of the |
Ok, thanks. ...
} else if (settings.getHeaders() != null && settings.getHeaders().length > 0) {
List<Schema> dataTypes = getDataTypes(config, settings.getHeaders());
IntStream.range(0, settings.getHeaders().length)
.forEach(index -> builder.field(settings.getHeaders()[index], dataTypes.get(index)));
}
... Could you also add a test to validate this change? On the other hand, this change would be included in the next release and the new features/fixes will be in the |
and remove hasHeader parameter
to match the formatting of the previous version of this method
The PR does not compile. Method |
Definitely! I'll change it tomorrow (July 30th) |
since ifPresentOrElse is not available in Java 8
and header extraction off
@mmolimar I added a test case and formatted my change so it is easier to see what has changed. I wasn't able to get JUnit working in VS Code so I'm hoping Travis can run the test case to see if it passes or fails |
It looks like the tests don't pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be simpler something like this:
private Schema buildSchema(Map<String, Object> config) {
SchemaBuilder builder = SchemaBuilder.struct();
if (iterator.hasNext() && !settings.isHeaderExtractionEnabled()) {
String[] headers;
if (settings.getHeaders() == null || settings.getHeaders().length == 0) {
Record first = iterator.next();
headers = new String[first.getValues().length];
IntStream.range(0, headers.length)
.forEach(index -> headers[index] = DEFAULT_COLUMN_NAME + (index + 1));
seek(0);
} else {
headers = settings.getHeaders();
}
List<Schema> dataTypes = getDataTypes(config, headers);
IntStream.range(0, headers.length)
.forEach(index -> builder.field(headers[index], dataTypes.get(index)));
} else if (settings.isHeaderExtractionEnabled()) {
Optional.ofNullable(iterator.getContext().headers()).ifPresent(headers -> {
List<Schema> dataTypes = getDataTypes(config, headers);
IntStream.range(0, headers.length)
.forEach(index -> builder.field(headers[index], dataTypes.get(index)));
});
}
return builder.build();
}
And rearrange so the case of extracted or user provided headers is handled first and using default headers is handled second This stems from the potentially incorrect assumption that wanting to use provided or extracted headers is more common than wanting to use the default headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests don't pass.
You can use the snippet I shared with you ;-)
private Schema buildSchema(Map<String, Object> config) {
SchemaBuilder builder = SchemaBuilder.struct();
if (iterator.hasNext() && !settings.isHeaderExtractionEnabled()) {
String[] headers;
if (settings.getHeaders() == null || settings.getHeaders().length == 0) {
Record first = iterator.next();
headers = new String[first.getValues().length];
IntStream.range(0, headers.length)
.forEach(index -> headers[index] = DEFAULT_COLUMN_NAME + (index + 1));
seek(0);
} else {
headers = settings.getHeaders();
}
List<Schema> dataTypes = getDataTypes(config, headers);
IntStream.range(0, headers.length)
.forEach(index -> builder.field(headers[index], dataTypes.get(index)));
} else if (settings.isHeaderExtractionEnabled()) {
Optional.ofNullable(iterator.getContext().headers()).ifPresent(headers -> {
List<Schema> dataTypes = getDataTypes(config, headers);
IntStream.range(0, headers.length)
.forEach(index -> builder.field(headers[index], dataTypes.get(index)));
});
}
return builder.build();
}
@mmolimar I applied your suggestion and it is passing tests now! |
Thanks @kylecbrodie! |
Closes #53
This allows
header_names
to be specified and used as the field names in the Avro schema for files that lack a header row