diff --git a/pkg/reduce/reduce_test.go b/pkg/reduce/reduce_test.go index 5471c4aec..9014ab41d 100644 --- a/pkg/reduce/reduce_test.go +++ b/pkg/reduce/reduce_test.go @@ -219,12 +219,12 @@ func TestDataForward_StartWithInMemoryWMStore(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - //create from buffers for tests + // create from buffers for tests fromBuffer1 := simplebuffer.NewInMemoryBuffer("from1", fromBufferSize) fromBuffer2 := simplebuffer.NewInMemoryBuffer("from2", fromBufferSize) fromBuffer3 := simplebuffer.NewInMemoryBuffer("from3", fromBufferSize) - //create to buffers for tests + // create to buffers for tests buffer1 := simplebuffer.NewInMemoryBuffer("to", toBufferSize) buffer2 := simplebuffer.NewInMemoryBuffer("to", toBufferSize) buffer3 := simplebuffer.NewInMemoryBuffer("to", toBufferSize) @@ -263,12 +263,12 @@ func TestDataForward_StartWithInMemoryWMStore(t *testing.T) { go writeMessages(ctx, 100, "test-2", fromBuffer2, p2["from2"], time.Minute*1) go writeMessages(ctx, 1000, "test-3", fromBuffer3, p3["from3"], time.Minute*10) - //create window for tests + // create window for tests window1 := fixed.NewFixed(2 * time.Second) window2 := fixed.NewFixed(2 * time.Minute) window3 := fixed.NewFixed(20 * time.Minute) - //create forwarder for tests + // create forwarder for tests var reduceDataForwarder1, reduceDataForwarder2, reduceDataForwarder3 *DataForward reduceDataForwarder1, err = NewDataForward(ctx, CounterReduceTest{}, fromBuffer1, toBuffer1, pbqManager1, CounterReduceTest{}, f1, p1, window1, WithReadBatchSize(10)) assert.NoError(t, err) @@ -385,7 +385,7 @@ func fetcherAndPublisher(ctx context.Context, toBuffers map[string]isb.BufferWri hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh) otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh) - var pm = fetch.NewProcessorManager(ctx, wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), fetch.WithPodHeartbeatRate(1), fetch.WithRefreshingProcessorsRate(1), fetch.WithSeparateOTBuckets(false)) + var pm = fetch.NewProcessorManager(ctx, wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), fetch.WithPodHeartbeatRate(1), fetch.WithRefreshingProcessorsRate(1)) var f = fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), pm) return f, publishers } diff --git a/pkg/watermark/fetch/options.go b/pkg/watermark/fetch/options.go index d13e8a2e6..0dd54ebec 100644 --- a/pkg/watermark/fetch/options.go +++ b/pkg/watermark/fetch/options.go @@ -5,7 +5,6 @@ type processorManagerOptions struct { podHeartbeatRate int64 // refreshingProcessorsRate uses second as time unit refreshingProcessorsRate int64 - separateOTBucket bool } // ProcessorManagerOption set options for FromVertex. @@ -24,10 +23,3 @@ func WithRefreshingProcessorsRate(rate int64) ProcessorManagerOption { opts.refreshingProcessorsRate = rate } } - -// WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline. -func WithSeparateOTBuckets(separate bool) ProcessorManagerOption { - return func(opts *processorManagerOptions) { - opts.separateOTBucket = separate - } -} diff --git a/pkg/watermark/fetch/processor_manager.go b/pkg/watermark/fetch/processor_manager.go index 77fda004b..5584a692a 100644 --- a/pkg/watermark/fetch/processor_manager.go +++ b/pkg/watermark/fetch/processor_manager.go @@ -38,7 +38,6 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm opts := &processorManagerOptions{ podHeartbeatRate: 5, refreshingProcessorsRate: 5, - separateOTBucket: false, } for _, opt := range inputOpts { opt(opts) @@ -166,7 +165,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() { // A fromProcessor needs to be added to v.processors // The fromProcessor may have been deleted // TODO: make capacity configurable - var entity = processor.NewProcessorEntity(value.Key(), processor.WithSeparateOTBuckets(v.opts.separateOTBucket)) + var entity = processor.NewProcessorEntity(value.Key()) var fromProcessor = NewProcessorToFetch(v.ctx, entity, 10, v.otWatcher) v.addProcessor(value.Key(), fromProcessor) v.log.Infow("v.AddProcessor successfully added a new fromProcessor", zap.String("fromProcessor", value.Key())) diff --git a/pkg/watermark/fetch/processor_manager_inmem_test.go b/pkg/watermark/fetch/processor_manager_inmem_test.go index b87334b6e..383e959b1 100644 --- a/pkg/watermark/fetch/processor_manager_inmem_test.go +++ b/pkg/watermark/fetch/processor_manager_inmem_test.go @@ -52,7 +52,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { assert.NoError(t, err) otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) assert.NoError(t, err) - var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(false)) + var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1)) var testBuffer = NewEdgeFetcher(ctx, "testBuffer", testVertex).(*edgeFetcher) // start p1 heartbeat for 3 loops diff --git a/pkg/watermark/fetch/processor_manager_test.go b/pkg/watermark/fetch/processor_manager_test.go index af8b07d59..92b1d2fea 100644 --- a/pkg/watermark/fetch/processor_manager_test.go +++ b/pkg/watermark/fetch/processor_manager_test.go @@ -97,7 +97,7 @@ func TestFetcherWithSameOTBucket(t *testing.T) { assert.NoError(t, err) otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) assert.NoError(t, err) - var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(false)) + var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1)) var testBuffer = NewEdgeFetcher(ctx, "testBuffer", testVertex).(*edgeFetcher) wg.Add(1) @@ -271,227 +271,3 @@ func TestFetcherWithSameOTBucket(t *testing.T) { wg.Wait() cancel() } - -func TestFetcherWithSeparateOTBucket(t *testing.T) { - // FIXME: Check for seperate buckets implementation for single watcher or multiple watcher - // Maybe we should not support seperate OT because controller does not support it - t.Skip() - // uncomment to debug - // os.Setenv("NUMAFLOW_DEBUG", "true") - - var ctx = context.Background() - - // Connect to NATS - nc, err := jsclient.NewDefaultJetStreamClient(nats.DefaultURL).Connect(context.TODO()) - assert.Nil(t, err) - - // Create JetStream Context - js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) - assert.Nil(t, err) - - // create heartbeat bucket - var keyspace = "fetcherTest" - - heartbeatBucket, err := js.CreateKeyValue(&nats.KeyValueConfig{ - Bucket: keyspace + "_PROCESSORS", - Description: fmt.Sprintf("[%s] heartbeat bucket", keyspace), - MaxValueSize: 0, - History: 0, - TTL: 0, - MaxBytes: 0, - Storage: nats.MemoryStorage, - Replicas: 0, - Placement: nil, - }) - defer func() { _ = js.DeleteKeyValue(keyspace + "_PROCESSORS") }() - assert.Nil(t, err) - - var epoch int64 = 1651161600000 - var testOffset int64 = 100 - p1OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{ - Bucket: keyspace + "_OT_" + "p1", - Description: "", - MaxValueSize: 0, - History: 0, - TTL: 0, - MaxBytes: 0, - Storage: nats.MemoryStorage, - Replicas: 0, - Placement: nil, - }) - defer func() { _ = js.DeleteKeyValue(keyspace + "_OT_" + "p1") }() - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, uint64(testOffset)) - _, err = p1OT.Put(fmt.Sprintf("%d", epoch), b) - assert.NoError(t, err) - - epoch += 60000 - binary.LittleEndian.PutUint64(b, uint64(testOffset+5)) - p2OT, _ := js.CreateKeyValue(&nats.KeyValueConfig{ - Bucket: keyspace + "_OT_" + "p2", - Description: "", - MaxValueSize: 0, - History: 0, - TTL: 0, - MaxBytes: 0, - Storage: nats.MemoryStorage, - Replicas: 0, - Placement: nil, - }) - defer func() { _ = js.DeleteKeyValue(keyspace + "_OT_" + "p2") }() - _, err = p2OT.Put(fmt.Sprintf("%d", epoch), b) - assert.NoError(t, err) - - defaultJetStreamClient := jsclient.NewDefaultJetStreamClient(nats.DefaultURL) - - hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) - assert.NoError(t, err) - otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) - assert.NoError(t, err) - var testVertex = NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher), WithPodHeartbeatRate(1), WithRefreshingProcessorsRate(1), WithSeparateOTBuckets(true)) - var testBuffer = NewEdgeFetcher(ctx, "testBuffer", testVertex).(*edgeFetcher) - - // var location, _ = time.LoadLocation("UTC") - go func() { - var err error - for i := 0; i < 3; i++ { - _, err = heartbeatBucket.Put("p1", []byte(fmt.Sprintf("%d", time.Now().Unix()))) - assert.NoError(t, err) - time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second) - } - err = heartbeatBucket.Delete("p1") - assert.NoError(t, err) - }() - - go func() { - for i := 0; i < 100; i++ { - _, err := heartbeatBucket.Put("p2", []byte(fmt.Sprintf("%d", time.Now().Unix()))) - assert.NoError(t, err) - time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second) - } - }() - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - allProcessors := testBuffer.processorManager.GetAllProcessors() - for len(allProcessors) != 2 { - select { - case <-ctx.Done(): - t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err()) - default: - time.Sleep(1 * time.Millisecond) - allProcessors = testBuffer.processorManager.GetAllProcessors() - } - } - - assert.True(t, allProcessors["p1"].IsActive()) - assert.True(t, allProcessors["p2"].IsActive()) - - // "p1" is deleted after 5 loops - for !allProcessors["p1"].IsDeleted() { - select { - case <-ctx.Done(): - t.Fatalf("expected p1 to be deleted: %s", ctx.Err()) - default: - time.Sleep(1 * time.Millisecond) - allProcessors = testBuffer.processorManager.GetAllProcessors() - } - } - - allProcessors = testBuffer.processorManager.GetAllProcessors() - assert.Equal(t, 2, len(allProcessors)) - assert.True(t, allProcessors["p1"].IsDeleted()) - assert.True(t, allProcessors["p2"].IsActive()) - _ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) })) - allProcessors = testBuffer.processorManager.GetAllProcessors() - assert.Equal(t, 2, len(allProcessors)) - assert.True(t, allProcessors["p1"].IsDeleted()) - assert.True(t, allProcessors["p2"].IsActive()) - // "p1" should be deleted after this GetWatermark offset=101 - // because "p1" offsetTimeline's head offset=100, which is < inputOffset 103 - _ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) })) - allProcessors = testBuffer.processorManager.GetAllProcessors() - assert.Equal(t, 1, len(allProcessors)) - assert.True(t, allProcessors["p2"].IsActive()) - - time.Sleep(time.Second) - - // resume after one second - go func() { - var err error - for i := 0; i < 5; i++ { - _, err = heartbeatBucket.Put("p1", []byte(fmt.Sprintf("%d", time.Now().Unix()))) - assert.NoError(t, err) - time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second) - } - }() - - // wait until p1 becomes active - allProcessors = testBuffer.processorManager.GetAllProcessors() - for len(allProcessors) != 2 { - select { - case <-ctx.Done(): - t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err()) - default: - time.Sleep(1 * time.Millisecond) - allProcessors = testBuffer.processorManager.GetAllProcessors() - } - } - - assert.True(t, allProcessors["p1"].IsActive()) - assert.True(t, allProcessors["p2"].IsActive()) - - // "p1" has been deleted from vertex.Processors - // so "p1" will be considered as a new processors and a new offsetTimeline watcher for "p1" will be created - _ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) })) - newP1 := testBuffer.processorManager.GetProcessor("p1") - assert.NotNil(t, newP1) - assert.True(t, newP1.IsActive()) - assert.NotNil(t, newP1.offsetTimeline) - // because the bucket hasn't been cleaned up, the new watcher will read all the history data to create this new offsetTimeline - assert.Equal(t, int64(100), newP1.offsetTimeline.GetHeadOffset()) - - // publish a new watermark 101 - binary.LittleEndian.PutUint64(b, uint64(testOffset+1)) - _, err = p1OT.Put(fmt.Sprintf("%d", epoch), b) - assert.NoError(t, err) - - // "p1" becomes inactive after 5 loops - for !allProcessors["p1"].IsInactive() { - select { - case <-ctx.Done(): - t.Fatalf("expected p1 to be inactive: %s", ctx.Err()) - default: - time.Sleep(1 * time.Millisecond) - allProcessors = testBuffer.processorManager.GetAllProcessors() - } - } - - time.Sleep(time.Second) - - // resume after one second - go func() { - var err error - for i := 0; i < 5; i++ { - _, err = heartbeatBucket.Put("p1", []byte(fmt.Sprintf("%d", time.Now().Unix()))) - assert.NoError(t, err) - time.Sleep(time.Duration(testVertex.opts.podHeartbeatRate) * time.Second) - } - }() - - allProcessors = testBuffer.processorManager.GetAllProcessors() - for len(allProcessors) != 2 { - select { - case <-ctx.Done(): - t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err()) - default: - time.Sleep(1 * time.Millisecond) - allProcessors = testBuffer.processorManager.GetAllProcessors() - } - } - - // added 101 in the previous steps for newP1, so the head should be 101 after resume - assert.Equal(t, int64(101), newP1.offsetTimeline.GetHeadOffset()) - -} diff --git a/pkg/watermark/fetch/processor_to_fetch.go b/pkg/watermark/fetch/processor_to_fetch.go index b7843950f..4bbdfd18c 100644 --- a/pkg/watermark/fetch/processor_to_fetch.go +++ b/pkg/watermark/fetch/processor_to_fetch.go @@ -123,7 +123,7 @@ func (p *ProcessorToFetch) startTimeLineWatcher() { p.log.Errorw("Unable to convert value.PartitionID() to int64", zap.String("received", value.Key()), zap.Error(err)) continue } - // if skip is set to true, it means the key update we received is for a different processor (sharing of bucket) + // if skip is set to true, it means the key update we received is for a different processor if skip { continue } diff --git a/pkg/watermark/processor/entity.go b/pkg/watermark/processor/entity.go index 37991b667..94460dc1a 100644 --- a/pkg/watermark/processor/entity.go +++ b/pkg/watermark/processor/entity.go @@ -34,27 +34,18 @@ func (w Watermark) Before(t time.Time) bool { } type entityOptions struct { - separateOTBucket bool - keySeparator string + keySeparator string } // EntityOption set options for FromVertex. type EntityOption func(*entityOptions) -// WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline. -func WithSeparateOTBuckets(separate bool) EntityOption { - return func(opts *entityOptions) { - opts.separateOTBucket = separate - } -} - // ProcessorEntitier defines what can be a processor. The Processor is the smallest unit where the watermark will // monotonically increase. type ProcessorEntitier interface { GetID() string BuildOTWatcherKey(Watermark) string ParseOTWatcherKey(string) (int64, bool, error) - IsOTBucketShared() bool } // ProcessorEntity implements ProcessorEntitier. @@ -66,8 +57,8 @@ type ProcessorEntity struct { var _ ProcessorEntitier = (*ProcessorEntity)(nil) -// _defaultKeySeparator is the key separate when we have shared OT buckets. -// NOTE: we can only use `_` as the separator, Jetstream will not let any other special character. +// _defaultKeySeparator is the key separate used in the offset timeline kv. +// NOTE: we can only use `_` as the separator, JetStream will not let any other special character. // // Perhaps we can encode the key using base64, but it will have a performance hit. const _defaultKeySeparator = "_" @@ -75,8 +66,7 @@ const _defaultKeySeparator = "_" // NewProcessorEntity returns a new `ProcessorEntity`. func NewProcessorEntity(name string, inputOpts ...EntityOption) *ProcessorEntity { opts := &entityOptions{ - separateOTBucket: false, - keySeparator: _defaultKeySeparator, + keySeparator: _defaultKeySeparator, } for _, opt := range inputOpts { opt(opts) @@ -92,45 +82,27 @@ func (p *ProcessorEntity) GetID() string { return p.name } -// IsOTBucketShared returns true if the OT bucket is shared. -func (p *ProcessorEntity) IsOTBucketShared() bool { - return p.opts.separateOTBucket -} - // BuildOTWatcherKey builds the offset-timeline key name func (p *ProcessorEntity) BuildOTWatcherKey(watermark Watermark) string { - if p.opts.separateOTBucket { - return fmt.Sprintf("%d", watermark.UnixMilli()) - } else { - return fmt.Sprintf("%s%s%d", p.GetID(), p.opts.keySeparator, watermark.UnixMilli()) - } + return fmt.Sprintf("%s%s%d", p.GetID(), p.opts.keySeparator, watermark.UnixMilli()) } // ParseOTWatcherKey parses the key of the KeyValue OT watcher and returns the epoch, a boolean to indicate // whether the record can be skipped and error if any. // NOTE: _defaultKeySeparator has constraints, please make sure we will not end up with multiple values func (p *ProcessorEntity) ParseOTWatcherKey(key string) (epoch int64, skip bool, err error) { - var name string - var epochStr = key - // if not separate bucket, the key will have to be split - if !p.opts.separateOTBucket { - name, epochStr, err = p.splitKey(key) - if err != nil { - return 0, false, err - } - // skip if not this processor - skip = name != p.GetID() + name, epochStr, err := p.splitKey(key) + if err != nil { + return 0, false, err } + // skip if not this processor + skip = name != p.GetID() epoch, err = strconv.ParseInt(epochStr, 10, 64) return epoch, skip, err } func (p *ProcessorEntity) splitKey(key string) (string, string, error) { - // if there are separate buckets, the name will be not present and the key is the epoch - if p.opts.separateOTBucket { - return "", key, nil - } split := strings.Split(key, p.opts.keySeparator) if len(split) != 2 { return "", "", fmt.Errorf("key=%s when split using %s, did not have 2 outputs=%v", key, p.opts.keySeparator, split) diff --git a/pkg/watermark/processor/entity_test.go b/pkg/watermark/processor/entity_test.go index ddf4334bb..6796e490f 100644 --- a/pkg/watermark/processor/entity_test.go +++ b/pkg/watermark/processor/entity_test.go @@ -10,13 +10,6 @@ import ( func TestEntity(t *testing.T) { e := NewProcessorEntity("pod0") - assert.False(t, e.opts.separateOTBucket) - assert.Equal(t, "pod0", e.GetID()) -} - -func TestEntityDifferentBuckets(t *testing.T) { - e := NewProcessorEntity("pod0", WithSeparateOTBuckets(true)) - assert.True(t, e.IsOTBucketShared()) assert.Equal(t, "pod0", e.GetID()) } @@ -42,25 +35,9 @@ func TestProcessorEntity_ParseOTWatcherKey(t *testing.T) { wantSkip bool wantErr assert.ErrorAssertionFunc }{ - { - name: "good_with_split", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(true)), - arg: "1234", - wantEpoch: 1234, - wantSkip: false, - wantErr: assert.NoError, - }, - { - name: "bad_with_split", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(true)), - arg: _defaultKeySeparator + "1234", - wantEpoch: 0, - wantSkip: false, - wantErr: assert.Error, - }, { name: "bad_without_split_butSkip", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(false)), + p: NewProcessorEntity("test1"), arg: _defaultKeySeparator + "1234", // name is missing wantEpoch: 1234, wantSkip: true, @@ -68,7 +45,7 @@ func TestProcessorEntity_ParseOTWatcherKey(t *testing.T) { }, { name: "bad_without_split_missing_separator", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(false)), + p: NewProcessorEntity("test1"), arg: "1234", wantEpoch: 0, wantSkip: false, @@ -76,7 +53,7 @@ func TestProcessorEntity_ParseOTWatcherKey(t *testing.T) { }, { name: "good_without_split", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(false)), + p: NewProcessorEntity("test1"), arg: "test1" + _defaultKeySeparator + "1234", wantEpoch: 1234, wantSkip: false, @@ -84,7 +61,7 @@ func TestProcessorEntity_ParseOTWatcherKey(t *testing.T) { }, { name: "good_without_split_skip", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(false)), + p: NewProcessorEntity("test1"), arg: "test-not-this" + _defaultKeySeparator + "1234", wantEpoch: 1234, wantSkip: true, @@ -113,17 +90,9 @@ func TestProcessorEntity_splitKey(t *testing.T) { want1 string wantErr assert.ErrorAssertionFunc }{ - { - name: "good_separate_bucket", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(true)), - arg: "1234", - want: "", - want1: "1234", - wantErr: assert.NoError, - }, { name: "bad_expected_name", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(false)), + p: NewProcessorEntity("test1"), arg: "1234", want: "", want1: "", @@ -131,7 +100,7 @@ func TestProcessorEntity_splitKey(t *testing.T) { }, { name: "good_same_bucket", - p: NewProcessorEntity("test1", WithSeparateOTBuckets(false)), + p: NewProcessorEntity("test1"), arg: "p1" + _defaultKeySeparator + "1234", want: "p1", want1: "1234", diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 778a08824..6e19af50a 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -176,9 +176,6 @@ func (p *publish) StopPublisher() { // - remove itself from heartbeat bucket p.log.Infow("Stopping publisher", zap.String("bucket", p.heartbeatStore.GetStoreName())) - if !p.entity.IsOTBucketShared() { - p.log.Warnw("Non sharing of bucket is not supported by controller as of today", zap.String("bucket", p.heartbeatStore.GetStoreName())) - } // clean up heartbeat bucket err := p.heartbeatStore.DeleteKey(p.ctx, p.entity.GetID()) diff --git a/pkg/watermark/publish/publisher_inmem_test.go b/pkg/watermark/publish/publisher_inmem_test.go index dee31c197..1da5bade1 100644 --- a/pkg/watermark/publish/publisher_inmem_test.go +++ b/pkg/watermark/publish/publisher_inmem_test.go @@ -57,41 +57,3 @@ func TestPublisherWithSharedOTBuckets_InMem(t *testing.T) { assert.Equal(t, fmt.Errorf("key publisherTestPod1 not found"), err) } - -func TestPublisherWithSeparateOTBucket_InMem(t *testing.T) { - var ctx = context.Background() - - var keyspace = "publisherTest" - - publishEntity := processor.NewProcessorEntity("publisherTestPod1", processor.WithSeparateOTBuckets(true)) - heartbeatKV, _, err := inmem.NewKVInMemKVStore(ctx, "testPublisher", keyspace+"_PROCESSORS") - assert.NoError(t, err) - otKV, _, err := inmem.NewKVInMemKVStore(ctx, "testPublisher", keyspace+"_OT") - assert.NoError(t, err) - - p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) - - var epoch int64 = 1651161600000 - var location, _ = time.LoadLocation("UTC") - for i := 0; i < 3; i++ { - p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) - epoch += 60000 - time.Sleep(time.Millisecond) - } - // publish a stale watermark (offset doesn't matter) - p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) - - keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"1651161600000", "1651161660000", "1651161720000"}, keys) - - wm := p.loadLatestFromStore() - assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String()) - - head := p.GetLatestWatermark() - assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) - - p.StopPublisher() - - _, err = p.heartbeatStore.GetValue(ctx, publishEntity.GetID()) - assert.ErrorContains(t, err, "key publisherTestPod1 not found") -} diff --git a/pkg/watermark/publish/publisher_test.go b/pkg/watermark/publish/publisher_test.go index 6b54fab4e..27c759f0e 100644 --- a/pkg/watermark/publish/publisher_test.go +++ b/pkg/watermark/publish/publisher_test.go @@ -30,61 +30,6 @@ func createAndLaterDeleteBucket(js *jsclient.JetStreamContext, kvConfig *nats.Ke }, nil } -func TestPublisherWithSeparateOTBuckets(t *testing.T) { - var ctx = context.Background() - - defaultJetStreamClient := jsclient.NewDefaultJetStreamClient(nats.DefaultURL) - conn, err := defaultJetStreamClient.Connect(ctx) - assert.NoError(t, err) - js, err := conn.JetStream() - assert.NoError(t, err) - - var publisherHBKeyspace = "publisherTest_PROCESSORS" - deleteFn, err := createAndLaterDeleteBucket(js, &nats.KeyValueConfig{Bucket: publisherHBKeyspace}) - assert.NoError(t, err) - defer deleteFn() - - // this test uses separate OT buckets, so it is an OT bucket per processor - var publisherOTKeyspace = "publisherTest_OT_publisherTestPod1" - deleteFn, err = createAndLaterDeleteBucket(js, &nats.KeyValueConfig{Bucket: publisherOTKeyspace}) - assert.NoError(t, err) - defer deleteFn() - - heartbeatKV, err := jetstream.NewKVJetStreamKVStore(ctx, "testPublisher", publisherHBKeyspace, defaultJetStreamClient) - assert.NoError(t, err) - otKV, err := jetstream.NewKVJetStreamKVStore(ctx, "testPublisher", publisherOTKeyspace, defaultJetStreamClient) - assert.NoError(t, err) - - publishEntity := processor.NewProcessorEntity("publisherTestPod1") - - p := NewPublish(ctx, publishEntity, store.BuildWatermarkStore(heartbeatKV, otKV), WithAutoRefreshHeartbeatDisabled(), WithPodHeartbeatRate(1)).(*publish) - - var epoch int64 = 1651161600000 - var location, _ = time.LoadLocation("UTC") - for i := 0; i < 3; i++ { - p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(i) })) - epoch += 60000 - time.Sleep(time.Millisecond) - } - // publish a stale watermark (offset doesn't matter) - p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) - - keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"publisherTestPod1_1651161600000", "publisherTestPod1_1651161660000", "publisherTestPod1_1651161720000"}, keys) - - wm := p.loadLatestFromStore() - assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String()) - - head := p.GetLatestWatermark() - assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), head.String()) - - p.StopPublisher() - - _, err = p.heartbeatStore.GetValue(ctx, publishEntity.GetID()) - assert.Equal(t, nats.ErrConnectionClosed, err) - -} - func TestPublisherWithSharedOTBucket(t *testing.T) { var ctx = context.Background() @@ -102,7 +47,7 @@ func TestPublisherWithSharedOTBucket(t *testing.T) { deleteFn, err = createAndLaterDeleteBucket(js, &nats.KeyValueConfig{Bucket: keyspace + "_OT"}) defer deleteFn() - publishEntity := processor.NewProcessorEntity("publisherTestPod1", processor.WithSeparateOTBuckets(true)) + publishEntity := processor.NewProcessorEntity("publisherTestPod1") heartbeatKV, err := jetstream.NewKVJetStreamKVStore(ctx, "testPublisher", keyspace+"_PROCESSORS", defaultJetStreamClient) assert.NoError(t, err) @@ -122,7 +67,7 @@ func TestPublisherWithSharedOTBucket(t *testing.T) { p.PublishWatermark(processor.Watermark(time.UnixMilli(epoch-120000).In(location)), isb.SimpleStringOffset(func() string { return strconv.Itoa(0) })) keys := p.getAllOTKeysFromBucket() - assert.Equal(t, []string{"1651161600000", "1651161660000", "1651161720000"}, keys) + assert.Equal(t, []string{"publisherTestPod1_1651161600000", "publisherTestPod1_1651161660000", "publisherTestPod1_1651161720000"}, keys) wm := p.loadLatestFromStore() assert.Equal(t, processor.Watermark(time.UnixMilli(epoch-60000).In(location)).String(), wm.String())