Skip to content

Commit

Permalink
passing window information inside the context (#341)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Nov 11, 2022
1 parent 8951627 commit 0d8f659
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats.go v1.19.1
github.com/numaproj/numaflow-go v0.2.3
github.com/numaproj/numaflow-go v0.2.4
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
github.com/soheilhy/cmux v0.1.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.2.3 h1:DDyLK6SehUmXUq/RwQemawPd3TfXqJLepA1fwYUrN0U=
github.com/numaproj/numaflow-go v0.2.3/go.mod h1:jwhmgurhkWs3YMzpPPl4qXf3OohdE18jc0YKM7Ux2iQ=
github.com/numaproj/numaflow-go v0.2.4 h1:a/O/NJe8sGmRt9UAwYDV2NPvdxnz7VUQ/69rjq/d4Rg=
github.com/numaproj/numaflow-go v0.2.4/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
10 changes: 9 additions & 1 deletion pkg/udf/function/uds_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package function
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -129,7 +130,14 @@ func (u *udsGRPCBasedUDF) Reduce(ctx context.Context, partitionID *partition.ID,
var result []*functionpb.Datum
var err error

ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{functionsdk.DatumKey: partitionID.Key}))
// pass key and window information inside the context
mdMap := map[string]string{
functionsdk.DatumKey: partitionID.Key,
functionsdk.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10),
functionsdk.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10),
}

ctx = metadata.NewOutgoingContext(ctx, metadata.New(mdMap))

// invoke the reduceFn method with datumCh channel
wg.Add(1)
Expand Down

0 comments on commit 0d8f659

Please sign in to comment.