Skip to content

Commit 5a62f03

Browse files
committed
feat(filters): add new XmlToStructFilter
1 parent 03bab9a commit 5a62f03

File tree

26 files changed

+1123
-654
lines changed

26 files changed

+1123
-654
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,5 @@
6161
<suppress checks="NPathComplexity" files="KafkaBasedLog.java"/>
6262
<!-- Classes used for configuration -->
6363
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files=".*Config.java"/>
64-
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files="SchemaUtils.java"/>
64+
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files="SchemaMerger.java"/>
6565
</suppressions>

connect-file-pulse-dataformat/pom.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-connect-file-pulse-reactor</artifactId>
7+
<groupId>io.streamthoughts</groupId>
8+
<version>2.4.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<name>Kafka Connect Source File Pulse Data Format</name>
13+
<artifactId>kafka-connect-file-pulse-dataformat</artifactId>
14+
15+
<properties>
16+
<checkstyle.config.location>${project.parent.basedir}</checkstyle.config.location>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>net.sf.saxon</groupId>
22+
<artifactId>Saxon-HE</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.streamthoughts</groupId>
26+
<artifactId>kafka-connect-file-pulse-api</artifactId>
27+
<version>${project.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.apache.kafka</groupId>
31+
<artifactId>connect-api</artifactId>
32+
</dependency>
33+
</dependencies>
34+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.xml;
20+
21+
import org.apache.kafka.common.config.AbstractConfig;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
24+
import java.util.Arrays;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
/**
30+
*
31+
*/
32+
public class XMLCommonConfig extends AbstractConfig {
33+
34+
public static final String XML_FORCE_ARRAY_ON_FIELDS_CONFIG = "xml.force.array.on.fields";
35+
private static final String XML_FORCE_ARRAY_ON_FIELDS_FORCE_DOC = "The comma-separated list of fields for which an array-type must be forced";
36+
37+
public static final String XML_PARSER_VALIDATING_ENABLED_CONFIG = "xml.parser.validating.enabled";
38+
private static final String XML_PARSER_VALIDATING_ENABLED_DOC = " Specifies that the parser will validate documents as they are parsed (default: false).";
39+
40+
public static final String XML_PARSER_NAMESPACE_AWARE_ENABLED_CONFIG = "xml.parser.namespace.aware.enabled";
41+
private static final String XML_PARSER_NAMESPACE_AWARE_ENABLED_DOC = "Specifies that the XML parser will provide support for XML namespaces (default: false).";
42+
43+
public static final String XML_EXCLUDE_EMPTY_ELEMENTS_CONFIG = "xml.exclude.empty.elements";
44+
private static final String XML_EXCLUDE_EMPTY_ELEMENTS_DOC = "Specifies that the reader should exclude element having no field (default: false).";
45+
46+
public static final String XML_DATA_TYPE_INFERENCE_ENABLED_CONFIG = "xml.data.type.inference.enabled";
47+
private static final String XML_DATA_TYPE_INFERENCE_ENABLED_DOC = "Specifies that the reader should try to infer the type of data nodes. (default: false).";
48+
49+
private final String keyPrefix;
50+
51+
/**
52+
* Creates a new {@link XMLCommonConfig} instance.
53+
*
54+
* @param originals the reader configuration.
55+
*/
56+
protected XMLCommonConfig(final String keyPrefix,
57+
final ConfigDef configDef,
58+
final Map<String, ?> originals) {
59+
super(configDef, originals);
60+
this.keyPrefix = keyPrefix;
61+
}
62+
63+
private String withKeyPrefix(final String configKey) {
64+
return keyPrefix + configKey;
65+
}
66+
67+
public boolean isValidatingEnabled() {
68+
return getBoolean(withKeyPrefix(XML_PARSER_VALIDATING_ENABLED_CONFIG));
69+
}
70+
71+
public boolean isNamespaceAwareEnabled() {
72+
return getBoolean(withKeyPrefix(XML_PARSER_NAMESPACE_AWARE_ENABLED_CONFIG));
73+
}
74+
75+
public boolean isEmptyElementExcluded() {
76+
return getBoolean(withKeyPrefix(XML_EXCLUDE_EMPTY_ELEMENTS_CONFIG));
77+
}
78+
79+
public boolean isDataTypeInferenceEnabled() {
80+
return getBoolean(withKeyPrefix(XML_DATA_TYPE_INFERENCE_ENABLED_CONFIG));
81+
}
82+
83+
public List<String> forceArrayFields() {
84+
return getList(withKeyPrefix(XML_FORCE_ARRAY_ON_FIELDS_CONFIG));
85+
}
86+
87+
public static ConfigDef buildConfigDefWith(final String group,
88+
final String keyPrefix,
89+
final ConfigDef.ConfigKey... additional) {
90+
return buildConfigDefWith(group, keyPrefix, Arrays.asList(additional));
91+
}
92+
public static ConfigDef buildConfigDefWith(final String group,
93+
final String keyPrefix,
94+
final Iterable<ConfigDef.ConfigKey> additional) {
95+
int filterGroupCounter = 0;
96+
final ConfigDef def = new ConfigDef()
97+
.define(
98+
keyPrefix + XML_FORCE_ARRAY_ON_FIELDS_CONFIG,
99+
ConfigDef.Type.LIST,
100+
Collections.emptyList(),
101+
ConfigDef.Importance.MEDIUM,
102+
XML_FORCE_ARRAY_ON_FIELDS_FORCE_DOC,
103+
group,
104+
filterGroupCounter++,
105+
ConfigDef.Width.NONE,
106+
XML_FORCE_ARRAY_ON_FIELDS_FORCE_DOC
107+
)
108+
.define(
109+
keyPrefix + XML_PARSER_VALIDATING_ENABLED_CONFIG,
110+
ConfigDef.Type.BOOLEAN,
111+
false,
112+
ConfigDef.Importance.LOW,
113+
XML_PARSER_VALIDATING_ENABLED_DOC,
114+
group,
115+
filterGroupCounter++,
116+
ConfigDef.Width.NONE,
117+
keyPrefix + XML_PARSER_VALIDATING_ENABLED_CONFIG
118+
)
119+
.define(
120+
keyPrefix + XML_PARSER_NAMESPACE_AWARE_ENABLED_CONFIG,
121+
ConfigDef.Type.BOOLEAN,
122+
false,
123+
ConfigDef.Importance.LOW,
124+
XML_PARSER_NAMESPACE_AWARE_ENABLED_DOC,
125+
group,
126+
filterGroupCounter++,
127+
ConfigDef.Width.NONE,
128+
keyPrefix + XML_PARSER_NAMESPACE_AWARE_ENABLED_CONFIG
129+
)
130+
.define(
131+
keyPrefix + XML_EXCLUDE_EMPTY_ELEMENTS_CONFIG,
132+
ConfigDef.Type.BOOLEAN,
133+
false,
134+
ConfigDef.Importance.LOW,
135+
XML_EXCLUDE_EMPTY_ELEMENTS_DOC,
136+
group,
137+
filterGroupCounter++,
138+
ConfigDef.Width.NONE,
139+
keyPrefix + XML_EXCLUDE_EMPTY_ELEMENTS_CONFIG
140+
)
141+
.define(
142+
keyPrefix + XML_DATA_TYPE_INFERENCE_ENABLED_CONFIG,
143+
ConfigDef.Type.BOOLEAN,
144+
false,
145+
ConfigDef.Importance.LOW,
146+
XML_DATA_TYPE_INFERENCE_ENABLED_DOC,
147+
group,
148+
filterGroupCounter++,
149+
ConfigDef.Width.NONE,
150+
keyPrefix + XML_EXCLUDE_EMPTY_ELEMENTS_CONFIG
151+
);
152+
153+
for (ConfigDef.ConfigKey configKey : additional) {
154+
def.define(
155+
configKey.name,
156+
configKey.type,
157+
configKey.defaultValue,
158+
configKey.validator,
159+
configKey.importance,
160+
configKey.documentation,
161+
group,
162+
filterGroupCounter++,
163+
configKey.width,
164+
configKey.displayName
165+
);
166+
}
167+
return def;
168+
}
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.xml;
20+
21+
import net.sf.saxon.lib.NamespaceConstant;
22+
import org.w3c.dom.Document;
23+
import org.xml.sax.ErrorHandler;
24+
import org.xml.sax.SAXException;
25+
26+
import javax.xml.parsers.DocumentBuilder;
27+
import javax.xml.parsers.DocumentBuilderFactory;
28+
import javax.xml.parsers.ParserConfigurationException;
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
32+
public class XMLDocumentReader {
33+
34+
static {
35+
System.setProperty(
36+
"javax.xml.xpath.XPathFactory:" + NamespaceConstant.OBJECT_MODEL_SAXON,
37+
"net.sf.saxon.xpath.XPathFactoryImpl"
38+
);
39+
}
40+
41+
private final DocumentBuilder documentBuilder;
42+
43+
/**
44+
* Creates a new {@link XMLDocumentReader} instance.
45+
*
46+
* @param isNamespaceAware Specifies that the parser produced by this code will provide support for XML namespaces.
47+
* @param isValidating Specifies that the parser produced by this code will validate documents as they are parsed.
48+
*/
49+
public XMLDocumentReader(final boolean isNamespaceAware,
50+
final boolean isValidating) {
51+
try {
52+
final DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
53+
builderFactory.setIgnoringElementContentWhitespace(true);
54+
builderFactory.setIgnoringComments(true);
55+
builderFactory.setNamespaceAware(isNamespaceAware);
56+
builderFactory.setValidating(isValidating);
57+
this.documentBuilder = builderFactory.newDocumentBuilder();
58+
} catch (ParserConfigurationException e) {
59+
throw new IllegalStateException(e);
60+
}
61+
}
62+
63+
public Document parse(final InputStream inputStream) throws IOException, SAXException {
64+
return parse(inputStream, null);
65+
}
66+
67+
public Document parse(final InputStream inputStream,
68+
final ErrorHandler errorHandler) throws IOException, SAXException {
69+
try {
70+
if (errorHandler != null) {
71+
documentBuilder.setErrorHandler(errorHandler);
72+
}
73+
return documentBuilder.parse(inputStream);
74+
} finally {
75+
documentBuilder.reset();
76+
documentBuilder.setErrorHandler(null);
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)