From 1f33bf8b45039ce235b930047ab3b77e0f1d8635 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Sun, 20 Aug 2023 21:05:29 -0700 Subject: [PATCH] refactor: build wmstore and wmstorewatcher directly, and remove some unnecessary fields (#970) Signed-off-by: Derek Wang --- cmd/commands/daemon_server.go | 4 +- cmd/commands/isbsvc_create.go | 2 +- cmd/commands/isbsvc_delete.go | 2 +- cmd/commands/isbsvc_validate.go | 2 +- cmd/commands/processor.go | 2 +- cmd/commands/side_inputs_init.go | 3 +- cmd/commands/side_inputs_manager.go | 4 +- cmd/commands/side_inputs_watcher.go | 3 +- pkg/forward/forward_test.go | 11 +- pkg/isbsvc/jetstream_service.go | 40 ++---- pkg/isbsvc/redis_service.go | 16 +-- pkg/reduce/data_forward_test.go | 25 ++-- pkg/reduce/pnf/processandforward_test.go | 12 +- pkg/shared/kvs/inmem/kv_store.go | 26 ++-- pkg/shared/kvs/inmem/kv_watch.go | 6 +- pkg/shared/kvs/jetstream/kv_store.go | 4 +- pkg/shared/kvs/jetstream/kv_watch.go | 4 +- pkg/sideinputs/initializer/initializer.go | 2 +- .../initializer/initializer_test.go | 16 +-- pkg/sideinputs/synchronizer/synchronizer.go | 2 +- .../synchronizer/synchronizer_test.go | 9 +- pkg/sources/forward/data_forward_test.go | 12 +- pkg/sources/generator/tickgen_test.go | 7 +- pkg/sources/http/http_test.go | 3 +- pkg/sources/kafka/handler_test.go | 3 +- pkg/sources/kafka/reader_test.go | 12 +- pkg/sources/nats/nats_test.go | 3 +- pkg/sources/source.go | 13 +- pkg/watermark/fetch/edge_fetcher.go | 4 +- pkg/watermark/fetch/edge_fetcher_set_test.go | 41 +++--- pkg/watermark/fetch/edge_fetcher_test.go | 122 +++++++----------- pkg/watermark/fetch/source_fetcher.go | 2 +- pkg/watermark/generic/jetstream/generic.go | 93 ++++--------- pkg/watermark/processor/processor_manager.go | 14 +- .../processor/processor_manager_test.go | 48 +++---- pkg/watermark/processor/processor_to_fetch.go | 4 +- .../processor/processor_to_fetch_test.go | 2 +- pkg/watermark/publish/publisher_inmem_test.go | 14 +- pkg/watermark/publish/publisher_test.go | 7 +- pkg/watermark/store/watermark_store.go | 67 ++++++++-- .../store/watermark_store_watcher.go | 49 ++++++- pkg/watermark/timeline/offset_timeline.go | 4 +- .../timeline/offset_timeline_test.go | 8 +- 43 files changed, 325 insertions(+), 402 deletions(-) diff --git a/cmd/commands/daemon_server.go b/cmd/commands/daemon_server.go index e0edb08f5..8cc8bfdf2 100644 --- a/cmd/commands/daemon_server.go +++ b/cmd/commands/daemon_server.go @@ -39,13 +39,11 @@ func NewDaemonServerCommand() *cobra.Command { Use: "daemon-server", Short: "Start the daemon server", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("daemon-server") - pl, err := decodePipeline() if err != nil { return fmt.Errorf("failed to decode the pipeline spec: %v", err) } - + logger := logging.NewLogger().Named("daemon-server").With("pipeline", pl.Name) ctx := logging.WithLogger(signals.SetupSignalHandler(), logger) server := server.NewDaemonServer(pl, v1alpha1.ISBSvcType(isbSvcType)) return server.Run(ctx) diff --git a/cmd/commands/isbsvc_create.go b/cmd/commands/isbsvc_create.go index b767099e0..a8999daac 100644 --- a/cmd/commands/isbsvc_create.go +++ b/cmd/commands/isbsvc_create.go @@ -45,11 +45,11 @@ func NewISBSvcCreateCommand() *cobra.Command { Use: "isbsvc-create", Short: "Create buffers, buckets and side inputs store", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("isbsvc-create") pipelineName, defined := os.LookupEnv(v1alpha1.EnvPipelineName) if !defined { return fmt.Errorf("required environment variable '%s' not defined", v1alpha1.EnvPipelineName) } + logger := logging.NewLogger().Named("isbsvc-create").With("pipeline", pipelineName) isbSvcConfig := &v1alpha1.BufferServiceConfig{} encodedBufferServiceConfig := os.Getenv(v1alpha1.EnvISBSvcConfig) if len(encodedBufferServiceConfig) > 0 { diff --git a/cmd/commands/isbsvc_delete.go b/cmd/commands/isbsvc_delete.go index aaf431826..0952ab265 100644 --- a/cmd/commands/isbsvc_delete.go +++ b/cmd/commands/isbsvc_delete.go @@ -42,11 +42,11 @@ func NewISBSvcDeleteCommand() *cobra.Command { Use: "isbsvc-delete", Short: "Delete ISB Service buffers, buckets and side inputs store", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("isbsvc-delete") pipelineName, defined := os.LookupEnv(v1alpha1.EnvPipelineName) if !defined { return fmt.Errorf("required environment variable '%s' not defined", v1alpha1.EnvPipelineName) } + logger := logging.NewLogger().Named("isbsvc-delete").With("pipeline", pipelineName) var isbsClient isbsvc.ISBService var err error ctx := logging.WithLogger(context.Background(), logger) diff --git a/cmd/commands/isbsvc_validate.go b/cmd/commands/isbsvc_validate.go index 84c1b699c..50ac8ec5f 100644 --- a/cmd/commands/isbsvc_validate.go +++ b/cmd/commands/isbsvc_validate.go @@ -45,11 +45,11 @@ func NewISBSvcValidateCommand() *cobra.Command { Use: "isbsvc-validate", Short: "Validate ISB Service buffers, buckets and side inputs store", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("isbsvc-validate") pipelineName, existing := os.LookupEnv(v1alpha1.EnvPipelineName) if !existing { return fmt.Errorf("environment variable %q not existing", v1alpha1.EnvPipelineName) } + logger := logging.NewLogger().Named("isbsvc-validate").With("pipeline", pipelineName) var isbsClient isbsvc.ISBService var err error ctx := logging.WithLogger(context.Background(), logger) diff --git a/cmd/commands/processor.go b/cmd/commands/processor.go index 9f4cd130c..03bdfaffd 100644 --- a/cmd/commands/processor.go +++ b/cmd/commands/processor.go @@ -68,7 +68,7 @@ func NewProcessorCommand() *cobra.Command { if err != nil { return fmt.Errorf("invalid replica %q", replicaStr) } - log = log.With("vertex", vertex.Name) + log = log.With("pipeline", vertex.Spec.PipelineName).With("vertex", vertex.Spec.Name) vertexInstance := &dfv1.VertexInstance{ Vertex: vertex, Hostname: hostname, diff --git a/cmd/commands/side_inputs_init.go b/cmd/commands/side_inputs_init.go index 4233c07c2..54df1b52a 100644 --- a/cmd/commands/side_inputs_init.go +++ b/cmd/commands/side_inputs_init.go @@ -37,7 +37,6 @@ func NewSideInputsInitCommand() *cobra.Command { Use: "side-inputs-init", Short: "Start the Side Inputs init service", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("side-inputs-init") pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName) if !defined { @@ -47,7 +46,7 @@ func NewSideInputsInitCommand() *cobra.Command { if len(sideInputs) == 0 { return fmt.Errorf("no side inputs are defined for this vertex") } - + logger := logging.NewLogger().Named("side-inputs-init").With("pipeline", pipelineName) ctx := logging.WithLogger(context.Background(), logger) sideInputsInitializer := initializer.NewSideInputsInitializer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs) return sideInputsInitializer.Run(ctx) diff --git a/cmd/commands/side_inputs_manager.go b/cmd/commands/side_inputs_manager.go index 2d916a2ac..76d5cdaa9 100644 --- a/cmd/commands/side_inputs_manager.go +++ b/cmd/commands/side_inputs_manager.go @@ -39,8 +39,6 @@ func NewSideInputsManagerCommand() *cobra.Command { Use: "side-inputs-manager", Short: "Start a Side Inputs Manager", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("side-inputs-manager") - encodedSiceInputSpec, defined := os.LookupEnv(dfv1.EnvSideInputObject) if !defined { return fmt.Errorf("environment %q is not defined", dfv1.EnvSideInputObject) @@ -59,6 +57,8 @@ func NewSideInputsManagerCommand() *cobra.Command { return fmt.Errorf("environment %q is not defined", dfv1.EnvPipelineName) } + logger := logging.NewLogger().Named("side-inputs-manager").With("pipeline", pipelineName) + ctx := logging.WithLogger(signals.SetupSignalHandler(), logger) sideInputManager := manager.NewSideInputsManager(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInput) return sideInputManager.Start(ctx) diff --git a/cmd/commands/side_inputs_watcher.go b/cmd/commands/side_inputs_watcher.go index 8da54fe37..7be5889ee 100644 --- a/cmd/commands/side_inputs_watcher.go +++ b/cmd/commands/side_inputs_watcher.go @@ -38,8 +38,6 @@ func NewSideInputsWatcherCommand() *cobra.Command { Use: "side-inputs-watcher", Short: "Start the Side Inputs Watcher", RunE: func(cmd *cobra.Command, args []string) error { - logger := logging.NewLogger().Named("side-inputs-watcher") - pipelineName, defined := os.LookupEnv(dfv1.EnvPipelineName) if !defined { return fmt.Errorf("environment %q is not defined", dfv1.EnvPipelineName) @@ -49,6 +47,7 @@ func NewSideInputsWatcherCommand() *cobra.Command { return fmt.Errorf("no side inputs are defined for this vertex") } + logger := logging.NewLogger().Named("side-inputs-watcher").With("pipeline", pipelineName) ctx := logging.WithLogger(signals.SetupSignalHandler(), logger) sideInputsWatcher := synchronizer.NewSideInputsSynchronizer(dfv1.ISBSvcType(isbSvcType), pipelineName, sideInputsStore, sideInputs) return sideInputsWatcher.Start(ctx) diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index a007a5c3a..cfda10e92 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -34,7 +34,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" "github.com/numaproj/numaflow/pkg/shared/kvs" - "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" "github.com/numaproj/numaflow/pkg/shared/logging" udfapplier "github.com/numaproj/numaflow/pkg/udf/function" "github.com/numaproj/numaflow/pkg/watermark/generic" @@ -47,8 +46,7 @@ import ( const ( testPipelineName = "testPipeline" testProcessorEntity = "publisherTestPod" - publisherHBKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "PROCESSORS" - publisherOTKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "OT" + publisherKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s" ) var ( @@ -1603,10 +1601,9 @@ func buildPublisherMapAndOTStore(toBuffers map[string][]isb.BufferWriter) (map[s publishers := make(map[string]publish.Publisher) otStores := make(map[string]kvs.KVStorer) for key, partitionedBuffers := range toBuffers { - heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key)) - otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key)) - otStores[key] = otKV - p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) + store, _, _, _ := wmstore.BuildInmemWatermarkStore(ctx, fmt.Sprintf(publisherKeyspace, key)) + otStores[key] = store.OffsetTimelineStore() + p := publish.NewPublish(ctx, processorEntity, store, int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) publishers[key] = p } return publishers, otStores diff --git a/pkg/isbsvc/jetstream_service.go b/pkg/isbsvc/jetstream_service.go index aa774311d..4ea8d016e 100644 --- a/pkg/isbsvc/jetstream_service.go +++ b/pkg/isbsvc/jetstream_service.go @@ -28,10 +28,9 @@ import ( "go.uber.org/zap" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" - "github.com/numaproj/numaflow/pkg/shared/kvs/jetstream" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/processor" - "github.com/numaproj/numaflow/pkg/watermark/store" + wmstore "github.com/numaproj/numaflow/pkg/watermark/store" ) type jetStreamSvc struct { @@ -146,7 +145,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b for _, bucket := range buckets { // Create offset-timeline KV - otKVName := JetStreamOTKVName(bucket) + otKVName := wmstore.JetStreamOTKVName(bucket) if _, err := js.KeyValue(otKVName); err != nil { if !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) { return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", otKVName, err) @@ -165,7 +164,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b } } // Create processor KV - procKVName := JetStreamProcessorKVName(bucket) + procKVName := wmstore.JetStreamProcessorKVName(bucket) if _, err := js.KeyValue(procKVName); err != nil { if !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) { return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", procKVName, err) @@ -209,12 +208,12 @@ func (jss *jetStreamSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, b log.Infow("Succeeded to delete a stream", zap.String("stream", streamName)) } for _, bucket := range buckets { - otKVName := JetStreamOTKVName(bucket) + otKVName := wmstore.JetStreamOTKVName(bucket) if err := js.DeleteKeyValue(otKVName); err != nil && !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) { return fmt.Errorf("failed to delete offset timeline KV %q, %w", otKVName, err) } log.Infow("Succeeded to delete an offset timeline KV", zap.String("kvName", otKVName)) - procKVName := JetStreamProcessorKVName(bucket) + procKVName := wmstore.JetStreamProcessorKVName(bucket) if err := js.DeleteKeyValue(procKVName); err != nil && !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) { return fmt.Errorf("failed to delete processor KV %q, %w", procKVName, err) } @@ -251,12 +250,12 @@ func (jss *jetStreamSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers, } } for _, bucket := range buckets { - otKVName := JetStreamOTKVName(bucket) + otKVName := wmstore.JetStreamOTKVName(bucket) if _, err := js.KeyValue(otKVName); err != nil { return fmt.Errorf("failed to query OT KV %q, %w", otKVName, err) } - procKVName := JetStreamProcessorKVName(bucket) + procKVName := wmstore.JetStreamProcessorKVName(bucket) if _, err := js.KeyValue(procKVName); err != nil { return fmt.Errorf("failed to query processor KV %q, %w", procKVName, err) } @@ -317,6 +316,8 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf // CreateProcessorManagers is used to create processor manager for the given bucket. func (jss *jetStreamSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) { + log := logging.FromContext(ctx).With("bucket", bucketName) + ctx = logging.WithLogger(ctx, log) var processorManagers []*processor.ProcessorManager fetchers := 1 if isReduce { @@ -324,22 +325,15 @@ func (jss *jetStreamSvc) CreateProcessorManagers(ctx context.Context, bucketName } // if it's not a reduce vertex, we don't need multiple watermark fetchers. We use common fetcher among all partitions. for i := 0; i < fetchers; i++ { - hbKVName := JetStreamProcessorKVName(bucketName) - hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbKVName, jss.jsClient) + storeWatcher, err := wmstore.BuildJetStreamWatermarkStoreWatcher(ctx, bucketName, jss.jsClient) if err != nil { - return nil, err - } - otKVName := JetStreamOTKVName(bucketName) - otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otKVName, jss.jsClient) - if err != nil { - return nil, err + return nil, fmt.Errorf("failed at new JetStream watermark store watcher, %w", err) } - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch) var pm *processor.ProcessorManager if isReduce { - pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) + pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) } else { - pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount)) + pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount)) } processorManagers = append(processorManagers, pm) } @@ -350,14 +344,6 @@ func JetStreamName(bufferName string) string { return bufferName } -func JetStreamOTKVName(bucketName string) string { - return fmt.Sprintf("%s_OT", bucketName) -} - -func JetStreamProcessorKVName(bucketName string) string { - return fmt.Sprintf("%s_PROCESSORS", bucketName) -} - func JetStreamSideInputsStoreKVName(sideInputStoreName string) string { return fmt.Sprintf("%s_SIDE_INPUTS", sideInputStoreName) } diff --git a/pkg/isbsvc/redis_service.go b/pkg/isbsvc/redis_service.go index 733866cf4..3b58951a4 100644 --- a/pkg/isbsvc/redis_service.go +++ b/pkg/isbsvc/redis_service.go @@ -24,12 +24,10 @@ import ( "go.uber.org/zap" redis2 "github.com/numaproj/numaflow/pkg/isb/stores/redis" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" - "github.com/numaproj/numaflow/pkg/watermark/processor" - "github.com/numaproj/numaflow/pkg/watermark/store" - redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis" "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/watermark/processor" + "github.com/numaproj/numaflow/pkg/watermark/store" ) type isbsRedisSvc struct { @@ -141,6 +139,8 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buffe // CreateProcessorManagers is used to create the processor managers for the given bucket. func (r *isbsRedisSvc) CreateProcessorManagers(ctx context.Context, bucketName string, fromBufferPartitionCount int, isReduce bool) ([]*processor.ProcessorManager, error) { + log := logging.FromContext(ctx).With("bucket", bucketName) + ctx = logging.WithLogger(ctx, log) // Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher. var processorManagers []*processor.ProcessorManager fetchers := 1 @@ -148,14 +148,12 @@ func (r *isbsRedisSvc) CreateProcessorManagers(ctx context.Context, bucketName s fetchers = fromBufferPartitionCount } for i := 0; i < fetchers; i++ { - hbWatcher := noop.NewKVOpWatch() - otWatcher := noop.NewKVOpWatch() - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) + storeWatcher, _ := store.BuildNoOpWatermarkStoreWatcher() var pm *processor.ProcessorManager if isReduce { - pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) + pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) } else { - pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount)) + pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount)) } processorManagers = append(processorManagers, pm) } diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index 2285c97f1..c180ef4e0 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -37,7 +37,6 @@ import ( "github.com/numaproj/numaflow/pkg/reduce/pbq/store/memory" "github.com/numaproj/numaflow/pkg/reduce/pnf" "github.com/numaproj/numaflow/pkg/shared/kvs" - "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/publish" @@ -1281,17 +1280,14 @@ func TestReduceDataForward_SumMultiPartitions(t *testing.T) { func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryBuffer, key string) (fetch.Fetcher, publish.Publisher) { var ( - keyspace = key - hbBucketName = keyspace + "_PROCESSORS" - otBucketName = keyspace + "_OT" + keyspace = key ) sourcePublishEntity := processor.NewProcessorEntity(fromBuffer.GetName()) - hb, hbWatcherCh, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName) - ot, otWatcherCh, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) + store, hbWatcherCh, otWatcherCh, _ := wmstore.BuildInmemWatermarkStore(ctx, keyspace) // publisher for source - sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, wmstore.BuildWatermarkStore(hb, ot), 1, publish.WithAutoRefreshHeartbeatDisabled()) + sourcePublisher := publish.NewPublish(ctx, sourcePublishEntity, store, 1, publish.WithAutoRefreshHeartbeatDisabled()) // publish heartbeat manually for the processor go func() { @@ -1300,16 +1296,14 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB case <-ctx.Done(): return default: - _ = hb.PutKV(ctx, fromBuffer.GetName(), []byte(fmt.Sprintf("%d", time.Now().Unix()))) + _ = store.HeartbeatStore().PutKV(ctx, fromBuffer.GetName(), []byte(fmt.Sprintf("%d", time.Now().Unix()))) time.Sleep(time.Duration(1) * time.Second) } } }() - hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh) - otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh) - storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - pm := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1, processor.WithIsReduce(true)) + storeWatcher, _ := wmstore.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh) + pm := processor.NewProcessorManager(ctx, storeWatcher, 1, processor.WithIsReduce(true)) for waitForReadyP := pm.GetProcessor(fromBuffer.GetName()); waitForReadyP == nil; waitForReadyP = pm.GetProcessor(fromBuffer.GetName()) { // wait until the test processor has been added to the processor list time.Sleep(time.Millisecond * 100) @@ -1336,10 +1330,9 @@ func buildPublisherMapAndOTStore(ctx context.Context, toBuffers map[string][]isb index := int32(0) for key, partitionedBuffers := range toBuffers { publishEntity := processor.NewProcessorEntity(key) - hb, hbKVEntry, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, key+"_PROCESSORS") - ot, otKVEntry, _ := inmem.NewKVInMemKVStore(ctx, pipelineName, key+"_OT") - otStores[key] = ot - p := publish.NewPublish(ctx, publishEntity, wmstore.BuildWatermarkStore(hb, ot), int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) + store, hbKVEntry, otKVEntry, _ := wmstore.BuildInmemWatermarkStore(ctx, key) + otStores[key] = store.OffsetTimelineStore() + p := publish.NewPublish(ctx, publishEntity, store, int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) publishers[key] = p go func() { diff --git a/pkg/reduce/pnf/processandforward_test.go b/pkg/reduce/pnf/processandforward_test.go index ff6632fb5..10f18941e 100644 --- a/pkg/reduce/pnf/processandforward_test.go +++ b/pkg/reduce/pnf/processandforward_test.go @@ -19,7 +19,6 @@ package pnf import ( "context" "encoding/json" - "fmt" "io" "strings" "testing" @@ -34,7 +33,6 @@ import ( "github.com/numaproj/numaflow/pkg/reduce/pbq/store/memory" "github.com/numaproj/numaflow/pkg/sdkclient/udf/clienttest" "github.com/numaproj/numaflow/pkg/shared/kvs" - "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/processor" @@ -55,8 +53,7 @@ import ( const ( testPipelineName = "testPipeline" testProcessorEntity = "publisherTestPod" - publisherHBKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "PROCESSORS" - publisherOTKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "OT" + publisherKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s" ) type forwardTest struct { @@ -455,10 +452,9 @@ func buildPublisherMapAndOTStore(toBuffers map[string][]isb.BufferWriter) (map[s publishers := make(map[string]publish.Publisher) otStores := make(map[string]kvs.KVStorer) for key, partitionedBuffers := range toBuffers { - heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key)) - otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key)) - otStores[key] = otKV - p := publish.NewPublish(ctx, processorEntity, wmstore.BuildWatermarkStore(heartbeatKV, otKV), int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) + store, _, _, _ := wmstore.BuildInmemWatermarkStore(ctx, publisherKeyspace) + otStores[key] = store.OffsetTimelineStore() + p := publish.NewPublish(ctx, processorEntity, store, int32(len(partitionedBuffers)), publish.WithAutoRefreshHeartbeatDisabled(), publish.WithPodHeartbeatRate(1)) publishers[key] = p } return publishers, otStores diff --git a/pkg/shared/kvs/inmem/kv_store.go b/pkg/shared/kvs/inmem/kv_store.go index e29abd46e..9627d0149 100644 --- a/pkg/shared/kvs/inmem/kv_store.go +++ b/pkg/shared/kvs/inmem/kv_store.go @@ -25,9 +25,9 @@ import ( "sort" "sync" - "github.com/numaproj/numaflow/pkg/shared/kvs" "go.uber.org/zap" + "github.com/numaproj/numaflow/pkg/shared/kvs" "github.com/numaproj/numaflow/pkg/shared/logging" ) @@ -55,25 +55,23 @@ func (k kvEntry) Operation() kvs.KVWatchOp { // inMemStore implements the watermark's KV store backed up by in mem store. type inMemStore struct { - pipelineName string - bucketName string - kv map[string][]byte - kvLock sync.RWMutex - kvEntryCh chan kvs.KVEntry - isClosed bool - log *zap.SugaredLogger + bucketName string + kv map[string][]byte + kvLock sync.RWMutex + kvEntryCh chan kvs.KVEntry + isClosed bool + log *zap.SugaredLogger } var _ kvs.KVStorer = (*inMemStore)(nil) // NewKVInMemKVStore returns inMemStore. -func NewKVInMemKVStore(ctx context.Context, pipelineName string, bucketName string) (kvs.KVStorer, chan kvs.KVEntry, error) { +func NewKVInMemKVStore(ctx context.Context, bucketName string) (kvs.KVStorer, chan kvs.KVEntry, error) { s := &inMemStore{ - pipelineName: pipelineName, - bucketName: bucketName, - kv: make(map[string][]byte), - kvEntryCh: make(chan kvs.KVEntry, 10), - log: logging.FromContext(ctx).With("pipeline", pipelineName).With("bucketName", bucketName), + bucketName: bucketName, + kv: make(map[string][]byte), + kvEntryCh: make(chan kvs.KVEntry, 10), + log: logging.FromContext(ctx).With("bucketName", bucketName), } return s, s.kvEntryCh, nil } diff --git a/pkg/shared/kvs/inmem/kv_watch.go b/pkg/shared/kvs/inmem/kv_watch.go index 329cf7b85..0a2720591 100644 --- a/pkg/shared/kvs/inmem/kv_watch.go +++ b/pkg/shared/kvs/inmem/kv_watch.go @@ -29,7 +29,6 @@ import ( // inMemWatch implements the watermark's KV store backed up by in memory store. type inMemWatch struct { - pipelineName string bucketName string kvEntryCh <-chan kvs.KVEntry kvHistory []kvs.KVEntry @@ -42,15 +41,14 @@ type inMemWatch struct { var _ kvs.KVWatcher = (*inMemWatch)(nil) // NewInMemWatch returns inMemWatch which implements the KVWatcher interface. -func NewInMemWatch(ctx context.Context, pipelineName string, bucketName string, kvEntryCh <-chan kvs.KVEntry) (kvs.KVWatcher, error) { +func NewInMemWatch(ctx context.Context, bucketName string, kvEntryCh <-chan kvs.KVEntry) (kvs.KVWatcher, error) { k := &inMemWatch{ - pipelineName: pipelineName, bucketName: bucketName, kvEntryCh: kvEntryCh, kvHistory: []kvs.KVEntry{}, updatesChMap: make(map[string]chan kvs.KVEntry), doneCh: make(chan struct{}), - log: logging.FromContext(ctx).With("pipeline", pipelineName).With("bucketName", bucketName), + log: logging.FromContext(ctx).With("bucketName", bucketName), } return k, nil } diff --git a/pkg/shared/kvs/jetstream/kv_store.go b/pkg/shared/kvs/jetstream/kv_store.go index 9f5930cab..47a523060 100644 --- a/pkg/shared/kvs/jetstream/kv_store.go +++ b/pkg/shared/kvs/jetstream/kv_store.go @@ -42,11 +42,11 @@ type jetStreamStore struct { var _ kvs.KVStorer = (*jetStreamStore)(nil) // NewKVJetStreamKVStore returns KVJetStreamStore. -func NewKVJetStreamKVStore(ctx context.Context, pipelineName string, kvName string, client *jsclient.NATSClient, opts ...JSKVStoreOption) (kvs.KVStorer, error) { +func NewKVJetStreamKVStore(ctx context.Context, kvName string, client *jsclient.NATSClient, opts ...JSKVStoreOption) (kvs.KVStorer, error) { var err error var jsStore = &jetStreamStore{ client: client, - log: logging.FromContext(ctx).With("pipeline", pipelineName).With("kvName", kvName), + log: logging.FromContext(ctx).With("kvName", kvName), } // for JetStream KeyValue store, the bucket should have been created in advance diff --git a/pkg/shared/kvs/jetstream/kv_watch.go b/pkg/shared/kvs/jetstream/kv_watch.go index 8b0a3b5dc..c742fb64e 100644 --- a/pkg/shared/kvs/jetstream/kv_watch.go +++ b/pkg/shared/kvs/jetstream/kv_watch.go @@ -45,7 +45,7 @@ type jetStreamWatch struct { var _ kvs.KVWatcher = (*jetStreamWatch)(nil) // NewKVJetStreamKVWatch returns KVJetStreamWatch specific to JetStream which implements the KVWatcher interface. -func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvName string, client *jsclient.NATSClient, opts ...Option) (kvs.KVWatcher, error) { +func NewKVJetStreamKVWatch(ctx context.Context, kvName string, client *jsclient.NATSClient, opts ...Option) (kvs.KVWatcher, error) { kvOpts := defaultOptions() @@ -66,7 +66,7 @@ func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvName stri kvwTimer: time.NewTimer(kvOpts.watcherCreationThreshold), opts: kvOpts, doneCh: make(chan struct{}), - log: logging.FromContext(ctx).With("pipeline", pipelineName).With("kvName", kvName), + log: logging.FromContext(ctx).With("kvName", kvName), } return jsw, nil } diff --git a/pkg/sideinputs/initializer/initializer.go b/pkg/sideinputs/initializer/initializer.go index 62a44b6a8..1b3e5dc44 100644 --- a/pkg/sideinputs/initializer/initializer.go +++ b/pkg/sideinputs/initializer/initializer.go @@ -80,7 +80,7 @@ func (sii *sideInputsInitializer) Run(ctx context.Context) error { } // Load the required KV bucket and create a sideInputWatcher for it kvName := isbsvc.JetStreamSideInputsStoreKVName(sii.sideInputsStore) - sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, sii.pipelineName, kvName, natsClient) + sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient) if err != nil { return fmt.Errorf("failed to create a sideInputWatcher, %w", err) } diff --git a/pkg/sideinputs/initializer/initializer_test.go b/pkg/sideinputs/initializer/initializer_test.go index 1b8e518a5..405f71b93 100644 --- a/pkg/sideinputs/initializer/initializer_test.go +++ b/pkg/sideinputs/initializer/initializer_test.go @@ -27,10 +27,9 @@ func cleanup(mountPath string) { // by reading from the side input bucket. func TestSideInputsInitializer_Success(t *testing.T) { var ( - keyspace = "sideInputTestWatch" - pipelineName = "testPipeline" - sideInputs = []string{"TEST", "TEST2"} - dataTest = []string{"HELLO", "HELLO2"} + keyspace = "sideInputTestWatch" + sideInputs = []string{"TEST", "TEST2"} + dataTest = []string{"HELLO", "HELLO2"} ) mountPath, err := os.MkdirTemp("", "side-input") assert.NoError(t, err) @@ -66,7 +65,7 @@ func TestSideInputsInitializer_Success(t *testing.T) { assert.NoError(t, err) bucketName := keyspace - sideInputWatcher, _ := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, bucketName, nc) + sideInputWatcher, _ := jetstream.NewKVJetStreamKVWatch(ctx, bucketName, nc) for x := range sideInputs { _, err = kv.Put(sideInputs[x], []byte(dataTest[x])) if err != nil { @@ -97,9 +96,8 @@ func TestSideInputsInitializer_Success(t *testing.T) { // write any values to the store func TestSideInputsTimeout(t *testing.T) { var ( - keyspace = "sideInputTestWatch" - pipelineName = "testPipeline" - sideInputs = []string{"TEST", "TEST2"} + keyspace = "sideInputTestWatch" + sideInputs = []string{"TEST", "TEST2"} ) mountPath, err := os.MkdirTemp("", "side-input") assert.NoError(t, err) @@ -136,7 +134,7 @@ func TestSideInputsTimeout(t *testing.T) { assert.NoError(t, err) bucketName := keyspace - sideInputWatcher, _ := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, bucketName, nc) + sideInputWatcher, _ := jetstream.NewKVJetStreamKVWatch(ctx, bucketName, nc) _ = startSideInputInitializer(ctx, sideInputWatcher, mountPath, sideInputs) assert.Equal(t, context.DeadlineExceeded, ctx.Err()) diff --git a/pkg/sideinputs/synchronizer/synchronizer.go b/pkg/sideinputs/synchronizer/synchronizer.go index abba3cb87..cd1b27ccf 100644 --- a/pkg/sideinputs/synchronizer/synchronizer.go +++ b/pkg/sideinputs/synchronizer/synchronizer.go @@ -80,7 +80,7 @@ func (sis *sideInputsSynchronizer) Start(ctx context.Context) error { } // Create a new watcher for the side input KV store kvName := isbsvc.JetStreamSideInputsStoreKVName(sis.sideInputsStore) - sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, sis.pipelineName, kvName, natsClient) + sideInputWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, kvName, natsClient) if err != nil { return fmt.Errorf("failed to create a sideInputWatcher, %w", err) } diff --git a/pkg/sideinputs/synchronizer/synchronizer_test.go b/pkg/sideinputs/synchronizer/synchronizer_test.go index 89ef0b084..7b12dc1b8 100644 --- a/pkg/sideinputs/synchronizer/synchronizer_test.go +++ b/pkg/sideinputs/synchronizer/synchronizer_test.go @@ -27,10 +27,9 @@ func cleanup(mountPath string) { // side input store path with updated values from the side input bucket. func TestSideInputsValueUpdates(t *testing.T) { var ( - keyspace = "sideInputTestWatch" - pipelineName = "testPipeline" - sideInputs = []string{"TEST", "TEST2"} - dataTest = []string{"HELLO", "HELLO2"} + keyspace = "sideInputTestWatch" + sideInputs = []string{"TEST", "TEST2"} + dataTest = []string{"HELLO", "HELLO2"} ) mountPath, err := os.MkdirTemp("/tmp", "side-input") assert.NoError(t, err) @@ -77,7 +76,7 @@ func TestSideInputsValueUpdates(t *testing.T) { } bucketName := keyspace - sideInputWatcher, _ := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, bucketName, nc) + sideInputWatcher, _ := jetstream.NewKVJetStreamKVWatch(ctx, bucketName, nc) go startSideInputSynchronizer(ctx, sideInputWatcher, mountPath) for x := range sideInputs { _, err = kv.Put(sideInputs[x], []byte(dataTest[x])) diff --git a/pkg/sources/forward/data_forward_test.go b/pkg/sources/forward/data_forward_test.go index 9c7ae8e4a..a5f09c192 100644 --- a/pkg/sources/forward/data_forward_test.go +++ b/pkg/sources/forward/data_forward_test.go @@ -31,8 +31,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/isb/testutils" - "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" "github.com/numaproj/numaflow/pkg/shared/logging" udfapplier "github.com/numaproj/numaflow/pkg/udf/function" "github.com/numaproj/numaflow/pkg/watermark/generic" @@ -46,8 +44,7 @@ import ( const ( testPipelineName = "testPipeline" testProcessorEntity = "publisherTestPod" - publisherHBKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "PROCESSORS" - publisherOTKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s_" + "OT" + publishKeyspace = testPipelineName + "_" + testProcessorEntity + "_%s" ) var ( @@ -1235,9 +1232,8 @@ func buildToVertexWatermarkStores(toBuffers map[string][]isb.BufferWriter) map[s var ctx = context.Background() otStores := make(map[string]wmstore.WatermarkStore) for key := range toBuffers { - heartbeatKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherHBKeyspace, key)) - otKV, _, _ := inmem.NewKVInMemKVStore(ctx, testPipelineName, fmt.Sprintf(publisherOTKeyspace, key)) - otStores[key] = wmstore.BuildWatermarkStore(heartbeatKV, otKV) + store, _, _, _ := wmstore.BuildInmemWatermarkStore(ctx, fmt.Sprintf(publishKeyspace, key)) + otStores[key] = store } return otStores } @@ -1245,7 +1241,7 @@ func buildToVertexWatermarkStores(toBuffers map[string][]isb.BufferWriter) map[s func buildNoOpToVertexStores(toBuffers map[string][]isb.BufferWriter) map[string]wmstore.WatermarkStore { toVertexStores := make(map[string]wmstore.WatermarkStore) for key := range toBuffers { - store := wmstore.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + store, _ := wmstore.BuildNoOpWatermarkStore() toVertexStores[key] = store } return toVertexStores diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index eb1ecb45b..07e810898 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -26,7 +26,6 @@ import ( "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" @@ -67,7 +66,7 @@ func TestRead(t *testing.T) { Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() toBuffers := map[string][]isb.BufferWriter{ "writer": {dest}, } @@ -126,7 +125,7 @@ func TestStop(t *testing.T) { Hostname: "TestRead", Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() toBuffers := map[string][]isb.BufferWriter{ "writer": {dest}, } @@ -230,7 +229,7 @@ func TestWatermark(t *testing.T) { "writer": {dest}, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, applier.Terminal, nil, nil, publishWMStore, nil) assert.NoError(t, err) stop := mgen.Start() diff --git a/pkg/sources/http/http_test.go b/pkg/sources/http/http_test.go index 0cc13ed90..810cbc54d 100644 --- a/pkg/sources/http/http_test.go +++ b/pkg/sources/http/http_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/stretchr/testify/assert" @@ -80,7 +79,7 @@ func Test_NewHTTP(t *testing.T) { toBuffers := map[string][]isb.BufferWriter{ "test": {dest}, } - publishWMStores := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStores, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "test": publishWMStores, diff --git a/pkg/sources/kafka/handler_test.go b/pkg/sources/kafka/handler_test.go index 025e3ce4c..962a314d0 100644 --- a/pkg/sources/kafka/handler_test.go +++ b/pkg/sources/kafka/handler_test.go @@ -30,7 +30,6 @@ import ( "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" @@ -76,7 +75,7 @@ func TestMessageHandling(t *testing.T) { Hostname: "test-host", Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "test": publishWMStore, diff --git a/pkg/sources/kafka/reader_test.go b/pkg/sources/kafka/reader_test.go index b038cb0db..21c59d2c1 100644 --- a/pkg/sources/kafka/reader_test.go +++ b/pkg/sources/kafka/reader_test.go @@ -22,15 +22,13 @@ import ( "github.com/stretchr/testify/assert" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" - "github.com/numaproj/numaflow/pkg/watermark/store" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forward/applier" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/store" ) func TestNewKafkasource(t *testing.T) { @@ -55,7 +53,7 @@ func TestNewKafkasource(t *testing.T) { Hostname: "test-host", Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "testVertex": publishWMStore, @@ -98,7 +96,7 @@ func TestGroupNameOverride(t *testing.T) { Hostname: "test-host", Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "testVertex": publishWMStore, @@ -131,7 +129,7 @@ func TestDefaultBufferSize(t *testing.T) { Hostname: "test-host", Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "testVertex": publishWMStore, @@ -164,7 +162,7 @@ func TestBufferSizeOverrides(t *testing.T) { Hostname: "test-host", Replica: 0, } - publishWMStore := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStore, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "testVertex": publishWMStore, diff --git a/pkg/sources/nats/nats_test.go b/pkg/sources/nats/nats_test.go index 1c754d072..5960abe65 100644 --- a/pkg/sources/nats/nats_test.go +++ b/pkg/sources/nats/nats_test.go @@ -23,7 +23,6 @@ import ( "time" natslib "github.com/nats-io/nats.go" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" "github.com/stretchr/testify/assert" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -77,7 +76,7 @@ func newInstance(t *testing.T, vi *dfv1.VertexInstance) (*natsSource, error) { "test": {dest}, } - publishWMStores := store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + publishWMStores, _ := store.BuildNoOpWatermarkStore() fetchWatermark, _ := generic.BuildNoOpWatermarkProgressorsFromBufferMap(map[string][]isb.BufferWriter{}) toVertexWmStores := map[string]store.WatermarkStore{ "testVertex": publishWMStores, diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 5c0e4ec4b..94eeff7b7 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" "github.com/numaproj/numaflow/pkg/watermark/processor" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -58,11 +57,11 @@ type SourceProcessor struct { func (sp *SourceProcessor) Start(ctx context.Context) error { var ( - sourcePublisherStores = store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) - processorManagers map[string]*processor.ProcessorManager - toVertexWatermarkStores = make(map[string]store.WatermarkStore) - log = logging.FromContext(ctx) - writersMap = make(map[string][]isb.BufferWriter) + sourcePublisherStores, _ = store.BuildNoOpWatermarkStore() + processorManagers map[string]*processor.ProcessorManager + toVertexWatermarkStores = make(map[string]store.WatermarkStore) + log = logging.FromContext(ctx) + writersMap = make(map[string][]isb.BufferWriter) ) ctx, cancel := context.WithCancel(ctx) @@ -78,7 +77,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { // create a no op publisher stores for _, e := range sp.VertexInstance.Vertex.Spec.ToEdges { - toVertexWatermarkStores[e.To] = store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()) + toVertexWatermarkStores[e.To], _ = store.BuildNoOpWatermarkStore() } switch sp.ISBSvcType { diff --git a/pkg/watermark/fetch/edge_fetcher.go b/pkg/watermark/fetch/edge_fetcher.go index 11bc60ef0..e5f19c14f 100644 --- a/pkg/watermark/fetch/edge_fetcher.go +++ b/pkg/watermark/fetch/edge_fetcher.go @@ -47,7 +47,7 @@ type edgeFetcher struct { // NewEdgeFetcher returns a new edge fetcher. func NewEdgeFetcher(ctx context.Context, manager *processor.ProcessorManager, fromBufferPartitionCount int) *edgeFetcher { - log := logging.FromContext(ctx).With("bucketName", manager.GetBucket()) + log := logging.FromContext(ctx) log.Info("Creating a new edge watermark fetcher") var lastProcessedWm []int64 @@ -99,7 +99,7 @@ func (e *edgeFetcher) updateWatermark(inputOffset isb.Offset, fromPartitionIdx i // if the pod is not active and the head offset of all the timelines is less than the input offset, delete the processor // (this means we are processing data later than what the stale processor has processed) if p.IsDeleted() && (offset > headOffset) { - e.log.Info("Deleting processor because it's stale", zap.String("processor", p.GetEntity().GetName())) + e.log.Infow("Deleting processor because it's stale", zap.String("processor", p.GetEntity().GetName())) e.processorManager.DeleteProcessor(p.GetEntity().GetName()) } } diff --git a/pkg/watermark/fetch/edge_fetcher_set_test.go b/pkg/watermark/fetch/edge_fetcher_set_test.go index 8aa6f4956..1f9224585 100644 --- a/pkg/watermark/fetch/edge_fetcher_set_test.go +++ b/pkg/watermark/fetch/edge_fetcher_set_test.go @@ -24,9 +24,6 @@ import ( "time" "github.com/stretchr/testify/assert" - - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" - "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -94,7 +91,7 @@ func Test_EdgeFetcherSet_ComputeWatermark(t *testing.T) { testPodsByVertex[vertex] = make([]*processor.ProcessorToFetch, numPods) for pod := 0; pod < numPods; pod++ { name := fmt.Sprintf("test-pod-%d-%d", vertex, pod) - testPodsByVertex[vertex][pod] = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity(name), "test-bucket", 5, partitionCount) + testPodsByVertex[vertex][pod] = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity(name), 5, partitionCount) for _, watermark := range testPodTimelines[vertex][pod] { testPodsByVertex[vertex][pod].GetOffsetTimelines()[watermark.Partition].Put(watermark) } @@ -187,16 +184,14 @@ func Test_EdgeFetcherSet_ComputeWatermark(t *testing.T) { } func createProcessorManager(ctx context.Context, partitionCount int32) *processor.ProcessorManager { - hbWatcher := noop.NewKVOpWatch() - otWatcher := noop.NewKVOpWatch() - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - return processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + storeWatcher, _ := store.BuildNoOpWatermarkStoreWatcher() + return processor.NewProcessorManager(ctx, storeWatcher, partitionCount) } func edge1Idle(ctx context.Context, processorManager *processor.ProcessorManager) { var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, 2) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, 2) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, 2) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, 2) pod0Timeline = []wmb.WMB{ { Idle: true, @@ -239,8 +234,8 @@ func edge1Idle(ctx context.Context, processorManager *processor.ProcessorManager func edge1NonIdle(ctx context.Context, processorManager *processor.ProcessorManager) { var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, 2) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, 2) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, 2) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, 2) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -283,8 +278,8 @@ func edge1NonIdle(ctx context.Context, processorManager *processor.ProcessorMana func edge2Idle(ctx context.Context, processorManager *processor.ProcessorManager) { var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, 2) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, 2) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, 2) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, 2) pod0Timeline = []wmb.WMB{ { Idle: true, @@ -327,8 +322,8 @@ func edge2Idle(ctx context.Context, processorManager *processor.ProcessorManager func edge2NonIdle(ctx context.Context, processorManager *processor.ProcessorManager) { var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, 2) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, 2) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, 2) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, 2) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -378,16 +373,14 @@ func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) { // 3. all publishers Idle but somehow the GetWatermark() of one of the EdgeFetchers is higher than the returned value var ( - ctx = context.Background() - hbWatcher = noop.NewKVOpWatch() - otWatcher = noop.NewKVOpWatch() - storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) + ctx = context.Background() + storeWatcher, _ = store.BuildNoOpWatermarkStoreWatcher() - edge1ProcessorManagerIdle = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 2) - edge1ProcessorManagerNonIdle = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 2) + edge1ProcessorManagerIdle = processor.NewProcessorManager(ctx, storeWatcher, 2) + edge1ProcessorManagerNonIdle = processor.NewProcessorManager(ctx, storeWatcher, 2) - edge2ProcessorManagerIdle = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 2) - edge2ProcessorManagerNonIdle = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 2) + edge2ProcessorManagerIdle = processor.NewProcessorManager(ctx, storeWatcher, 2) + edge2ProcessorManagerNonIdle = processor.NewProcessorManager(ctx, storeWatcher, 2) ) edge1Idle(ctx, edge1ProcessorManagerIdle) diff --git a/pkg/watermark/fetch/edge_fetcher_test.go b/pkg/watermark/fetch/edge_fetcher_test.go index 47fc51930..d18d837a3 100644 --- a/pkg/watermark/fetch/edge_fetcher_test.go +++ b/pkg/watermark/fetch/edge_fetcher_test.go @@ -29,17 +29,13 @@ import ( "go.uber.org/zap/zaptest" "github.com/numaproj/numaflow/pkg/isb" + natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" "github.com/numaproj/numaflow/pkg/shared/kvs" "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" "github.com/numaproj/numaflow/pkg/shared/kvs/jetstream" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" - - natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" - - "github.com/numaproj/numaflow/pkg/watermark/wmb" - "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/store" + "github.com/numaproj/numaflow/pkg/watermark/wmb" ) func TestBuffer_updateWatermarkWithOnePartition(t *testing.T) { @@ -47,14 +43,12 @@ func TestBuffer_updateWatermarkWithOnePartition(t *testing.T) { // We don't really need watcher because we manually call the `Put` function and the `addProcessor` function // so use no op watcher for testing - hbWatcher := noop.NewKVOpWatch() - otWatcher := noop.NewKVOpWatch() - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1) + storeWatcher, _ := store.BuildNoOpWatermarkStoreWatcher() + processorManager := processor.NewProcessorManager(ctx, storeWatcher, 1) var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, 1) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, 1) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, 1) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, 1) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, 1) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, 1) pod0Timeline = []wmb.WMB{ {Watermark: 11, Offset: 9, Partition: 0}, {Watermark: 12, Offset: 20, Partition: 0}, @@ -174,15 +168,13 @@ func TestBuffer_updateWatermarkWithMultiplePartition(t *testing.T) { // We don't really need watcher because we manually call the `Put` function and the `addProcessor` function // so use no op watcher for testing - hbWatcher := noop.NewKVOpWatch() - otWatcher := noop.NewKVOpWatch() - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) + storeWatcher, _ := store.BuildNoOpWatermarkStoreWatcher() partitionCount := int32(3) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, partitionCount) var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) pod0Timeline = []wmb.WMB{ {Watermark: 11, Offset: 9, Partition: 0}, {Watermark: 12, Offset: 20, Partition: 1}, @@ -337,11 +329,9 @@ func Test_edgeFetcher_ComputeHeadWatermark(t *testing.T) { var ( partitionCount = int32(2) ctx = context.Background() - hbWatcher = noop.NewKVOpWatch() - otWatcher = noop.NewKVOpWatch() - storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) - processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + storeWatcher, _ = store.BuildNoOpWatermarkStoreWatcher() + processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) ) computeHeadWMTest1(ctx, processorManager1) @@ -377,9 +367,9 @@ func Test_edgeFetcher_ComputeHeadWatermark(t *testing.T) { func computeHeadWMTest1(ctx context.Context, processorManager1 *processor.ProcessorManager) { var ( partitionCount = int32(2) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: true, @@ -441,9 +431,9 @@ func computeHeadWMTest1(ctx context.Context, processorManager1 *processor.Proces func computeHeadWMTest2(ctx context.Context, processorManager2 *processor.ProcessorManager) { var ( partitionCount = int32(2) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -506,13 +496,11 @@ func Test_edgeFetcher_updateHeadIdleWMB(t *testing.T) { var ( partitionCount = int32(3) ctx = context.Background() - hbWatcher = noop.NewKVOpWatch() - otWatcher = noop.NewKVOpWatch() - storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) - processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) - processorManager3 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) - processorManager4 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + storeWatcher, _ = store.BuildNoOpWatermarkStoreWatcher() + processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager3 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager4 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) ) updateHeadIdleWMBTest1(ctx, processorManager1) @@ -570,9 +558,9 @@ func Test_edgeFetcher_updateHeadIdleWMB(t *testing.T) { func updateHeadIdleWMBTest1(ctx context.Context, processorManager1 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: true, @@ -652,9 +640,9 @@ func updateHeadIdleWMBTest1(ctx context.Context, processorManager1 *processor.Pr func updateHeadIdleWMBTest2(ctx context.Context, processorManager2 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -734,9 +722,9 @@ func updateHeadIdleWMBTest2(ctx context.Context, processorManager2 *processor.Pr func updateHeadIdleWMBTest3(ctx context.Context, processorManager3 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -816,9 +804,9 @@ func updateHeadIdleWMBTest3(ctx context.Context, processorManager3 *processor.Pr func updateHeadIdleWMBTest4(ctx context.Context, processorManager4 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) ) processorManager4.AddProcessor("testPod0", testPod0) processorManager4.AddProcessor("testPod1", testPod1) @@ -840,7 +828,6 @@ func otValueToBytes(offset int64, watermark int64, idle bool, partitionIdx int32 func TestFetcherWithSameOTBucket_InMem(t *testing.T) { var ( err error - pipelineName = "testFetch" keyspace = "fetcherTest" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" @@ -851,21 +838,18 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName) + hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, hbBucketName) assert.NoError(t, err) defer hbStore.Close() - otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) + otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, otBucketName) assert.NoError(t, err) defer otStore.Close() epoch += 60000 - hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh) - assert.NoError(t, err) - otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) + storeWatcher, err := store.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh) assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1) + var processorManager = processor.NewProcessorManager(ctx, storeWatcher, 1) var fetcher = NewEdgeFetcher(ctx, processorManager, 1) var heartBeatManagerMap = make(map[string]*heartBeatManager) @@ -1088,22 +1072,19 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) // create hbStore - hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) + hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, keyspace+"_PROCESSORS", defaultJetStreamClient) assert.NoError(t, err) defer hbStore.Close() // create otStore - otStore, err := jetstream.NewKVJetStreamKVStore(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) + otStore, err := jetstream.NewKVJetStreamKVStore(ctx, keyspace+"_OT", defaultJetStreamClient) assert.NoError(t, err) defer otStore.Close() // create watchers for heartbeat and offset timeline - hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) + storeWatcher, err := store.BuildJetStreamWatermarkStoreWatcher(ctx, keyspace, defaultJetStreamClient) assert.NoError(t, err) - otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) - assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, 1) fetcher := NewEdgeFetcher(ctx, processorManager, 1) var heartBeatManagerMap = make(map[string]*heartBeatManager) @@ -1379,22 +1360,19 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) // create hbStore - hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) + hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, keyspace+"_PROCESSORS", defaultJetStreamClient) assert.NoError(t, err) defer hbStore.Close() // create otStore - otStore, err := jetstream.NewKVJetStreamKVStore(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) + otStore, err := jetstream.NewKVJetStreamKVStore(ctx, keyspace+"_OT", defaultJetStreamClient) assert.NoError(t, err) defer otStore.Close() // create watchers for heartbeat and offset timeline - hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) - assert.NoError(t, err) - otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) + storeWatcher, err := store.BuildJetStreamWatermarkStoreWatcher(ctx, keyspace, defaultJetStreamClient) assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 3) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3) fetcher := NewEdgeFetcher(ctx, processorManager, 3) var heartBeatManagerMap = make(map[string]*heartBeatManager) diff --git a/pkg/watermark/fetch/source_fetcher.go b/pkg/watermark/fetch/source_fetcher.go index 1c2db01dd..d7f3e3f0d 100644 --- a/pkg/watermark/fetch/source_fetcher.go +++ b/pkg/watermark/fetch/source_fetcher.go @@ -41,7 +41,7 @@ type sourceFetcher struct { // NewSourceFetcher returns a new source fetcher, processorManager has the details about the processors responsible for writing to the // buckets of the source buffer. func NewSourceFetcher(ctx context.Context, manager *processor.ProcessorManager) Fetcher { - log := logging.FromContext(ctx).With("sourceBufferName", manager.GetBucket()) + log := logging.FromContext(ctx) log.Info("Creating a new source watermark fetcher") return &sourceFetcher{ processorManager: manager, diff --git a/pkg/watermark/generic/jetstream/generic.go b/pkg/watermark/generic/jetstream/generic.go index 8b657defd..e48fce454 100644 --- a/pkg/watermark/generic/jetstream/generic.go +++ b/pkg/watermark/generic/jetstream/generic.go @@ -22,15 +22,12 @@ import ( "context" "fmt" - "github.com/numaproj/numaflow/pkg/shared/kvs/jetstream" - "github.com/numaproj/numaflow/pkg/shared/kvs/noop" - "github.com/numaproj/numaflow/pkg/watermark/processor" - "github.com/numaproj/numaflow/pkg/watermark/store" - "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaflow/pkg/isbsvc" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/publish" + "github.com/numaproj/numaflow/pkg/watermark/store" ) // BuildProcessorManagers creates a map of ProcessorManagers for all the incoming edges of the given Vertex. @@ -60,23 +57,15 @@ func BuildProcessorManagers(ctx context.Context, vertexInstance *v1alpha1.Vertex // buildProcessorManagerForBucket creates a processor manager for the given bucket. func buildProcessorManagerForBucket(ctx context.Context, vertexInstance *v1alpha1.VertexInstance, fromBucket string, client *jsclient.NATSClient) (*processor.ProcessorManager, error) { - pipelineName := vertexInstance.Vertex.Spec.PipelineName - hbKVName := isbsvc.JetStreamProcessorKVName(fromBucket) - hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbKVName, client) - if err != nil { - return nil, fmt.Errorf("failed at new HB KVJetStreamKVWatch, hbKVName: %s, %w", hbKVName, err) - } - - otKVName := isbsvc.JetStreamOTKVName(fromBucket) - otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otKVName, client) + // create a store watcher that watches the heartbeat and ot store. + storeWatcher, err := store.BuildJetStreamWatermarkStoreWatcher(ctx, fromBucket, client) if err != nil { - return nil, fmt.Errorf("failed at new OT KVJetStreamKVWatch, otKVName: %s, %w", otKVName, err) + return nil, fmt.Errorf("failed at new JetStream watermark store watcher, %w", err) } - // create a store watcher that watches the heartbeat and ot store. - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch) + log := logging.FromContext(ctx).With("bucket", fromBucket) // create processor manager with the store watcher which will keep track of all the active processors and updates the offset timelines accordingly. - processManager := processor.NewProcessorManager(ctx, storeWatcher, fromBucket, int32(len(vertexInstance.Vertex.OwnedBuffers())), + processManager := processor.NewProcessorManager(logging.WithLogger(ctx, log), storeWatcher, int32(len(vertexInstance.Vertex.OwnedBuffers())), processor.WithVertexReplica(vertexInstance.Replica), processor.WithIsReduce(vertexInstance.Vertex.IsReduceUDF()), processor.WithIsSource(vertexInstance.Vertex.IsASource())) return processManager, nil @@ -86,46 +75,26 @@ func buildProcessorManagerForBucket(ctx context.Context, vertexInstance *v1alpha func BuildToVertexWatermarkStores(ctx context.Context, vertexInstance *v1alpha1.VertexInstance, client *jsclient.NATSClient) (map[string]store.WatermarkStore, error) { var wmStores = make(map[string]store.WatermarkStore) vertex := vertexInstance.Vertex - pipelineName := vertex.Spec.PipelineName if vertex.IsASink() { toBucket := vertex.GetToBuckets()[0] - - // build heartBeat store - hbKVName := isbsvc.JetStreamProcessorKVName(toBucket) - hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbKVName, client) + // build watermark store + wmStore, err := store.BuildJetStreamWatermarkStore(ctx, toBucket, client) if err != nil { - return nil, fmt.Errorf("failed at new HB KVJetStreamKVStore, HeartbeatKV: %s, %w", hbKVName, err) + return nil, fmt.Errorf("failed at new JetStream watermark store, %w", err) } - - // build offsetTimeline store - otStoreKVName := isbsvc.JetStreamOTKVName(toBucket) - otStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otStoreKVName, client) - if err != nil { - return nil, fmt.Errorf("failed at new OT KVJetStreamKVStore, OffsetTimelineKV: %s, %w", otStoreKVName, err) - } - // build watermark store using the hb and ot store - wmStores[vertex.Spec.Name] = store.BuildWatermarkStore(hbStore, otStore) + wmStores[vertex.Spec.Name] = wmStore } else { for _, e := range vertex.Spec.ToEdges { toBucket := v1alpha1.GenerateEdgeBucketName(vertex.Namespace, vertex.Spec.PipelineName, e.From, e.To) - - // build heartBeat store - hbKVName := isbsvc.JetStreamProcessorKVName(toBucket) - hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, vertex.Spec.PipelineName, hbKVName, client) + // build watermark store + wmStore, err := store.BuildJetStreamWatermarkStore(ctx, toBucket, client) if err != nil { - return nil, fmt.Errorf("failed at new HB KVJetStreamKVStore, HeartbeatKV: %s, %w", hbKVName, err) - } - - // build offsetTimeline store - otStoreKVName := isbsvc.JetStreamOTKVName(toBucket) - otStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otStoreKVName, client) - if err != nil { - return nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, otKVName: %s, %w", otStoreKVName, err) + return nil, fmt.Errorf("failed at new JetStream watermark store, %w", err) } // build watermark store using the hb and ot store - wmStores[e.To] = store.BuildWatermarkStore(hbStore, otStore) + wmStores[e.To] = wmStore } } @@ -161,25 +130,15 @@ func BuildSourcePublisherStores(ctx context.Context, vertexInstance *v1alpha1.Ve return nil, fmt.Errorf("not a source vertex") } if vertexInstance.Vertex.Spec.Watermark.Disabled { - return store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()), nil + return store.BuildNoOpWatermarkStore() } - pipelineName := vertexInstance.Vertex.Spec.PipelineName bucketName := vertexInstance.Vertex.GetFromBuckets()[0] - // heartbeat - hbKVName := isbsvc.JetStreamProcessorKVName(bucketName) - hbKVStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbKVName, client) + wmStore, err := store.BuildJetStreamWatermarkStore(ctx, bucketName, client) if err != nil { - return nil, fmt.Errorf("failed at new HB KVJetStreamKVStore for source, hbKVName: %s, %w", hbKVName, err) + return nil, fmt.Errorf("failed at new JetStream watermark store, %w", err) } - // OT - otKVName := isbsvc.JetStreamOTKVName(bucketName) - otKVStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otKVName, client) - if err != nil { - return nil, fmt.Errorf("failed at new OT KVJetStreamKVStore for source, otKVName: %s, %w", otKVName, err) - } - sourcePublishStores := store.BuildWatermarkStore(hbKVStore, otKVStore) - return sourcePublishStores, nil + return wmStore, nil } func BuildToVertexPublisherStores(ctx context.Context, vertexInstance *v1alpha1.VertexInstance, client *jsclient.NATSClient) (map[string]store.WatermarkStore, error) { @@ -187,18 +146,12 @@ func BuildToVertexPublisherStores(ctx context.Context, vertexInstance *v1alpha1. var publisherStores = make(map[string]store.WatermarkStore) for _, e := range vertexInstance.Vertex.Spec.ToEdges { toBucket := v1alpha1.GenerateEdgeBucketName(vertexInstance.Vertex.Namespace, pipelineName, e.From, e.To) - hbPublisherKVName := isbsvc.JetStreamProcessorKVName(toBucket) - hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbPublisherKVName, client) + wmStore, err := store.BuildJetStreamWatermarkStore(ctx, toBucket, client) if err != nil { - return nil, fmt.Errorf("failed at new HB Publish JetStreamKVStore, HeartbeatPublisherKV: %s, %w", hbPublisherKVName, err) + return nil, fmt.Errorf("failed at new JetStream watermark store, %w", err) } - otKVName := isbsvc.JetStreamOTKVName(toBucket) - otStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otKVName, client) - if err != nil { - return nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, otKVName: %s, %w", otKVName, err) - } - publisherStores[e.To] = store.BuildWatermarkStore(hbStore, otStore) + publisherStores[e.To] = wmStore } return publisherStores, nil } diff --git a/pkg/watermark/processor/processor_manager.go b/pkg/watermark/processor/processor_manager.go index b2e4c3376..110a414de 100644 --- a/pkg/watermark/processor/processor_manager.go +++ b/pkg/watermark/processor/processor_manager.go @@ -33,7 +33,6 @@ import ( "go.uber.org/zap" "github.com/numaproj/numaflow/pkg/shared/kvs" - "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -50,8 +49,6 @@ type ProcessorManager struct { // processors has reference to the actual processing unit (ProcessorEntitier) which includes offset timeline which is // used for tracking watermark. processors map[string]*ProcessorToFetch - // name of the bucket, used for logging - bucket string // fromBufferPartitionCount is the number of partitions in the fromBuffer fromBufferPartitionCount int32 lock sync.RWMutex @@ -64,7 +61,7 @@ type ProcessorManager struct { } // NewProcessorManager returns a new ProcessorManager instance -func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.WatermarkStoreWatcher, bucket string, fromBufferPartitionCount int32, inputOpts ...ProcessorManagerOption) *ProcessorManager { +func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.WatermarkStoreWatcher, fromBufferPartitionCount int32, inputOpts ...ProcessorManagerOption) *ProcessorManager { opts := &processorManagerOptions{ podHeartbeatRate: 5, refreshingProcessorsRate: 5, @@ -81,9 +78,8 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm otWatcher: watermarkStoreWatcher.OffsetTimelineWatcher(), heartbeat: NewProcessorHeartbeat(), processors: make(map[string]*ProcessorToFetch), - bucket: bucket, fromBufferPartitionCount: fromBufferPartitionCount, - log: logging.FromContext(ctx).With("bucket", bucket), + log: logging.FromContext(ctx), doneCh: make(chan struct{}), waitCh: make(chan struct{}), opts: opts, @@ -233,7 +229,7 @@ func (v *ProcessorManager) startHeartBeatWatcher() { var entity = NewProcessorEntity(value.Key()) // if the processor is a reduce or source processor, then we only need one fromProcessor // because the reduce or source will read from only one partition. - fromProcessor := NewProcessorToFetch(v.ctx, entity, v.bucket, 10, v.fromBufferPartitionCount) + fromProcessor := NewProcessorToFetch(v.ctx, entity, 10, v.fromBufferPartitionCount) v.AddProcessor(value.Key(), fromProcessor) v.log.Infow("Successfully added a new fromProcessor", zap.String("fromProcessor", value.Key())) } else { // else just make a note that this processor is still active @@ -329,10 +325,6 @@ func (v *ProcessorManager) startTimeLineWatcher() { } } -func (v *ProcessorManager) GetBucket() string { - return v.bucket -} - // Close stops the watchers, and waits for the goroutines to exit. func (v *ProcessorManager) Close() { // send a signal to the goroutines to exit diff --git a/pkg/watermark/processor/processor_manager_test.go b/pkg/watermark/processor/processor_manager_test.go index 18c1c30bd..3323624ff 100644 --- a/pkg/watermark/processor/processor_manager_test.go +++ b/pkg/watermark/processor/processor_manager_test.go @@ -48,26 +48,22 @@ func TestMain(m *testing.M) { func TestProcessorManager(t *testing.T) { var ( err error - pipelineName = "testFetch" keyspace = "fetcherTest" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" ) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName) + hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, hbBucketName) assert.NoError(t, err) - otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) + otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, otBucketName) assert.NoError(t, err) defer hbStore.Close() defer otStore.Close() defer cancel() - hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh) + storeWatcher, err := store.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh) assert.NoError(t, err) - otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) - assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, "my-bucket", 1) + var processorManager = NewProcessorManager(ctx, storeWatcher, 1) // start p1 heartbeat for 3 loops then delete p1 go func() { var err error @@ -135,7 +131,6 @@ func TestProcessorManager(t *testing.T) { func TestProcessorManagerWatchForMapWithOnePartition(t *testing.T) { var ( err error - pipelineName = "testFetch" keyspace = "fetcherTest" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" @@ -143,21 +138,18 @@ func TestProcessorManagerWatchForMapWithOnePartition(t *testing.T) { testOffset int64 = 100 ) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName) + hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, hbBucketName) assert.NoError(t, err) - otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) + otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, otBucketName) assert.NoError(t, err) defer cancel() defer hbStore.Close() defer otStore.Close() - hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh) - assert.NoError(t, err) - otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) + storeWatcher, err := store.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh) assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, "", 1) + var processorManager = NewProcessorManager(ctx, storeWatcher, 1) // start p1 heartbeat for 3 loops go func(ctx context.Context) { for { @@ -235,7 +227,6 @@ func TestProcessorManagerWatchForMapWithOnePartition(t *testing.T) { func TestProcessorManagerWatchForReduce(t *testing.T) { var ( err error - pipelineName = "testFetch" keyspace = "fetcherTest" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" @@ -243,20 +234,17 @@ func TestProcessorManagerWatchForReduce(t *testing.T) { testOffset int64 = 100 ) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName) + hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, hbBucketName) assert.NoError(t, err) - otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) + otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, otBucketName) assert.NoError(t, err) defer hbStore.Close() defer otStore.Close() defer cancel() - hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh) + storeWatcher, err := store.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh) assert.NoError(t, err) - otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) - assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, "my-bucket", 1, WithIsReduce(true), WithVertexReplica(2)) + var processorManager = NewProcessorManager(ctx, storeWatcher, 1, WithIsReduce(true), WithVertexReplica(2)) // start p1 heartbeat for 3 loops go func(ctx context.Context) { for { @@ -347,7 +335,6 @@ func TestProcessorManagerWatchForReduce(t *testing.T) { func TestProcessorManagerWatchForMapWithMultiplePartition(t *testing.T) { var ( err error - pipelineName = "testFetch" keyspace = "fetcherTest" hbBucketName = keyspace + "_PROCESSORS" otBucketName = keyspace + "_OT" @@ -357,19 +344,16 @@ func TestProcessorManagerWatchForMapWithMultiplePartition(t *testing.T) { ) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, hbBucketName) + hbStore, hbWatcherCh, err := inmem.NewKVInMemKVStore(ctx, hbBucketName) assert.NoError(t, err) defer hbStore.Close() - otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, pipelineName, otBucketName) + otStore, otWatcherCh, err := inmem.NewKVInMemKVStore(ctx, otBucketName) assert.NoError(t, err) defer otStore.Close() - hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh) - assert.NoError(t, err) - otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) + storeWatcher, err := store.BuildInmemWatermarkStoreWatcher(ctx, keyspace, hbWatcherCh, otWatcherCh) assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, "my-bucket", 3) + var processorManager = NewProcessorManager(ctx, storeWatcher, 3) // start p1 heartbeat for 3 loops go func(ctx context.Context) { var err error diff --git a/pkg/watermark/processor/processor_to_fetch.go b/pkg/watermark/processor/processor_to_fetch.go index 9e0e80830..1ab1cec19 100644 --- a/pkg/watermark/processor/processor_to_fetch.go +++ b/pkg/watermark/processor/processor_to_fetch.go @@ -79,11 +79,11 @@ func (p *ProcessorToFetch) String() string { } // NewProcessorToFetch creates ProcessorToFetch. -func NewProcessorToFetch(ctx context.Context, processor ProcessorEntitier, bucket string, capacity int, fromBufferPartitionCount int32) *ProcessorToFetch { +func NewProcessorToFetch(ctx context.Context, processor ProcessorEntitier, capacity int, fromBufferPartitionCount int32) *ProcessorToFetch { var offsetTimelines []*timeline.OffsetTimeline for i := int32(0); i < fromBufferPartitionCount; i++ { - t := timeline.NewOffsetTimeline(ctx, capacity, bucket) + t := timeline.NewOffsetTimeline(ctx, capacity) offsetTimelines = append(offsetTimelines, t) } p := &ProcessorToFetch{ diff --git a/pkg/watermark/processor/processor_to_fetch_test.go b/pkg/watermark/processor/processor_to_fetch_test.go index dd121e5ca..859be7e88 100644 --- a/pkg/watermark/processor/processor_to_fetch_test.go +++ b/pkg/watermark/processor/processor_to_fetch_test.go @@ -25,7 +25,7 @@ import ( func TestFromProcessor_setStatus(t *testing.T) { var ctx = context.Background() - p := NewProcessorToFetch(ctx, NewProcessorEntity("test-pod"), "test-bucket", 5, 1) + p := NewProcessorToFetch(ctx, NewProcessorEntity("test-pod"), 5, 1) p.setStatus(_inactive) assert.Equal(t, _inactive, p.status) } diff --git a/pkg/watermark/publish/publisher_inmem_test.go b/pkg/watermark/publish/publisher_inmem_test.go index cced60fed..86c26138b 100644 --- a/pkg/watermark/publish/publisher_inmem_test.go +++ b/pkg/watermark/publish/publisher_inmem_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/numaproj/numaflow/pkg/isb" - "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -34,20 +33,11 @@ import ( func TestPublisherWithSharedOTBuckets_InMem(t *testing.T) { var ctx = context.Background() - - var publisherHBKeyspace = "publisherTest_PROCESSORS" - - // this test uses separate OT buckets, so it is an OT bucket per processor - var publisherOTKeyspace = "publisherTest_OT_publisherTestPod1" - - heartbeatKV, _, err := inmem.NewKVInMemKVStore(ctx, "testPublisher", publisherHBKeyspace) + wmstore, _, _, err := store.BuildInmemWatermarkStore(ctx, "test") assert.NoError(t, err) - otKV, _, err := inmem.NewKVInMemKVStore(ctx, "testPublisher", publisherOTKeyspace) - assert.NoError(t, err) - publishEntity := processor.NewProcessorEntity("publisherTestPod1") - p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), 1, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) + p := NewPublish(ctx, publishEntity, wmstore, 1, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) var epoch int64 = 1651161600000 var location, _ = time.LoadLocation("UTC") diff --git a/pkg/watermark/publish/publisher_test.go b/pkg/watermark/publish/publisher_test.go index 8d8b71aab..92854efb6 100644 --- a/pkg/watermark/publish/publisher_test.go +++ b/pkg/watermark/publish/publisher_test.go @@ -27,7 +27,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb" natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" - "github.com/numaproj/numaflow/pkg/shared/kvs/jetstream" "github.com/numaproj/numaflow/pkg/watermark/processor" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -67,12 +66,10 @@ func TestPublisherWithSharedOTBucket(t *testing.T) { publishEntity := processor.NewProcessorEntity("publisherTestPod1") - heartbeatKV, err := jetstream.NewKVJetStreamKVStore(ctx, "testPublisher", keyspace+"_PROCESSORS", defaultJetStreamClient) - assert.NoError(t, err) - otKV, err := jetstream.NewKVJetStreamKVStore(ctx, "testPublisher", keyspace+"_OT", defaultJetStreamClient) + wmstore, err := store.BuildJetStreamWatermarkStore(ctx, keyspace, defaultJetStreamClient) assert.NoError(t, err) - p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), 1, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) + p := NewPublish(ctx, publishEntity, wmstore, 1, WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) var epoch int64 = 1651161600000 var location, _ = time.LoadLocation("UTC") diff --git a/pkg/watermark/store/watermark_store.go b/pkg/watermark/store/watermark_store.go index 1051171e0..74a634ae1 100644 --- a/pkg/watermark/store/watermark_store.go +++ b/pkg/watermark/store/watermark_store.go @@ -16,7 +16,16 @@ limitations under the License. package store -import "github.com/numaproj/numaflow/pkg/shared/kvs" +import ( + "context" + "fmt" + + jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" + "github.com/numaproj/numaflow/pkg/shared/kvs" + "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" + "github.com/numaproj/numaflow/pkg/shared/kvs/jetstream" + noopkv "github.com/numaproj/numaflow/pkg/shared/kvs/noop" +) // watermarkStore wraps a pair of heartbeatStore and offsetTimelineStore, // it implements interface WatermarkStore. @@ -27,14 +36,6 @@ type watermarkStore struct { var _ WatermarkStore = (*watermarkStore)(nil) -// BuildWatermarkStore returns a WatermarkStore instance -func BuildWatermarkStore(hbStore, otStore kvs.KVStorer) WatermarkStore { - return &watermarkStore{ - heartbeatStore: hbStore, - offsetTimelineStore: otStore, - } -} - func (ws *watermarkStore) HeartbeatStore() kvs.KVStorer { return ws.heartbeatStore } @@ -48,3 +49,51 @@ func (ws *watermarkStore) Close() error { ws.offsetTimelineStore.Close() return nil } + +// BuildNoOpWatermarkStore returns a NoOp WatermarkStore instance +func BuildNoOpWatermarkStore() (WatermarkStore, error) { + return &watermarkStore{ + heartbeatStore: noopkv.NewKVNoOpStore(), + offsetTimelineStore: noopkv.NewKVNoOpStore(), + }, nil +} + +// BuildInmemWatermarkStore returns an in-mem WatermarkStore instance, and the HB, OT entry channels +func BuildInmemWatermarkStore(ctx context.Context, bucket string) (WatermarkStore, chan kvs.KVEntry, chan kvs.KVEntry, error) { + hbKV, hbKVEntry, _ := inmem.NewKVInMemKVStore(ctx, bucket+"_PROCESSORS") + otKV, otKVEntry, _ := inmem.NewKVInMemKVStore(ctx, bucket+"_OT") + return &watermarkStore{ + heartbeatStore: hbKV, + offsetTimelineStore: otKV, + }, hbKVEntry, otKVEntry, nil +} + +// BuildJetStreamWatermarkStore returns a JetStream WatermarkStore instance +func BuildJetStreamWatermarkStore(ctx context.Context, bucket string, client *jsclient.NATSClient) (WatermarkStore, error) { + // build heartBeat store + hbKVName := JetStreamProcessorKVName(bucket) + hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, hbKVName, client) + if err != nil { + return nil, fmt.Errorf("failed at new JetStream HB KV store %q, %w", hbKVName, err) + } + + // build offsetTimeline store + otStoreKVName := JetStreamOTKVName(bucket) + otStore, err := jetstream.NewKVJetStreamKVStore(ctx, otStoreKVName, client) + if err != nil { + hbStore.Close() + return nil, fmt.Errorf("failed at new JetStream OT KV store %q, %w", otStoreKVName, err) + } + return &watermarkStore{ + heartbeatStore: hbStore, + offsetTimelineStore: otStore, + }, nil +} + +func JetStreamProcessorKVName(bucketName string) string { + return fmt.Sprintf("%s_PROCESSORS", bucketName) +} + +func JetStreamOTKVName(bucketName string) string { + return fmt.Sprintf("%s_OT", bucketName) +} diff --git a/pkg/watermark/store/watermark_store_watcher.go b/pkg/watermark/store/watermark_store_watcher.go index dea2bdcf1..57ef2516c 100644 --- a/pkg/watermark/store/watermark_store_watcher.go +++ b/pkg/watermark/store/watermark_store_watcher.go @@ -16,7 +16,16 @@ limitations under the License. package store -import "github.com/numaproj/numaflow/pkg/shared/kvs" +import ( + "context" + "fmt" + + jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" + "github.com/numaproj/numaflow/pkg/shared/kvs" + "github.com/numaproj/numaflow/pkg/shared/kvs/inmem" + "github.com/numaproj/numaflow/pkg/shared/kvs/jetstream" + noopkv "github.com/numaproj/numaflow/pkg/shared/kvs/noop" +) // watermarkStoreWatcher defines a pair of heartbeatStoreWatcher and offsetTimelineStoreWatcher, // it implements interface WatermarkStoreWatcher @@ -34,10 +43,40 @@ func (w *watermarkStoreWatcher) OffsetTimelineWatcher() kvs.KVWatcher { return w.offsetTimelineStoreWatcher } -// BuildWatermarkStoreWatcher returns a WatermarkStoreWatcher instance -func BuildWatermarkStoreWatcher(hbStoreWatcher, otStoreWatcher kvs.KVWatcher) WatermarkStoreWatcher { +// BuildNoOpWatermarkStoreWatcher returns a NoOp WatermarkStoreWatcher instance +func BuildNoOpWatermarkStoreWatcher() (WatermarkStoreWatcher, error) { return &watermarkStoreWatcher{ - heartbeatStoreWatcher: hbStoreWatcher, - offsetTimelineStoreWatcher: otStoreWatcher, + heartbeatStoreWatcher: noopkv.NewKVOpWatch(), + offsetTimelineStoreWatcher: noopkv.NewKVOpWatch(), + }, nil +} + +// BuildInmemWatermarkStoreWatcher returns an in-mem WatermarkStoreWatcher instance +func BuildInmemWatermarkStoreWatcher(ctx context.Context, bucket string, hbKVEntryCh, otKVEntryCh <-chan kvs.KVEntry) (WatermarkStoreWatcher, error) { + hbWatcher, _ := inmem.NewInMemWatch(ctx, bucket+"_PROCESSORS", hbKVEntryCh) + otWatcher, _ := inmem.NewInMemWatch(ctx, bucket+"_OT", otKVEntryCh) + return &watermarkStoreWatcher{ + heartbeatStoreWatcher: hbWatcher, + offsetTimelineStoreWatcher: otWatcher, + }, nil +} + +// BuildJetStreamWatermarkStoreWatcher returns a JetStream WatermarkStoreWatcher instance +func BuildJetStreamWatermarkStoreWatcher(ctx context.Context, bucket string, client *jsclient.NATSClient) (WatermarkStoreWatcher, error) { + hbKVName := JetStreamProcessorKVName(bucket) + hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, hbKVName, client) + if err != nil { + return nil, fmt.Errorf("failed at new JetStream HB KV watch for %q, %w", hbKVName, err) + } + + otKVName := JetStreamOTKVName(bucket) + otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, otKVName, client) + if err != nil { + hbWatch.Close() + return nil, fmt.Errorf("failed at new JetStream OT KV watch for %q, %w", otKVName, err) } + return &watermarkStoreWatcher{ + heartbeatStoreWatcher: hbWatch, + offsetTimelineStoreWatcher: otWatch, + }, nil } diff --git a/pkg/watermark/timeline/offset_timeline.go b/pkg/watermark/timeline/offset_timeline.go index d20991781..049603c06 100644 --- a/pkg/watermark/timeline/offset_timeline.go +++ b/pkg/watermark/timeline/offset_timeline.go @@ -42,13 +42,13 @@ type OffsetTimeline struct { } // NewOffsetTimeline returns OffsetTimeline. -func NewOffsetTimeline(ctx context.Context, c int, bucket string) *OffsetTimeline { +func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline { // Initialize a new empty watermarks DLL with nil values of the size capacity. // This is to avoid length check: when a new element is added, the tail element will be deleted. offsetTimeline := OffsetTimeline{ ctx: ctx, capacity: c, - log: logging.FromContext(ctx).With("bucket", bucket), + log: logging.FromContext(ctx), } for i := 0; i < c; i++ { diff --git a/pkg/watermark/timeline/offset_timeline_test.go b/pkg/watermark/timeline/offset_timeline_test.go index 9263fd350..13d8b93ee 100644 --- a/pkg/watermark/timeline/offset_timeline_test.go +++ b/pkg/watermark/timeline/offset_timeline_test.go @@ -29,8 +29,8 @@ import ( func TestTimeline_GetEventTime(t *testing.T) { var ( ctx = context.Background() - emptyTimeline = NewOffsetTimeline(ctx, 5, "myBucket") - testTimeline = NewOffsetTimeline(ctx, 10, "myBucket") + emptyTimeline = NewOffsetTimeline(ctx, 5) + testTimeline = NewOffsetTimeline(ctx, 10) testwatermarks = []wmb.WMB{ {Watermark: 10, Offset: 9}, {Watermark: 12, Offset: 10}, @@ -127,7 +127,7 @@ func TestTimeline_GetEventTime(t *testing.T) { func TestOffsetTimeline_GetOffset(t *testing.T) { var ( ctx = context.Background() - testTimeline = NewOffsetTimeline(ctx, 10, "myBucket") + testTimeline = NewOffsetTimeline(ctx, 10) testwatermarks = []wmb.WMB{ {Watermark: 10, Offset: 9}, {Watermark: 12, Offset: 20}, @@ -223,7 +223,7 @@ func TestOffsetTimeline_GetOffset(t *testing.T) { func TestOffsetTimeline_PutIdle(t *testing.T) { var ( ctx = context.Background() - testTimeline = NewOffsetTimeline(ctx, 10, "myBucket") + testTimeline = NewOffsetTimeline(ctx, 10) setUps = []wmb.WMB{ {Idle: false, Watermark: 10, Offset: 9}, {Idle: false, Watermark: 12, Offset: 20},