Skip to content

Commit 4fe77fd

Browse files
committed
feat(api): add access to error stacktrace in filter chain
1 parent b9c0a40 commit 4fe77fd

File tree

3 files changed

+79
-23
lines changed

3 files changed

+79
-23
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public List<FileRecord<TypedStruct>> apply(final FilterContext context,
195195

196196
if (filter.onFailure() != null) {
197197
final FilterContext errorContext = FilterContextBuilder.newBuilder(context)
198-
.withException(new FilterError(e.getLocalizedMessage(), filter.label()))
198+
.withError(FilterError.of(e, filter.label()))
199199
.build();
200200
filtered.addAll(filter.onFailure().apply(errorContext, record, hasNext));
201201
} else {

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/FilterContextBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*/
3131
public class FilterContextBuilder {
3232

33-
private FilterError exception;
33+
private FilterError error;
3434
private FileObjectMeta metadata;
3535
private FileRecordOffset offset;
3636
private String topic;
@@ -65,7 +65,7 @@ static FilterContextBuilder newBuilder(final FilterContext original) {
6565
.withOffset(original.offset())
6666
.withVariables(original.variables())
6767
.withHeaders(original.headers())
68-
.withException(original.error());
68+
.withError(original.error());
6969
}
7070

7171
FilterContextBuilder withVariables(final Map<String, Object> variables) {
@@ -108,8 +108,8 @@ FilterContextBuilder withOffset(final FileRecordOffset offset) {
108108
return this;
109109
}
110110

111-
FilterContextBuilder withException(final FilterError exception) {
112-
this.exception = exception;
111+
FilterContextBuilder withError(final FilterError exception) {
112+
this.error = exception;
113113
return this;
114114
}
115115

@@ -125,7 +125,7 @@ public FilterContext build() {
125125
timestamp,
126126
key,
127127
headers,
128-
exception,
128+
error,
129129
variables);
130130
}
131131
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/FilterError.java

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,96 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.filter;
2020

21+
import java.io.PrintWriter;
22+
import java.io.StringWriter;
2123
import java.util.Objects;
2224

2325
public class FilterError {
2426

25-
private final String message;
27+
private final String exceptionMessage;
28+
29+
private final String exceptionClassName;
30+
31+
private final String exceptionStacktrace;
2632

2733
private final String filter;
2834

2935
/**
30-
* Creates a new {@link FilterError} instance.
31-
*
32-
* @param error the error message message.
33-
* @param filter the filter name that failed.
36+
* Helper method to create a new {@link FilterError} object from a given {@link Throwable}.
3437
*/
35-
FilterError(final String error, final String filter) {
36-
Objects.requireNonNull(error, "The message message");
37-
Objects.requireNonNull(filter, "The filter message");
38-
this.message = error;
39-
this.filter = filter;
38+
public static FilterError of(final Throwable throwable, final String filter) {
39+
final Throwable cause;
40+
if (throwable instanceof FilterException && throwable.getCause() != null) {
41+
cause = throwable.getCause();
42+
} else {
43+
cause = throwable;
44+
}
45+
return new FilterError(cause.getMessage(), cause.getClass().getName(), getStacktrace(cause), filter);
4046
}
4147

42-
public String message() {
43-
return message;
48+
/**
49+
* Creates a new {@link FilterError} instance.
50+
*
51+
* @param message the exception message.
52+
* @param classname the exception className.
53+
* @param stacktrace the exception stacktrace.
54+
* @param filter the failed filter name.
55+
*/
56+
public FilterError(final String message,
57+
final String classname,
58+
final String stacktrace,
59+
final String filter) {
60+
this.exceptionMessage = Objects.requireNonNull(message, "'message' should not be null");
61+
this.exceptionClassName = Objects.requireNonNull(classname, "'classname' should not be null");
62+
this.exceptionStacktrace = Objects.requireNonNull(stacktrace, "'stacktrace' should not be null");
63+
this.filter = Objects.requireNonNull(filter, "'filter' should not be null");
4464
}
4565

4666
public String filter() {
4767
return filter;
4868
}
4969

70+
public String exceptionMessage() {
71+
return exceptionMessage;
72+
}
73+
74+
public String exceptionStacktrace() {
75+
return exceptionStacktrace;
76+
}
77+
78+
public String getExceptionClassName() {
79+
return exceptionClassName;
80+
}
81+
82+
/**
83+
* {@inheritDoc}
84+
*/
85+
@Override
86+
public boolean equals(Object o) {
87+
if (this == o) return true;
88+
if (o == null || getClass() != o.getClass()) return false;
89+
FilterError that = (FilterError) o;
90+
return Objects.equals(exceptionMessage, that.exceptionMessage) &&
91+
Objects.equals(exceptionClassName, that.exceptionClassName) &&
92+
Objects.equals(exceptionStacktrace, that.exceptionStacktrace) &&
93+
Objects.equals(filter, that.filter);
94+
}
95+
96+
/**
97+
* {@inheritDoc}
98+
*/
5099
@Override
51-
public String toString() {
52-
return "[" +
53-
"message=" + message +
54-
", filter=" + filter +
55-
']';
100+
public int hashCode() {
101+
return Objects.hash(exceptionMessage, exceptionClassName, exceptionStacktrace, filter);
102+
}
103+
104+
/**
105+
* @return the stacktrace representation for the given {@link Throwable}.
106+
*/
107+
private static String getStacktrace(final Throwable throwable) {
108+
final StringWriter sw = new StringWriter();
109+
final PrintWriter pw = new PrintWriter(sw, true);
110+
throwable.printStackTrace(pw);
111+
return sw.getBuffer().toString();
56112
}
57113
}

0 commit comments

Comments
 (0)