Skip to content

Commit 760d98b

Browse files
committed
fix(filters): XmlToJson should support bytes input
1 parent 7003141 commit 760d98b

File tree

3 files changed

+73
-11
lines changed

3 files changed

+73
-11
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/XmlToJsonFilterConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
import org.apache.kafka.common.config.ConfigDef;
2222

23+
import java.nio.charset.Charset;
24+
import java.nio.charset.StandardCharsets;
2325
import java.util.Map;
26+
import java.util.Optional;
2427

2528
public class XmlToJsonFilterConfig extends CommonFilterConfig {
2629

@@ -36,6 +39,9 @@ public class XmlToJsonFilterConfig extends CommonFilterConfig {
3639
public static final String XML_PARSER_CDATA_TAG_NAME_DOC = "The name of the key in a JSON Object that indicates " +
3740
"a CDATA section (default: '" + XML_PARSER_CDATA_TAG_NAME_DEFAULT + "').";
3841

42+
public static final String XML_PARSER_SOURCE_CHARSET_CONFIG = "source.charset";
43+
public static final String XML_PARSER_SOURCE_CHARSET_DOC = "The charset to be used for reading the source " +
44+
" field (default: UTF-8)";
3945

4046
/**
4147
* Creates a new {@link XmlToJsonFilterConfig} instance.
@@ -72,6 +78,17 @@ public static ConfigDef configDef() {
7278
filterGroupCounter++,
7379
ConfigDef.Width.NONE,
7480
XML_PARSER_CDATA_TAG_NAME_CONFIG
81+
)
82+
.define(
83+
XML_PARSER_SOURCE_CHARSET_CONFIG,
84+
ConfigDef.Type.STRING,
85+
null,
86+
ConfigDef.Importance.MEDIUM,
87+
XML_PARSER_SOURCE_CHARSET_DOC,
88+
FILTER,
89+
filterGroupCounter++,
90+
ConfigDef.Width.NONE,
91+
XML_PARSER_SOURCE_CHARSET_CONFIG
7592
);
7693
}
7794

@@ -82,4 +99,10 @@ public boolean getXmlParserKeepStrings() {
8299
public String getCDataTagName() {
83100
return getString(XML_PARSER_CDATA_TAG_NAME_CONFIG);
84101
}
102+
103+
public Charset charset() {
104+
return Optional.ofNullable(getString(XML_PARSER_SOURCE_CHARSET_CONFIG))
105+
.map(Charset::forName)
106+
.orElse(StandardCharsets.UTF_8);
107+
}
85108
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/XmlToJsonFilter.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,20 @@
2929
import org.json.XML;
3030
import org.json.XMLParserConfiguration;
3131

32+
import java.io.ByteArrayInputStream;
33+
import java.io.InputStreamReader;
34+
import java.io.Reader;
35+
import java.io.StringReader;
3236
import java.util.Collections;
3337
import java.util.Map;
3438
import java.util.Set;
3539

36-
public class XmlToJsonFilter extends AbstractMergeRecordFilter<XmlToJsonFilter> {
40+
public class XmlToJsonFilter extends AbstractMergeRecordFilter<XmlToJsonFilter> {
3741

3842
private XmlToJsonFilterConfig config;
3943

4044
private XMLParserConfiguration xmlParserConfiguration;
45+
4146
/**
4247
* {@inheritDoc}
4348
*/
@@ -55,8 +60,8 @@ public void configure(final Map<String, ?> props) {
5560
config = new XmlToJsonFilterConfig(props);
5661

5762
xmlParserConfiguration = new XMLParserConfiguration()
58-
.withKeepStrings(config.getXmlParserKeepStrings())
59-
.withcDataTagName(config.getCDataTagName());
63+
.withKeepStrings(config.getXmlParserKeepStrings())
64+
.withcDataTagName(config.getCDataTagName());
6065
}
6166

6267
/**
@@ -65,14 +70,33 @@ public void configure(final Map<String, ?> props) {
6570
@Override
6671
protected RecordsIterable<TypedStruct> apply(final FilterContext context,
6772
final TypedStruct record) throws FilterException {
68-
try {
69-
final String payload = checkIsNotNull(record.get(config.source())).getString();
7073

71-
if (StringUtils.isBlank(payload)) {
72-
return RecordsIterable.empty();
73-
}
74+
final TypedValue value = checkIsNotNull(record.get(config.source()));
75+
switch (value.type()) {
76+
case STRING:
77+
final String xml = value.getString();
78+
if (StringUtils.isBlank(value.getString())) {
79+
return RecordsIterable.empty();
80+
}
81+
return xmlToJson(new StringReader(xml));
82+
case BYTES:
83+
byte[] bytes = value.getBytes();
84+
if (bytes.length == 0) {
85+
return RecordsIterable.empty();
86+
}
87+
return xmlToJson(new InputStreamReader(new ByteArrayInputStream(bytes), config.charset()));
88+
default:
89+
throw new FilterException(
90+
"Invalid field '" + config.source() + "' was passed through the " +
91+
"connector's configuration'. " +
92+
"Cannot parse field of type '" + value.type() + "' to XML."
93+
);
94+
}
95+
}
7496

75-
final JSONObject xmlJSONObj = XML.toJSONObject(payload, xmlParserConfiguration);
97+
private RecordsIterable<TypedStruct> xmlToJson(final Reader xml) {
98+
try {
99+
final JSONObject xmlJSONObj = XML.toJSONObject(xml, xmlParserConfiguration);
76100
final String jsonString = xmlJSONObj.toString(0);
77101
return RecordsIterable.of(TypedStruct.create().put(config.source(), jsonString));
78102
} catch (JSONException e) {
@@ -92,7 +116,7 @@ private TypedValue checkIsNotNull(final TypedValue value) {
92116
if (value.isNull()) {
93117
throw new FilterException(
94118
"Invalid field '" + config.source() + "' was passed through the connector's configuration'. " +
95-
"Cannot parse null or empty value to XML."
119+
"Cannot parse null or empty value to XML."
96120
);
97121
}
98122
return value;

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/XmlToJsonFilterTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.junit.Assert;
2323
import org.junit.Test;
2424

25+
import java.nio.charset.StandardCharsets;
2526
import java.util.Collections;
2627

2728
public class XmlToJsonFilterTest {
@@ -37,7 +38,7 @@ public class XmlToJsonFilterTest {
3738
"</root>";
3839

3940
@Test
40-
public void should_success_to_convert_xml_to_json() {
41+
public void should_success_to_convert_xml_to_json_given_input_string() {
4142
final XmlToJsonFilter filter = new XmlToJsonFilter();
4243
filter.configure(Collections.emptyMap());
4344
final TypedStruct input = TypedStruct.create().put("message", XML);
@@ -49,4 +50,18 @@ public void should_success_to_convert_xml_to_json() {
4950
output.getString("message")
5051
);
5152
}
53+
54+
@Test
55+
public void should_success_to_convert_xml_to_json_given_input_bytes() {
56+
final XmlToJsonFilter filter = new XmlToJsonFilter();
57+
filter.configure(Collections.emptyMap());
58+
final TypedStruct input = TypedStruct.create().put("message", XML.getBytes(StandardCharsets.UTF_8));
59+
final TypedStruct output = filter.apply(null, input).iterator().next();
60+
61+
Assert.assertNotNull(output);
62+
Assert.assertEquals(
63+
"{\"root\":{\"element1\":{\"attr\":\"foo\",\"value\":\"bar\"},\"element2\":\"value\",\"element3\":\"value\",\"element4\":[\"value1\",\"value2\"],\"element5\":\"\"}}",
64+
output.getString("message")
65+
);
66+
}
5267
}

0 commit comments

Comments
 (0)