Skip to content

Commit

Permalink
chore: fix watermark components ownership (#963)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Derek Wang <whynowy@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
3 people committed Aug 19, 2023
1 parent d57e139 commit ed53342
Show file tree
Hide file tree
Showing 23 changed files with 342 additions and 251 deletions.
15 changes: 9 additions & 6 deletions pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,20 @@ func (ds *daemonServer) Run(ctx context.Context) error {
default:
return fmt.Errorf("unsupported isbsvc buffer type %q", ds.isbSvcType)
}
wmFetchers, err := service.GetUXEdgeWatermarkFetchers(ctx, ds.pipeline, isbSvcClient)
processorManagers, err := service.GetProcessorManagers(ctx, ds.pipeline, isbSvcClient)
if err != nil {
return fmt.Errorf("failed to get processor managers, %w", err)
}
wmFetchers, err := service.GetUXEdgeWatermarkFetchers(ctx, ds.pipeline, processorManagers)
if err != nil {
return fmt.Errorf("failed to get watermark fetchers, %w", err)
}

// Stop all the processor managers, it will stop watching for offset and heartbeat updates.
defer func() {
for _, fetcherList := range wmFetchers {
for _, f := range fetcherList {
if err := f.Close(); err != nil {
log.Errorw("Failed to close watermark fetcher", zap.Error(err))
}
for _, pms := range processorManagers {
for _, pm := range pms {
pm.Close()
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
)

type mockGetType func(url string) (*http.Response, error)
Expand Down Expand Up @@ -67,7 +67,7 @@ func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffe
return nil
}

func (ms *mockIsbSvcClient) CreateUXWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.UXFetcher, error) {
func (ms *mockIsbSvcClient) CreateProcessorManagers(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]*processor.ProcessorManager, error) {
return nil, nil
}

Expand Down
29 changes: 24 additions & 5 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,46 @@ import (
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
)

// GetUXEdgeWatermarkFetchers returns a map of the watermark fetchers, where key is the buffer name,
// value is a list of fetchers to the buffers.
func GetUXEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline, isbSvcClient isbsvc.ISBService) (map[v1alpha1.Edge][]fetch.UXFetcher, error) {
func GetUXEdgeWatermarkFetchers(ctx context.Context, pipeline *v1alpha1.Pipeline, processorManagers map[v1alpha1.Edge][]*processor.ProcessorManager) (map[v1alpha1.Edge][]fetch.UXFetcher, error) {
var wmFetchers = make(map[v1alpha1.Edge][]fetch.UXFetcher)
if pipeline.Spec.Watermark.Disabled {
return wmFetchers, nil
}

for edge, pms := range processorManagers {
var fetchers []fetch.UXFetcher
for _, pm := range pms {
fetchers = append(fetchers, fetch.NewEdgeFetcher(ctx, pm, pipeline.GetVertex(edge.To).GetPartitionCount()))
}
wmFetchers[edge] = fetchers
}

return wmFetchers, nil
}

// GetProcessorManagers returns a map of ProcessorManager per edge.
func GetProcessorManagers(ctx context.Context, pipeline *v1alpha1.Pipeline, isbsvcClient isbsvc.ISBService) (map[v1alpha1.Edge][]*processor.ProcessorManager, error) {
var processorManagers = make(map[v1alpha1.Edge][]*processor.ProcessorManager)
if pipeline.Spec.Watermark.Disabled {
return processorManagers, nil
}

for _, edge := range pipeline.ListAllEdges() {
bucketName := v1alpha1.GenerateEdgeBucketName(pipeline.Namespace, pipeline.Name, edge.From, edge.To)
isReduce := pipeline.GetVertex(edge.To).IsReduceUDF()
partitionCount := pipeline.GetVertex(edge.To).GetPartitionCount()
wmFetcherList, err := isbSvcClient.CreateUXWatermarkFetcher(ctx, bucketName, partitionCount, isReduce)
pms, err := isbsvcClient.CreateProcessorManagers(ctx, bucketName, partitionCount, isReduce)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher %w", err)
return nil, fmt.Errorf("failed to create processor manager %w", err)
}
wmFetchers[edge] = wmFetcherList
processorManagers[edge] = pms
}
return wmFetchers, nil
return processorManagers, nil
}

// GetPipelineWatermarks is used to return the head watermarks for a given pipeline.
Expand Down
22 changes: 0 additions & 22 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ type testForwardFetcher struct {
// for forward_test.go only
}

func (t *testForwardFetcher) Close() error {
// won't be used
return nil
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
Expand Down Expand Up @@ -227,14 +222,6 @@ func TestNewInterStepDataForward(t *testing.T) {
fetchWatermark := &testForwardFetcher{}
publishWatermark, otStores := buildPublisherMapAndOTStore(toSteps)

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
}()

f, err := NewInterStepDataForward(vertex, fromStep, toSteps, &myForwardToAllTest{}, &myForwardToAllTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(batchSize), WithVertexType(dfv1.VertexTypeMapUDF), WithUDFStreaming(tt.streamEnabled))

assert.NoError(t, err)
Expand Down Expand Up @@ -384,7 +371,6 @@ func TestNewInterStepDataForward(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
Expand Down Expand Up @@ -551,7 +537,6 @@ func TestNewInterStepDataForward(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
Expand Down Expand Up @@ -798,11 +783,6 @@ func (t *testWMBFetcher) RevertBoolValue() {
t.WMBTestDiffHeadWMB = !t.WMBTestDiffHeadWMB
}

func (t *testWMBFetcher) Close() error {
// won't be used
return nil
}

func (t *testWMBFetcher) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark {
return t.getWatermark()
}
Expand Down Expand Up @@ -885,7 +865,6 @@ func TestNewInterStepDataForwardIdleWatermark(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
Expand Down Expand Up @@ -1046,7 +1025,6 @@ func TestNewInterStepDataForwardIdleWatermark_Reset(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = fetchWatermark.Close()
for _, p := range publishWatermark {
_ = p.Close()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package isbsvc
import (
"context"

"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
)

// ISBService is an interface used to do the operations on ISBSvc
Expand All @@ -28,7 +28,7 @@ type ISBService interface {
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string) error
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string) error
GetBufferInfo(ctx context.Context, buffer string) (*BufferInfo, error)
CreateUXWatermarkFetcher(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]fetch.UXFetcher, error)
CreateProcessorManagers(ctx context.Context, bucketName string, partitions int, isReduce bool) ([]*processor.ProcessorManager, error)
}

// createOptions describes the options for creating buffers and buckets
Expand Down
12 changes: 5 additions & 7 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)
Expand Down Expand Up @@ -316,9 +315,9 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf
return bufferInfo, nil
}

// CreateUXWatermarkFetcher is used to create watermark fetcher for the given bucket.
func (jss *jetStreamSvc) CreateUXWatermarkFetcher(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]fetch.UXFetcher, error) {
var watermarkFetchers []fetch.UXFetcher
// CreateProcessorManagers is used to create processor manager for the given bucket.
func (jss *jetStreamSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) {
var processorManagers []*processor.ProcessorManager
fetchers := 1
if isReduce {
fetchers = fromBufferPartitionCount
Expand All @@ -342,10 +341,9 @@ func (jss *jetStreamSvc) CreateUXWatermarkFetcher(ctx context.Context, bucketNam
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, pm, fromBufferPartitionCount)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
processorManagers = append(processorManagers, pm)
}
return watermarkFetchers, nil
return processorManagers, nil
}

func JetStreamName(bufferName string) string {
Expand Down
12 changes: 5 additions & 7 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)

type isbsRedisSvc struct {
Expand Down Expand Up @@ -140,10 +139,10 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe
return bufferInfo, nil
}

// CreateUXWatermarkFetcher is used to create watermark fetcher for the given bucket
func (r *isbsRedisSvc) CreateUXWatermarkFetcher(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]fetch.UXFetcher, error) {
// CreateProcessorManagers is used to create the processor managers for the given bucket.
func (r *isbsRedisSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) {
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
var watermarkFetchers []fetch.UXFetcher
var processorManagers []*processor.ProcessorManager
fetchers := 1
if isReduce {
fetchers = fromBufferPartitionCount
Expand All @@ -158,9 +157,8 @@ func (r *isbsRedisSvc) CreateUXWatermarkFetcher(ctx context.Context, bucketName
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, pm, fromBufferPartitionCount)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
processorManagers = append(processorManagers, pm)
}

return watermarkFetchers, nil
return processorManagers, nil
}
9 changes: 0 additions & 9 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ func TestReduceDataForward_IdleWM(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publisherMap {
_ = p.Close()
}
Expand Down Expand Up @@ -594,7 +593,6 @@ func TestReduceDataForward_Count(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publisherMap {
_ = p.Close()
}
Expand Down Expand Up @@ -676,7 +674,6 @@ func TestReduceDataForward_AllowedLatencyCount(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publisherMap {
_ = p.Close()
}
Expand Down Expand Up @@ -762,7 +759,6 @@ func TestReduceDataForward_Sum(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publishersMap {
_ = p.Close()
}
Expand Down Expand Up @@ -845,7 +841,6 @@ func TestReduceDataForward_Max(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publishersMap {
_ = p.Close()
}
Expand Down Expand Up @@ -928,7 +923,6 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publishersMap {
_ = p.Close()
}
Expand Down Expand Up @@ -1032,7 +1026,6 @@ func TestReduceDataForward_NonKeyed(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publishersMap {
_ = p.Close()
}
Expand Down Expand Up @@ -1123,7 +1116,6 @@ func TestDataForward_WithContextClose(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publishersMap {
_ = p.Close()
}
Expand Down Expand Up @@ -1218,7 +1210,6 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) {

// close the fetcher and publishers
defer func() {
_ = f.Close()
for _, p := range publishersMap {
_ = p.Close()
}
Expand Down
Loading

0 comments on commit ed53342

Please sign in to comment.