Skip to content

Commit

Permalink
feat: pipeline watermark (#416)
Browse files Browse the repository at this point in the history
Signed-off-by: DevilAeron <guptavedant2312@gmail.com>
  • Loading branch information
veds-g authored and whynowy committed Jan 13, 2023
1 parent 33e9399 commit e7021c9
Show file tree
Hide file tree
Showing 10 changed files with 733 additions and 103 deletions.
530 changes: 477 additions & 53 deletions pkg/apis/proto/daemon/daemon.pb.go

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions pkg/apis/proto/daemon/daemon.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/apis/proto/daemon/daemon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ message GetVertexWatermarkRequest {
required string vertex = 2;
}

message GetPipelineWatermarksResponse {
repeated VertexWatermark pipelineWatermarks = 1;
}

// GetPipelineWatermarksRequest requests for the watermark for a pipeline.
message GetPipelineWatermarksRequest {
required string pipeline = 1;
}

// DaemonService is a grpc service that is used to provide APIs for giving any pipeline information.
service DaemonService {

Expand All @@ -112,4 +121,9 @@ service DaemonService {
rpc GetVertexWatermark (GetVertexWatermarkRequest) returns (GetVertexWatermarkResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/vertices/{vertex}/watermark";
};

// GetPipelineWatermarks return the watermark of the given pipeline
rpc GetPipelineWatermarks (GetPipelineWatermarksRequest) returns (GetPipelineWatermarksResponse) {
option (google.api.http).get = "/api/v1/pipelines/{pipeline}/watermarks";
};
}
11 changes: 11 additions & 0 deletions pkg/daemon/client/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,14 @@ func (dc *DaemonClient) GetVertexWatermark(ctx context.Context, pipeline, vertex
return rspn.VertexWatermark, nil
}
}

// GetPipelineWatermarks returns the []VertexWatermark response instance for GetPipelineWatermarksRequest
func (dc *DaemonClient) GetPipelineWatermarks(ctx context.Context, pipeline string) ([]*daemon.VertexWatermark, error) {
if rspn, err := dc.client.GetPipelineWatermarks(ctx, &daemon.GetPipelineWatermarksRequest{
Pipeline: &pipeline,
}); err != nil {
return nil, err
} else {
return rspn.PipelineWatermarks, nil
}
}
55 changes: 51 additions & 4 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
log := logging.FromContext(ctx)
resp := new(daemon.GetVertexWatermarkResponse)
vertexName := request.GetVertex()
retFalse := false
retTrue := true
isWatermarkEnabled := !ps.pipeline.Spec.Watermark.Disabled

// If watermark is not enabled, return time zero
if ps.pipeline.Spec.Watermark.Disabled {
Expand All @@ -81,7 +80,7 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Watermark: &timeZero,
IsWatermarkEnabled: &retFalse,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
return resp, nil
Expand All @@ -108,8 +107,56 @@ func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request
Pipeline: &ps.pipeline.Name,
Vertex: request.Vertex,
Watermark: &latestWatermark,
IsWatermarkEnabled: &retTrue,
IsWatermarkEnabled: &isWatermarkEnabled,
}
resp.VertexWatermark = v
return resp, nil
}

// GetPipelineWatermarks is used to return the head watermarks for a given pipeline.
func (ps *pipelineMetadataQuery) GetPipelineWatermarks(ctx context.Context, request *daemon.GetPipelineWatermarksRequest) (*daemon.GetPipelineWatermarksResponse, error) {
resp := new(daemon.GetPipelineWatermarksResponse)
isWatermarkEnabled := !ps.pipeline.Spec.Watermark.Disabled

// If watermark is not enabled, return time zero
if ps.pipeline.Spec.Watermark.Disabled {
timeZero := time.Unix(0, 0).UnixMilli()
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
for k := range ps.watermarkFetchers {
vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermark: &timeZero,
IsWatermarkEnabled: &isWatermarkEnabled,
}
i++
}
resp.PipelineWatermarks = watermarkArr
return resp, nil
}

// Watermark is enabled
watermarkArr := make([]*daemon.VertexWatermark, len(ps.watermarkFetchers))
i := 0
for k, vertexFetchers := range ps.watermarkFetchers {
var latestWatermark = int64(-1)
for _, fetcher := range vertexFetchers {
watermark := fetcher.GetHeadWatermark().UnixMilli()
if watermark > latestWatermark {
latestWatermark = watermark
}
}
vertexName := k
watermarkArr[i] = &daemon.VertexWatermark{
Pipeline: &ps.pipeline.Name,
Vertex: &vertexName,
Watermark: &latestWatermark,
IsWatermarkEnabled: &isWatermarkEnabled,
}
i++
}
resp.PipelineWatermarks = watermarkArr
return resp, nil
}
20 changes: 20 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,26 @@ func (h *handler) GetVertexWatermark(c *gin.Context) {
c.JSON(http.StatusOK, l)
}

// GetPipelineWatermarks is used to provide the head watermarks for a given pipeline
func (h *handler) GetPipelineWatermarks(c *gin.Context) {
ns := c.Param("namespace")
pipeline := c.Param("pipeline")
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
defer func() {
_ = client.Close()
}()
l, err := client.GetPipelineWatermarks(context.Background(), pipeline)
if err != nil {
c.JSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, l)
}

func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}
1 change: 1 addition & 0 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ func v1Routes(r gin.IRouter) {
r.GET("/namespaces/:namespace/pipelines/:pipeline/edges/:edge", handler.GetPipelineEdge)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/metrics", handler.GetVertexMetrics)
r.GET("/namespaces/:namespace/pipelines/:pipeline/vertices/:vertex/watermark", handler.GetVertexWatermark)
r.GET("/namespaces/:namespace/pipelines/:pipeline/watermarks", handler.GetPipelineWatermarks)
}
50 changes: 28 additions & 22 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ package e2e
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
daemonclient "github.com/numaproj/numaflow/pkg/daemon/client"
. "github.com/numaproj/numaflow/test/fixtures"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type FunctionalSuite struct {
Expand Down Expand Up @@ -217,33 +217,39 @@ func (s *FunctionalSuite) TestWatermarkEnabled() {
assert.Equal(s.T(), "input", *bufferInfo.FromVertex)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
for _, vertex := range vertexList {
wg.Add(1)
go func(vertex string) {
defer wg.Done()
// timeout for checking watermark progression
isProgressing, err := isWatermarkProgressing(ctx, client, pipelineName, vertex, 3)
assert.NoError(s.T(), err, "TestWatermarkEnabled failed %s\n", err)
assert.Truef(s.T(), isProgressing, "isWatermarkProgressing\n")
}(vertex)
}
wg.Wait()
isProgressing, err := isWatermarkProgressing(ctx, client, pipelineName, vertexList, 3)
assert.NoError(s.T(), err, "TestWatermarkEnabled failed %s\n", err)
assert.Truef(s.T(), isProgressing, "isWatermarkProgressing\n")
}

// isWatermarkProgressing checks whether the watermark for a given vertex is progressing monotonically.
// isWatermarkProgressing checks whether the watermark for each vertex in a pipeline is progressing monotonically.
// progressCount is the number of progressions the watermark value should undertake within the timeout deadline for it
// to be considered as valid.
func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClient, pipelineName string, vertexName string, progressCount int) (bool, error) {
prevWatermark := int64(-1)
func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClient, pipelineName string, vertexList []string, progressCount int) (bool, error) {
prevWatermark := make([]int64, len(vertexList))
for i := 0; i < len(vertexList); i++ {
prevWatermark[i] = -1
}
for i := 0; i < progressCount; i++ {
currentWatermark := prevWatermark
for currentWatermark <= prevWatermark {
wm, err := client.GetVertexWatermark(ctx, pipelineName, vertexName)
for func(current []int64, prev []int64) bool {
for j := 0; j < len(current); j++ {
if current[j] > prev[j] {
return false
}
}
return true
}(currentWatermark, prevWatermark) {
wm, err := client.GetPipelineWatermarks(ctx, pipelineName)
if err != nil {
return false, err
}
currentWatermark = *wm.Watermark
pipelineWatermarks := make([]int64, len(vertexList))
idx := 0
for _, v := range wm {
pipelineWatermarks[idx] = *v.Watermark
idx++
}
currentWatermark = pipelineWatermarks
select {
case <-ctx.Done():
return false, ctx.Err()
Expand Down
Loading

0 comments on commit e7021c9

Please sign in to comment.