Skip to content

Commit 355b6e4

Browse files
committed
feat(expression): add UnixTimestamp expression function
1 parent 5a62f03 commit 355b6e4

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Split;
3939
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.StartsWith;
4040
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Trim;
41+
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.UnixTimestamp;
4142
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Uppercase;
4243
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Uuid;
4344
import org.slf4j.Logger;
@@ -64,7 +65,7 @@ public static ExpressionFunctionExecutor resolve(final String functionName, fina
6465
* Creates a new {@link ExpressionFunctionExecutors} instance.
6566
*/
6667
private ExpressionFunctionExecutors() {
67-
// TODO function registration is hard-coded
68+
// List of built-in expression functions to register.
6869
register(new Lowercase());
6970
register(new Uppercase());
7071
register(new Converts());
@@ -85,6 +86,7 @@ private ExpressionFunctionExecutors() {
8586
register(new Hash());
8687
register(new Md5());
8788
register(new Split());
89+
register(new UnixTimestamp());
8890
}
8991

9092
@SuppressWarnings("unchecked")
@@ -106,8 +108,9 @@ private ExpressionFunctionExecutor make(final String functionName, final Express
106108
return new ExpressionFunctionExecutor(functionName, function, prepared);
107109
}
108110

109-
private void register(final ExpressionFunction function) {
110-
LOG.info("Registered expression function '" + function.name() + "'");
111+
public void register(final ExpressionFunction function) {
112+
Objects.requireNonNull(function, "'function' should not be null");
113+
LOG.info("Registered built-in expression function '{}'", function.name() );
111114
functions.put(function.name(), function);
112115
}
113116
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.streamthoughts.kafka.connect.filepulse.expression.function.impl;
21+
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
23+
import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments;
24+
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction;
25+
import io.streamthoughts.kafka.connect.filepulse.expression.function.GenericArgument;
26+
27+
/**
28+
* Function to return the current Unix timestamp in seconds.
29+
*/
30+
public class UnixTimestamp implements ExpressionFunction {
31+
32+
/**
33+
* {@inheritDoc}
34+
*/
35+
@Override
36+
public TypedValue apply(final Arguments<GenericArgument> args) {
37+
return TypedValue.int64(System.currentTimeMillis());
38+
}
39+
}

0 commit comments

Comments
 (0)