Skip to content

Commit

Permalink
fix: unit tests for replay. Closes #373 (#377)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Nov 29, 2022
1 parent b8a5c8a commit 2f94a91
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 9 deletions.
17 changes: 15 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type InMemoryBuffer struct {
buffer []elem
writeIdx int64
readIdx int64
options *options
rwlock *sync.RWMutex
}

Expand All @@ -54,14 +55,24 @@ type elem struct {
}

// NewInMemoryBuffer returns a new buffer.
func NewInMemoryBuffer(name string, size int64) *InMemoryBuffer {
func NewInMemoryBuffer(name string, size int64, opts ...Option) *InMemoryBuffer {

bufferOptions := &options{
readTimeOut: time.Second, // default read time out
}

for _, o := range opts {
_ = o(bufferOptions)
}

sb := &InMemoryBuffer{
name: name,
size: size,
buffer: make([]elem, size),
writeIdx: int64(0),
readIdx: int64(0),
rwlock: new(sync.RWMutex),
options: bufferOptions,
}
return sb
}
Expand Down Expand Up @@ -161,9 +172,11 @@ func (b *InMemoryBuffer) blockIfEmpty(ctx context.Context) error {

func (b *InMemoryBuffer) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) {
var readMessages = make([]*isb.ReadMessage, 0, count)
cctx, cancel := context.WithTimeout(ctx, b.options.readTimeOut)
defer cancel()
for i := int64(0); i < count; i++ {
// wait till we have data
if err := b.blockIfEmpty(ctx); err != nil {
if err := b.blockIfEmpty(cctx); err != nil {
if errors.Is(err, context.Canceled) {
return readMessages, nil
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/isb/stores/simplebuffer/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package simplebuffer

import "time"

// Options for simple buffer
type options struct {
// readTimeOut is the timeout needed for read timeout
readTimeOut time.Duration
}

type Option func(options *options) error

// WithReadTimeOut is used to set read timeout option
func WithReadTimeOut(timeout time.Duration) Option {
return func(o *options) error {
o.readTimeOut = timeout
return nil
}
}
1 change: 0 additions & 1 deletion pkg/pbq/pbqmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (m *Manager) ShutDown(ctx context.Context) {
// iterate through the map of pbq
// close all the pbq
var wg sync.WaitGroup

var PBQCloseBackOff = wait.Backoff{
Steps: math.MaxInt,
Duration: 100 * time.Millisecond,
Expand Down
21 changes: 16 additions & 5 deletions pkg/pbq/store/memory/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,21 @@ import (
type memoryStores struct {
storeSize int64
discoverFunc func(ctx context.Context) ([]partition.ID, error)
partitions map[partition.ID]store.Store
}

func NewMemoryStores(opts ...Option) store.StoreProvider {
s := &memoryStores{
storeSize: 100,
storeSize: 100,
partitions: make(map[partition.ID]store.Store),
}
// default discover function
s.discoverFunc = func(ctx context.Context) ([]partition.ID, error) {
partitionsIds := make([]partition.ID, 0)
for key := range s.partitions {
partitionsIds = append(partitionsIds, key)
}
return partitionsIds, nil
}
for _, o := range opts {
o(s)
Expand All @@ -41,6 +51,9 @@ func NewMemoryStores(opts ...Option) store.StoreProvider {
}

func (ms *memoryStores) CreateStore(ctx context.Context, partitionID partition.ID) (store.Store, error) {
if memStore, ok := ms.partitions[partitionID]; ok {
return memStore, nil
}
memStore := &memoryStore{
writePos: 0,
readPos: 0,
Expand All @@ -50,13 +63,11 @@ func (ms *memoryStores) CreateStore(ctx context.Context, partitionID partition.I
log: logging.FromContext(ctx).With("pbqStore", "Memory").With("partitionID", partitionID),
partitionID: partitionID,
}
ms.partitions[partitionID] = memStore

return memStore, nil
}

func (ms *memoryStores) DiscoverPartitions(ctx context.Context) ([]partition.ID, error) {
if ms.discoverFunc != nil {
return ms.discoverFunc(ctx)
}
return []partition.ID{}, nil
return ms.discoverFunc(ctx)
}
3 changes: 3 additions & 0 deletions pkg/reduce/readloop/readloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (rl *ReadLoop) Startup(ctx context.Context) error {
// crosses the window.

alignedKeyedWindow := keyed.NewKeyedWindow(p.Start, p.End)
// add key to the window, so that when a new message with the watermark greater than
// the window end time comes, key will not be lost and the windows will be closed as expected
alignedKeyedWindow.AddKey(p.Key)

// These windows have to be recreated as they are completely in-memory
rl.windower.CreateWindow(alignedKeyedWindow)
Expand Down
197 changes: 197 additions & 0 deletions pkg/reduce/readloop/readloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package readloop

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/pbq"
"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/pbq/store/memory"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/window/strategy/fixed"
"github.com/stretchr/testify/assert"
)

// PayloadForTest is a dummy payload for testing.
type PayloadForTest struct {
Key string
Value int
}

type SumReduceTest struct {
}

func (s *SumReduceTest) WhereTo(s2 string) ([]string, error) {
return []string{"reduce-buffer"}, nil
}

func (s *SumReduceTest) ApplyReduce(ctx context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.Message, error) {
sum := 0
for msg := range messageStream {
var payload PayloadForTest
_ = json.Unmarshal(msg.Payload, &payload)
sum += payload.Value
}

payload := PayloadForTest{Key: "sum", Value: sum}
b, _ := json.Marshal(payload)
ret := &isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
StartTime: partitionID.Start,
EndTime: partitionID.End,
EventTime: partitionID.End,
},
ID: "msgID",
Key: "result",
},
Body: isb.Body{Payload: b},
}
return []*isb.Message{
ret,
}, nil
}

// testing startup code with replay included using in-memory pbq
func TestReadLoop_Startup(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// partitions to be replayed
partitionIds := []partition.ID{
{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "even",
},
{
Start: time.Unix(120, 0),
End: time.Unix(180, 0),
Key: "odd",
},
{
Start: time.Unix(180, 0),
End: time.Unix(240, 0),
Key: "even",
},
}

memStoreProvider := memory.NewMemoryStores(memory.WithStoreSize(100))

for _, id := range partitionIds {
memStore, err := memStoreProvider.CreateStore(ctx, id)
assert.NoError(t, err)

var msgVal int
if id.Key == "even" {
msgVal = 2
} else {
msgVal = 3
}

// write messages to the store, which will be replayed
storeMessages := createStoreMessages(ctx, id.Key, msgVal, id.Start, 10)
for _, msg := range storeMessages {
err = memStore.Write(msg)
assert.NoError(t, err)
}
}

pManager, _ := pbq.NewManager(ctx, memStoreProvider, pbq.WithChannelBufferSize(10))

to1 := simplebuffer.NewInMemoryBuffer("reduce-buffer", 3)
toSteps := map[string]isb.BufferWriter{
"reduce-buffer": to1,
}

_, pw := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)

window := fixed.NewFixed(60 * time.Second)

rl := NewReadLoop(ctx, &SumReduceTest{}, pManager, window, toSteps, &SumReduceTest{}, pw)

err := rl.Startup(ctx)
assert.NoError(t, err)

// send a message with the higher watermark so that the windows will be closed
latestMessage := &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
EventTime: time.Unix(300, 0),
StartTime: time.Time{},
EndTime: time.Time{},
IsLate: false,
},
ID: "",
Key: "",
},
Body: isb.Body{},
},
ReadOffset: isb.SimpleStringOffset(func() string { return "simple-offset" }),
Watermark: time.Unix(300, 0),
}

rl.Process(ctx, []*isb.ReadMessage{latestMessage})
for !to1.IsFull() {
select {
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
return
default:
time.Sleep(100 * time.Millisecond)
}
}

msgs, readErr := to1.Read(ctx, 3)
assert.Nil(t, readErr)
assert.Len(t, msgs, 3)

// since we have 3 partitions we should have 3 different outputs
var readMessagePayload1 PayloadForTest
var readMessagePayload2 PayloadForTest
var readMessagePayload3 PayloadForTest
_ = json.Unmarshal(msgs[0].Payload, &readMessagePayload1)
_ = json.Unmarshal(msgs[1].Payload, &readMessagePayload2)
_ = json.Unmarshal(msgs[2].Payload, &readMessagePayload3)
// since we had 10 messages in the store with value 2 and 3
// the expected value is 20 and 30, since the reduce operation is sum
assert.Contains(t, []int{20, 30, 20}, readMessagePayload1.Value)
assert.Contains(t, []int{20, 30, 20}, readMessagePayload2.Value)
assert.Contains(t, []int{20, 30, 20}, readMessagePayload3.Value)
assert.Equal(t, "sum", readMessagePayload1.Key)
assert.Equal(t, "sum", readMessagePayload2.Key)
assert.Equal(t, "sum", readMessagePayload3.Key)

}

func createStoreMessages(ctx context.Context, key string, value int, eventTime time.Time, count int) []*isb.ReadMessage {
readMessages := make([]*isb.ReadMessage, count)
for j := 0; j < count; j++ {
result, _ := json.Marshal(PayloadForTest{
Key: key,
Value: value,
})
readMessage := &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
PaneInfo: isb.PaneInfo{
EventTime: eventTime,
},
ID: fmt.Sprintf("%d", value+1),
Key: key,
},
Body: isb.Body{Payload: result},
},
ReadOffset: isb.SimpleStringOffset(func() string { return "simple-offset" }),
}
eventTime = eventTime.Add(time.Second)
readMessages[j] = readMessage
}
return readMessages
}

0 comments on commit 2f94a91

Please sign in to comment.