Skip to content

Commit

Permalink
feat: combine built-in UDTransformers for filter and eventTime assign…
Browse files Browse the repository at this point in the history
…ment (#783)

Signed-off-by: Dillen Padhiar <dillen_padhiar@intuit.com>
  • Loading branch information
dpadhiar committed Jun 13, 2023
1 parent 2392b6a commit a9204fb
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 2 deletions.
1 change: 1 addition & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4416,6 +4416,7 @@ spec:
enum:
- eventTimeExtractor
- filter
- timeExtractionFilter
type: string
required:
- name
Expand Down
1 change: 1 addition & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2913,6 +2913,7 @@ spec:
enum:
- eventTimeExtractor
- filter
- timeExtractionFilter
type: string
required:
- name
Expand Down
2 changes: 2 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6861,6 +6861,7 @@ spec:
enum:
- eventTimeExtractor
- filter
- timeExtractionFilter
type: string
required:
- name
Expand Down Expand Up @@ -11037,6 +11038,7 @@ spec:
enum:
- eventTimeExtractor
- filter
- timeExtractionFilter
type: string
required:
- name
Expand Down
2 changes: 2 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6861,6 +6861,7 @@ spec:
enum:
- eventTimeExtractor
- filter
- timeExtractionFilter
type: string
required:
- name
Expand Down Expand Up @@ -11037,6 +11038,7 @@ spec:
enum:
- eventTimeExtractor
- filter
- timeExtractionFilter
type: string
required:
- name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Time Extraction Filter

A `timeExtractionFilter` built-in transformer implements both the `eventTimeExtractor` and `filter` built-in functions. It evaluates a message on a pipeline and if valid, extracts event time from the payload of the messsage.

`filterExpr` is used to evaluate and drop invalid messages.

`eventTimeExpr` is used to compile the payload to a string representation of the event time.

`format` is used to convert the event time in string format to a `time.Time` object.

## Expression (required)

The expressions for the filter and event time extractor are implemented with `expr` and `sprig` libraries.

### Data conversion functions

These function can be accessed directly in expression. `payload` keyword represents the message object. It will be the root element to represent the message object in expression.

- `json` - Convert payload in JSON object. e.g: `json(payload)`
- `int` - Convert element/payload into `int` value. e.g: `int(json(payload).id)`
- `string` - Convert element/payload into `string` value. e.g: `string(json(payload).amount)`

### Sprig functions

`Sprig` library has 70+ functions. `sprig` prefix need to be added to access the sprig functions.

[sprig functions](http://masterminds.github.io/sprig/)

E.g:

- `sprig.trim(string(json(payload).timestamp))` # Remove spaces from either side of the value of field `timestamp`

## Format (optional)

Depending on whether a `format` is specified, the Event Time Extractor uses different approaches to convert the event time string to a `time.Time` object.

## Time Extraction Filter Spec

```yaml
spec:
vertices:
- name: in
source:
http: {}
transformer:
builtin:
name: timeExtractionFilter
kwargs:
filterExpr: int(json(payload).id) < 100
eventTimeExpr: json(payload).item[1].time
format: 2006-01-02T15:04:05Z07:00
```
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type UDTransformer struct {
}

type Transformer struct {
// +kubebuilder:validation:Enum=eventTimeExtractor;filter
// +kubebuilder:validation:Enum=eventTimeExtractor;filter;timeExtractionFilter
Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
// +optional
Args []string `json:"args,omitempty" protobuf:"bytes,2,rep,name=args"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/sources/transformer/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/numaproj/numaflow/pkg/shared/logging"
eventtime "github.com/numaproj/numaflow/pkg/sources/transformer/builtin/event_time"
"github.com/numaproj/numaflow/pkg/sources/transformer/builtin/filter"
timeextractionfilter "github.com/numaproj/numaflow/pkg/sources/transformer/builtin/time_extraction_filter"
)

type Builtin struct {
Expand Down Expand Up @@ -54,6 +55,8 @@ func (b *Builtin) executor() (functionsdk.MapTFunc, error) {
return filter.New(b.KWArgs)
case "eventTimeExtractor":
return eventtime.New(b.KWArgs)
case "timeExtractionFilter":
return timeextractionfilter.New(b.KWArgs)
default:
return nil, fmt.Errorf("unrecognized transformer %q", b.Name)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timeextractionfilter

import (
"context"
"fmt"
"time"

"github.com/araddon/dateparse"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow/pkg/shared/expr"
"github.com/numaproj/numaflow/pkg/shared/logging"
)

type expressions struct {
filterExpr string
eventTimeExpr string
format string
}

func New(args map[string]string) (functionsdk.MapTFunc, error) {

filterExpr, existing := args["filterExpr"]
if !existing {
return nil, fmt.Errorf(`missing "filterExpr"`)
}

eventTimeExpr, existing := args["eventTimeExpr"]
if !existing {
return nil, fmt.Errorf(`missing "eventTimeExpr"`)
}

var format string
if format, existing = args["format"]; !existing {
format = ""
}

e := expressions{
filterExpr: filterExpr,
eventTimeExpr: eventTimeExpr,
format: format,
}

return func(ctx context.Context, keys []string, datum functionsdk.Datum) functionsdk.MessageTs {
log := logging.FromContext(ctx)
resultMsg, err := e.apply(datum.EventTime(), datum.Value())
if err != nil {
log.Errorf("Filter or event time extractor got an error: %v", err)
}
return functionsdk.MessageTsBuilder().Append(resultMsg)
}, nil

}

func (e expressions) apply(et time.Time, payload []byte) (functionsdk.MessageT, error) {
result, err := expr.EvalBool(e.filterExpr, payload)
if err != nil {
return functionsdk.MessageTToDrop(), err
}
if result {
timeStr, err := expr.EvalStr(e.eventTimeExpr, payload)
if err != nil {
return functionsdk.NewMessageT(payload, et), err
}
var newEventTime time.Time
time.Local, _ = time.LoadLocation("UTC")
if e.format != "" {
newEventTime, err = time.Parse(e.format, timeStr)
} else {
newEventTime, err = dateparse.ParseStrict(timeStr)
}
if err != nil {
return functionsdk.NewMessageT(payload, et), err
} else {
return functionsdk.NewMessageT(payload, newEventTime), nil
}
}
return functionsdk.MessageTToDrop(), nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timeextractionfilter

import (
"context"
"testing"
"time"

functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/stretchr/testify/assert"
)

type testDatum struct {
value []byte
eventTime time.Time
watermark time.Time
metadata testDatumMetadata
}

func (h *testDatum) Metadata() functionsdk.DatumMetadata {
return h.metadata
}

func (h *testDatum) Value() []byte {
return h.value
}

func (h *testDatum) EventTime() time.Time {
return h.eventTime
}

func (h *testDatum) Watermark() time.Time {
return h.watermark
}

type testDatumMetadata struct {
id string
numDelivered uint64
}

func (t testDatumMetadata) ID() string {
return t.id
}

func (t testDatumMetadata) NumDelivered() uint64 {
return t.numDelivered
}

var (
testJsonMsg = `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
)

func TestFilterEventTime(t *testing.T) {

t.Run("Missing both expressions, return error", func(t *testing.T) {
_, err := New(map[string]string{})
assert.Error(t, err)
assert.Contains(t, err.Error(), "missing \"filterExpr\"")
})

t.Run("Missing eventTime expr, return error", func(t *testing.T) {
_, err := New(map[string]string{"filterExpr": ""})
assert.Error(t, err)
assert.Contains(t, err.Error(), "missing \"eventTimeExpr\"")
})

t.Run("Missing filter expr, return error", func(t *testing.T) {
_, err := New(map[string]string{"eventTimeExpr": ""})
assert.Error(t, err)
assert.Contains(t, err.Error(), "missing \"filterExpr\"")
})

t.Run("Valid JSON expression for filter and eventTimeExtractor", func(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "format": time.RFC3339})
assert.NoError(t, err)

result := handle(context.Background(), []string{"test-key"}, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
})

// check that messsage has not changed
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))

