Skip to content

Commit 4d13731

Browse files
committed
refactor(filters): cleanup classes
1 parent 1fa3e0e commit 4d13731

File tree

7 files changed

+14
-11
lines changed

7 files changed

+14
-11
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonFilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public String source() {
111111
return getString(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG);
112112
}
113113

114-
public static ConfigDef.ConfigKey withOverwrite(final String group, final int groupCounter) {
114+
public static ConfigDef.ConfigKey getOverwriteConfigKey(final String group, final int groupCounter) {
115115
return new ConfigDef.ConfigKey(
116116
CommonFilterConfig.FILTER_OVERWRITE_CONFIG,
117117
ConfigDef.Type.LIST,

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ExplodeFilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public ExplodeFilterConfig(final Map<?, ?> originals) {
3939
public static ConfigDef configDef() {
4040
int filterGroupCounter = 0;
4141
return new ConfigDef(CommonFilterConfig.configDef())
42-
.define(withOverwrite(FILTER_EXPLODE, filterGroupCounter++))
42+
.define(getOverwriteConfigKey(FILTER_EXPLODE, filterGroupCounter++))
4343
.define(getSourceConfigKey(FILTER_EXPLODE, filterGroupCounter++));
4444
}
4545
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/GrokFilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static ConfigDef configDef() {
4646
int filterGroupCounter = 0;
4747
final ConfigDef def = new ConfigDef(CommonFilterConfig.configDef())
4848
.define(getSourceConfigKey(GROK_FILTER, filterGroupCounter++))
49-
.define(withOverwrite(GROK_FILTER, filterGroupCounter++));
49+
.define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++));
5050
for (ConfigDef.ConfigKey configKey : GrokConfig.configDef().configKeys().values()) {
5151
def.define(new ConfigDef.ConfigKey(
5252
configKey.name,

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/JSONFilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Charset charset() {
7676
public static ConfigDef configDef() {
7777
int filterGroupCounter = 0;
7878
return new ConfigDef(CommonFilterConfig.configDef())
79-
.define(withOverwrite(JSON_FILTER, filterGroupCounter++))
79+
.define(getOverwriteConfigKey(JSON_FILTER, filterGroupCounter++))
8080
.define(getSourceConfigKey(JSON_FILTER, filterGroupCounter++))
8181
.define(
8282
JSON_TARGET_CONFIG,

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/MoveFilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public String source() {
5858
public static ConfigDef configDef() {
5959
int filterGroupCounter = 0;
6060
return new ConfigDef(CommonFilterConfig.configDef())
61-
.define(withOverwrite(GROUP_MOVE, filterGroupCounter++))
61+
.define(getOverwriteConfigKey(GROUP_MOVE, filterGroupCounter++))
6262
.define(getSourceConfigKey(GROUP_MOVE, filterGroupCounter++))
6363
.define(
6464
MOVE_TARGET_CONFIG,

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/XmlToJsonFilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public XmlToJsonFilterConfig(final Map<?, ?> originals) {
5555
public static ConfigDef configDef() {
5656
int filterGroupCounter = 0;
5757
return new ConfigDef(CommonFilterConfig.configDef())
58-
.define(withOverwrite(FILTER, filterGroupCounter++))
58+
.define(getOverwriteConfigKey(FILTER, filterGroupCounter++))
5959
.define(getSourceConfigKey(FILTER, filterGroupCounter++))
6060
.define(
6161
XML_PARSER_KEEP_STRINGS_CONFIG,

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractRecordFilter.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public abstract class AbstractRecordFilter<T extends AbstractRecordFilter<T>> im
3636

3737
private boolean ignoreFailure;
3838

39-
private CommonFilterConfig config;
39+
protected CommonFilterConfig config;
4040

4141
/**
4242
* {@inheritDoc}
@@ -58,12 +58,15 @@ public void configure(final Map<String, ?> configs) {
5858
@Override
5959
public void configure(final Map<String, ?> props, final RecordFilterProvider provider) {
6060
config = new CommonFilterConfig(props);
61-
condition = config.condition();
62-
ignoreFailure = config.ignoreFailure();
61+
withOnCondition(config.condition());
62+
withIgnoreFailure(config.ignoreFailure());
6363
if (!config.onFailure().isEmpty()) {
64-
failurePipeline = new DefaultRecordFilterPipeline(config.onFailure().stream()
64+
withOnFailure(
65+
new DefaultRecordFilterPipeline(config.onFailure().stream()
6566
.map(provider::getRecordForAlias)
66-
.collect(Collectors.toList()));
67+
.collect(Collectors.toList())
68+
)
69+
);
6770
}
6871
configure(props);
6972
}

0 commit comments

Comments
 (0)