Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elaborate savepoint and update features #107

Merged
merged 11 commits into from
Oct 15, 2021

Conversation

elanv
Copy link
Contributor

@elanv elanv commented Sep 16, 2021

Purpose of this PR

Currently, savepoint and its related routines are scattered in several places. It make difficult to enhance this operator now. This PR organizes them so that savepoint-related routines can be improved and extended in the future. It also improves the update, cancel and recovery features that depend on savepoint routines.

resolve #84
fix #85
fix #95
fix #115

Changes

  • Make job deploy phase clearly with new job states.
  • Organize savepoint routines.
  • Fix some savepoint related issues.
  • Improve update stability.
  • Change the job stop process that is applied when updating and canceling a job.
  • Elaborate update/restart strategy more.

Details

  • Organize and fix savepoint routine

    • Organize Savepoint handling and related routines in one place
    • Auto savepoint
      • Delete lastSavepointTriggerTime and lastSavepointTriggerID: duplicated with status.savepoint
      • Change the first trigger to be based on status.job.startTime and delete SavepointTriggerReasonScheduledInitial
    • Savepoint state
      • Add a routine to derive the state from HTTP code
      • Get rid of operator's own savepoint timeout error
  • Change job stop behavior when updating and cancelling a job

    • From version 1.9, the stop API that supports exactly-once semantics was introduced, but for compatibility up to version 1.8, "cancel with savepoint" will be applied first. In the future, add the flinkVersion field and support "stop with savepoint" in 1.9 or higher.
    • Apply "cancel with savepoint" API
  • Improve update process

  • Elaborate update/restart strategy
    Limit job state age from which job can be restarted when auto restarting from failure, updating stopped job and updating running job wiith takeSavepointOnUpdate false

    • Add a field to limit maximum savepoint age to restore for job on restart.
      • MaxStateAgeToRestoreSeconds
  • Add new job deployment states

    • Deploying, DeployFailed, Restarting
      image

- fix handling failed auto savepoint
- fix validations and tests related to changes
- improve update routine
- change the behavior of handling unexpected jobs
- add a constraint for update: when takeSavepointOnUpdate is true, latest savepoint age should be less than maxStateAgeToRestore
@elanv
Copy link
Contributor Author

elanv commented Sep 16, 2021

Hello @regadas.
The work is almost done, but there are a lot of changes.
I am going to do testing and fixing a little more.

@elanv
Copy link
Contributor Author

elanv commented Sep 17, 2021

Currently this PR does not work with the job mode, "blocking".
In this PR, job is tracked by an ID obtained from job submitter.

It would be nice if we could discuss the issue #110 too.

@hjwalt
Copy link
Contributor

hjwalt commented Sep 24, 2021

@elanv @regadas I have some questions about the savepoint status:

  1. Is it better to move savepoint status into JobStatus? It can be useful to support multiple job submission (if it makes sense to do so)

  2. From my understanding, savepoint failure will also get updated into the savepoint status, how will the restart work with the failed savepoint if it is taken periodically? Since this PR also adds MaxStateAgeToRestoreSeconds, does it make sense to also add LastSuccessfulSavepoint savepoint status and use that for restarting the job if it still fulfils the age or the age is nil? This is relevant for execution.checkpointing.tolerable-failed-checkpoints as savepoint failures also counts to this configuration.

The operator can only automatically restart a failed job when there is a savepoint recorded in the job status whether it is automatically or manually taken; otherwise, the job will stay in failed state.

@elanv
Copy link
Contributor Author

elanv commented Sep 24, 2021

Hi @hjwalt.
Thanks for your review.