// check that event time has changed
time.Local, _ = time.LoadLocation("UTC")
expected, _ := time.Parse(time.RFC3339, "2021-02-18T21:54:42.123Z")
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
})

t.Run("Invalid JSON expression for filter", func(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 3", "eventTimeExpr": "json(payload).item[1].time", "format": time.RFC3339})
assert.NoError(t, err)

result := handle(context.Background(), []string{"test-key"}, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
})

assert.Equal(t, "", string(result.Items()[0].Value()))
})

t.Run("Valid JSON expression for filter, incorrect format to eventTime", func(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "format": time.ANSIC})
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
result := handle(context.Background(), []string{"test-key"}, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
})

assert.Equal(t, testInputEventTime, result.Items()[0].EventTime())
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
})

}
19 changes: 19 additions & 0 deletions test/e2e-suite-1/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,25 @@ func (s *FunctionalSuite) TestSourceFiltering() {
w.Expect().SinkNotContains("out", expect2)
}

func (s *FunctionalSuite) TestTimeExtractionFilter() {
w := s.Given().Pipeline("@testdata/time-extraction-filter.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "time-extraction-filter"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

testMsgOne := `{"id": 80, "msg": "hello", "time": "2021-01-18T21:54:42.123Z", "desc": "A good ID."}`
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgOne)))
w.Expect().VertexPodLogContains("out", fmt.Sprintf("EventTime - %d", time.Date(2021, 1, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli()), PodLogCheckOptionWithCount(1), PodLogCheckOptionWithContainer("numa"))

testMsgTwo := `{"id": 101, "msg": "test", "time": "2021-01-18T21:54:42.123Z", "desc": "A bad ID."}`
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgTwo)))
w.Expect().SinkNotContains("out", testMsgTwo)
}

func (s *FunctionalSuite) TestBuiltinEventTimeExtractor() {
w := s.Given().Pipeline("@testdata/extract-event-time-from-payload.yaml").
When().
Expand Down
Loading

0 comments on commit a9204fb

Please sign in to comment.