From 0d8f659e3d83eec1b07420439299134361fe58b2 Mon Sep 17 00:00:00 2001 From: Yashash H L <109710325+yhl25@users.noreply.github.com> Date: Fri, 11 Nov 2022 21:23:29 +0530 Subject: [PATCH] passing window information inside the context (#341) Signed-off-by: Yashash H L --- go.mod | 2 +- go.sum | 4 ++-- pkg/udf/function/uds_grpc.go | 10 +++++++++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 2a0dcc892..ccab7593d 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe github.com/nats-io/nats.go v1.19.1 - github.com/numaproj/numaflow-go v0.2.3 + github.com/numaproj/numaflow-go v0.2.4 github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.32.1 github.com/soheilhy/cmux v0.1.5 diff --git a/go.sum b/go.sum index 18403ba4a..89328de42 100644 --- a/go.sum +++ b/go.sum @@ -690,8 +690,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/numaproj/numaflow-go v0.2.3 h1:DDyLK6SehUmXUq/RwQemawPd3TfXqJLepA1fwYUrN0U= -github.com/numaproj/numaflow-go v0.2.3/go.mod h1:jwhmgurhkWs3YMzpPPl4qXf3OohdE18jc0YKM7Ux2iQ= +github.com/numaproj/numaflow-go v0.2.4 h1:a/O/NJe8sGmRt9UAwYDV2NPvdxnz7VUQ/69rjq/d4Rg= +github.com/numaproj/numaflow-go v0.2.4/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/udf/function/uds_grpc.go b/pkg/udf/function/uds_grpc.go index f1e01ee70..e5866f7db 100644 --- a/pkg/udf/function/uds_grpc.go +++ b/pkg/udf/function/uds_grpc.go @@ -19,6 +19,7 @@ package function import ( "context" "fmt" + "strconv" "sync" "time" @@ -129,7 +130,14 @@ func (u *udsGRPCBasedUDF) Reduce(ctx context.Context, partitionID *partition.ID, var result []*functionpb.Datum var err error - ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{functionsdk.DatumKey: partitionID.Key})) + // pass key and window information inside the context + mdMap := map[string]string{ + functionsdk.DatumKey: partitionID.Key, + functionsdk.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10), + functionsdk.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10), + } + + ctx = metadata.NewOutgoingContext(ctx, metadata.New(mdMap)) // invoke the reduceFn method with datumCh channel wg.Add(1)