-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
executor.go
183 lines (156 loc) · 6.3 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package executor
import (
"context"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
yaml2 "gopkg.in/yaml.v2"
"k8s.io/apiserver/pkg/authentication/authenticator"
"sigs.k8s.io/yaml"
)
var (
executor Executor
)
type Executor interface {
Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error
Kubelet(ctx context.Context, args []string) error
KubeProxy(ctx context.Context, args []string) error
APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error)
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error
Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error
ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error)
ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error
CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error
Containerd(ctx context.Context, node *daemonconfig.Node) error
Docker(ctx context.Context, node *daemonconfig.Node) error
}
type ETCDConfig struct {
InitialOptions `json:",inline"`
Name string `json:"name,omitempty"`
ListenClientURLs string `json:"listen-client-urls,omitempty"`
ListenClientHTTPURLs string `json:"listen-client-http-urls,omitempty"`
ListenMetricsURLs string `json:"listen-metrics-urls,omitempty"`
ListenPeerURLs string `json:"listen-peer-urls,omitempty"`
AdvertiseClientURLs string `json:"advertise-client-urls,omitempty"`
DataDir string `json:"data-dir,omitempty"`
SnapshotCount int `json:"snapshot-count,omitempty"`
ServerTrust ServerTrust `json:"client-transport-security"`
PeerTrust PeerTrust `json:"peer-transport-security"`
ForceNewCluster bool `json:"force-new-cluster,omitempty"`
HeartbeatInterval int `json:"heartbeat-interval"`
ElectionTimeout int `json:"election-timeout"`
Logger string `json:"logger"`
LogOutputs []string `json:"log-outputs"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
}
type ServerTrust struct {
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
ClientCertAuth bool `json:"client-cert-auth"`
TrustedCAFile string `json:"trusted-ca-file"`
}
type PeerTrust struct {
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
ClientCertAuth bool `json:"client-cert-auth"`
TrustedCAFile string `json:"trusted-ca-file"`
}
type InitialOptions struct {
AdvertisePeerURL string `json:"initial-advertise-peer-urls,omitempty"`
Cluster string `json:"initial-cluster,omitempty"`
State string `json:"initial-cluster-state,omitempty"`
}
func (e ETCDConfig) ToConfigFile(extraArgs []string) (string, error) {
confFile := filepath.Join(e.DataDir, "config")
bytes, err := yaml.Marshal(&e)
if err != nil {
return "", err
}
if len(extraArgs) > 0 {
var s map[string]interface{}
if err := yaml2.Unmarshal(bytes, &s); err != nil {
return "", err
}
for _, v := range extraArgs {
extraArg := strings.SplitN(v, "=", 2)
// Depending on the argV, we have different types to handle.
// Source: https://github.com/etcd-io/etcd/blob/44b8ae145b505811775f5af915dd19198d556d55/server/config/config.go#L36-L190 and https://etcd.io/docs/v3.5/op-guide/configuration/#configuration-file
if len(extraArg) == 2 {
key := strings.TrimLeft(extraArg[0], "-")
lowerKey := strings.ToLower(key)
var stringArr []string
if i, err := strconv.Atoi(extraArg[1]); err == nil {
s[key] = i
} else if time, err := time.ParseDuration(extraArg[1]); err == nil && (strings.Contains(lowerKey, "time") || strings.Contains(lowerKey, "duration") || strings.Contains(lowerKey, "interval") || strings.Contains(lowerKey, "retention")) {
// auto-compaction-retention is either a time.Duration or int, depending on version. If it is an int, it will be caught above.
s[key] = time
} else if err := yaml.Unmarshal([]byte(extraArg[1]), &stringArr); err == nil {
s[key] = stringArr
} else {
switch strings.ToLower(extraArg[1]) {
case "true":
s[key] = true
case "false":
s[key] = false
default:
s[key] = extraArg[1]
}
}
}
}
bytes, err = yaml2.Marshal(&s)
if err != nil {
return "", err
}
}
if err := os.MkdirAll(e.DataDir, 0700); err != nil {
return "", err
}
return confFile, os.WriteFile(confFile, bytes, 0600)
}
func Set(driver Executor) {
executor = driver
}
func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
return executor.Bootstrap(ctx, nodeConfig, cfg)
}
func Kubelet(ctx context.Context, args []string) error {
return executor.Kubelet(ctx, args)
}
func KubeProxy(ctx context.Context, args []string) error {
return executor.KubeProxy(ctx, args)
}
func APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
return executor.APIServerHandlers(ctx)
}
func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
return executor.APIServer(ctx, etcdReady, args)
}
func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
return executor.Scheduler(ctx, apiReady, args)
}
func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
return executor.ControllerManager(ctx, apiReady, args)
}
func CurrentETCDOptions() (InitialOptions, error) {
return executor.CurrentETCDOptions()
}
func ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
return executor.ETCD(ctx, args, extraArgs)
}
func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
return executor.CloudControllerManager(ctx, ccmRBACReady, args)
}
func Containerd(ctx context.Context, config *daemonconfig.Node) error {
return executor.Containerd(ctx, config)
}
func Docker(ctx context.Context, config *daemonconfig.Node) error {
return executor.Docker(ctx, config)
}