forked from vitessio/vitess
/
workflow.go
116 lines (96 loc) · 3.33 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package vtctld
import (
"flag"
"time"
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/flagutil"
"github.com/youtube/vitess/go/vt/schemamanager/schemaswap"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtctl"
"github.com/youtube/vitess/go/vt/workflow"
"github.com/youtube/vitess/go/vt/workflow/resharding"
"github.com/youtube/vitess/go/vt/workflow/topovalidator"
)
var (
workflowManagerInit = flag.Bool("workflow_manager_init", false, "Initialize the workflow manager in this vtctld instance.")
workflowManagerUseElection = flag.Bool("workflow_manager_use_election", false, "if specified, will use a topology server-based master election to ensure only one workflow manager is active at a time.")
workflowManagerDisable flagutil.StringListValue
)
func init() {
flag.Var(&workflowManagerDisable, "workflow_manager_disable", "comma separated list of workflow types to disable")
}
func initWorkflowManager(ts topo.Server) {
if *workflowManagerInit {
// Uncomment this line to register the UI test validator.
// topovalidator.RegisterUITestValidator()
// Register the Topo Validators, and the workflow.
topovalidator.RegisterKeyspaceValidator()
topovalidator.RegisterShardValidator()
topovalidator.Register()
// Register the Schema Swap workflow.
schemaswap.RegisterWorkflowFactory()
// Register the Horizontal Resharding workflow.
resharding.Register()
// Unregister the blacklisted workflows.
for _, name := range workflowManagerDisable {
workflow.Unregister(name)
}
// Create the WorkflowManager.
vtctl.WorkflowManager = workflow.NewManager(ts)
// Register the long polling and websocket handlers.
vtctl.WorkflowManager.HandleHTTPLongPolling(apiPrefix + "workflow")
vtctl.WorkflowManager.HandleHTTPWebSocket(apiPrefix + "workflow")
if *workflowManagerUseElection {
runWorkflowManagerElection(ts)
} else {
runWorkflowManagerAlone()
}
}
}
func runWorkflowManagerAlone() {
ctx, cancel := context.WithCancel(context.Background())
go vtctl.WorkflowManager.Run(ctx)
// Running cancel on OnTermSync will cancel the context of any
// running workflow inside vtctld. They may still checkpoint
// if they want to.
servenv.OnTermSync(cancel)
}
func runWorkflowManagerElection(ts topo.Server) {
var mp topo.MasterParticipation
// We use servenv.ListeningURL which is only populated during Run,
// so we have to start this with OnRun.
servenv.OnRun(func() {
var err error
mp, err = ts.NewMasterParticipation("vtctld", servenv.ListeningURL.Host)
if err != nil {
log.Errorf("Cannot start MasterParticipation, disabling workflow manager: %v", err)
return
}
// Set up a redirect host so when we are not the
// master, we can redirect traffic properly.
vtctl.WorkflowManager.SetRedirectFunc(func() (string, error) {
ctx := context.Background()
return mp.GetCurrentMasterID(ctx)
})
go func() {
for {
ctx, err := mp.WaitForMastership()
switch err {
case nil:
vtctl.WorkflowManager.Run(ctx)
case topo.ErrInterrupted:
return
default:
log.Errorf("Got error while waiting for master, will retry in 5s: %v", err)
time.Sleep(5 * time.Second)
}
}
}()
})
// When we get killed, clean up.
servenv.OnTermSync(func() {
mp.Stop()
})
}