Skip to content

Commit

Permalink
feat: allowedLateness to support late data ingestion (#703)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Co-authored-by: jyu6 <juanlu_yu@intuit.com>
  • Loading branch information
vigith and jyu6 committed Apr 25, 2023
1 parent 15a01a6 commit f518d99
Show file tree
Hide file tree
Showing 18 changed files with 684 additions and 399 deletions.
4 changes: 4 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18063,6 +18063,10 @@
"io.numaproj.numaflow.v1alpha1.GroupBy": {
"description": "GroupBy indicates it is a reducer UDF",
"properties": {
"allowedLateness": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "AllowedLateness allows late data to be included for the Reduce operation as long as the late data is not later than (Watermark - AllowedLateness)."
},
"keyed": {
"type": "boolean"
},
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -18071,6 +18071,10 @@
"window"
],
"properties": {
"allowedLateness": {
"description": "AllowedLateness allows late data to be included for the Reduce operation as long as the late data is not later than (Watermark - AllowedLateness).",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"keyed": {
"type": "boolean"
},
Expand Down
2 changes: 2 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4843,6 +4843,8 @@ spec:
type: object
groupBy:
properties:
allowedLateness:
type: string
keyed:
type: boolean
storage:
Expand Down
2 changes: 2 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,8 @@ spec:
type: object
groupBy:
properties:
allowedLateness:
type: string
keyed:
type: boolean
storage:
Expand Down
4 changes: 4 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7288,6 +7288,8 @@ spec:
type: object
groupBy:
properties:
allowedLateness:
type: string
keyed:
type: boolean
storage:
Expand Down Expand Up @@ -11462,6 +11464,8 @@ spec:
type: object
groupBy:
properties:
allowedLateness:
type: string
keyed:
type: boolean
storage:
Expand Down
4 changes: 4 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7288,6 +7288,8 @@ spec:
type: object
groupBy:
properties:
allowedLateness:
type: string
keyed:
type: boolean
storage:
Expand Down Expand Up @@ -11462,6 +11464,8 @@ spec:
type: object
groupBy:
properties:
allowedLateness:
type: string
keyed:
type: boolean
storage:
Expand Down
15 changes: 15 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,21 @@ Window describes the windowing strategy.
</tr>
<tr>
<td>
<code>allowedLateness</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
AllowedLateness allows late data to be included for the Reduce operation
as long as the late data is not later than (Watermark -
AllowedLateness).
</p>
</td>
</tr>
<tr>
<td>
<code>storage</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.PBQStorage"> PBQStorage </a>
</em>
Expand Down
829 changes: 442 additions & 387 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pkg/apis/numaflow/v1alpha1/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ type GroupBy struct {
Window Window `json:"window" protobuf:"bytes,1,opt,name=window"`
// +optional
Keyed bool `json:"keyed" protobuf:"bytes,2,opt,name=keyed"`
// AllowedLateness allows late data to be included for the Reduce operation as long as the late data is not later
// than (Watermark - AllowedLateness).
// +optional
AllowedLateness *metav1.Duration `json:"allowedLateness,omitempty" protobuf:"bytes,3,opt,name=allowedLateness"`
// Storage is used to define the PBQ storage for a reduce vertex.
Storage *PBQStorage `json:"storage,omitempty" protobuf:"bytes,3,opt,name=storage"`
Storage *PBQStorage `json:"storage,omitempty" protobuf:"bytes,4,opt,name=storage"`
}

// Window describes windowing strategy
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/numaflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func NewDataForward(ctx context.Context,
idleManager := wmb.NewIdleManager(len(toBuffers))

rl, err := readloop.NewReadLoop(ctx, vertexInstance.Vertex.Spec.Name, vertexInstance.Vertex.Spec.PipelineName,
vertexInstance.Replica, udf, pbqManager, windowingStrategy, toBuffers, whereToDecider, watermarkPublishers, idleManager)
vertexInstance.Replica, udf, pbqManager, windowingStrategy, toBuffers, whereToDecider, watermarkPublishers,
idleManager, options.allowedLateness)

df := &DataForward{
ctx: ctx,
Expand Down
152 changes: 152 additions & 0 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,82 @@ func TestReduceDataForward_Count(t *testing.T) {
// since the window duration is 60s and tps is 1, the count should be 60
assert.Equal(t, int64(60), int64(readMessagePayload.Value))
assert.Equal(t, "count", readMessagePayload.Key)
}

func TestReduceDataForward_AllowedLatencyCount(t *testing.T) {
var (
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
batchSize = 1
fromBufferSize = int64(100000)
toBufferSize = int64(10)
messageValue = 7
startTime = 60000 // time in millis
fromBufferName = "source-reduce-buffer"
toBufferName = "reduce-to-buffer"
pipelineName = "test-reduce-pipeline"
err error
)

defer cancel()

// create from buffers
fromBuffer := simplebuffer.NewInMemoryBuffer(fromBufferName, fromBufferSize)

// create to buffers
buffer := simplebuffer.NewInMemoryBuffer(toBufferName, toBufferSize)
toBuffer := map[string]isb.BufferWriter{
toBufferName: buffer,
}

// create pbq manager
var pbqManager *pbq.Manager
pbqManager, err = pbq.NewManager(ctx, "reduce", pipelineName, 0, memory.NewMemoryStores(memory.WithStoreSize(1000)),
pbq.WithReadTimeout(1*time.Second), pbq.WithChannelBufferSize(10))
assert.NoError(t, err)

// create in memory watermark publisher and fetcher
f, p := fetcherAndPublisher(ctx, fromBuffer, t.Name())
publisherMap, _ := buildPublisherMapAndOTStore(ctx, toBuffer, pipelineName)

// create a fixed window of 10s
window := fixed.NewFixed(5 * time.Second)

var reduceDataForward *DataForward
allowedLatency := 1000
reduceDataForward, err = NewDataForward(ctx, CounterReduceTest{}, keyedVertex, fromBuffer, toBuffer, pbqManager, CounterReduceTest{}, f, publisherMap,
window, WithReadBatchSize(int64(batchSize)),
WithAllowedLateness(time.Duration(allowedLatency)*time.Millisecond),
)
assert.NoError(t, err)

// start the forwarder
go reduceDataForward.Start()

// start the producer
go publishMessagesAllowedLatency(ctx, startTime, messageValue, allowedLatency, 5, batchSize, p, fromBuffer)

// we are reading only one message here but the count should be equal to
// the number of keyed windows that closed
msgs, readErr := buffer.Read(ctx, 1)
assert.Nil(t, readErr)
for len(msgs) == 0 || msgs[0].Header.Kind == isb.WMB {
select {
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
return
default:
time.Sleep(100 * time.Millisecond)
msgs, readErr = buffer.Read(ctx, 1)
assert.Nil(t, readErr)
}
}

// assert the output of reduce
var readMessagePayload PayloadForTest
_ = json.Unmarshal(msgs[0].Payload, &readMessagePayload)
// without allowedLatency the value would be 4
assert.Equal(t, int64(5), int64(readMessagePayload.Value))
assert.Equal(t, "count", readMessagePayload.Key)
}

// Sum operation with 2 minutes window
Expand Down Expand Up @@ -1165,3 +1240,80 @@ func buildIsbMessage(messageValue int, eventTime time.Time) isb.Message {
Body: isb.Body{Payload: result},
}
}

// publishMessagesAllowedLatency is only used for xxxAllowedLatency test
func publishMessagesAllowedLatency(ctx context.Context, startTime int, message int, allowedLatency int, windowSize int, batchSize int, publish publish.Publisher, fromBuffer *simplebuffer.InMemoryBuffer) {
counter := 0
inputMsgs := make([]isb.Message, batchSize)
for i := 0; i <= windowSize; i++ {
// to simulate real usage
time.Sleep(time.Second)
et := startTime + i*1000
// normal cases, eventTime = watermark
wm := wmb.Watermark(time.UnixMilli(int64(et)))
inputMsg := buildIsbMessage(message, time.UnixMilli(int64(et)))
if i == windowSize-1 {
// et is now (window.EndTime - 1000)
et = et + 1000
// set wm to be the window.EndTime
// because we've set allowedLatency, so COB is not triggered
wm = wmb.Watermark(time.UnixMilli(int64(et)))
// set the eventTime = wm
inputMsg = buildIsbMessage(message, time.UnixMilli(int64(et)))
}
if i == windowSize {
// et is now window.EndTime
// set wm to be the window.EndTime + allowedLatency to trigger COB
wm = wmb.Watermark(time.UnixMilli(int64(et + allowedLatency)))
// set message eventTime to still be inside the window
// so the eventTime is now smaller than wm, the message isLate is set to true
et = et - 100
inputMsg = buildIsbMessageAllowedLatency(message, time.UnixMilli(int64(et)))
}
inputMsgs[counter] = inputMsg
counter += 1

if counter >= batchSize {
offsets, _ := fromBuffer.Write(ctx, inputMsgs)
if len(offsets) > 0 {
publish.PublishWatermark(wm, offsets[len(offsets)-1])
}
counter = 0
}
}

// dummy message to start a new window
// COB won't happen for this window
et := 6000000
wm := wmb.Watermark(time.UnixMilli(int64(et)))
inputMsgs = []isb.Message{buildIsbMessage(message, time.UnixMilli(int64(et)))}
offsets, _ := fromBuffer.Write(ctx, inputMsgs)
if len(offsets) > 0 {
publish.PublishWatermark(wm, offsets[len(offsets)-1])
}
}

func buildIsbMessageAllowedLatency(messageValue int, eventTime time.Time) isb.Message {
var messageKey []string
if messageValue%2 == 0 {
messageKey = []string{"even"}
} else {
messageKey = []string{"odd"}
}

result, _ := json.Marshal(PayloadForTest{
Key: messageKey[0],
Value: messageValue,
})
return isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: eventTime,
IsLate: true,
},
ID: fmt.Sprintf("%d", messageValue),
Keys: messageKey,
},
Body: isb.Body{Payload: result},
}
}
15 changes: 14 additions & 1 deletion pkg/reduce/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ limitations under the License.
package reduce

import (
"time"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

// Options for forwarding the message
type Options struct {
// readBatchSize is the default batch size
readBatchSize int64
// allowedLateness is the time.Duration it waits after the watermark has progressed for late-date to be included
allowedLateness time.Duration
}

type Option func(*Options) error

func DefaultOptions() *Options {
return &Options{
readBatchSize: dfv1.DefaultReadBatchSize,
readBatchSize: dfv1.DefaultReadBatchSize,
allowedLateness: time.Duration(0),
}
}

Expand All @@ -41,3 +46,11 @@ func WithReadBatchSize(f int64) Option {
return nil
}
}

// WithAllowedLateness sets allowedLateness
func WithAllowedLateness(t time.Duration) Option {
return func(o *Options) error {
o.allowedLateness = t
return nil
}
}

0 comments on commit f518d99

Please sign in to comment.