Skip to content

Commit

Permalink
fix: watermark consumer fix (#273)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
  • Loading branch information
jy4096 authored and whynowy committed Oct 27, 2022
1 parent d2a3d90 commit f25e303
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (ds *daemonServer) Run(ctx context.Context) error {

func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) (*grpc.Server, error) {
// "Prometheus histograms are a great way to measure latency distributions of your RPCs.
// However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default.
// However, since it is a bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default.
// To enable them please call the following in your server initialization code:"
grpc_prometheus.EnableHandlingTimeHistogram()

Expand Down
6 changes: 5 additions & 1 deletion pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ func NewEdgeFetcher(ctx context.Context, edgeName string, processorManager *Proc

// GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This
// can be used in showing the watermark progression for a vertex when not consuming the messages
// directly (eg. UX, tests,)
// directly (eg. UX, tests)
// NOTE
// - We don't use this function in the regular pods in the vertex.
// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens.
// Meaning, in the UX (daemon service) we never delete any processor.
func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
var debugString strings.Builder
var headOffset int64 = math.MinInt64
Expand Down
4 changes: 2 additions & 2 deletions pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm
return v
}

// addProcessor adds a new processor.
// addProcessor adds a new processor. If the given processor already exists, the value will be updated.
func (v *ProcessorManager) addProcessor(processor string, p *ProcessorToFetch) {
v.lock.Lock()
defer v.lock.Unlock()
Expand Down Expand Up @@ -161,7 +161,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() {
case store.KVPut:
// do we have such a processor
p := v.GetProcessor(value.Key())
if p == nil {
if p == nil || p.IsDeleted() {
// if p is nil, create a new processor
// A fromProcessor needs to be added to v.processors
// The fromProcessor may have been deleted
Expand Down
1 change: 1 addition & 0 deletions pkg/watermark/publish/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) {
}
// update p.headWatermark only if wm > p.headWatermark
if wm.After(time.Time(p.headWatermark)) {
p.log.Debugw("New watermark updated for head water mark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
p.headWatermark = wm
} else {
p.log.Infow("New watermark is ignored because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/store/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type jetStreamWatch struct {

var _ store.WatermarkKVWatcher = (*jetStreamWatch)(nil)

// NewKVJetStreamKVWatch returns KVJetStreamWatch specific to Jetstream which implements the WatermarkKVWatcher interface.
// NewKVJetStreamKVWatch returns KVJetStreamWatch specific to JetStream which implements the WatermarkKVWatcher interface.
func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketName string, client jsclient.JetStreamClient, opts ...JSKVWatcherOption) (store.WatermarkKVWatcher, error) {
var err error
conn, err := client.Connect(ctx)
Expand Down

0 comments on commit f25e303

Please sign in to comment.