Skip to content

Commit

Permalink
feat: update watermark offset bucket implementation (#307)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
  • Loading branch information
jy4096 authored and whynowy committed Nov 7, 2022
1 parent f293301 commit 1198a60
Show file tree
Hide file tree
Showing 19 changed files with 340 additions and 257 deletions.
2 changes: 1 addition & 1 deletion config/advanced-install/namespaced-controller-wo-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ data:
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
history: 5
ttl: 3h
maxBytes: 0
replicas: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ data:
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
history: 5
ttl: 3h
maxBytes: 0
replicas: 3
Expand Down
2 changes: 1 addition & 1 deletion config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9274,7 +9274,7 @@ data:
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
history: 5
ttl: 3h
maxBytes: 0
replicas: 3
Expand Down
2 changes: 1 addition & 1 deletion config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9185,7 +9185,7 @@ data:
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
history: 5
ttl: 3h
maxBytes: 0
replicas: 3
Expand Down
2 changes: 1 addition & 1 deletion docs/numaflow-controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ data:
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
history: 5
ttl: 72h
maxBytes: 0
replicas: 3
Expand Down
7 changes: 3 additions & 4 deletions pkg/reduce/pnf/processandforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pnf
import (
"context"
"encoding/json"
"math"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -193,7 +192,7 @@ func TestProcessAndForward_Forward(t *testing.T) {
expected: []bool{false, true},
wmExpected: map[string]int64{
"buffer1": 120000,
"buffer2": math.MinInt64,
"buffer2": -1,
},
},
{
Expand Down Expand Up @@ -222,8 +221,8 @@ func TestProcessAndForward_Forward(t *testing.T) {
pf: createProcessAndForward(ctx, "test-drop-all", pbqManager, toBuffers3),
expected: []bool{true, true},
wmExpected: map[string]int64{
"buffer1": math.MinInt64,
"buffer2": math.MinInt64,
"buffer1": -1,
"buffer2": -1,
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/watermark/fetch/offset_timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ type OffsetTimeline struct {
func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline {
// Initialize a new empty watermarks DLL with nil values of the size capacity.
// This is to avoid length check: when a new element is added, the tail element will be deleted.
ot := OffsetTimeline{
offsetTimeline := OffsetTimeline{
ctx: ctx,
capacity: c,
log: logging.FromContext(ctx),
}

for i := 0; i < c; i++ {
ot.watermarks.PushBack(OffsetWatermark{
offsetTimeline.watermarks.PushBack(OffsetWatermark{
watermark: -1,
offset: -1,
})
}

return &ot
return &offsetTimeline
}

// OffsetWatermark stores the maximum offset for the given event time
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ProcessorManager struct {
otWatcher store.WatermarkKVWatcher
// heartbeat just tracks the heartbeat of each processing unit. we use it to mark a processing unit's status (e.g, inactive)
heartbeat *ProcessorHeartbeat
// processors has reference to the actual processing unit (ProcessorEntity) which includes offset timeline which is
// processors has reference to the actual processing unit (ProcessorEntitier) which includes offset timeline which is
// used for tracking watermark.
processors map[string]*ProcessorToFetch
lock sync.RWMutex
Expand Down
48 changes: 29 additions & 19 deletions pkg/watermark/fetch/processor_manager_inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@ package fetch

import (
"context"
"encoding/binary"
"fmt"
"strconv"
"sync"
"testing"
"time"

"github.com/numaproj/numaflow/pkg/watermark/ot"
"github.com/numaproj/numaflow/pkg/watermark/store/inmem"
"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

func otValueToBytes(offset int64, watermark int64) ([]byte, error) {
otValue := ot.Value{
Offset: offset,
Watermark: watermark,
}
otValueByte, err := otValue.EncodeToBytes()
return otValueByte, err
}

func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
var (
err error
Expand All @@ -45,23 +54,23 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
hb, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName)
hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName)
assert.NoError(t, err)
defer hb.Close()
ot, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName)
defer hbStore.Close()
otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName)
assert.NoError(t, err)
defer ot.Close()
defer otStore.Close()

b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(testOffset))
// this key format is meant for non-separate OT watcher
err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)
otValueByte, err := otValueToBytes(testOffset, epoch)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000
binary.LittleEndian.PutUint64(b, uint64(testOffset+5))
// this key format is meant for non-separate OT watcher
err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p2", "_", epoch), b)

otValueByte, err = otValueToBytes(testOffset+5, epoch)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh)
Expand All @@ -77,11 +86,11 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
defer wg.Done()
var err error
for i := 0; i < 3; i++ {
err = hb.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
err = hbStore.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
err = hb.DeleteKey(ctx, "p1")
err = hbStore.DeleteKey(ctx, "p1")
assert.NoError(t, err)
}()

Expand All @@ -91,7 +100,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
defer wg.Done()
var err error
for i := 0; i < 20; i++ {
err = hb.PutKV(ctx, "p2", []byte(fmt.Sprintf("%d", time.Now().Unix())))
err = hbStore.PutKV(ctx, "p2", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
Expand Down Expand Up @@ -151,7 +160,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
defer wg.Done()
var err error
for i := 0; i < 5; i++ {
err = hb.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
err = hbStore.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
Expand Down Expand Up @@ -197,8 +206,9 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.Equal(t, int64(100), p1.offsetTimeline.GetHeadOffset())

// publish a new watermark 101
binary.LittleEndian.PutUint64(b, uint64(testOffset+1))
err = ot.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)
otValueByte, err = otValueToBytes(testOffset+1, epoch)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

// "p1" becomes inactive after 5 loops
Expand All @@ -222,7 +232,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
defer wg.Done()
var err error
for i := 0; i < 3; i++ {
err = hb.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
err = hbStore.PutKV(ctx, "p1", []byte(fmt.Sprintf("%d", time.Now().Unix())))
assert.NoError(t, err)
time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second)
}
Expand Down
73 changes: 56 additions & 17 deletions pkg/watermark/fetch/processor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package fetch

import (
"context"
"encoding/binary"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -75,7 +74,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
Bucket: keyspace + "_OT",
Description: "",
MaxValueSize: 0,
History: 0,
History: 2,
TTL: 0,
MaxBytes: 0,
Storage: nats.MemoryStorage,
Expand All @@ -99,13 +98,28 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
defer otStore.Close()

// put values into otStore
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(testOffset))
err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)

// this first entry should not be in the offset timeline because we set the ot bucket history to 2
otValueByte, err := otValueToBytes(testOffset, epoch+100)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+1, epoch+200)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+2, epoch+300)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000
binary.LittleEndian.PutUint64(b, uint64(testOffset+5))
err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p2", "_", epoch), b)

otValueByte, err = otValueToBytes(testOffset+5, epoch)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

// create watchers for heartbeat and offset timeline
Expand Down Expand Up @@ -154,6 +168,18 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
}
}

for allProcessors["p1"].offsetTimeline.Dump() != "[1651161600300:102] -> [1651161600200:101] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 has the offset timeline [1651161600300:102] -> [1651161600200:101] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())

Expand Down Expand Up @@ -181,7 +207,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
assert.True(t, allProcessors["p1"].IsDeleted())
assert.True(t, allProcessors["p2"].IsActive())
// "p1" should be deleted after this GetWatermark offset=101
// because "p1" offsetTimeline's head offset=100, which is < inputOffset 103
// because "p1" offsetTimeline's head offset=102, which is < inputOffset 103
_ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }))
allProcessors = testBuffer.processorManager.GetAllProcessors()
assert.Equal(t, 1, len(allProcessors))
Expand Down Expand Up @@ -226,21 +252,22 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
assert.NotNil(t, p1.offsetTimeline)
// wait till the offsetTimeline has been populated
newP1head := p1.offsetTimeline.GetHeadOffset()
for newP1head == -1 {
// because the bucket hasn't been cleaned up, the new watcher will read all the history data to create this new offsetTimeline
for newP1head != 102 {
fmt.Println(p1.offsetTimeline.Dump())
select {
case <-ctx.Done():
t.Fatalf("expected head offset to not be equal to -1, %s", ctx.Err())
t.Fatalf("expected head offset to be 102, %s", ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
newP1head = p1.offsetTimeline.GetHeadOffset()
}
}
// because the bucket hasn't been cleaned up, the new watcher will read all the history data to create this new offsetTimeline
assert.Equal(t, int64(100), p1.offsetTimeline.GetHeadOffset())

// publish a new watermark 101
binary.LittleEndian.PutUint64(b, uint64(testOffset+1))
err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)
// publish a new watermark 103
otValueByte, err = otValueToBytes(testOffset+3, epoch)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

// "p1" becomes inactive after 5 loops
Expand Down Expand Up @@ -281,8 +308,20 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
}
}

// added 101 in the previous steps for p1, so the head should be 101 after resume
assert.Equal(t, int64(101), p1.offsetTimeline.GetHeadOffset())
// added 103 in the previous steps for p1, so the head should be 103 after resume
assert.Equal(t, int64(103), p1.offsetTimeline.GetHeadOffset())

for allProcessors["p1"].offsetTimeline.Dump() != "[1651161660000:103] -> [1651161600300:102] -> [1651161600200:101] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 has the offset timeline [1651161660000:103] -> [1651161600300:102] -> [1651161600200:101] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = testBuffer.processorManager.GetAllProcessors()
}
}

wg.Wait()
cancel()
Expand Down
18 changes: 8 additions & 10 deletions pkg/watermark/fetch/processor_to_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package fetch

import (
"context"
"encoding/binary"
"fmt"
"sync"

"github.com/numaproj/numaflow/pkg/watermark/ot"
"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/shared/logging"
Expand Down Expand Up @@ -134,21 +134,19 @@ func (p *ProcessorToFetch) startTimeLineWatcher() {
}
switch value.Operation() {
case store.KVPut:
epoch, skip, err := p.entity.ParseOTWatcherKey(value.Key())
if err != nil {
p.log.Errorw("Unable to convert value.PartitionID() to int64", zap.String("received", value.Key()), zap.Error(err))
if value.Key() != p.entity.BuildOTWatcherKey() {
continue
}
// if skip is set to true, it means the key update we received is for a different processor
if skip {
otValue, err := ot.DecodeToOTValue(value.Value())
if err != nil {
p.log.Errorw("Unable to decode the value", zap.String("processorEntity", p.entity.GetID()), zap.Error(err))
continue
}
uint64Value := binary.LittleEndian.Uint64(value.Value())
p.offsetTimeline.Put(OffsetWatermark{
watermark: epoch,
offset: int64(uint64Value),
watermark: otValue.Watermark,
offset: otValue.Offset,
})
p.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("epoch", epoch), zap.Uint64("value", uint64Value))
p.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("watermark", otValue.Watermark), zap.Int64("offset", otValue.Offset))
case store.KVDelete:
// we do not care about Delete events because the timeline bucket is meant to grow and the TTL will
// naturally trim the KV store.
Expand Down
Loading

0 comments on commit 1198a60

Please sign in to comment.