From 2f94a915be7627a6ef3349f5e47b30f47dd63561 Mon Sep 17 00:00:00 2001 From: Yashash H L <109710325+yhl25@users.noreply.github.com> Date: Tue, 29 Nov 2022 14:28:30 +0530 Subject: [PATCH] fix: unit tests for replay. Closes #373 (#377) Signed-off-by: Yashash H L --- pkg/isb/stores/simplebuffer/buffer.go | 17 ++- pkg/isb/stores/simplebuffer/options.go | 19 +++ pkg/pbq/pbqmanager.go | 1 - pkg/pbq/store/memory/stores.go | 21 ++- pkg/reduce/readloop/readloop.go | 3 + pkg/reduce/readloop/readloop_test.go | 197 +++++++++++++++++++++++++ pkg/reduce/reduce_test.go | 87 +++++++++++ pkg/sources/generator/tickgen_test.go | 2 +- 8 files changed, 338 insertions(+), 9 deletions(-) create mode 100644 pkg/isb/stores/simplebuffer/options.go create mode 100644 pkg/reduce/readloop/readloop_test.go diff --git a/pkg/isb/stores/simplebuffer/buffer.go b/pkg/isb/stores/simplebuffer/buffer.go index b8999deb5..6833072cf 100644 --- a/pkg/isb/stores/simplebuffer/buffer.go +++ b/pkg/isb/stores/simplebuffer/buffer.go @@ -38,6 +38,7 @@ type InMemoryBuffer struct { buffer []elem writeIdx int64 readIdx int64 + options *options rwlock *sync.RWMutex } @@ -54,7 +55,16 @@ type elem struct { } // NewInMemoryBuffer returns a new buffer. -func NewInMemoryBuffer(name string, size int64) *InMemoryBuffer { +func NewInMemoryBuffer(name string, size int64, opts ...Option) *InMemoryBuffer { + + bufferOptions := &options{ + readTimeOut: time.Second, // default read time out + } + + for _, o := range opts { + _ = o(bufferOptions) + } + sb := &InMemoryBuffer{ name: name, size: size, @@ -62,6 +72,7 @@ func NewInMemoryBuffer(name string, size int64) *InMemoryBuffer { writeIdx: int64(0), readIdx: int64(0), rwlock: new(sync.RWMutex), + options: bufferOptions, } return sb } @@ -161,9 +172,11 @@ func (b *InMemoryBuffer) blockIfEmpty(ctx context.Context) error { func (b *InMemoryBuffer) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) { var readMessages = make([]*isb.ReadMessage, 0, count) + cctx, cancel := context.WithTimeout(ctx, b.options.readTimeOut) + defer cancel() for i := int64(0); i < count; i++ { // wait till we have data - if err := b.blockIfEmpty(ctx); err != nil { + if err := b.blockIfEmpty(cctx); err != nil { if errors.Is(err, context.Canceled) { return readMessages, nil } diff --git a/pkg/isb/stores/simplebuffer/options.go b/pkg/isb/stores/simplebuffer/options.go new file mode 100644 index 000000000..d083926d0 --- /dev/null +++ b/pkg/isb/stores/simplebuffer/options.go @@ -0,0 +1,19 @@ +package simplebuffer + +import "time" + +// Options for simple buffer +type options struct { + // readTimeOut is the timeout needed for read timeout + readTimeOut time.Duration +} + +type Option func(options *options) error + +// WithReadTimeOut is used to set read timeout option +func WithReadTimeOut(timeout time.Duration) Option { + return func(o *options) error { + o.readTimeOut = timeout + return nil + } +} diff --git a/pkg/pbq/pbqmanager.go b/pkg/pbq/pbqmanager.go index 21d155ec9..210de411c 100644 --- a/pkg/pbq/pbqmanager.go +++ b/pkg/pbq/pbqmanager.go @@ -152,7 +152,6 @@ func (m *Manager) ShutDown(ctx context.Context) { // iterate through the map of pbq // close all the pbq var wg sync.WaitGroup - var PBQCloseBackOff = wait.Backoff{ Steps: math.MaxInt, Duration: 100 * time.Millisecond, diff --git a/pkg/pbq/store/memory/stores.go b/pkg/pbq/store/memory/stores.go index 084ecdb0c..4b4f61eff 100644 --- a/pkg/pbq/store/memory/stores.go +++ b/pkg/pbq/store/memory/stores.go @@ -28,11 +28,21 @@ import ( type memoryStores struct { storeSize int64 discoverFunc func(ctx context.Context) ([]partition.ID, error) + partitions map[partition.ID]store.Store } func NewMemoryStores(opts ...Option) store.StoreProvider { s := &memoryStores{ - storeSize: 100, + storeSize: 100, + partitions: make(map[partition.ID]store.Store), + } + // default discover function + s.discoverFunc = func(ctx context.Context) ([]partition.ID, error) { + partitionsIds := make([]partition.ID, 0) + for key := range s.partitions { + partitionsIds = append(partitionsIds, key) + } + return partitionsIds, nil } for _, o := range opts { o(s) @@ -41,6 +51,9 @@ func NewMemoryStores(opts ...Option) store.StoreProvider { } func (ms *memoryStores) CreateStore(ctx context.Context, partitionID partition.ID) (store.Store, error) { + if memStore, ok := ms.partitions[partitionID]; ok { + return memStore, nil + } memStore := &memoryStore{ writePos: 0, readPos: 0, @@ -50,13 +63,11 @@ func (ms *memoryStores) CreateStore(ctx context.Context, partitionID partition.I log: logging.FromContext(ctx).With("pbqStore", "Memory").With("partitionID", partitionID), partitionID: partitionID, } + ms.partitions[partitionID] = memStore return memStore, nil } func (ms *memoryStores) DiscoverPartitions(ctx context.Context) ([]partition.ID, error) { - if ms.discoverFunc != nil { - return ms.discoverFunc(ctx) - } - return []partition.ID{}, nil + return ms.discoverFunc(ctx) } diff --git a/pkg/reduce/readloop/readloop.go b/pkg/reduce/readloop/readloop.go index 9dafa03c2..6d8da0cf1 100644 --- a/pkg/reduce/readloop/readloop.go +++ b/pkg/reduce/readloop/readloop.go @@ -103,6 +103,9 @@ func (rl *ReadLoop) Startup(ctx context.Context) error { // crosses the window. alignedKeyedWindow := keyed.NewKeyedWindow(p.Start, p.End) + // add key to the window, so that when a new message with the watermark greater than + // the window end time comes, key will not be lost and the windows will be closed as expected + alignedKeyedWindow.AddKey(p.Key) // These windows have to be recreated as they are completely in-memory rl.windower.CreateWindow(alignedKeyedWindow) diff --git a/pkg/reduce/readloop/readloop_test.go b/pkg/reduce/readloop/readloop_test.go new file mode 100644 index 000000000..5679bd078 --- /dev/null +++ b/pkg/reduce/readloop/readloop_test.go @@ -0,0 +1,197 @@ +package readloop + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" + "github.com/numaproj/numaflow/pkg/pbq" + "github.com/numaproj/numaflow/pkg/pbq/partition" + "github.com/numaproj/numaflow/pkg/pbq/store/memory" + "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/window/strategy/fixed" + "github.com/stretchr/testify/assert" +) + +// PayloadForTest is a dummy payload for testing. +type PayloadForTest struct { + Key string + Value int +} + +type SumReduceTest struct { +} + +func (s *SumReduceTest) WhereTo(s2 string) ([]string, error) { + return []string{"reduce-buffer"}, nil +} + +func (s *SumReduceTest) ApplyReduce(ctx context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) { + sum := 0 + for msg := range messageStream { + var payload PayloadForTest + _ = json.Unmarshal(msg.Payload, &payload) + sum += payload.Value + } + + payload := PayloadForTest{Key: "sum", Value: sum} + b, _ := json.Marshal(payload) + ret := &isb.Message{ + Header: isb.Header{ + PaneInfo: isb.PaneInfo{ + StartTime: partitionID.Start, + EndTime: partitionID.End, + EventTime: partitionID.End, + }, + ID: "msgID", + Key: "result", + }, + Body: isb.Body{Payload: b}, + } + return []*isb.Message{ + ret, + }, nil +} + +// testing startup code with replay included using in-memory pbq +func TestReadLoop_Startup(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // partitions to be replayed + partitionIds := []partition.ID{ + { + Start: time.Unix(60, 0), + End: time.Unix(120, 0), + Key: "even", + }, + { + Start: time.Unix(120, 0), + End: time.Unix(180, 0), + Key: "odd", + }, + { + Start: time.Unix(180, 0), + End: time.Unix(240, 0), + Key: "even", + }, + } + + memStoreProvider := memory.NewMemoryStores(memory.WithStoreSize(100)) + + for _, id := range partitionIds { + memStore, err := memStoreProvider.CreateStore(ctx, id) + assert.NoError(t, err) + + var msgVal int + if id.Key == "even" { + msgVal = 2 + } else { + msgVal = 3 + } + + // write messages to the store, which will be replayed + storeMessages := createStoreMessages(ctx, id.Key, msgVal, id.Start, 10) + for _, msg := range storeMessages { + err = memStore.Write(msg) + assert.NoError(t, err) + } + } + + pManager, _ := pbq.NewManager(ctx, memStoreProvider, pbq.WithChannelBufferSize(10)) + + to1 := simplebuffer.NewInMemoryBuffer("reduce-buffer", 3) + toSteps := map[string]isb.BufferWriter{ + "reduce-buffer": to1, + } + + _, pw := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) + + window := fixed.NewFixed(60 * time.Second) + + rl := NewReadLoop(ctx, &SumReduceTest{}, pManager, window, toSteps, &SumReduceTest{}, pw) + + err := rl.Startup(ctx) + assert.NoError(t, err) + + // send a message with the higher watermark so that the windows will be closed + latestMessage := &isb.ReadMessage{ + Message: isb.Message{ + Header: isb.Header{ + PaneInfo: isb.PaneInfo{ + EventTime: time.Unix(300, 0), + StartTime: time.Time{}, + EndTime: time.Time{}, + IsLate: false, + }, + ID: "", + Key: "", + }, + Body: isb.Body{}, + }, + ReadOffset: isb.SimpleStringOffset(func() string { return "simple-offset" }), + Watermark: time.Unix(300, 0), + } + + rl.Process(ctx, []*isb.ReadMessage{latestMessage}) + for !to1.IsFull() { + select { + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + return + default: + time.Sleep(100 * time.Millisecond) + } + } + + msgs, readErr := to1.Read(ctx, 3) + assert.Nil(t, readErr) + assert.Len(t, msgs, 3) + + // since we have 3 partitions we should have 3 different outputs + var readMessagePayload1 PayloadForTest + var readMessagePayload2 PayloadForTest + var readMessagePayload3 PayloadForTest + _ = json.Unmarshal(msgs[0].Payload, &readMessagePayload1) + _ = json.Unmarshal(msgs[1].Payload, &readMessagePayload2) + _ = json.Unmarshal(msgs[2].Payload, &readMessagePayload3) + // since we had 10 messages in the store with value 2 and 3 + // the expected value is 20 and 30, since the reduce operation is sum + assert.Contains(t, []int{20, 30, 20}, readMessagePayload1.Value) + assert.Contains(t, []int{20, 30, 20}, readMessagePayload2.Value) + assert.Contains(t, []int{20, 30, 20}, readMessagePayload3.Value) + assert.Equal(t, "sum", readMessagePayload1.Key) + assert.Equal(t, "sum", readMessagePayload2.Key) + assert.Equal(t, "sum", readMessagePayload3.Key) + +} + +func createStoreMessages(ctx context.Context, key string, value int, eventTime time.Time, count int) []*isb.ReadMessage { + readMessages := make([]*isb.ReadMessage, count) + for j := 0; j < count; j++ { + result, _ := json.Marshal(PayloadForTest{ + Key: key, + Value: value, + }) + readMessage := &isb.ReadMessage{ + Message: isb.Message{ + Header: isb.Header{ + PaneInfo: isb.PaneInfo{ + EventTime: eventTime, + }, + ID: fmt.Sprintf("%d", value+1), + Key: key, + }, + Body: isb.Body{Payload: result}, + }, + ReadOffset: isb.SimpleStringOffset(func() string { return "simple-offset" }), + } + eventTime = eventTime.Add(time.Second) + readMessages[j] = readMessage + } + return readMessages +} diff --git a/pkg/reduce/reduce_test.go b/pkg/reduce/reduce_test.go index cc719d468..7ffd5ddd4 100644 --- a/pkg/reduce/reduce_test.go +++ b/pkg/reduce/reduce_test.go @@ -544,6 +544,93 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) { } +func TestDataForward_WithContextClose(t *testing.T) { + var ( + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + fromBufferSize = int64(100000) + toBufferSize = int64(10) + messages = []int{100, 99} + startTime = 0 // time in millis + fromBufferName = "source-reduce-buffer" + toBufferName = "reduce-to-buffer" + err error + ) + + cctx, childCancel := context.WithCancel(ctx) + + defer cancel() + defer childCancel() + + // create from buffers + fromBuffer := simplebuffer.NewInMemoryBuffer(fromBufferName, fromBufferSize) + + // create to buffers + buffer := simplebuffer.NewInMemoryBuffer(toBufferName, toBufferSize) + toBuffer := map[string]isb.BufferWriter{ + toBufferName: buffer, + } + + // create a store provider + storeProvider := memory.NewMemoryStores(memory.WithStoreSize(1000)) + + // create pbq manager + var pbqManager *pbq.Manager + pbqManager, err = pbq.NewManager(cctx, storeProvider, + pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10)) + assert.NoError(t, err) + + // create in memory watermark publisher and fetcher + f, p := fetcherAndPublisher(cctx, toBuffer, fromBuffer, t.Name()) + + // create a fixed window of 5 minutes + window := fixed.NewFixed(5 * time.Minute) + + var reduceDataForward *DataForward + reduceDataForward, err = NewDataForward(cctx, SumReduceTest{}, fromBuffer, toBuffer, pbqManager, CounterReduceTest{}, f, p, + window, WithReadBatchSize(1)) + assert.NoError(t, err) + + // start the forwarder + go reduceDataForward.Start(cctx) + // window duration is 300s, we are sending only 200 messages with event time less than window end time, so the window will not be closed + publishMessages(cctx, startTime, messages, 200, 10, p[fromBuffer.GetName()], fromBuffer) + // wait for the partitions to be created + for { + partitionsList := pbqManager.ListPartitions() + if len(partitionsList) > 0 { + childCancel() + break + } + select { + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + return + default: + time.Sleep(100 * time.Millisecond) + } + } + + var discoveredPartitions []partition.ID + for { + discoveredPartitions, _ = storeProvider.DiscoverPartitions(ctx) + + if len(discoveredPartitions) > 0 { + break + } + select { + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + return + default: + time.Sleep(100 * time.Millisecond) + } + } + + // since we have 2 different keys + assert.Len(t, discoveredPartitions, 2) + +} + // fetcherAndPublisher creates watermark fetcher and publishers, and keeps the processors alive by sending heartbeats func fetcherAndPublisher(ctx context.Context, toBuffers map[string]isb.BufferWriter, fromBuffer *simplebuffer.InMemoryBuffer, key string) (fetch.Fetcher, map[string]publish.Publisher) { diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index b9e1d72d6..40f77c7a7 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -36,7 +36,7 @@ import ( ) func TestRead(t *testing.T) { - dest := simplebuffer.NewInMemoryBuffer("writer", 20) + dest := simplebuffer.NewInMemoryBuffer("writer", 20, simplebuffer.WithReadTimeOut(10*time.Second)) ctx := context.Background() vertex := &dfv1.Vertex{ ObjectMeta: v1.ObjectMeta{