diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedValue.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedValue.java index 4e5747eac..a17bc783d 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedValue.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedValue.java @@ -358,7 +358,11 @@ public boolean isEmpty() { return getArray().isEmpty(); } - throw new DataException("Cannot check empty-value on non-string, non-array and non-map type"); + if (Type.STRUCT == type) { + return getStruct().schema().fields().isEmpty(); + } + + throw new DataException("Cannot check empty-value on non-string, non-array, non-struct, and non-map type"); } public TypedValue as(final Type type) { diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/xml/XMLFileInputIterator.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/xml/XMLFileInputIterator.java index 690d7d7ad..51ab27c5d 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/xml/XMLFileInputIterator.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/xml/XMLFileInputIterator.java @@ -277,17 +277,22 @@ private TypedValue convertObjectTree(final Node node, final String childNodeName = isTextNode(child) ? nodeName : determineNodeName(child); Optional optional = readNodeObject(child, currentForceArrayFields); if (optional.isPresent()) { - TypedValue nodeValue = optional.get(); + final TypedValue nodeValue = optional.get(); + if (excludeEmptyElement && + nodeValue.type() == Type.STRUCT && + nodeValue.isEmpty()) { + LOG.debug("Empty XML element excluded: '{}'", node.getNodeName()); + continue; + } final boolean isArray = currentForceArrayFields.anyMatches(childNodeName); container = enrichStructWithObject(container, childNodeName, nodeValue, isArray); - } } return TypedValue.struct(container); } private Optional readNodeObject(final Node node, - final FieldPaths forceArrayFields) { + final FieldPaths forceArrayFields) { if (isWhitespaceOrNewLineNodeElement(node)) { return Optional.empty(); } @@ -297,13 +302,7 @@ private Optional readNodeObject(final Node node, } if (isElementNode(node)) { - - if (excludeEmptyElement && isEmptyNode(node)) { - LOG.debug("Empty XML element excluded: '{}'", node.getNodeName()); - return Optional.empty(); - } - - Optional childTextContent = peekChildNodeTextContent(node); + final Optional childTextContent = peekChildNodeTextContent(node); return childTextContent .map(s -> readTextNode(node, s)) .orElseGet(() -> Optional.of(convertObjectTree(node, forceArrayFields))); @@ -341,11 +340,6 @@ private static TypedStruct enrichStructWithObject(final TypedStruct container, return container.put(nodeName, value); } - private static boolean isEmptyNode(final Node node) { - return node.getChildNodes().getLength() == 0 && - node.getAttributes().getLength() == 0; - } - private static Optional readTextNode(final Node node, final String text) { final NamedNodeMap attributes = node.getAttributes();