Skip to content

Commit

Permalink
chore: fix newMessageT order (#702)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 authored Apr 21, 2023
1 parent 2967962 commit 15a01a6
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/user-guide/sources/transformer/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsd
if filterOut {
return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTToDrop())
} else {
return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(eventTime, data.Value()).WithKeys(keys))
return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(data.Value(), eventTime).WithKeys(keys))
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703
github.com/nats-io/nats.go v1.24.0
github.com/numaproj/numaflow-go v0.4.3
github.com/numaproj/numaflow-go v0.4.4
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
github.com/soheilhy/cmux v0.1.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.4.3 h1:jnoWG04zqk6gb+0FHsnw17INBCYkHHbWad7g+MBGRm8=
github.com/numaproj/numaflow-go v0.4.3/go.mod h1:XN05IPD7SC9Gy0OwemyFtvpo+WvZsBul48Fji2vQJP4=
github.com/numaproj/numaflow-go v0.4.4 h1:dnVIVIz65s69PIZiTZbWmQCpqdbXoxsNbgWkfdK0Qvk=
github.com/numaproj/numaflow-go v0.4.4/go.mod h1:+z3jw/cCg69hvufEvFUJiK4VDNlI5IfkYsYUh9OJr6E=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func New(args map[string]string) (functionsdk.MapTFunc, error) {

return func(ctx context.Context, keys []string, datum functionsdk.Datum) functionsdk.MessageTs {
log := logging.FromContext(ctx)
resultMsg, err := e.apply(datum.EventTime(), datum.Value())
resultMsg, err := e.apply(datum.Value(), datum.EventTime())
if err != nil {
log.Warnf("event time extractor got an error: %v, skip updating event time...", err)
}
Expand All @@ -66,10 +66,10 @@ func New(args map[string]string) (functionsdk.MapTFunc, error) {

// apply compiles the payload to extract the new event time. If there is any error during extraction,
// we pass on the original input event time. Otherwise, we assign the new event time to the message.
func (e eventTimeExtractor) apply(et time.Time, payload []byte) (functionsdk.MessageT, error) {
func (e eventTimeExtractor) apply(payload []byte, et time.Time) (functionsdk.MessageT, error) {
timeStr, err := expr.EvalStr(e.expression, payload)
if err != nil {
return functionsdk.NewMessageT(et, payload), err
return functionsdk.NewMessageT(payload, et), err
}

var newEventTime time.Time
Expand All @@ -80,8 +80,8 @@ func (e eventTimeExtractor) apply(et time.Time, payload []byte) (functionsdk.Mes
newEventTime, err = dateparse.ParseStrict(timeStr)
}
if err != nil {
return functionsdk.NewMessageT(et, payload), err
return functionsdk.NewMessageT(payload, et), err
} else {
return functionsdk.NewMessageT(newEventTime, payload), nil
return functionsdk.NewMessageT(payload, newEventTime), nil
}
}
2 changes: 1 addition & 1 deletion pkg/sources/transformer/builtin/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (f filter) apply(et time.Time, msg []byte) (functionsdk.MessageT, error) {
return functionsdk.MessageTToDrop(), err
}
if result {
return functionsdk.NewMessageT(et, msg), nil
return functionsdk.NewMessageT(msg, et), nil
}
return functionsdk.MessageTToDrop(), nil
}

0 comments on commit 15a01a6

Please sign in to comment.