Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: watermark consumer fix #273

Merged
merged 5 commits into from
Oct 26, 2022
Merged

Conversation

jy4096
Copy link
Contributor

@jy4096 jy4096 commented Oct 26, 2022

fix: #251
All cases use simple pipeline and JetStream isbsvc

Test on main branch when disabling daemon service

initial state: in(1) cat(1) out(1)
image
scale down: in(0) cat(0) out(0)
image
scale up: in(1) cat(1) out(1)
image
scale up: in(2) cat(2) out(2)
image

Based on above observation, I assume the regular pods in the vertex are working as expected.

Test on this branch with the fix

Explanation: When a processor exits, we keep the offsetTimeline until all the offset in the offsetTimeline are outdated, and then delete the processor from the processor manager. This operation is executed in the "GetWatermark" function. However, in the UX, we only use GetHeadWatermark function, so the exited processor never gets deleted in the processor manager of the daemon service. When we scale up again and add back the processor, because we only check on processor is nil, the new coming back processor is not added back properly to the processor manager. Hence, the watcher is lost for the coming back processor, and the watermark in the UI stops updating.

Actual test:
initial state: in(1) cat(1) out(1)

{"level":"info","ts":1666782273.4887934,"logger":"numaflow","caller":"fetch/processor_manager.go:172","msg":"v.AddProcessor successfully added a new fromProcessor","fromProcessor":"simple-pipeline-in-0"}
{"level":"info","ts":1666782274.5278976,"logger":"numaflow","caller":"fetch/processor_manager.go:172","msg":"v.AddProcessor successfully added a new fromProcessor","fromProcessor":"simple-pipeline-cat-0"}
{"level":"info","ts":1666782274.5298617,"logger":"numaflow","caller":"fetch/processor_manager.go:172","msg":"v.AddProcessor successfully added a new fromProcessor","fromProcessor":"simple-pipeline-out-0"}

image

