Skip to content

Commit f3179a7

Browse files
committed
refactor(expression): refactor expression function api
1 parent bf3fc31 commit f3179a7

38 files changed

+865
-862
lines changed

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/accessor/MapAdaptablePropertyAccessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private Method findGetterByKeyMethodForProperty(final Class target) {
123123

124124

125125
private Method findMethodForProperty(final Class target, final Predicate<Method> predicate) {
126-
Optional<Method> optional = Arrays.stream(target.getDeclaredMethods())
126+
Optional<Method> optional = Arrays.stream(target.getMethods())
127127
.filter(predicate)
128128
.findAny();
129129
return optional.orElse(null);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.expression.function;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
22+
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
23+
import io.streamthoughts.kafka.connect.filepulse.expression.ExpressionException;
24+
25+
public abstract class AbstractTransformExpressionFunction implements ExpressionFunction {
26+
27+
private static final String FIELD_ARG = "field_expr";
28+
29+
public abstract TypedValue transform(final TypedValue value);
30+
31+
/**
32+
* {@inheritDoc}
33+
*/
34+
@Override
35+
public Instance get() {
36+
return new Instance() {
37+
38+
/**
39+
* {@inheritDoc}
40+
*/
41+
@Override
42+
public Arguments prepare(final Expression[] args) {
43+
if (args.length < 1) {
44+
throw new ExpressionException(String.format(
45+
"Missing required arguments: %s(<%s>)", name(), FIELD_ARG)
46+
);
47+
}
48+
49+
return Arguments.of(FIELD_ARG, args[0]);
50+
}
51+
52+
/**
53+
* {@inheritDoc}
54+
*/
55+
@Override
56+
public TypedValue invoke(final ExecutionContext context) throws ExpressionException {
57+
return transform(context.get(0));
58+
}
59+
};
60+
}
61+
}

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/Argument.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,22 @@
1919

2020
package io.streamthoughts.kafka.connect.filepulse.expression.function;
2121

22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
2223
import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;
2324

24-
import java.util.List;
25-
2625
public interface Argument {
2726

27+
/**
28+
* @return the name of this argument.
29+
*/
2830
String name();
2931

32+
/**
33+
* @return the value of this argument.
34+
*/
3035
Object value();
3136

32-
List<String> errorMessages();
33-
34-
boolean isValid();
35-
36-
Object evaluate(final EvaluationContext context) ;
37+
default TypedValue evaluate(final EvaluationContext context) {
38+
throw new UnsupportedOperationException();
39+
}
3740
}

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/Arguments.java

Lines changed: 38 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.expression.function;
2020

21-
import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;
21+
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
2222

2323
import java.util.Arrays;
2424
import java.util.Collections;
@@ -27,19 +27,39 @@
2727
import java.util.List;
2828
import java.util.Objects;
2929
import java.util.Optional;
30-
import java.util.stream.StreamSupport;
3130

32-
public class Arguments<T extends Argument> implements Iterable<T> {
31+
public class Arguments implements Iterable<Argument> {
3332

34-
@SafeVarargs
35-
public static <T extends Argument> Arguments<T> of(final T... arguments) {
36-
return new Arguments<>(Arrays.asList(arguments));
33+
public static Arguments of(final Argument... arguments) {
34+
return new Arguments(Arrays.asList(arguments));
3735
}
3836

39-
public static <T extends Argument> Arguments<T> empty() {
40-
return new Arguments<T>() {
37+
public static Arguments of(final String name1, final Expression expression1) {
38+
return of(new ExpressionArgument(name1, expression1));
39+
}
40+
41+
public static Arguments of(final String name1, final Expression expression1,
42+
final String name2, final Expression expression2) {
43+
return of(
44+
new ExpressionArgument(name1, expression1),
45+
new ExpressionArgument(name2, expression2)
46+
);
47+
}
48+
49+
public static Arguments of(final String name1, final Expression expression1,
50+
final String name2, final Expression expression2,
51+
final String name3, final Expression expression3) {
52+
return of(
53+
new ExpressionArgument(name1, expression1),
54+
new ExpressionArgument(name2, expression2),
55+
new ExpressionArgument(name3, expression3)
56+
);
57+
}
58+
59+
public static Arguments empty() {
60+
return new Arguments() {
4161
@Override
42-
public Iterator<T> iterator() {
62+
public Iterator<Argument> iterator() {
4363
return Collections.emptyIterator();
4464
}
4565

@@ -50,7 +70,7 @@ public String toString() {
5070
};
5171
}
5272

53-
private final List<T> arguments;
73+
private final List<Argument> arguments;
5474

5575
/**
5676
* Creates a new {@link Arguments} instance.
@@ -64,7 +84,7 @@ public Arguments() {
6484
*
6585
* @param argument the single argument.
6686
*/
67-
public Arguments(final T argument) {
87+
public Arguments(final Argument argument) {
6888
this(Collections.singletonList(argument));
6989
}
7090

@@ -73,11 +93,11 @@ public Arguments(final T argument) {
7393
* @param arguments the list of arguments.
7494
*
7595
*/
76-
public Arguments(final List<T> arguments) {
96+
public Arguments(final List<Argument> arguments) {
7797
this.arguments = arguments;
7898
}
7999

80-
private Arguments<T> add(final T argument) {
100+
private Arguments add(final Argument argument) {
81101
arguments.add(argument);
82102
return this;
83103
}
@@ -90,11 +110,11 @@ private Arguments<T> add(final T argument) {
90110
* @throws IndexOutOfBoundsException if the index is out of range
91111
* ({@code index < 0 || index >= size()})
92112
*/
93-
public T get(final int index) {
113+
public Argument get(final int index) {
94114
return arguments.get(index);
95115
}
96116

97-
public List<T> get(final int index, final int to) {
117+
public List<Argument> get(final int index, final int to) {
98118
return arguments.subList(index, to);
99119
}
100120

@@ -103,59 +123,22 @@ public int size() {
103123
}
104124

105125
@SuppressWarnings("unchecked")
106-
public <V> V valueOf(final String name) {
126+
public <V> Optional<V> valueOf(final String name) {
107127
Objects.requireNonNull(name, "name cannot be null");
108128
Optional<Object> value = arguments
109129
.stream()
110130
.filter(a -> a.name().equals(name))
111131
.findFirst()
112132
.map(Argument::value);
113-
if (value.isPresent()) return (V) value.get();
114133

115-
throw new IllegalArgumentException("No argument with name '" + name + "'");
116-
}
117-
118-
Arguments<GenericArgument> evaluate(final EvaluationContext context) {
119-
Arguments<GenericArgument> evaluated = new Arguments<>();
120-
for (T arg : arguments) {
121-
Object value = arg.evaluate(context);
122-
evaluated.add(new GenericArgument<>(arg.name(), value));
123-
}
124-
return evaluated;
125-
}
126-
127-
public boolean valid() {
128-
return StreamSupport
129-
.stream(this.spliterator(), true)
130-
.allMatch(Argument::isValid);
131-
}
132-
133-
String buildErrorMessage() {
134-
final StringBuilder errors = new StringBuilder();
135-
for (T value : arguments) {
136-
if (!value.errorMessages().isEmpty()) {
137-
List<String> errorMessages = value.errorMessages();
138-
for (String error : errorMessages) {
139-
errors
140-
.append("\n\t")
141-
.append("Invalid argument with name='")
142-
.append(value.name()).append("'")
143-
.append(", value=")
144-
.append("'").append(value.value()).append("'")
145-
.append(" - ")
146-
.append(error)
147-
.append("\n\t");
148-
}
149-
}
150-
}
151-
return errors.toString();
134+
return (Optional<V>) value;
152135
}
153136

154137
/**
155138
* {@inheritDoc}
156139
*/
157140
@Override
158-
public Iterator<T> iterator() {
141+
public Iterator<Argument> iterator() {
159142
return arguments.iterator();
160143
}
161144

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.expression.function;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
22+
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
29+
public class ExecutionContext {
30+
31+
private final Map<Integer, TypedValue> argumentByIndex;
32+
private final Map<String, TypedValue> argumentByName;
33+
34+
/**
35+
* Creates a new {@link ExecutionContext} instance.
36+
*/
37+
ExecutionContext() {
38+
this.argumentByName = new HashMap<>();
39+
this.argumentByIndex = new HashMap<>();
40+
}
41+
42+
void addArgument(final String name, final int index, final TypedValue value) {
43+
this.argumentByName.put(name, value);
44+
this.argumentByIndex.put(index, value);
45+
}
46+
47+
public TypedValue get(final int index) {
48+
return Optional.ofNullable(argumentByIndex.get(index)).orElseThrow(IndexOutOfBoundsException::new);
49+
}
50+
51+
public TypedValue get(final String name) {
52+
return argumentByName.get(name);
53+
}
54+
55+
public List<TypedValue> get(final int index, final int to) {
56+
return values().subList(index, to);
57+
}
58+
59+
60+
public int size() {
61+
return argumentByName.size();
62+
}
63+
64+
public List<TypedValue> values() {
65+
return new ArrayList<>(argumentByIndex.values());
66+
}
67+
}

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionArgument.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,26 @@
2323
import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;
2424
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
2525

26-
public class ExpressionArgument extends GenericArgument<Expression> {
26+
import java.util.Objects;
27+
28+
public class ExpressionArgument extends GenericArgument {
2729

2830
/**
2931
* Creates a new {@link ExpressionArgument} instance.
32+
*
3033
* @param name the argument name.
3134
* @param expression the argument expression.
3235
*/
33-
public ExpressionArgument(final String name, final Expression expression) {
34-
super(name, expression);
36+
public ExpressionArgument(final String name,
37+
final Expression expression) {
38+
super(name, Objects.requireNonNull(expression, "'expression should not be null"));
3539
}
3640

3741
/**
3842
* {@inheritDoc}
3943
*/
4044
@Override
41-
public TypedValue evaluate(EvaluationContext context) {
42-
return value().readValue(context, TypedValue.class);
45+
public TypedValue evaluate(final EvaluationContext context) {
46+
return ((Expression)value()).readValue(context, TypedValue.class);
4347
}
4448
}

0 commit comments

Comments
 (0)