Skip to content

Commit 7fea775

Browse files
committed
feat(dataformat): add config to specify a prefix used to prepend XML attributes (#176)
Resolves: #176
1 parent 4fc2cb9 commit 7fea775

File tree

13 files changed

+121
-22
lines changed

13 files changed

+121
-22
lines changed

connect-file-pulse-dataformat/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,11 @@
3030
<groupId>org.apache.kafka</groupId>
3131
<artifactId>connect-api</artifactId>
3232
</dependency>
33+
<dependency>
34+
<groupId>junit</groupId>
35+
<artifactId>junit</artifactId>
36+
<version>${junit.version}</version>
37+
<scope>test</scope>
38+
</dependency>
3339
</dependencies>
3440
</project>

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLCommonConfig.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ public class XMLCommonConfig extends AbstractConfig {
5252
private static final String XML_EXCLUDE_NODE_ATTRIBUTES_IN_NAMESPACES_DOC = "Specifies that the reader should only exclude node attributes in the defined list of namespaces.";
5353

5454
public static final String XML_DATA_TYPE_INFERENCE_ENABLED_CONFIG = "xml.data.type.inference.enabled";
55-
private static final String XML_DATA_TYPE_INFERENCE_ENABLED_DOC = "Specifies that the reader should try to infer the type of data nodes. (default: false).";
55+
private static final String XML_DATA_TYPE_INFERENCE_ENABLED_DOC = "Specifies that the reader should try to infer the type of data nodes (default: false).";
56+
57+
public static final String XML_ATTRIBUTE_PREFIX_CONFIG = "xml.attribute.prefix";
58+
public static final String XML_ATTRIBUTE_PREFIX_DOC = "If set, the name of attributes will be prepended with the specified prefix when they are added to a record (default: '').";
5659

5760
private final String keyPrefix;
5861

@@ -92,6 +95,10 @@ public Set<String> getExcludeNodeAttributesInNamespaces() {
9295
return new HashSet<>(getList(withKeyPrefix(XML_EXCLUDE_NODE_ATTRIBUTES_IN_NAMESPACES_CONFIG)));
9396
}
9497

98+
public String getAttributePrefix() {
99+
return getString(withKeyPrefix(XML_ATTRIBUTE_PREFIX_CONFIG));
100+
}
101+
95102
public boolean isDataTypeInferenceEnabled() {
96103
return getBoolean(withKeyPrefix(XML_DATA_TYPE_INFERENCE_ENABLED_CONFIG));
97104
}
@@ -186,6 +193,17 @@ public static ConfigDef buildConfigDefWith(final String group,
186193
filterGroupCounter++,
187194
ConfigDef.Width.NONE,
188195
keyPrefix + XML_EXCLUDE_EMPTY_ELEMENTS_CONFIG
196+
)
197+
.define(
198+
keyPrefix + XML_ATTRIBUTE_PREFIX_CONFIG,
199+
ConfigDef.Type.STRING,
200+
"",
201+
ConfigDef.Importance.LOW,
202+
XML_ATTRIBUTE_PREFIX_DOC,
203+
group,
204+
filterGroupCounter++,
205+
ConfigDef.Width.NONE,
206+
keyPrefix + XML_ATTRIBUTE_PREFIX_CONFIG
189207
);
190208

191209
for (ConfigDef.ConfigKey configKey : additional) {

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLNodeToStructConverter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public final class XMLNodeToStructConverter implements Function<Node, TypedStruc
6868

6969
private Set<String> excludeAttributesInNamespaces = Collections.emptySet();
7070

71+
private String attributePrefix = "";
72+
7173
private FieldPaths forceArrayFields = FieldPaths.empty();
7274

7375
public XMLNodeToStructConverter setExcludeEmptyElement(boolean excludeEmptyElement) {
@@ -85,6 +87,11 @@ public XMLNodeToStructConverter setExcludeAttributesInNamespaces(final Set<Strin
8587
return this;
8688
}
8789

90+
public XMLNodeToStructConverter setAttributePrefix(final String attributePrefix) {
91+
this.attributePrefix = attributePrefix;
92+
return this;
93+
}
94+
8895
public XMLNodeToStructConverter setForceArrayFields(final FieldPaths forceArrayFields) {
8996
this.forceArrayFields = forceArrayFields;
9097
return this;
@@ -193,7 +200,7 @@ private Map<String, String> getNotExcludedNodeAttributes(final Node node) {
193200
if (!excludeAttributesInNamespaces.contains(attr.getNamespaceURI())) {
194201
String attrName = determineNodeName(attr);
195202
if (isNotXmlNamespace(attr)) {
196-
attributes.put(attrName, attr.getNodeValue());
203+
attributes.put(attributePrefix + attrName, attr.getNodeValue());
197204
}
198205
}
199206
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.xml;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
import org.w3c.dom.Document;
25+
import org.xml.sax.SAXException;
26+
27+
import java.io.ByteArrayInputStream;
28+
import java.io.IOException;
29+
30+
public class XMLNodeToStructConverterTest {
31+
32+
private final XMLDocumentReader reader = new XMLDocumentReader(true, true);
33+
34+
@Test
35+
public void should_prefix_node_attributes() throws IOException, SAXException {
36+
37+
// Given
38+
final XMLNodeToStructConverter converter = new XMLNodeToStructConverter()
39+
.setAttributePrefix("prefix_");
40+
41+
// When
42+
final Document document = reader.parse(new ByteArrayInputStream("<root attr=\"text\">test</root>".getBytes()));
43+
final TypedStruct result = converter.apply(document);
44+
45+
// Then
46+
final String path = "root.prefix_attr";
47+
Assert.assertTrue(result.exists(path));
48+
Assert.assertEquals("text", result.find(path).getString());
49+
}
50+
51+
@Test
52+
public void should_not_prefix_node_attributes() throws IOException, SAXException {
53+
// Given
54+
final XMLNodeToStructConverter converter = new XMLNodeToStructConverter();
55+
56+
// When
57+
final Document document = reader.parse(new ByteArrayInputStream("<root attr=\"text\">test</root>".getBytes()));
58+
final TypedStruct result = converter.apply(document);
59+
60+
// Then
61+
final String path = "root.attr";
62+
Assert.assertTrue(result.exists(path));
63+
Assert.assertEquals("text", result.find(path).getString());
64+
}
65+
}

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3XMLFileInputReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
22-
import io.streamthoughts.kafka.connect.filepulse.xml.XMLFileInputIteratorFactory;
23-
import io.streamthoughts.kafka.connect.filepulse.xml.XMLFileInputReaderConfig;
22+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIteratorFactory;
23+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputReaderConfig;
2424
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
2525
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
2626

connect-file-pulse-filesystems/filepulse-azure-storage-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AzureBlobStorageXMLFileInputReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
22-
import io.streamthoughts.kafka.connect.filepulse.xml.XMLFileInputIteratorFactory;
23-
import io.streamthoughts.kafka.connect.filepulse.xml.XMLFileInputReaderConfig;
22+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIteratorFactory;
23+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputReaderConfig;
2424
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
2525
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
2626

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package io.streamthoughts.kafka.connect.filepulse.xml;
19+
package io.streamthoughts.kafka.connect.filepulse.fs.reader.xml;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.data.FieldPaths;
2222
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
@@ -29,6 +29,8 @@
2929
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
3030
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
3131
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
32+
import io.streamthoughts.kafka.connect.filepulse.xml.XMLDocumentReader;
33+
import io.streamthoughts.kafka.connect.filepulse.xml.XMLNodeToStructConverter;
3234
import net.sf.saxon.lib.NamespaceConstant;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
@@ -85,12 +87,8 @@ public XMLFileInputIterator(final XMLFileInputReaderConfig config,
8587
.setExcludeAllAttributes(config.isNodeAttributesExcluded())
8688
.setExcludeAttributesInNamespaces(config.getExcludeNodeAttributesInNamespaces())
8789
.setForceArrayFields(FieldPaths.from(config.forceArrayFields()))
88-
.setTypeInferenceEnabled(config.isDataTypeInferenceEnabled());
89-
90-
System.setProperty(
91-
"javax.xml.xpath.XPathFactory:" + NamespaceConstant.OBJECT_MODEL_SAXON,
92-
"net.sf.saxon.xpath.XPathFactoryImpl"
93-
);
90+
.setTypeInferenceEnabled(config.isDataTypeInferenceEnabled())
91+
.setAttributePrefix(config.getAttributePrefix());
9492

9593
final QName qName = new QName("http://www.w3.org/1999/XSL/Transform", config.resultType());
9694

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package io.streamthoughts.kafka.connect.filepulse.xml;
19+
package io.streamthoughts.kafka.connect.filepulse.fs.reader.xml;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2222
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIteratorFactory;
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package io.streamthoughts.kafka.connect.filepulse.xml;
19+
package io.streamthoughts.kafka.connect.filepulse.fs.reader.xml;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.xml.XMLCommonConfig;
2122
import org.apache.kafka.common.config.ConfigDef;
2223

2324
import javax.xml.xpath.XPathConstants;
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package io.streamthoughts.kafka.connect.filepulse.xml;
19+
package io.streamthoughts.kafka.connect.filepulse.fs.reader.xml;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.data.Type;
2222
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2323
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
2424
import io.streamthoughts.kafka.connect.filepulse.fs.reader.IteratorManager;
25+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIterator;
26+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputReaderConfig;
2527
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
2628
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
2729
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
2830
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
31+
import io.streamthoughts.kafka.connect.filepulse.xml.XMLCommonConfig;
2932
import org.junit.Assert;
3033
import org.junit.Rule;
3134
import org.junit.Test;
@@ -42,7 +45,7 @@
4245
import java.util.HashMap;
4346
import java.util.List;
4447

45-
import static io.streamthoughts.kafka.connect.filepulse.xml.XMLFileInputReaderConfig.withKeyPrefix;
48+
import static io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputReaderConfig.withKeyPrefix;
4649

4750
public class XMLFileInputIteratorTest {
4851

0 commit comments

Comments
 (0)