Skip to content

[FLINK-37701][flink-runtime] Fix AdaptiveScheduler ignoring checkpoint states sizes for local recovery adjustment. #26663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Izeren
Copy link
Contributor

@Izeren Izeren commented Jun 10, 2025

What is the purpose of the change

Address local recovery issues when Adaptive scheduler is enabled.

  1. Pass latest completed checkpoint in addition to execution graph to StateSizeEstimates (that is needed because execution graph goes through cancelling/cancelled state and checkpoint coordinator is nulled by the time we run calculations).
  2. Assign positive priority score to allocations that have overlapping key groups even when state size is zero (currently we would only give priority score if managedKeyedState is present, but local recovery semantics doesn't require state presence).

Context: When job can be recovered locally, we should keep slot allocation after restart to maintain

Verifying this change

LocalRecoveryITCase#testRecoverLocallyFromProcessCrashWithWorkingDirectory now passes when AdaptiveScheduler is enabled.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 10, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @Izeren !
I've left a couple of comments, PTAL.
Were you able to reproduce the failure reliably without fix - and success with the fix?

@lsyldliu
Copy link
Contributor

The ci is failed.

@dmvk dmvk self-requested a review June 11, 2025 07:56
@dmvk
Copy link
Member

dmvk commented Jun 11, 2025

@Izeren rebasing the PR to include 374fedb should fix the CI

@Izeren
Copy link
Contributor Author

Izeren commented Jun 11, 2025

@Izeren rebasing the PR to include 374fedb should fix the CI

Thank you, will do

@Izeren
Copy link
Contributor Author

Izeren commented Jun 11, 2025

Thanks for the fix @Izeren ! I've left a couple of comments, PTAL. Were you able to reproduce the failure reliably without fix - and success with the fix?

Yes, it fails in intellij more than it doesn't, but you need to provide VM options: -Dflink.tests.enable-adaptive-scheduler=true to ensure that adaptive scheduler is used. With the fix it is consistently successful.

@dmvk
Copy link
Member

dmvk commented Jun 11, 2025

  1. We should add test case to AdaptiveScheduler with custom implementation of SlotAssigner that acts as a regression test
  2. We should add test case to SlotAssigner implementation that verifies how SA behaves in non-rescaling scenarios, when we simply want to reuse previously known allocations

@Izeren
Copy link
Contributor Author

Izeren commented Jun 13, 2025

  • We should add test case to AdaptiveScheduler with custom implementation of SlotAssigner that acts as a regression test
  • We should add test case to SlotAssigner implementation that verifies how SA behaves in non-rescaling scenarios, when we simply want to reuse previously known allocations

@dmvk, I have added 2 tests:

  1. For AdaptiveScheduler to ensure that SlotAllocator receives data from the checkpoint and will use it for distribution. (I couldn't check the checkpoints themselves as it AllocationInformation calculated through static call, hence I verify that state made it into SlotAllocator after).
  2. For SlotAllocator to ensure that it preserves allocation according to state distribution and should retain allocation for a job restart.

@Izeren
Copy link
Contributor Author

Izeren commented Jun 16, 2025

I am still looking into few other test failures in AdaptiveScheduler

@Izeren Izeren reopened this Jun 16, 2025
@lsyldliu
Copy link
Contributor

@Izeren Hi, can we push this fix before code freeze time?

@Izeren
Copy link
Contributor Author

Izeren commented Jun 18, 2025

@dmvk, would you have time to have a second look today?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants