Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: watermark consumer fix #273

Merged
merged 5 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (ds *daemonServer) Run(ctx context.Context) error {

func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) (*grpc.Server, error) {
// "Prometheus histograms are a great way to measure latency distributions of your RPCs.
// However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default.
// However, since it is a bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default.
// To enable them please call the following in your server initialization code:"
grpc_prometheus.EnableHandlingTimeHistogram()

Expand Down
6 changes: 5 additions & 1 deletion pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ func NewEdgeFetcher(ctx context.Context, edgeName string, processorManager *Proc

// GetHeadWatermark returns the watermark using the HeadOffset (the latest offset among all processors). This
// can be used in showing the watermark progression for a vertex when not consuming the messages
// directly (eg. UX, tests,)
// directly (eg. UX, tests)
// NOTE
// - We don't use this function in the regular pods in the vertex.
// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens.
// Meaning, in the UX (daemon service) we never delete any processor.
func (e *edgeFetcher) GetHeadWatermark() processor.Watermark {
var debugString strings.Builder
var headOffset int64 = math.MinInt64
Expand Down
4 changes: 2 additions & 2 deletions pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm
return v
}

// addProcessor adds a new processor.
// addProcessor adds a new processor. If the given processor already exists, the value will be updated.
func (v *ProcessorManager) addProcessor(processor string, p *ProcessorToFetch) {
v.lock.Lock()
defer v.lock.Unlock()
Expand Down Expand Up @@ -161,7 +161,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() {
case store.KVPut:
// do we have such a processor
p := v.GetProcessor(value.Key())
if p == nil {
if p == nil || p.IsDeleted() {
vigith marked this conversation as resolved.
Show resolved Hide resolved
// if p is nil, create a new processor
// A fromProcessor needs to be added to v.processors
// The fromProcessor may have been deleted
Expand Down
1 change: 1 addition & 0 deletions pkg/watermark/publish/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (p *publish) PublishWatermark(wm processor.Watermark, offset isb.Offset) {
}
// update p.headWatermark only if wm > p.headWatermark
if wm.After(time.Time(p.headWatermark)) {
p.log.Debugw("New watermark updated for head water mark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
p.headWatermark = wm
} else {
p.log.Infow("New watermark is ignored because it's older than the current watermark", zap.String("head", p.headWatermark.String()), zap.String("new", wm.String()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/store/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type jetStreamWatch struct {

var _ store.WatermarkKVWatcher = (*jetStreamWatch)(nil)

// NewKVJetStreamKVWatch returns KVJetStreamWatch specific to Jetstream which implements the WatermarkKVWatcher interface.
// NewKVJetStreamKVWatch returns KVJetStreamWatch specific to JetStream which implements the WatermarkKVWatcher interface.
func NewKVJetStreamKVWatch(ctx context.Context, pipelineName string, kvBucketName string, client jsclient.JetStreamClient, opts ...JSKVWatcherOption) (store.WatermarkKVWatcher, error) {
var err error
conn, err := client.Connect(ctx)
Expand Down