Skip to content

Commit 5314e20

Browse files
committed
fix(filters): DelimtedRowFileInputFilter should compute schema for each record (#171)
1 parent 608d1c2 commit 5314e20

File tree

2 files changed

+43
-7
lines changed

2 files changed

+43
-7
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DelimitedRowFilter.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ public void configure(final Map<String, ?> configs) {
8888

8989
private boolean isMandatoryConfigsMissing() {
9090
return configs.schema() == null &&
91-
configs.extractColumnName() == null &&
92-
!configs.isAutoGenerateColumnNames();
91+
configs.extractColumnName() == null &&
92+
!configs.isAutoGenerateColumnNames();
9393
}
9494

9595
/**
@@ -111,13 +111,22 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
111111
final String source = record.first(DEFAULT_SOURCE_FIELD).getString();
112112

113113
String[] columnValues = splitColumnValues(source);
114-
if (schema == null) {
114+
115+
if (schema == null || isSchemaDynamic()) {
115116
inferSchemaFromRecord(record, columnValues.length);
116117
}
117118
final TypedStruct struct = buildStructForFields(columnValues);
118119
return RecordsIterable.of(struct);
119120
}
120121

122+
public boolean isSchemaDynamic() {
123+
// Schema SHOULD be inferred for each record when columns name are auto generate.
124+
// This rule is used to handle cases where records may have different number of columns.
125+
return configs.extractColumnName() == null &&
126+
configs.schema() == null &&
127+
configs.isAutoGenerateColumnNames();
128+
}
129+
121130
private void inferSchemaFromRecord(final TypedStruct record, int numColumns) {
122131
schema = Schema.struct();
123132

@@ -126,7 +135,8 @@ private void inferSchemaFromRecord(final TypedStruct record, int numColumns) {
126135
String field = record.first(fieldName).getString();
127136
if (field == null) {
128137
throw new FilterException(
129-
"Can't found field for name '" + fieldName + "' to determine columns names");
138+
"Cannot find field for name '" + fieldName + "' to determine columns names"
139+
);
130140
}
131141
final List<String> columns = Arrays
132142
.stream(splitColumnValues(field))
@@ -168,7 +178,8 @@ private String[] splitColumnValues(final String value) {
168178
private TypedStruct buildStructForFields(final String[] fieldValues) {
169179
if (fieldValues.length > columnsTypesByIndex.size()) {
170180
throw new FilterException(
171-
"Error while reading delimited input row. Too large number of fields (" + fieldValues.length + ")");
181+
"Error while reading delimited input row. Too large number of fields (" + fieldValues.length + ")"
182+
);
172183
}
173184

174185
TypedStruct struct = TypedStruct.create();

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DelimitedRowFileInputFilterTest.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.HashMap;
3232
import java.util.Map;
3333

34+
import static io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig.READER_AUTO_GENERATE_COLUMN_NAME_CONFIG;
3435
import static io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig.READER_EXTRACT_COLUMN_NAME_CONFIG;
3536
import static io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig.READER_FIELD_COLUMNS_CONFIG;
3637
import static io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig.READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG;
@@ -42,12 +43,10 @@ public class DelimitedRowFileInputFilterTest {
4243

4344
private DelimitedRowFilter filter;
4445

45-
4646
private static final TypedStruct DEFAULT_STRUCT = TypedStruct.create()
4747
.put("message", "value1;2;true")
4848
.put("headers", Collections.singletonList("col1;col2;col3"));
4949

50-
5150
@Before
5251
public void setUp() {
5352
filter = new DelimitedRowFilter();
@@ -101,6 +100,32 @@ public void should_extract_repeated_columns_names_from_given_field() {
101100
Assert.assertEquals("value3", output.getString("col3"));
102101
}
103102

103+
@Test
104+
public void should_generate_column_names_given_records_with_different_size() {
105+
configs.put(READER_AUTO_GENERATE_COLUMN_NAME_CONFIG, "true");
106+
filter.configure(configs, alias -> null);
107+
108+
TypedStruct input, output;
109+
110+
input = TypedStruct.create().put("message", "value1;value2;");
111+
RecordsIterable<TypedStruct> iterable1 = filter.apply(null, input, false);
112+
Assert.assertNotNull(iterable1);
113+
Assert.assertEquals(1, iterable1.size());
114+
115+
output = iterable1.iterator().next();
116+
Assert.assertNotNull(output.schema().field("column1"));
117+
Assert.assertNotNull(output.schema().field("column2"));
118+
119+
input = TypedStruct.create().put("message", "value1;value2;value3");
120+
RecordsIterable<TypedStruct> iterable2 = filter.apply(null, input, false);
121+
Assert.assertNotNull(iterable2);
122+
Assert.assertEquals(1, iterable2.size());
123+
124+
output = iterable2.iterator().next();
125+
Assert.assertNotNull(output.schema().field("column1"));
126+
Assert.assertNotNull(output.schema().field("column2"));
127+
Assert.assertNotNull(output.schema().field("column3"));
128+
}
104129

105130
@Test(expected = DataException.class)
106131
public void should_fail_given_repeated_columns_names_and_duplicate_not_allowed() {

0 commit comments

Comments
 (0)