forked from ing-bank/flink-deployer
/
update_job.go
157 lines (136 loc) · 4.49 KB
/
update_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package operations
import (
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/cenkalti/backoff"
"github.com/ing-bank/flink-deployer/cmd/cli/flink"
)
// UpdateJob represents the configuration used for
// updating a job on the Flink cluster
type UpdateJob struct {
JobNameBase string
LocalFilename string
RemoteFilename string
APIToken string
EntryClass string
Parallelism int
ProgramArgs []string
SavepointDir string
AllowNonRestoredState bool
Deploy bool
MaxSavepointDuration int
}
func (o RealOperator) filterRunningJobsByName(jobs []flink.Job, jobNameBase string) (ret []flink.Job) {
for _, job := range jobs {
if job.Status == "RUNNING" && strings.HasPrefix(job.Name, jobNameBase) {
ret = append(ret, job)
}
}
return
}
func (o RealOperator) monitorSavepointCreation(jobID string, requestID string, maxElapsedTime int) error {
op := func() error {
log.Println("checking status of savepoint creation")
res, err := o.FlinkRestAPI.MonitorSavepointCreation(jobID, requestID)
if err != nil {
log.Println(err)
return err
}
switch res.Status.Id {
case "COMPLETED":
if res.Operation != nil && res.Operation.FailureCause != nil {
err = fmt.Errorf("savepoint completed with failure class %s: %s", res.Operation.FailureCause.Class, res.Operation.FailureCause.StackTrace)
log.Println(err)
return err
}
return nil
case "IN_PROGRESS":
err = fmt.Errorf("savepoint creation for job \"%v\" is still pending", jobID)
log.Println(err)
return err
default:
err = fmt.Errorf("savepoint creation for job \"%v\" returned an unknown status \"%v\"", jobID, res.Status)
log.Println(err)
return err
}
}
b := &backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: backoff.DefaultMaxInterval,
MaxElapsedTime: time.Duration(maxElapsedTime) * time.Second,
Clock: backoff.SystemClock,
}
err := backoff.Retry(op, b)
if err != nil {
return fmt.Errorf("failed to create savepoint for job \"%v\" within %v seconds", jobID, b.MaxElapsedTime.Seconds())
}
b.Reset()
return nil
}
// Update executes the actual update of a job on the Flink cluster
func (o RealOperator) Update(u UpdateJob) error {
if len(u.JobNameBase) == 0 {
return errors.New("unspecified argument 'JobNameBase'")
}
if len(u.SavepointDir) == 0 {
return errors.New("unspecified argument 'SavepointDir'")
}
log.Printf("starting job update for base name '%v' and savepoint dir '%v'\n", u.JobNameBase, u.SavepointDir)
jobs, err := o.FlinkRestAPI.RetrieveJobs()
if err != nil {
return fmt.Errorf("retrieving jobs failed: %v", err)
}
runningJobs := o.filterRunningJobsByName(jobs, u.JobNameBase)
deploy := Deploy{
LocalFilename: u.LocalFilename,
RemoteFilename: u.RemoteFilename,
APIToken: u.APIToken,
EntryClass: u.EntryClass,
Parallelism: u.Parallelism,
ProgramArgs: u.ProgramArgs,
AllowNonRestoredState: u.AllowNonRestoredState,
}
switch len(runningJobs) {
case 0:
if u.Deploy {
log.Printf("no instance running for job name base \"%v\". Creating new", u.JobNameBase)
} else {
return fmt.Errorf("no instance running for job name base \"%v\". Aborting update", u.JobNameBase)
}
case 1:
log.Printf("found exactly 1 running job with base name: \"%v\"", u.JobNameBase)
job := runningJobs[0]
log.Printf("creating savepoint for job \"%v\"", job.ID)
savepointResponse, err := o.FlinkRestAPI.CreateSavepoint(job.ID, u.SavepointDir)
if err != nil {
return fmt.Errorf("failed to create savepoint for job %v due to error: %v", job.ID, err)
}
err = o.monitorSavepointCreation(job.ID, savepointResponse.RequestID, u.MaxSavepointDuration)
if err != nil {
return err
}
err = o.FlinkRestAPI.Terminate(job.ID, "cancel")
if err != nil {
return fmt.Errorf("job \"%v\" failed to cancel due to: %v", job.ID, err)
}
default:
return fmt.Errorf("job name with base \"%v\" has %v instances running. Aborting update", u.JobNameBase, len(runningJobs))
}
latestSavepoint, err := o.retrieveLatestSavepoint(u.SavepointDir)
if err != nil {
return fmt.Errorf("retrieving the latest savepoint failed: %v", err)
}
if len(latestSavepoint) != 0 {
deploy.SavepointPath = latestSavepoint
}
err = o.Deploy(deploy)
if err != nil {
return err
}
return nil
}