For the first one, I also hope the feature like multiple job management on single session cluster. Do you mean this feature too? I wrote a issue about the feature in gcp operator repo. However, I think it is better to introduce new CRD as I wrote in that issue than add this feature to FlinkCluster CRD.
(note: GoogleCloudPlatform/flink-on-k8s-operator#303)

For the second, only last successful savepoint is recorded in status.components.job, and the last savepoint status is recorded in status.savepoint. So it's working as you understand.

@hjwalt
Copy link
Contributor

hjwalt commented Sep 24, 2021

@elanv thank you for the explanation. Yes it is as you described, having multiple job management on one session cluster, and yes its not relevant for this PR.

@hjwalt
Copy link
Contributor

hjwalt commented Sep 25, 2021

@elanv more thoughts after reading this PR more:

  1. The changes in CRD doesn't look backward compatible to me, which will break existing cluster when upgrading. I think this is better done with a new CRD definition version, and as you put in another issue, I think time can be invested better in writing a better CRD and reconciler. Fixing the current problems while maintaining backward compatibility seems pretty difficult to me.

  2. If we are to refactor the CRD and reconciler, I think it would be good to look into using finite state machines (either self implemented or a library) with direct and observed transitions. This way, new states and transitions can be added in the future with minimal changes to existing states and transitions. We can start with your changes and work upwards with new CRD.

current state -> transition action (direct transition) -> intermediate state -> background work completed (observed transition) -> final state

The current confusion in recovery mechanism also affects me too, if you are open to coordinate the effort for a new CRD version, I can help to implement some of the features.

@elanv
Copy link
Contributor Author

elanv commented Sep 25, 2021

1. The changes in CRD doesn't look backward compatible to me, which will break existing cluster when upgrading. I think this is better done with a new CRD definition version, and as you put in another issue, I think time can be invested better in writing a better CRD and reconciler. Fixing the current problems while maintaining backward compatibility seems pretty difficult to me.

When the updated operator observes the existing FlinkCluster, it records the calculated state according to the changed CRD data structure and then reconciles based on it, so if you change the CRD carefully, you can smoothly switch to new CRD. And since the code of the gcp operator has not been well maintained, there are many critical bugs covered in this PR. If there are no critical problems, I think it is better to stabilize even if there are small breaking changes. #85 , #95 , #115 seems related to this PR.

Could you explain more about your problem?

2. If we are to refactor the CRD and reconciler, I think it would be good to look into using finite state machines (either self implemented or a library) with direct and observed transitions. This way, new states and transitions can be added in the future with minimal changes to existing states and transitions. We can start with your changes and work upwards with new CRD.

In my opinion, since k8s controller operates on the mechanism "observe --> calculate desired state --> reconcile", it is not appropriate to apply state machine. For example, depending on the status at the time of observation, sometimes the state may have to be skipped. As far as I know, k8s native controllers are not implemented as state machines for these reasons.

I attached a diagram indicating the state transition of the job to this PR, but it is not intended to implement the state machine. Those are only observed state by the operator for the reconciliation stage. Therefore, state may be skipped because it is recorded at the time of observation.

@hjwalt
Copy link
Contributor

hjwalt commented Sep 27, 2021

Could you explain more about your problem?

Just putting it out that I don't have a lot of experience testing k8s operators :) . It just seems to me that old savepoint information on existing clusters will be lost when upgrading the operator. Didn't test it so I could be wrong.

In my opinion, since k8s controller operates on the mechanism "observe --> calculate desired state --> reconcile", it is not appropriate to apply state machine. For example, depending on the status at the time of observation, sometimes the state may have to be skipped. As far as I know, k8s native controllers are not implemented as state machines for these reasons.

Yes, we can't attach it to the reconcile cycle, I'm more referring to use FSM in internal logic for job status. Its just a thought that FSM is better for maintenance sanity. k8s works on fairly simple status transition (like waiting, running, terminated, completed, error, crashloopbackoff, failed). The job status you proposed is much more complicated than that.

@elanv
Copy link
Contributor Author

elanv commented Sep 27, 2021

Just putting it out that I don't have a lot of experience testing k8s operators :) . It just seems to me that old savepoint information on existing clusters will be lost when upgrading the operator. Didn't test it so I could be wrong.

It seems good to leave the fields you pointed out in v1beta1.

Yes, we can't attach it to the reconcile cycle, I'm more referring to use FSM in internal logic for job status. Its just a thought that FMS is better for maintenance sanity. k8s works on fairly simple status transition (like waiting, running, terminated, completed, error, crashloopbackoff, failed). The job status you proposed is much more complicated than that.

Understand. The updater's job status routine is particularly complex, and it would be nice if it could be improved.

And to support the blocking mode, the job tracking routine needs to be reverted, but it seems that time is needed to verify it works well.

@elanv
Copy link
Contributor Author

elanv commented Oct 9, 2021

@hjwalt It does not seem to lose the existing savepoint location. The savepoint location is recorded as SavepointLocation and it remains the same. However, duplicates of some other savepoint related information have been removed or changed. I think you can check the CRD changes by looking at the diff of api/v1beta1/flinkcluster_types.go.

@elanv
Copy link
Contributor Author

elanv commented Oct 9, 2021

@regadas Finished almost. Could you review this PR? There is some changes in the functions for extracting log.

@regadas
Copy link
Contributor

regadas commented Oct 13, 2021

Awesome! Thanks @elanv I'll have a look at this later today!

Copy link
Contributor

@regadas regadas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM; most of the comments can actually be address on follow up PR's. @elanv Are you adding more changes to this PR or can I go ahead and merge?

@@ -376,12 +377,20 @@ type JobSpec struct {
// Allow non-restored state, default: false.
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`

// Should take savepoint before upgrading the job, default: false.
TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove this in a next release? Keeping it backward compact for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can break now; I don't give guarantees with v1beta1

Comment on lines +278 to +280
var recordedJob = recorded.Components.Job
var extractLog = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
err = observer.observeSubmitter(extractLog, &submitter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎨 we can move the extractLog logic inside observeSubmitter to make the method intent clearer.

var jobSpec = reconciler.observed.cluster.Spec.Job
var jobStatus = reconciler.observed.cluster.Status.Components.Job
var savepointStatus = reconciler.observed.cluster.Status.Savepoint
func (reconciler *ClusterReconciler) shouldTakeSavepoint() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to return a proper reason type here

@elanv
Copy link
Contributor Author

elanv commented Oct 15, 2021

This LGTM; most of the comments can actually be address on follow up PR's. @elanv Are you adding more changes to this PR or can I go ahead and merge?

@regadas Thanks! There are no more commits to add. I'll make a new PR for the comments if it's okay.

@regadas regadas merged commit 6019051 into spotify:master Oct 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants