diff --git a/pkg/forward/forward.go b/pkg/forward/forward.go index b230d7f4b..3cd1c7e0e 100644 --- a/pkg/forward/forward.go +++ b/pkg/forward/forward.go @@ -275,6 +275,10 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { return } + // activeWatermarkBuffers records the buffers that the publisher has published + // a watermark in this batch processing cycle. + // it's used to determine which buffers should receive an idle watermark. + var activeWatermarkBuffers = make(map[string]bool) // forward the highest watermark to all the edges to avoid idle edge problem // TODO: sort and get the highest value for bufferName, offsets := range writeOffsets { @@ -283,11 +287,22 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { isdf.opts.vertexType == dfv1.VertexTypeReduceUDF { if len(offsets) > 0 { publisher.PublishWatermark(processorWM, offsets[len(offsets)-1]) + activeWatermarkBuffers[bufferName] = true } // This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer // TODO: Should also publish to those edges without writing (fall out of conditional forwarding) } else { // For Sink vertex, and it does not care about the offset during watermark publishing publisher.PublishWatermark(processorWM, nil) + activeWatermarkBuffers[bufferName] = true + } + } + } + if len(activeWatermarkBuffers) < len(isdf.publishWatermark) { + // if there's any buffers that haven't received any watermark during this + // batch processing cycle, send an idle watermark + for bufferName := range isdf.publishWatermark { + if !activeWatermarkBuffers[bufferName] { + isdf.publishWatermark[bufferName].PublishIdleWatermark() } } } diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index 686e6f910..f005429bc 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -30,15 +31,47 @@ import ( "github.com/numaproj/numaflow/pkg/shared/logging" udfapplier "github.com/numaproj/numaflow/pkg/udf/function" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/ot" + "github.com/numaproj/numaflow/pkg/watermark/processor" + "github.com/numaproj/numaflow/pkg/watermark/publish" + wmstore "github.com/numaproj/numaflow/pkg/watermark/store" + "github.com/numaproj/numaflow/pkg/watermark/store/inmem" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" ) +const ( + testPipelineName = "testPipeline" + testProcessorEntity = "publisherTestPod" + publisherHBKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "PROCESSORS" + publisherOTKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "OT" +) + var ( testStartTime = time.Unix(1636470000, 0).UTC() ) +type testForwardFetcher struct { + // for forward_test.go only +} + +func (t testForwardFetcher) Close() error { + // won't be used + return nil +} + +// GetWatermark uses current time as the watermark because we want to make sure +// the test publisher is publishing watermark +func (t testForwardFetcher) GetWatermark(_ isb.Offset) processor.Watermark { + return processor.Watermark(time.Now()) +} + +func (t testForwardFetcher) GetHeadWatermark() processor.Watermark { + // won't be used + return processor.Watermark{} +} + type myForwardTest struct { } @@ -101,18 +134,8 @@ func TestNewInterStepDataForward(t *testing.T) { <-stopped } -type myForwardDropTest struct { -} - -func (f myForwardDropTest) WhereTo(_ string) ([]string, error) { - return []string{"__DROP__"}, nil -} - -func (f myForwardDropTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { - return testutils.CopyUDFTestApply(ctx, message) -} - -func TestNewInterStepDataForward_drop(t *testing.T) { +// TestWriteToBufferError explicitly tests the case of retrying failed messages +func TestWriteToBufferError(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) toSteps := map[string]isb.BufferWriter{ @@ -130,53 +153,47 @@ func TestNewInterStepDataForward_drop(t *testing.T) { }, }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(10)) assert.NoError(t, err) assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) stopped := f.Start() - // write some data - _, errs := fromStep.Write(ctx, writeMessages[0:5]) - assert.Equal(t, make([]error, 5), errs) - - // nothing to read some data, this is a dropping queue - assert.Equal(t, true, to1.IsEmpty()) + go func() { + for !to1.IsFull() { + select { + case <-ctx.Done(): + logging.FromContext(ctx).Fatalf("not full, %s", ctx.Err()) + default: + time.Sleep(1 * time.Millisecond) + } + } + // stop will cancel the contexts + f.Stop() + }() - // write some data - _, errs = fromStep.Write(ctx, writeMessages[5:20]) - assert.Equal(t, make([]error, 15), errs) + // try to write to buffer after it is full. This causes write to error and fail. + var messageToStep = make(map[string][]isb.Message) + messageToStep["to1"] = make([]isb.Message, 0) + messageToStep["to1"] = append(messageToStep["to1"], writeMessages[0:11]...) - // since this is a dropping WhereTo, the buffer can never be full - f.Stop() + // asserting the number of failed messages + _, err = f.writeToBuffers(ctx, messageToStep) + assert.True(t, strings.Contains(err.Error(), "with failed messages:1")) <-stopped -} -type myForwardApplyErrTest struct { -} - -func (f myForwardApplyErrTest) WhereTo(_ string) ([]string, error) { - return []string{"to1"}, nil } -func (f myForwardApplyErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage) ([]*isb.Message, error) { - return nil, udfapplier.ApplyUDFErr{ - UserUDFErr: false, - InternalErr: struct { - Flag bool - MainCarDown bool - }{Flag: true, MainCarDown: false}, - Message: "InternalErr test", - } -} - -func TestNewInterStepDataForward_WithInternalError(t *testing.T) { +// TestNewInterStepDataForwardToOneStep explicitly tests the case where we forward to only one buffer +func TestNewInterStepDataForwardToOneStep(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) + to2 := simplebuffer.NewInMemoryBuffer("to2", 10) toSteps := map[string]isb.BufferWriter{ "to1": to1, + "to2": to2, } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -190,8 +207,9 @@ func TestNewInterStepDataForward_WithInternalError(t *testing.T) { }, }} - fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyErrTest{}, myForwardApplyErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) + fetchWatermark := testForwardFetcher{} + publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF)) assert.NoError(t, err) assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) @@ -201,28 +219,77 @@ func TestNewInterStepDataForward_WithInternalError(t *testing.T) { _, errs := fromStep.Write(ctx, writeMessages[0:5]) assert.Equal(t, make([]error, 5), errs) + // read some data + readMessages, err := to1.Read(ctx, 2) + assert.NoError(t, err, "expected no error") + assert.Len(t, readMessages, 2) + assert.Equal(t, []interface{}{writeMessages[0].Header, writeMessages[1].Header}, []interface{}{readMessages[0].Header, readMessages[1].Header}) + assert.Equal(t, []interface{}{writeMessages[0].Body, writeMessages[1].Body}, []interface{}{readMessages[0].Body, readMessages[1].Body}) + + // write some data + _, errs = fromStep.Write(ctx, writeMessages[5:20]) + assert.Equal(t, make([]error, 15), errs) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + otKeys1, _ := otStores["to1"].GetAllKeys(ctx) + for otKeys1 == nil { + otKeys1, _ = otStores["to1"].GetAllKeys(ctx) + time.Sleep(time.Millisecond * 100) + } + }() + wg.Wait() + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys1, _ := otStores["to1"].GetAllKeys(ctx) + otValue1, _ := otStores["to1"].GetValue(ctx, otKeys1[0]) + otDecode1, _ := ot.DecodeToOTValue(otValue1) + assert.False(t, otDecode1.Idle) + + wg.Add(1) + go func() { + defer wg.Done() + otKeys2, _ := otStores["to2"].GetAllKeys(ctx) + for otKeys2 == nil { + otKeys2, _ = otStores["to2"].GetAllKeys(ctx) + time.Sleep(time.Millisecond * 100) + } + }() + wg.Wait() + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys2, _ := otStores["to2"].GetAllKeys(ctx) + otValue2, _ := otStores["to2"].GetValue(ctx, otKeys2[0]) + otDecode2, _ := ot.DecodeToOTValue(otValue2) + assert.True(t, otDecode2.Idle) + + // stop will cancel the contexts and therefore the forwarder stops without waiting f.Stop() - time.Sleep(1 * time.Millisecond) + <-stopped } -type myForwardApplyWhereToErrTest struct { +type myForwardDropTest struct { } -func (f myForwardApplyWhereToErrTest) WhereTo(_ string) ([]string, error) { - return []string{"to1"}, fmt.Errorf("whereToStep failed") +func (f myForwardDropTest) WhereTo(_ string) ([]string, error) { + return []string{dfv1.MessageKeyDrop}, nil } -func (f myForwardApplyWhereToErrTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { +func (f myForwardDropTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { return testutils.CopyUDFTestApply(ctx, message) } -// TestNewInterStepDataForward_WhereToError is used to test the scenario with error -func TestNewInterStepDataForward_WhereToError(t *testing.T) { +// TestNewInterStepDataForwardToOneStep explicitly tests the case where we drop all events +func TestNewInterStepDataForward_dropAll(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) + to2 := simplebuffer.NewInMemoryBuffer("to2", 10) toSteps := map[string]isb.BufferWriter{ "to1": to1, + "to2": to2, } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -236,40 +303,86 @@ func TestNewInterStepDataForward_WhereToError(t *testing.T) { }, }} - fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyWhereToErrTest{}, myForwardApplyWhereToErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) + fetchWatermark := testForwardFetcher{} + publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF)) assert.NoError(t, err) + assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) stopped := f.Start() + // write some data _, errs := fromStep.Write(ctx, writeMessages[0:5]) assert.Equal(t, make([]error, 5), errs) + // nothing to read some data, this is a dropping queue + assert.Equal(t, true, to1.IsEmpty()) + + // write some data + _, errs = fromStep.Write(ctx, writeMessages[5:20]) + assert.Equal(t, make([]error, 15), errs) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + otKeys1, _ := otStores["to1"].GetAllKeys(ctx) + for otKeys1 == nil { + otKeys1, _ = otStores["to1"].GetAllKeys(ctx) + time.Sleep(time.Millisecond * 100) + } + }() + wg.Wait() + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys1, _ := otStores["to1"].GetAllKeys(ctx) + otValue1, _ := otStores["to1"].GetValue(ctx, otKeys1[0]) + otDecode1, _ := ot.DecodeToOTValue(otValue1) + assert.True(t, otDecode1.Idle) + + wg.Add(1) + go func() { + defer wg.Done() + otKeys2, _ := otStores["to2"].GetAllKeys(ctx) + for otKeys2 == nil { + otKeys2, _ = otStores["to2"].GetAllKeys(ctx) + time.Sleep(time.Millisecond * 100) + } + }() + wg.Wait() + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys2, _ := otStores["to2"].GetAllKeys(ctx) + otValue2, _ := otStores["to2"].GetValue(ctx, otKeys2[0]) + otDecode2, _ := ot.DecodeToOTValue(otValue2) + assert.True(t, otDecode2.Idle) + + // since this is a dropping WhereTo, the buffer can never be full f.Stop() - time.Sleep(1 * time.Millisecond) - assert.True(t, to1.IsEmpty()) <-stopped } -type myForwardApplyUDFErrTest struct { +type myForwardToAllTest struct { } -func (f myForwardApplyUDFErrTest) WhereTo(_ string) ([]string, error) { - return []string{"to1"}, nil +func (f myForwardToAllTest) WhereTo(_ string) ([]string, error) { + return []string{dfv1.MessageKeyAll}, nil } -func (f myForwardApplyUDFErrTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { - return nil, fmt.Errorf("UDF error") +func (f myForwardToAllTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { + return testutils.CopyUDFTestApply(ctx, message) } -// TestNewInterStepDataForward_UDFError is used to test the scenario with UDF error -func TestNewInterStepDataForward_UDFError(t *testing.T) { +// TestNewInterStepDataForwardToOneStep explicitly tests the case where we forward to all buffers +func TestNewInterStepData_forwardToAll(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) + to2 := simplebuffer.NewInMemoryBuffer("to2", 10) toSteps := map[string]isb.BufferWriter{ "to1": to1, + "to2": to2, } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -282,10 +395,11 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) { Name: "testVertex", }, }} - - fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyUDFErrTest{}, myForwardApplyUDFErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) + fetchWatermark := testForwardFetcher{} + publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2), WithVertexType(dfv1.VertexTypeMapUDF)) assert.NoError(t, err) + assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) stopped := f.Start() @@ -293,32 +407,80 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) { _, errs := fromStep.Write(ctx, writeMessages[0:5]) assert.Equal(t, make([]error, 5), errs) - assert.True(t, to1.IsEmpty()) + // read some data + readMessages, err := to1.Read(ctx, 2) + assert.NoError(t, err, "expected no error") + assert.Len(t, readMessages, 2) + assert.Equal(t, []interface{}{writeMessages[0].Header, writeMessages[1].Header}, []interface{}{readMessages[0].Header, readMessages[1].Header}) + assert.Equal(t, []interface{}{writeMessages[0].Body, writeMessages[1].Body}, []interface{}{readMessages[0].Body, readMessages[1].Body}) + + // write some data + _, errs = fromStep.Write(ctx, writeMessages[5:20]) + assert.Equal(t, make([]error, 15), errs) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + otKeys1, _ := otStores["to1"].GetAllKeys(ctx) + for otKeys1 == nil { + otKeys1, _ = otStores["to1"].GetAllKeys(ctx) + time.Sleep(time.Millisecond * 100) + } + }() + wg.Wait() + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys1, _ := otStores["to1"].GetAllKeys(ctx) + otValue1, _ := otStores["to1"].GetValue(ctx, otKeys1[0]) + otDecode1, _ := ot.DecodeToOTValue(otValue1) + assert.False(t, otDecode1.Idle) + + wg.Add(1) + go func() { + defer wg.Done() + otKeys2, _ := otStores["to2"].GetAllKeys(ctx) + for otKeys2 == nil { + otKeys2, _ = otStores["to2"].GetAllKeys(ctx) + time.Sleep(time.Millisecond * 100) + } + }() + wg.Wait() + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys2, _ := otStores["to2"].GetAllKeys(ctx) + otValue2, _ := otStores["to2"].GetValue(ctx, otKeys2[0]) + otDecode2, _ := ot.DecodeToOTValue(otValue2) + assert.False(t, otDecode2.Idle) f.Stop() - time.Sleep(1 * time.Millisecond) <-stopped } -type myForwardToAllTest struct { +type myForwardInternalErrTest struct { } -func (f myForwardToAllTest) WhereTo(_ string) ([]string, error) { - return []string{dfv1.MessageKeyAll}, nil +func (f myForwardInternalErrTest) WhereTo(_ string) ([]string, error) { + return []string{"to1"}, nil } -func (f myForwardToAllTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { - return testutils.CopyUDFTestApply(ctx, message) +func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage) ([]*isb.Message, error) { + return nil, udfapplier.ApplyUDFErr{ + UserUDFErr: false, + InternalErr: struct { + Flag bool + MainCarDown bool + }{Flag: true, MainCarDown: false}, + Message: "InternalErr test", + } } -func TestNewInterStepData_forwardToAll(t *testing.T) { +func TestNewInterStepDataForward_WithInternalError(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) - to2 := simplebuffer.NewInMemoryBuffer("to2", 10) toSteps := map[string]isb.BufferWriter{ "to1": to1, - "to2": to2, } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -331,8 +493,9 @@ func TestNewInterStepData_forwardToAll(t *testing.T) { Name: "testVertex", }, }} + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardToAllTest{}, myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardInternalErrTest{}, myForwardInternalErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) assert.NoError(t, err) assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) @@ -342,30 +505,28 @@ func TestNewInterStepData_forwardToAll(t *testing.T) { _, errs := fromStep.Write(ctx, writeMessages[0:5]) assert.Equal(t, make([]error, 5), errs) - // read some data - readMessages, err := to1.Read(ctx, 2) - assert.NoError(t, err, "expected no error") - assert.Len(t, readMessages, 2) - assert.Equal(t, []interface{}{writeMessages[0].Header, writeMessages[1].Header}, []interface{}{readMessages[0].Header, readMessages[1].Header}) - assert.Equal(t, []interface{}{writeMessages[0].Body, writeMessages[1].Body}, []interface{}{readMessages[0].Body, readMessages[1].Body}) + f.Stop() + time.Sleep(1 * time.Millisecond) + <-stopped +} - // write some data - _, errs = fromStep.Write(ctx, writeMessages[5:20]) - assert.Equal(t, make([]error, 15), errs) +type myForwardApplyWhereToErrTest struct { +} - f.Stop() +func (f myForwardApplyWhereToErrTest) WhereTo(_ string) ([]string, error) { + return []string{"to1"}, fmt.Errorf("whereToStep failed") +} - <-stopped +func (f myForwardApplyWhereToErrTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { + return testutils.CopyUDFTestApply(ctx, message) } -// TestNewInterStepDataForwardToOneStep explicitly tests the case where we forward to only one step -func TestNewInterStepDataForwardToOneStep(t *testing.T) { +// TestNewInterStepDataForward_WhereToError is used to test the scenario with error +func TestNewInterStepDataForward_WhereToError(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) - to2 := simplebuffer.NewInMemoryBuffer("to2", 10) toSteps := map[string]isb.BufferWriter{ "to1": to1, - "to2": to2, } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -380,9 +541,8 @@ func TestNewInterStepDataForwardToOneStep(t *testing.T) { }} fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyWhereToErrTest{}, myForwardApplyWhereToErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) assert.NoError(t, err) - assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) stopped := f.Start() @@ -390,25 +550,26 @@ func TestNewInterStepDataForwardToOneStep(t *testing.T) { _, errs := fromStep.Write(ctx, writeMessages[0:5]) assert.Equal(t, make([]error, 5), errs) - // read some data - readMessages, err := to1.Read(ctx, 2) - assert.NoError(t, err, "expected no error") - assert.Len(t, readMessages, 2) - assert.Equal(t, []interface{}{writeMessages[0].Header, writeMessages[1].Header}, []interface{}{readMessages[0].Header, readMessages[1].Header}) - assert.Equal(t, []interface{}{writeMessages[0].Body, writeMessages[1].Body}, []interface{}{readMessages[0].Body, readMessages[1].Body}) - - // write some data - _, errs = fromStep.Write(ctx, writeMessages[5:20]) - assert.Equal(t, make([]error, 15), errs) - - // stop will cancel the contexts and therefore the forwarder stops without waiting f.Stop() + time.Sleep(1 * time.Millisecond) + assert.True(t, to1.IsEmpty()) <-stopped } -// TestWriteToBufferError explicitly tests the case of retrying failed messages -func TestWriteToBufferError(t *testing.T) { +type myForwardApplyUDFErrTest struct { +} + +func (f myForwardApplyUDFErrTest) WhereTo(_ string) ([]string, error) { + return []string{"to1"}, nil +} + +func (f myForwardApplyUDFErrTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) { + return nil, fmt.Errorf("UDF error") +} + +// TestNewInterStepDataForward_UDFError is used to test the scenario with UDF error +func TestNewInterStepDataForward_UDFError(t *testing.T) { fromStep := simplebuffer.NewInMemoryBuffer("from", 25) to1 := simplebuffer.NewInMemoryBuffer("to1", 10) toSteps := map[string]isb.BufferWriter{ @@ -425,38 +586,23 @@ func TestWriteToBufferError(t *testing.T) { Name: "testVertex", }, }} + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) - f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(10)) + f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyUDFErrTest{}, myForwardApplyUDFErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) assert.NoError(t, err) - assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) stopped := f.Start() + // write some data + _, errs := fromStep.Write(ctx, writeMessages[0:5]) + assert.Equal(t, make([]error, 5), errs) - go func() { - for !to1.IsFull() { - select { - case <-ctx.Done(): - logging.FromContext(ctx).Fatalf("not full, %s", ctx.Err()) - default: - time.Sleep(1 * time.Millisecond) - } - } - // stop will cancel the contexts - f.Stop() - }() - - // try to write to buffer after it is full. This causes write to error and fail. - var messageToStep = make(map[string][]isb.Message) - messageToStep["to1"] = make([]isb.Message, 0) - messageToStep["to1"] = append(messageToStep["to1"], writeMessages[0:11]...) + assert.True(t, to1.IsEmpty()) - // asserting the number of failed messages - _, err = f.writeToBuffers(ctx, messageToStep) - assert.True(t, strings.Contains(err.Error(), "with failed messages:1")) + f.Stop() + time.Sleep(1 * time.Millisecond) <-stopped - } func validateMetrics(t *testing.T) { @@ -500,3 +646,19 @@ func validateMetrics(t *testing.T) { } } + +// buildPublisherMap builds OTStore and publisher for each toBuffer +func buildPublisherMapAndOTStore(toBuffers map[string]isb.BufferWriter) (map[string]publish.Publisher, map[string]wmstore.WatermarkKVStorer) { + var ctx = context.Background() + processorEntity := processor.NewProcessorEntity("publisherTestPod") + publishers := make(map[string]publish.Publisher) + otStores := make(map[string]wmstore.WatermarkKVStorer) + for key := range toBuffers { + heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key)) + otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key)) + otStores[key] = otKV + p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) + publishers[key] = p + } + return publishers, otStores +} diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index 83ac2b26a..60908cea5 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -77,6 +77,10 @@ func (e *EventTypeWMProgressor) PublishWatermark(watermark processor.Watermark, e.watermarks[offset.String()] = watermark } +func (e *EventTypeWMProgressor) PublishIdleWatermark() { + // TODO +} + func (e *EventTypeWMProgressor) GetLatestWatermark() processor.Watermark { return processor.Watermark{} } diff --git a/pkg/reduce/pnf/processandforward.go b/pkg/reduce/pnf/processandforward.go index 80883389b..2a8079989 100644 --- a/pkg/reduce/pnf/processandforward.go +++ b/pkg/reduce/pnf/processandforward.go @@ -251,10 +251,24 @@ func (p *ProcessAndForward) writeToBuffer(ctx context.Context, bufferID string, // publishWM publishes the watermark to each edge. func (p *ProcessAndForward) publishWM(wm processor.Watermark, writeOffsets map[string][]isb.Offset) { + // activeWatermarkBuffers records the buffers that the publisher has published + // a watermark in this batch processing cycle. + // it's used to determine which buffers should receive an idle watermark. + var activeWatermarkBuffers = make(map[string]bool) for bufferName, offsets := range writeOffsets { if publisher, ok := p.publishWatermark[bufferName]; ok { if len(offsets) > 0 { publisher.PublishWatermark(wm, offsets[len(offsets)-1]) + activeWatermarkBuffers[bufferName] = true + } + } + } + if len(activeWatermarkBuffers) < len(p.publishWatermark) { + // if there's any buffers that haven't received any watermark during this + // batch processing cycle, send an idle watermark + for bufferName := range p.publishWatermark { + if !activeWatermarkBuffers[bufferName] { + p.publishWatermark[bufferName].PublishIdleWatermark() } } } diff --git a/pkg/reduce/pnf/processandforward_test.go b/pkg/reduce/pnf/processandforward_test.go index e04227836..8e8295688 100644 --- a/pkg/reduce/pnf/processandforward_test.go +++ b/pkg/reduce/pnf/processandforward_test.go @@ -19,6 +19,7 @@ package pnf import ( "context" "encoding/json" + "fmt" "strings" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/numaproj/numaflow/pkg/reduce/pbq/store/memory" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/ot" "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/publish" "github.com/numaproj/numaflow/pkg/watermark/store/inmem" @@ -46,6 +48,13 @@ import ( wmstore "github.com/numaproj/numaflow/pkg/watermark/store" ) +const ( + testPipelineName = "testPipeline" + testProcessorEntity = "publisherTestPod" + publisherHBKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "PROCESSORS" + publisherOTKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "OT" +) + type myForwardTest struct { } @@ -157,6 +166,8 @@ func TestProcessAndForward_Forward(t *testing.T) { "buffer2": test1Buffer2, } + pf1, otStores1 := createProcessAndForwardAndOTStore(ctx, "test-forward-one", pbqManager, toBuffers1) + test2Buffer1 := simplebuffer.NewInMemoryBuffer("buffer1", 10) test2Buffer2 := simplebuffer.NewInMemoryBuffer("buffer2", 10) @@ -165,6 +176,8 @@ func TestProcessAndForward_Forward(t *testing.T) { "buffer2": test2Buffer2, } + pf2, otStores2 := createProcessAndForwardAndOTStore(ctx, "test-forward-all", pbqManager, toBuffers2) + test3Buffer1 := simplebuffer.NewInMemoryBuffer("buffer1", 10) test3Buffer2 := simplebuffer.NewInMemoryBuffer("buffer2", 10) @@ -173,13 +186,16 @@ func TestProcessAndForward_Forward(t *testing.T) { "buffer2": test3Buffer2, } + pf3, otStores3 := createProcessAndForwardAndOTStore(ctx, "test-drop-all", pbqManager, toBuffers3) + tests := []struct { name string id partition.ID buffers []*simplebuffer.InMemoryBuffer pf ProcessAndForward + otStores map[string]wmstore.WatermarkKVStorer expected []bool - wmExpected map[string]int64 + wmExpected map[string]ot.Value }{ { name: "test-forward-one", @@ -189,11 +205,20 @@ func TestProcessAndForward_Forward(t *testing.T) { Key: "test-forward-one", }, buffers: []*simplebuffer.InMemoryBuffer{test1Buffer1, test1Buffer2}, - pf: createProcessAndForward(ctx, "test-forward-one", pbqManager, toBuffers1), + pf: pf1, + otStores: otStores1, expected: []bool{false, true}, - wmExpected: map[string]int64{ - "buffer1": 120000, - "buffer2": -1, + wmExpected: map[string]ot.Value{ + "buffer1": { + Offset: 0, + Watermark: 120000, + Idle: false, + }, + "buffer2": { + Offset: 0, + Watermark: 0, + Idle: true, + }, }, }, { @@ -204,11 +229,20 @@ func TestProcessAndForward_Forward(t *testing.T) { Key: "test-forward-all", }, buffers: []*simplebuffer.InMemoryBuffer{test2Buffer1, test2Buffer2}, - pf: createProcessAndForward(ctx, "test-forward-all", pbqManager, toBuffers2), + pf: pf2, + otStores: otStores2, expected: []bool{false, false}, - wmExpected: map[string]int64{ - "buffer1": 120000, - "buffer2": 120000, + wmExpected: map[string]ot.Value{ + "buffer1": { + Offset: 0, + Watermark: 120000, + Idle: false, + }, + "buffer2": { + Offset: 0, + Watermark: 120000, + Idle: false, + }, }, }, { @@ -219,11 +253,20 @@ func TestProcessAndForward_Forward(t *testing.T) { Key: "test-drop-all", }, buffers: []*simplebuffer.InMemoryBuffer{test3Buffer1, test3Buffer2}, - pf: createProcessAndForward(ctx, "test-drop-all", pbqManager, toBuffers3), + pf: pf3, + otStores: otStores3, expected: []bool{true, true}, - wmExpected: map[string]int64{ - "buffer1": -1, - "buffer2": -1, + wmExpected: map[string]ot.Value{ + "buffer1": { + Offset: 0, + Watermark: 0, + Idle: true, + }, + "buffer2": { + Offset: 0, + Watermark: 0, + Idle: true, + }, }, }, } @@ -235,17 +278,21 @@ func TestProcessAndForward_Forward(t *testing.T) { assert.Equal(t, []bool{value.buffers[0].IsEmpty(), value.buffers[1].IsEmpty()}, value.expected) // pbq entry from the manager will be removed after forwarding assert.Equal(t, pbqManager.GetPBQ(value.id), nil) - index := 0 - for k, v := range value.pf.publishWatermark { - // expected watermark should be equal to window end time - assert.Equal(t, v.GetLatestWatermark().UnixMilli(), value.wmExpected[k]) - index += 1 + for bufferName := range value.pf.publishWatermark { + // NOTE: in this test we only have one processor to publish + // so len(otKeys) should always be 1 + otKeys, _ := value.otStores[bufferName].GetAllKeys(ctx) + for _, otKey := range otKeys { + otValue, _ := value.otStores[bufferName].GetValue(ctx, otKey) + ot, _ := ot.DecodeToOTValue(otValue) + assert.Equal(t, ot, value.wmExpected[bufferName]) + } } }) } } -func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Manager, toBuffers map[string]isb.BufferWriter) ProcessAndForward { +func createProcessAndForwardAndOTStore(ctx context.Context, key string, pbqManager *pbq.Manager, toBuffers map[string]isb.BufferWriter) (ProcessAndForward, map[string]wmstore.WatermarkKVStorer) { testPartition := partition.ID{ Start: time.UnixMilli(60000), @@ -254,7 +301,7 @@ func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Ma } // create a pbq for a partition - pw := buildPublisherMap(toBuffers) + pw, otStore := buildPublisherMapAndOTStore(toBuffers) var simplePbq pbq.Reader simplePbq, _ = pbqManager.CreateNewPBQ(ctx, testPartition) @@ -286,25 +333,21 @@ func createProcessAndForward(ctx context.Context, key string, pbqManager *pbq.Ma publishWatermark: pw, } - return pf + return pf, otStore } -// buildPublisherMap builds publisher for each toBuffer -func buildPublisherMap(toBuffers map[string]isb.BufferWriter) map[string]publish.Publisher { +// buildPublisherMap builds OTStore and publisher for each toBuffer +func buildPublisherMapAndOTStore(toBuffers map[string]isb.BufferWriter) (map[string]publish.Publisher, map[string]wmstore.WatermarkKVStorer) { var ctx = context.Background() - - var publisherHBKeyspace = "publisherTest_PROCESSORS" - - var publisherOTKeyspace = "publisherTest_OT_publisherTestPod1" - - heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, "testPublisher", publisherHBKeyspace) - otKV, _, _ := inmem.NewKVInMemKVStore(ctx, "testPublisher", publisherOTKeyspace) - + processorEntity := processor.NewProcessorEntity("publisherTestPod") publishers := make(map[string]publish.Publisher) + otStores := make(map[string]wmstore.WatermarkKVStorer) for key := range toBuffers { - publishEntity := processor.NewProcessorEntity(key) - p := publish.NewPublish(ctx, publishEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) + heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key)) + otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key)) + otStores[key] = otKV + p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) publishers[key] = p } - return publishers + return publishers, otStores } diff --git a/pkg/sources/kafka/reader.go b/pkg/sources/kafka/reader.go index 08988b190..8173417cc 100644 --- a/pkg/sources/kafka/reader.go +++ b/pkg/sources/kafka/reader.go @@ -144,14 +144,14 @@ loop: } } for p, t := range oldestTimestamps { - publisher := r.loadSourceWartermarkPublisher(p) + publisher := r.loadSourceWatermarkPublisher(p) publisher.PublishWatermark(processor.Watermark(t), nil) // Source publisher does not care about the offset } return msgs, nil } -// loadSourceWartermarkPublisher does a lazy load on the wartermark publisher -func (r *KafkaSource) loadSourceWartermarkPublisher(partitionID int32) publish.Publisher { +// loadSourceWatermarkPublisher does a lazy load on the watermark publisher +func (r *KafkaSource) loadSourceWatermarkPublisher(partitionID int32) publish.Publisher { r.lock.Lock() defer r.lock.Unlock() if p, ok := r.sourcePublishWMs[partitionID]; ok { diff --git a/pkg/watermark/fetch/offset_timeline.go b/pkg/watermark/fetch/offset_timeline.go index 2b803d3c3..144d9c60b 100644 --- a/pkg/watermark/fetch/offset_timeline.go +++ b/pkg/watermark/fetch/offset_timeline.go @@ -93,12 +93,16 @@ func (t *OffsetTimeline) Put(node OffsetWatermark) { } return } else { - // TODO put panic: the new input offset should never be smaller than the existing offset t.log.Errorw("The new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.watermark), - zap.Int64("existing offset", elementNode.offset), zap.Int64("input offset", node.offset)) + zap.Int64("existingOffset", elementNode.offset), zap.Int64("inputOffset", node.offset)) return } } else if node.watermark > elementNode.watermark { + if node.offset < elementNode.offset { + t.log.Errorw("The new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.watermark), + zap.Int64("existingOffset", elementNode.offset), zap.Int64("inputOffset", node.offset)) + return + } // our list is sorted by event time from highest to lowest t.watermarks.InsertBefore(node, e) // remove the last event time @@ -132,14 +136,11 @@ func (t *OffsetTimeline) GetHeadWatermark() int64 { return t.watermarks.Front().Value.(OffsetWatermark).watermark } -// GetTailOffset returns the smallest offset with the smallest watermark. -func (t *OffsetTimeline) GetTailOffset() int64 { +// GetHeadOffsetWatermark returns the largest offset with the largest watermark. +func (t *OffsetTimeline) GetHeadOffsetWatermark() OffsetWatermark { t.lock.RLock() defer t.lock.RUnlock() - if t.watermarks.Len() == 0 { - return -1 - } - return t.watermarks.Back().Value.(OffsetWatermark).offset + return t.watermarks.Front().Value.(OffsetWatermark) } // GetOffset will return the offset for the given event-time. @@ -170,18 +171,14 @@ func (t *OffsetTimeline) GetEventTimeFromInt64(inputOffsetInt64 int64) int64 { t.lock.RLock() defer t.lock.RUnlock() - var ( - offset int64 = -1 - eventTime int64 = -1 - ) + var eventTime int64 = -1 for e := t.watermarks.Front(); e != nil; e = e.Next() { // get the event time has the closest offset to the input offset // exclude the same offset because this offset may not finish processing yet - // offset < e.Value.(OffsetWatermark).offset: use < because we want the largest possible timestamp - if offset < e.Value.(OffsetWatermark).offset && e.Value.(OffsetWatermark).offset < inputOffsetInt64 { - offset = e.Value.(OffsetWatermark).offset + if e.Value.(OffsetWatermark).offset < inputOffsetInt64 { eventTime = e.Value.(OffsetWatermark).watermark + break } } diff --git a/pkg/watermark/fetch/offset_timeline_test.go b/pkg/watermark/fetch/offset_timeline_test.go index 7f370d244..2935cd3e3 100644 --- a/pkg/watermark/fetch/offset_timeline_test.go +++ b/pkg/watermark/fetch/offset_timeline_test.go @@ -22,9 +22,10 @@ import ( "testing" "github.com/numaproj/numaflow/pkg/isb" + "github.com/stretchr/testify/assert" ) -func TestTimeline_GetEventTime(t1 *testing.T) { +func TestTimeline_GetEventTime(t *testing.T) { var ( ctx = context.Background() emptyTimeline = NewOffsetTimeline(ctx, 5) @@ -114,7 +115,7 @@ func TestTimeline_GetEventTime(t1 *testing.T) { }, } for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { + t.Run(tt.name, func(t1 *testing.T) { if got := tt.args.timeline.GetEventTime(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.inputOffset, 10) })); got != tt.want { t1.Errorf("GetEventTime() = %v, want %v", got, tt.want) } @@ -122,7 +123,7 @@ func TestTimeline_GetEventTime(t1 *testing.T) { } } -func TestOffsetTimeline_GetOffset(t1 *testing.T) { +func TestOffsetTimeline_GetOffset(t *testing.T) { var ( ctx = context.Background() testTimeline = NewOffsetTimeline(ctx, 10) @@ -210,10 +211,52 @@ func TestOffsetTimeline_GetOffset(t1 *testing.T) { }, } for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { + t.Run(tt.name, func(t1 *testing.T) { if got := tt.args.timeline.GetOffset(tt.args.inputEventTime); got != tt.want { t1.Errorf("GetOffset() = %v, want %v", got, tt.want) } }) } } + +func TestOffsetTimeline(t *testing.T) { + var ( + ctx = context.Background() + testTimeline = NewOffsetTimeline(ctx, 10) + testwatermarks = []OffsetWatermark{ + {watermark: 10, offset: 9}, + {watermark: 12, offset: 20}, + {watermark: 13, offset: 21}, + {watermark: 15, offset: 24}, + {watermark: 15, offset: 25}, // will overwrite the previous one + {watermark: 20, offset: 26}, + {watermark: 23, offset: 27}, + {watermark: 28, offset: 30}, + {watermark: 29, offset: 35}, + {watermark: 32, offset: 36}, + } + ) + + for _, watermark := range testwatermarks { + testTimeline.Put(watermark) + assert.Equal(t, watermark, testTimeline.GetHeadOffsetWatermark()) + assert.Equal(t, watermark.watermark, testTimeline.GetHeadWatermark(), watermark.watermark) + assert.Equal(t, watermark.offset, testTimeline.GetHeadOffset()) + } + assert.Equal(t, "[32:36] -> [29:35] -> [28:30] -> [23:27] -> [20:26] -> [15:25] -> [13:21] -> [12:20] -> [10:9] -> [-1:-1]", testTimeline.Dump()) + + testTimeline.Put(OffsetWatermark{watermark: 33, offset: 36}) + assert.Equal(t, "[33:36] -> [32:36] -> [29:35] -> [28:30] -> [23:27] -> [20:26] -> [15:25] -> [13:21] -> [12:20] -> [10:9]", testTimeline.Dump()) + + testTimeline.Put(OffsetWatermark{watermark: 33, offset: 35}) + // should be ignored + assert.Equal(t, "[33:36] -> [32:36] -> [29:35] -> [28:30] -> [23:27] -> [20:26] -> [15:25] -> [13:21] -> [12:20] -> [10:9]", testTimeline.Dump()) + + testTimeline.Put(OffsetWatermark{watermark: 30, offset: 33}) + // should be ignored + assert.Equal(t, "[33:36] -> [32:36] -> [29:35] -> [28:30] -> [23:27] -> [20:26] -> [15:25] -> [13:21] -> [12:20] -> [10:9]", testTimeline.Dump()) + + testTimeline.Put(OffsetWatermark{watermark: 30, offset: 35}) + assert.Equal(t, "[33:36] -> [32:36] -> [30:35] -> [29:35] -> [28:30] -> [23:27] -> [20:26] -> [15:25] -> [13:21] -> [12:20]", testTimeline.Dump()) + +} diff --git a/pkg/watermark/fetch/processor_manager.go b/pkg/watermark/fetch/processor_manager.go index 00bdb4cfd..4c71eef82 100644 --- a/pkg/watermark/fetch/processor_manager.go +++ b/pkg/watermark/fetch/processor_manager.go @@ -249,10 +249,28 @@ func (v *ProcessorManager) startTimeLineWatcher() { v.log.Errorw("Unable to decode the value", zap.String("processorEntity", p.entity.GetName()), zap.Error(err)) continue } - p.offsetTimeline.Put(OffsetWatermark{ - watermark: otValue.Watermark, - offset: otValue.Offset, - }) + if otValue.Idle { + var processors = v.GetAllProcessors() + for processorName, processor := range processors { + // skip the processor itself, only use other processors as reference + if processorName != value.Key() { + // any other Vn-1 processor's non-empty head offsetWatermark can replace the idle watermark + // if all tail offsetWatermarks are empty, then it means there's no data flowing into + // this Vn processor, so it's safe to do nothing + watermarkOffset := processor.offsetTimeline.GetHeadOffsetWatermark() + if watermarkOffset.offset != -1 { + p.offsetTimeline.Put(watermarkOffset) + break + } + } + } + } else { + // NOTE: currently, for source edges, the otValue.Idle is always false + p.offsetTimeline.Put(OffsetWatermark{ + watermark: otValue.Watermark, + offset: otValue.Offset, + }) + } v.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", v.otWatcher.GetKVName()), zap.Int64("watermark", otValue.Watermark), zap.Int64("offset", otValue.Offset)) case store.KVDelete: // we do not care about Delete events because the timeline bucket is meant to grow and the TTL will diff --git a/pkg/watermark/fetch/processor_manager_inmem_test.go b/pkg/watermark/fetch/processor_manager_inmem_test.go index 28977f0f5..14cea8a0c 100644 --- a/pkg/watermark/fetch/processor_manager_inmem_test.go +++ b/pkg/watermark/fetch/processor_manager_inmem_test.go @@ -32,10 +32,11 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/store" ) -func otValueToBytes(offset int64, watermark int64) ([]byte, error) { +func otValueToBytes(offset int64, watermark int64, idle bool) ([]byte, error) { otValue := ot.Value{ Offset: offset, Watermark: watermark, + Idle: idle, } otValueByte, err := otValue.EncodeToBytes() return otValueByte, err @@ -61,14 +62,14 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { assert.NoError(t, err) defer otStore.Close() - otValueByte, err := otValueToBytes(testOffset, epoch) + otValueByte, err := otValueToBytes(testOffset, epoch, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p1", otValueByte) assert.NoError(t, err) epoch += 60000 - otValueByte, err = otValueToBytes(testOffset+5, epoch) + otValueByte, err = otValueToBytes(testOffset+5, epoch, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p2", otValueByte) assert.NoError(t, err) @@ -191,7 +192,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { assert.Equal(t, int64(-1), p1.offsetTimeline.GetHeadOffset()) // publish a new watermark 101 - otValueByte, err = otValueToBytes(testOffset+1, epoch) + otValueByte, err = otValueToBytes(testOffset+1, epoch, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p1", otValueByte) assert.NoError(t, err) diff --git a/pkg/watermark/fetch/processor_manager_test.go b/pkg/watermark/fetch/processor_manager_test.go index ee13d9fa8..2708ff86c 100644 --- a/pkg/watermark/fetch/processor_manager_test.go +++ b/pkg/watermark/fetch/processor_manager_test.go @@ -101,24 +101,24 @@ func TestFetcherWithSameOTBucket(t *testing.T) { // put values into otStore // this first entry should not be in the offset timeline because we set the ot bucket history to 2 - otValueByte, err := otValueToBytes(testOffset, epoch+100) + otValueByte, err := otValueToBytes(testOffset, epoch+100, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p1", otValueByte) assert.NoError(t, err) - otValueByte, err = otValueToBytes(testOffset+1, epoch+200) + otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p1", otValueByte) assert.NoError(t, err) - otValueByte, err = otValueToBytes(testOffset+2, epoch+300) + otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p1", otValueByte) assert.NoError(t, err) epoch += 60000 - otValueByte, err = otValueToBytes(testOffset+5, epoch) + otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p2", otValueByte) assert.NoError(t, err) @@ -253,7 +253,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) { assert.Equal(t, int64(-1), p1.offsetTimeline.GetHeadOffset()) // publish a new watermark 103 - otValueByte, err = otValueToBytes(testOffset+3, epoch) + otValueByte, err = otValueToBytes(testOffset+3, epoch+500, false) assert.NoError(t, err) err = otStore.PutKV(ctx, "p1", otValueByte) assert.NoError(t, err) @@ -278,7 +278,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) { go func() { defer wg.Done() var err error - for i := 0; i < 3; i++ { + for i := 0; i < 10; i++ { err = hbStore.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix()))) assert.NoError(t, err) time.Sleep(1 * time.Second) @@ -299,11 +299,30 @@ func TestFetcherWithSameOTBucket(t *testing.T) { // added 103 in the previous steps for p1, so the head should be 103 after resume assert.Equal(t, int64(103), p1.offsetTimeline.GetHeadOffset()) - for allProcessors["p1"].offsetTimeline.Dump() != "[1651161660000:103] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" { + for allProcessors["p1"].offsetTimeline.Dump() != "[1651161660500:103] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("expected p1 has the offset timeline [1651161660500:103] -> [-1:-1] -> [-1:-1] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err()) + } + default: + time.Sleep(1 * time.Millisecond) + allProcessors = testBuffer.processorManager.GetAllProcessors() + } + } + + // publish an idle watermark + otValueByte, err = otValueToBytes(0, 0, true) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p1", otValueByte) + assert.NoError(t, err) + + // p1 should get the head offset watermark from p2 + for allProcessors["p1"].offsetTimeline.Dump() != "[1651161660500:105] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" { select { case <-ctx.Done(): if ctx.Err() == context.DeadlineExceeded { - t.Fatalf("expected p1 has the offset timeline [1651161660000:103] -> [-1:-1] -> [-1:-1] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err()) + t.Fatalf("expected p1 has the offset timeline [1651161660500:105] -> [-1:-1] -> [-1:-1] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err()) } default: time.Sleep(1 * time.Millisecond) diff --git a/pkg/watermark/generic/noop.go b/pkg/watermark/generic/noop.go index efc6c9c4b..7913ef920 100644 --- a/pkg/watermark/generic/noop.go +++ b/pkg/watermark/generic/noop.go @@ -42,10 +42,14 @@ func (n NoOpWMProgressor) GetWatermark(_ isb.Offset) processor.Watermark { return processor.Watermark{} } -// PublishWatermark does a no-op publish. +// PublishWatermark does a no-op watermark publish. func (n NoOpWMProgressor) PublishWatermark(_ processor.Watermark, _ isb.Offset) { } +// PublishIdleWatermark does a no-op idle watermark publish. +func (n NoOpWMProgressor) PublishIdleWatermark() { +} + // GetLatestWatermark returns the default watermark as the latest watermark. func (n NoOpWMProgressor) GetLatestWatermark() processor.Watermark { return processor.Watermark{} diff --git a/pkg/watermark/ot/ot.go b/pkg/watermark/ot/ot.go index c853ba04e..468109d7a 100644 --- a/pkg/watermark/ot/ot.go +++ b/pkg/watermark/ot/ot.go @@ -26,6 +26,10 @@ import ( type Value struct { Offset int64 Watermark int64 + // Idle is set to true if the given processor entity hasn't published anything + // to the offset timeline bucket in a batch processing cycle. + // Idle is used to signal an idle watermark. + Idle bool } // EncodeToBytes encodes a Value object into byte array. diff --git a/pkg/watermark/ot/ot_test.go b/pkg/watermark/ot/ot_test.go index 40c798b51..9b15b901e 100644 --- a/pkg/watermark/ot/ot_test.go +++ b/pkg/watermark/ot/ot_test.go @@ -40,6 +40,7 @@ func TestDecodeToOTValue(t *testing.T) { v := Value{ Offset: 100, Watermark: 1667495100000, + Idle: false, } buf := new(bytes.Buffer) _ = binary.Write(buf, binary.LittleEndian, v) @@ -49,6 +50,7 @@ func TestDecodeToOTValue(t *testing.T) { want: Value{ Offset: 100, Watermark: 1667495100000, + Idle: false, }, wantErr: false, }, @@ -70,7 +72,7 @@ func TestDecodeToOTValue(t *testing.T) { wantErr: true, }, { - name: "decode_success_using_2_field_struct", + name: "decode_failure_using_2_field_struct", args: args{ b: func() []byte { v := struct { @@ -85,24 +87,48 @@ func TestDecodeToOTValue(t *testing.T) { return buf.Bytes() }(), }, + want: Value{}, + wantErr: true, + }, + { + name: "decode_success_using_3_field_struct", + args: args{ + b: func() []byte { + v := struct { + Test0 int64 + Test1 int64 + Test2 bool + }{ + Test0: 0, + Test1: 0, + Test2: true, + } + buf := new(bytes.Buffer) + _ = binary.Write(buf, binary.LittleEndian, v) + return buf.Bytes() + }(), + }, want: Value{ - Offset: 100, - Watermark: 1667495100000, + Offset: 0, + Watermark: 0, + Idle: true, }, wantErr: false, }, { - name: "decode_success_using_3_field_struct", + name: "decode_success_using_4_field_struct", args: args{ b: func() []byte { v := struct { Test0 int64 Test1 int64 - Test2 int64 // should be ignored + Test2 bool + Test3 int64 // should be ignored }{ Test0: 100, Test1: 1667495100000, - Test2: 200, + Test2: false, + Test3: 20, } buf := new(bytes.Buffer) _ = binary.Write(buf, binary.LittleEndian, v) @@ -112,6 +138,7 @@ func TestDecodeToOTValue(t *testing.T) { want: Value{ Offset: 100, Watermark: 1667495100000, + Idle: false, }, wantErr: false, }, @@ -135,6 +162,7 @@ func TestOTValue_EncodeToBytes(t *testing.T) { type fields struct { Offset int64 Watermark int64 + Idle bool } tests := []struct { name string @@ -147,8 +175,9 @@ func TestOTValue_EncodeToBytes(t *testing.T) { fields: fields{ Offset: 100, Watermark: 1667495100000, + Idle: false, }, - want: []byte{100, 0, 0, 0, 0, 0, 0, 0, 96, 254, 115, 62, 132, 1, 0, 0}, + want: []byte{100, 0, 0, 0, 0, 0, 0, 0, 96, 254, 115, 62, 132, 1, 0, 0, 0}, wantErr: false, }, } diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 0a69fba64..c3c050ec0 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -36,6 +36,8 @@ type Publisher interface { io.Closer // PublishWatermark publishes the watermark. PublishWatermark(processor.Watermark, isb.Offset) + // PublishIdleWatermark publishes the idle watermark. + PublishIdleWatermark() // GetLatestWatermark returns the latest published watermark. GetLatestWatermark() processor.Watermark } @@ -141,6 +143,32 @@ func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { } } +// PublishIdleWatermark publishes the idle watermark and will retry until it can succeed. +func (p *publish) PublishIdleWatermark() { + var key = p.entity.GetName() + var otValue = ot.Value{ + Offset: 0, + Watermark: 0, + Idle: true, + } + value, err := otValue.EncodeToBytes() + if err != nil { + p.log.Errorw("Unable to publish idle watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + } + + for { + err := p.otStore.PutKV(p.ctx, key, value) + if err != nil { + p.log.Errorw("Unable to publish idle watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + // TODO: better exponential backoff + time.Sleep(time.Millisecond * 250) + } else { + p.log.Debugw("New idle watermark published", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key)) + break + } + } +} + // loadLatestFromStore loads the latest watermark stored in the watermark store. // TODO: how to repopulate if the processing unit is down for a really long time? func (p *publish) loadLatestFromStore() processor.Watermark { diff --git a/pkg/watermark/publish/publisher_test.go b/pkg/watermark/publish/publisher_test.go index 013052c7d..0a6ae6291 100644 --- a/pkg/watermark/publish/publisher_test.go +++ b/pkg/watermark/publish/publisher_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/numaproj/numaflow/pkg/watermark/ot" "github.com/stretchr/testify/assert" "github.com/numaproj/numaflow/pkg/isb" @@ -95,6 +96,16 @@ func TestPublisherWithSharedOTBucket(t *testing.T) { head := p.GetLatestWatermark() assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) + p.PublishIdleWatermark() + keys, err = p.otStore.GetAllKeys(p.ctx) + assert.NoError(t, err) + assert.Equal(t, []string{"publisherTestPod1"}, keys) + otValue, err := p.otStore.GetValue(p.ctx, keys[0]) + assert.NoError(t, err) + otDecode, err := ot.DecodeToOTValue(otValue) + assert.NoError(t, err) + assert.True(t, otDecode.Idle) + _ = p.Close() _, err = p.heartbeatStore.GetValue(ctx, publishEntity.GetName())