scale down: in(0) cat(0) out(0)
{"level":"info","ts":1666782290.613133,"logger":"numaflow","caller":"fetch/processor_manager.go:191","msg":"Deleting","key":"simple-pipeline-out-0","simple-pipeline-out-0":"simple-pipeline-out-0 status:active, timeline: [1666782287:1666782289608293487] -> [1666782286:1666782288602169497] -> [1666782285:1666782287594846020] -> [1666782284:1666782286591291306] -> [1666782283:1666782285585891914] -> [1666782282:1666782284581331796] -> [1666782281:1666782283578595421] -> [1666782280:1666782282573982646] -> [1666782278:1666782280565193576] -> [1666782277:1666782279561633753]"}
{"level":"info","ts":1666782290.6132045,"logger":"numaflow","caller":"jetstream/kv_watch.go:113","msg":"stopping WatchAll","pipeline":"simple-pipeline","kvBucketName":"simple-pipeline-default-simple-pipeline-out_SINK_OT","watcher":"simple-pipeline-default-simple-pipeline-out_SINK_OT"}
{"level":"info","ts":1666782290.6182003,"logger":"numaflow","caller":"jetstream/kv_watch.go:119","msg":"WatchAll successfully stopped","pipeline":"simple-pipeline","kvBucketName":"simple-pipeline-default-simple-pipeline-out_SINK_OT","watcher":"simple-pipeline-default-simple-pipeline-out_SINK_OT"}
{"level":"info","ts":1666782290.9000976,"logger":"numaflow","caller":"jetstream/default_jetstream_client.go:58","msg":"Nats: connected to nats server"}
{"level":"info","ts":1666782290.900885,"logger":"numaflow","caller":"jetstream/in_cluster_jetstream_client.go:70","msg":"Starting Nats JetStream auto reconnection daemon..."}
{"level":"info","ts":1666782290.9045897,"logger":"numaflow","caller":"jetstream/in_cluster_jetstream_client.go:93","msg":"Exited Nats JetStream auto reconnection daemon..."}
{"level":"info","ts":1666782291.6131392,"logger":"numaflow","caller":"fetch/processor_manager.go:191","msg":"Deleting","key":"simple-pipeline-in-0","simple-pipeline-in-0":"simple-pipeline-in-0 status:active, timeline: [1666782290:110] -> [1666782289:105] -> [1666782288:100] -> [1666782287:95] -> [1666782286:90] -> [1666782285:85] -> [1666782284:80] -> [1666782283:75] -> [1666782282:70] -> [1666782281:65]"}
{"level":"info","ts":1666782291.6139848,"logger":"numaflow","caller":"jetstream/kv_watch.go:113","msg":"stopping WatchAll","pipeline":"simple-pipeline","kvBucketName":"simple-pipeline-default-simple-pipeline-in-cat_OT","watcher":"simple-pipeline-default-simple-pipeline-in-cat_OT"}
{"level":"info","ts":1666782291.6157627,"logger":"numaflow","caller":"jetstream/kv_watch.go:119","msg":"WatchAll successfully stopped","pipeline":"simple-pipeline","kvBucketName":"simple-pipeline-default-simple-pipeline-in-cat_OT","watcher":"simple-pipeline-default-simple-pipeline-in-cat_OT"}
{"level":"info","ts":1666782291.6178179,"logger":"numaflow","caller":"fetch/processor_manager.go:191","msg":"Deleting","key":"simple-pipeline-cat-0","simple-pipeline-cat-0":"simple-pipeline-cat-0 status:active, timeline: [1666782289:110] -> [1666782288:105] -> [1666782287:100] -> [1666782286:95] -> [1666782285:90] -> [1666782284:85] -> [1666782283:80] -> [1666782282:75] -> [1666782281:70] -> [1666782280:65]"}
{"level":"info","ts":1666782291.6179025,"logger":"numaflow","caller":"jetstream/kv_watch.go:113","msg":"stopping WatchAll","pipeline":"simple-pipeline","kvBucketName":"simple-pipeline-default-simple-pipeline-cat-out_OT","watcher":"simple-pipeline-default-simple-pipeline-cat-out_OT"}
{"level":"info","ts":1666782291.620023,"logger":"numaflow","caller":"jetstream/kv_watch.go:119","msg":"WatchAll successfully stopped","pipeline":"simple-pipeline","kvBucketName":"simple-pipeline-default-simple-pipeline-cat-out_OT","watcher":"simple-pipeline-default-simple-pipeline-cat-out_OT"}

image

scale up: in(1) cat(1) out(1)
{"level":"info","ts":1666782318.689903,"logger":"numaflow","caller":"fetch/processor_manager.go:172","msg":"v.AddProcessor successfully added a new fromProcessor","fromProcessor":"simple-pipeline-cat-0"}
{"level":"info","ts":1666782318.7181356,"logger":"numaflow","caller":"fetch/processor_manager.go:172","msg":"v.AddProcessor successfully added a new fromProcessor","fromProcessor":"simple-pipeline-out-0"}
{"level":"info","ts":1666782318.7189388,"logger":"numaflow","caller":"fetch/processor_manager.go:172","msg":"v.AddProcessor successfully added a new fromProcessor","fromProcessor":"simple-pipeline-in-0"}

image

NOTE

Failed to reproduce the KV_simple-pipeline-default-simple-pipeline-in_SOURCE_OT keeps zero issue locally.

jyu6 added 5 commits October 25, 2022 15:46
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
fix
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
@jy4096 jy4096 marked this pull request as ready for review October 26, 2022 11:33
@whynowy whynowy merged commit 7bb689b into numaproj:main Oct 26, 2022
@jy4096 jy4096 deleted the watermark-consumer-fix-251 branch October 26, 2022 16:12
whynowy pushed a commit that referenced this pull request Oct 27, 2022
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Watermarks stop propagating after the source vertex scaling down to 0 and back to 1
3 participants