From f25e303e773d4a0e1f3d32815f839628e35278f0 Mon Sep 17 00:00:00 2001 From: Juanlu Yu <19543684+chromevoid@users.noreply.github.com> Date: Wed, 26 Oct 2022 05:55:59 -0700 Subject: [PATCH] fix: watermark consumer fix (#273) Signed-off-by: jyu6 --- pkg/daemon/server/daemon_server.go | 2 +- pkg/watermark/fetch/edge_fetcher.go | 6 +++++- pkg/watermark/fetch/processor_manager.go | 4 ++-- pkg/watermark/publish/publisher.go | 1 + pkg/watermark/store/jetstream/kv_watch.go | 2 +- 5 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/daemon/server/daemon_server.go b/pkg/daemon/server/daemon_server.go index 90124aec7..ae5c968ba 100644 --- a/pkg/daemon/server/daemon_server.go +++ b/pkg/daemon/server/daemon_server.go @@ -92,7 +92,7 @@ func (ds *daemonServer) Run(ctx context.Context) error { func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) (*grpc.Server, error) { // "Prometheus histograms are a great way to measure latency distributions of your RPCs. - // However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. + // However, since it is a bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. // To enable them please call the following in your server initialization code:" grpc_prometheus.EnableHandlingTimeHistogram() diff --git a/pkg/watermark/fetch/edge_fetcher.go b/pkg/watermark/fetch/edge_fetcher.go index a042e0672..a607fe6fa 100644 --- a/pkg/watermark/fetch/edge_fetcher.go +++ b/pkg/watermark/fetch/edge_fetcher.go @@ -35,7 +35,11 @@ func NewEdgeFetcher(ctx context.Context, edgeName string, processorManager *Proc // GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This // can be used in showing the watermark progression for a vertex when not consuming the messages -// directly (eg. UX, tests,) +// directly (eg. UX, tests) +// NOTE +// - We don't use this function in the regular pods in the vertex. +// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens. +// Meaning, in the UX (daemon service) we never delete any processor. func (e *edgeFetcher) GetHeadWatermark() processor.Watermark { var debugString strings.Builder var headOffset int64 = math.MinInt64 diff --git a/pkg/watermark/fetch/processor_manager.go b/pkg/watermark/fetch/processor_manager.go index b0c8d0605..77fda004b 100644 --- a/pkg/watermark/fetch/processor_manager.go +++ b/pkg/watermark/fetch/processor_manager.go @@ -58,7 +58,7 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm return v } -// addProcessor adds a new processor. +// addProcessor adds a new processor. If the given processor already exists, the value will be updated. func (v *ProcessorManager) addProcessor(processor string, p *ProcessorToFetch) { v.lock.Lock() defer v.lock.Unlock() @@ -161,7 +161,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() { case store.KVPut: // do we have such a processor p := v.GetProcessor(value.Key()) - if p == nil { + if p == nil || p.IsDeleted() { // if p is nil, create a new processor // A fromProcessor needs to be added to v.processors // The fromProcessor may have been deleted diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 446abb8e7..c0548ab61 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -83,6 +83,7 @@ func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) { } // update p.headWatermark only if wm > p.headWatermark if wm.After(time.Time(p.headWatermark)) { + p.log.Debugw("New watermark updated for head water mark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) p.headWatermark = wm } else { p.log.Infow("New watermark is ignored because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String())) diff --git a/pkg/watermark/store/jetstream/kv_watch.go b/pkg/watermark/store/jetstream/kv_watch.go index fc0e175e5..2b91889ad 100644 --- a/pkg/watermark/store/jetstream/kv_watch.go +++ b/pkg/watermark/store/jetstream/kv_watch.go @@ -24,7 +24,7 @@ type jetStreamWatch struct { var _ store.WatermarkKVWatcher = (*jetStreamWatch)(nil) -// NewKVJetStreamKVWatch returns KVJetStreamWatch specific to Jetstream which implements the WatermarkKVWatcher interface. +// NewKVJetStreamKVWatch returns KVJetStreamWatch specific to JetStream which implements the WatermarkKVWatcher interface. func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketName string, client jsclient.JetStreamClient, opts ...JSKVWatcherOption) (store.WatermarkKVWatcher, error) { var err error conn, err := client.Connect(ctx)