Skip to content

Commit

Permalink
feat: no operation watermark progressor (#90)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
vigith authored and whynowy committed Jul 8, 2022
1 parent 5268e3d commit 5d2f90e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 3 deletions.
7 changes: 4 additions & 3 deletions pkg/udf/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func (u *UDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
if sharedutil.IsWatermarkEnabled() {
fetchWatermark, publishWatermark = generic.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)
}

// build watermark progressors
fetchWatermark, publishWatermark = generic.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)

for _, e := range u.VertexInstance.Vertex.Spec.ToEdges {
writeOpts := []jetstreamisb.WriteOption{}
if x := e.Limits; x != nil && x.BufferMaxLength != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/watermark/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/isbsvc/clients"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand Down Expand Up @@ -93,6 +94,17 @@ func (g *GenericProgress) StopPublisher() {
// and it can write to many.
// The function is used only when watermarking is enabled on the pipeline.
func BuildJetStreamWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.VertexInstance) (fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher) {
// if watermark is not enabled, use no-op.
if !sharedutil.IsWatermarkEnabled() {
fetchWatermark = NewNoOpWMProgressor()
publishWatermark = make(map[string]publish.Publisher)
for _, buffer := range vertexInstance.Vertex.GetToBuffers() {
streamName := isbsvc.JetStreamName(vertexInstance.Vertex.Spec.PipelineName, buffer.Name)
publishWatermark[streamName] = NewNoOpWMProgressor()
}

return fetchWatermark, publishWatermark
}

log := logging.FromContext(ctx)
publishWatermark = make(map[string]publish.Publisher)
Expand Down
39 changes: 39 additions & 0 deletions pkg/watermark/generic/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package generic

import (
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/publish"
)

// NoOpWMProgressor is a no-op watermark progressor. As the name suggests, it does not do anything, no watermark is
// progressed.
type NoOpWMProgressor struct {
}

var _ fetch.Fetcher = (*NoOpWMProgressor)(nil)
var _ publish.Publisher = (*NoOpWMProgressor)(nil)

// NewNoOpWMProgressor returns NoOpWMProgressor.
func NewNoOpWMProgressor() *NoOpWMProgressor {
return &NoOpWMProgressor{}
}

// GetWatermark returns the default watermark.
func (n NoOpWMProgressor) GetWatermark(_ isb.Offset) processor.Watermark {
return processor.Watermark{}
}

// PublishWatermark does a no-op publish.
func (n NoOpWMProgressor) PublishWatermark(_ processor.Watermark, _ isb.Offset) {
}

// GetLatestWatermark returns the default watermark as the latest watermark.
func (n NoOpWMProgressor) GetLatestWatermark() processor.Watermark {
return processor.Watermark{}
}

// StopPublisher stops the no-op publisher.
func (n NoOpWMProgressor) StopPublisher() {
}

0 comments on commit 5d2f90e

Please sign in to comment.