/
taskrun.go
393 lines (343 loc) · 13.6 KB
/
taskrun.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package command
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"strings"
"sync"
"time"
"k8s.io/client-go/dynamic"
"golang.org/x/oauth2"
"github.com/kyma-project/control-plane/components/kubeconfig-service/pkg/client"
"github.com/kyma-project/kyma-environment-broker/common/gardener"
"github.com/kyma-project/kyma-environment-broker/common/orchestration"
"github.com/kyma-project/kyma-environment-broker/common/orchestration/strategies"
"github.com/kyma-project/kyma-environment-broker/common/runtime"
"github.com/kyma-project/control-plane/tools/cli/pkg/credential"
"github.com/kyma-project/control-plane/tools/cli/pkg/logger"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// TaskRunCommand represents an execution of the kcp taskrun command
type TaskRunCommand struct {
cobraCmd *cobra.Command
log logger.Logger
cred credential.Manager
targetInputs []string
targetExcludeInputs []string
parallelism int
targets orchestration.TargetSpec
kubeconfigDir string
kubeconfingDirTemp bool
keepKubeconfigs bool
noKubeconfig bool
noPrefixOutput bool
taskCommand *exec.Cmd
shell string
}
// RuntimeLister implements the interface to obtains runtimes info from KEB for resolver
type RuntimeLister struct {
client runtime.Client
}
// RuntimeTask is the runtime operation executed by RuntimeTaskMakager via strategy.
type RuntimeTask struct {
operation orchestration.RuntimeOperation
result error
}
// RuntimeTaskMakager implements Executor interface needed by strategy to execute the runtime task operations.
type RuntimeTaskMakager struct {
cmd *TaskRunCommand
tasks map[string]*RuntimeTask
kubeconfigClient client.Client
}
// TaskRunError represents failure in task execution for one or more runtimes
type TaskRunError struct {
failed int
total int
}
func (e *TaskRunError) Error() string {
return fmt.Sprintf("%d/%d execution(s) failed", e.failed, e.total)
}
// NewTaskRunCmd constructs a new instance of TaskRunCommand and configures it in terms of a cobra.Command
func NewTaskRunCmd() *cobra.Command {
cmd := TaskRunCommand{}
cobraCmd := &cobra.Command{
Use: "taskrun --target {TARGET SPEC} ... [--target-exclude {TARGET SPEC} ...] -- COMMAND [ARGS ...]",
Aliases: []string{"task", "t"},
Short: "Runs generic tasks on one or more Kyma Runtimes.",
Long: `Runs a command, which can be a script or a program with arbitrary arguments, on targets of Kyma Runtimes.
The specified command is executed locally. It is executed in separate subprocesses for each Runtime in parallel, where the number of parallel executions is controlled by the --parallelism option.
For each subprocess, the following Runtime-specific data are passed as environment variables:
- KUBECONFIG : Path to the kubeconfig file for the specific Runtime, unless --no-kubeconfig option is passed
- GLOBALACCOUNT_ID : Global account ID of the Runtime
- SUBACCOUNT_ID : Subaccount ID of the Runtime
- RUNTIME_NAME : Shoot cluster name
- RUNTIME_ID : Runtime ID of the Runtime
- INSTANCE_ID : Instance ID of the Runtime
If all subprocesses finish successfully with the zero status code, the exit status is zero (0). If one or more subprocesses exit with a non-zero status, the command will also exit with a non-zero status.`,
Example: ` kcp taskrun --target all -- kubectl patch deployment valid-deployment -p '{"metadata":{"labels":{"my-label": "my-value"}}}'
Execute a kubectl patch operation for all Runtimes.
kcp taskrun --target account=CA4836781TID000000000123456789 /usr/local/bin/awesome-script.sh
Run a maintenance script for all Runtimes of a given global account.
kcp taskrun --target all -- helm upgrade -i -n kyma-system my-kyma-addon --values overrides.yaml
Deploy a Helm chart on all Runtimes.
kcp taskrun -t all -s "/bin/bash -i -c" -- kc get ns
Run an alias command (kc for kubectl) defined in user's .bashrc invocation script`,
Args: cobra.MinimumNArgs(1),
PreRunE: func(_ *cobra.Command, args []string) error { return cmd.Validate(args) },
RunE: func(_ *cobra.Command, args []string) error { return cmd.Run(args) },
}
cmd.cobraCmd = cobraCmd
SetRuntimeTargetOpts(cobraCmd, &cmd.targetInputs, &cmd.targetExcludeInputs)
cobraCmd.Flags().IntVarP(&cmd.parallelism, "parallelism", "p", 4, "Number of parallel commands to execute.")
cobraCmd.Flags().StringVarP(&cmd.kubeconfigDir, "kubeconfig-dir", "k", "", "Directory to download Runtime kubeconfig files to. By default, it is a random-generated directory in the OS-specific default temporary directory (e.g. /tmp in Linux).")
cobraCmd.Flags().BoolVar(&cmd.keepKubeconfigs, "keep-kubeconfig", false, "Option that allows you to keep downloaded kubeconfig files after execution for caching purposes.")
cobraCmd.Flags().BoolVar(&cmd.noKubeconfig, "no-kubeconfig", false, "Option that turns off the downloading and exposure of the kubeconfig file for each Runtime.")
cobraCmd.Flags().BoolVar(&cmd.noPrefixOutput, "no-prefix-output", false, "Option that omits the prefixing of each output line with the Runtime name. By default, all output lines are prepended for better traceability.")
cobraCmd.Flags().StringP("shell", "s", "", "Invoke the task command using the given shell and it's options. Useful when the task command uses alias(es) defined in the shell's invocation scripts. Can also be set in the KCP configuration file or with the KCP_SHELL environment variable.")
viper.BindPFlag("shell", cobraCmd.Flags().Lookup("shell"))
return cobraCmd
}
// Run executes the taskrun command
func (cmd *TaskRunCommand) Run(args []string) error {
cmd.log = logger.New()
cmd.cred = CLICredentialManager(cmd.log)
defer cmd.cleanupTempKubeConfigDir()
operations, err := cmd.resolveOperations()
if err != nil {
return err
}
mgr := NewRuntimeTaskMakager(cmd, operations)
strategy := strategies.NewParallelOrchestrationStrategy(mgr, cmd.log, 0)
execID, err := strategy.Execute(operations, orchestration.StrategySpec{
Type: orchestration.ParallelStrategy,
Schedule: string(orchestration.Immediate),
Parallel: orchestration.ParallelStrategySpec{Workers: cmd.parallelism},
})
if err != nil {
return errors.Wrap(err, "while executing task")
}
strategy.Wait(execID)
return mgr.exitStatus()
}
// Validate checks the input parameters of the taskrun command
func (cmd *TaskRunCommand) Validate(args []string) error {
// Validate kubeconfig-api-url global option
if !cmd.noKubeconfig && GlobalOpts.KubeconfigAPIURL() == "" {
return fmt.Errorf("missing required %s option", GlobalOpts.kubeconfigAPIURL)
}
// Validate gardener-kubeconfig global option
if GlobalOpts.GardenerKubeconfig() == "" || GlobalOpts.GardenerNamespace() == "" {
return fmt.Errorf("missing required %s/%s options", GlobalOpts.gardenerKubeconfig, GlobalOpts.gardenerNamespace)
}
// Validate target options
err := ValidateTransformRuntimeTargetOpts(cmd.targetInputs, cmd.targetExcludeInputs, &cmd.targets)
if err != nil {
return err
}
// Validate kubeconfig directory
if cmd.kubeconfigDir != "" {
fi, err := os.Stat(cmd.kubeconfigDir)
if err != nil {
return err
}
if !fi.IsDir() {
return fmt.Errorf("%s: not a directory", cmd.kubeconfigDir)
}
} else if !cmd.noKubeconfig {
cmd.kubeconfigDir, err = ioutil.TempDir("", "kubeconfig-")
if err != nil {
return errors.Wrap(err, "while creating temporary kubeconfig directory")
}
cmd.kubeconfingDirTemp = true
}
// Validate task command and shell wrapper
// Construct task command object
cmd.shell = viper.GetString("shell")
if cmd.shell != "" {
splitSh := strings.Split(cmd.shell, " ")
if _, err := exec.LookPath(splitSh[0]); err != nil {
return err
}
allArgs := append(splitSh[1:], strings.Join(args, " "))
cmd.taskCommand = exec.CommandContext(cmd.cobraCmd.Context(), splitSh[0], allArgs...)
} else {
if _, err := exec.LookPath(args[0]); err != nil {
return err
}
cmd.taskCommand = exec.CommandContext(cmd.cobraCmd.Context(), args[0], args[1:]...)
}
return nil
}
func (cmd *TaskRunCommand) resolveOperations() ([]orchestration.RuntimeOperation, error) {
gardenCfg, err := gardener.NewGardenerClusterConfig(GlobalOpts.GardenerKubeconfig())
if err != nil {
return nil, errors.Wrap(err, "while getting Gardener kubeconfig")
}
dynamicGardener, err := dynamic.NewForConfig(gardenCfg)
if err != nil {
return nil, errors.Wrap(err, "while getting Gardener client")
}
httpClient := oauth2.NewClient(cmd.cobraCmd.Context(), cmd.cred)
lister := NewRuntimeLister(runtime.NewClient(GlobalOpts.KEBAPIURL(), httpClient))
resolver := orchestration.NewGardenerRuntimeResolver(dynamicGardener, GlobalOpts.GardenerNamespace(), lister, cmd.log)
runtimes, err := resolver.Resolve(cmd.targets)
if err != nil {
return nil, errors.Wrap(err, "while resolving targets")
}
cmd.log.Infof("Number of resolved runtimes: %d\n", len(runtimes))
operations := make([]orchestration.RuntimeOperation, 0, len(runtimes))
for _, rt := range runtimes {
operations = append(operations, orchestration.RuntimeOperation{
Runtime: rt,
ID: randomString(16),
})
}
return operations, nil
}
func (cmd *TaskRunCommand) cleanupTempKubeConfigDir() error {
var err error = nil
if cmd.kubeconfingDirTemp {
err = os.RemoveAll(cmd.kubeconfigDir)
}
return err
}
// NewRuntimeLister constructs a RuntimeLister with the given runtime.Client
func NewRuntimeLister(client runtime.Client) *RuntimeLister {
return &RuntimeLister{client: client}
}
// ListAllRuntimes fetches all runtimes from KEB using the runtime client
func (rl RuntimeLister) ListAllRuntimes() ([]runtime.RuntimeDTO, error) {
res, err := rl.client.ListRuntimes(runtime.ListParameters{})
if err != nil {
return nil, errors.Wrap(err, "while querying runtimes")
}
return res.Data, nil
}
// NewRuntimeTaskMakager constructs a new RuntimeTaskMakager for the given runtime operations
func NewRuntimeTaskMakager(cmd *TaskRunCommand, operations []orchestration.RuntimeOperation) *RuntimeTaskMakager {
mgr := &RuntimeTaskMakager{
cmd: cmd,
tasks: make(map[string]*RuntimeTask, len(operations)),
kubeconfigClient: client.NewClient(cmd.cobraCmd.Context(), GlobalOpts.KubeconfigAPIURL(), cmd.cred),
}
for _, op := range operations {
mgr.tasks[op.ID] = &RuntimeTask{
operation: op,
}
}
return mgr
}
// Execute runs the task on the runtime identified by the operationID
func (mgr *RuntimeTaskMakager) Execute(operationID string) (time.Duration, error) {
task := mgr.tasks[operationID]
log := mgr.cmd.log.WithField("shoot", task.operation.ShootName)
kubeconfigPath, err := mgr.getKubeconfig(task)
if err != nil {
log.Errorf("Error: while getting kubeconfig: %s\n", err.Error())
task.result = err
return 0, err
}
command := *mgr.cmd.taskCommand
// Prepare environment variables
command.Env = os.Environ()
command.Env = append(command.Env,
fmt.Sprintf("GLOBALACCOUNT_ID=%s", task.operation.GlobalAccountID),
fmt.Sprintf("SUBACCOUNT_ID=%s", task.operation.SubAccountID),
fmt.Sprintf("RUNTIME_ID=%s", task.operation.RuntimeID),
fmt.Sprintf("RUNTIME_NAME=%s", task.operation.ShootName),
fmt.Sprintf("INSTANCE_ID=%s", task.operation.InstanceID),
)
if kubeconfigPath != "" {
command.Env = append(command.Env, fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath))
}
// Prepare stdout and stderr of the command
stdout, err := command.StdoutPipe()
if err != nil {
log.Errorf("Error: while creating stdout: %s\n", err.Error())
task.result = err
return 0, err
}
stderr, err := command.StderrPipe()
if err != nil {
log.Errorf("Error: while creating stderr: %s\n", err.Error())
task.result = err
return 0, err
}
// Prepare echoer stdout / stderr writers
echoerWg := sync.WaitGroup{}
echoer := func(src io.Reader, dst io.Writer) {
scanner := bufio.NewScanner(src)
for scanner.Scan() {
if !mgr.cmd.noPrefixOutput {
fmt.Fprintf(dst, "%s ", task.operation.ShootName)
}
fmt.Fprintln(dst, scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Errorf("Error: while reading from child process: %s\n", err)
}
echoerWg.Done()
}
echoerWg.Add(2)
go echoer(stdout, os.Stdout)
go echoer(stderr, os.Stderr)
// Start execution of the command
err = command.Start()
if err != nil {
log.Errorf("Error: command started with error: %s\n", err.Error())
}
// Wait for the command subprocess to finish
echoerWg.Wait()
err = command.Wait()
if err != nil {
log.Errorf("Error: command exited with error: %s\n", err.Error())
}
task.result = err
return 0, err
}
func (mgr *RuntimeTaskMakager) Reschedule(operationID string, maintenanceWindowBegin, maintenanceWindowEnd time.Time) error {
return nil
}
func (mgr *RuntimeTaskMakager) getKubeconfig(task *RuntimeTask) (string, error) {
path := ""
if !mgr.cmd.noKubeconfig {
path = fmt.Sprintf("%s/%s.yaml", mgr.cmd.kubeconfigDir, task.operation.ShootName)
if _, err := os.Stat(path); os.IsNotExist(err) {
kubeconfig, err := mgr.kubeconfigClient.GetKubeConfig(task.operation.GlobalAccountID, task.operation.RuntimeID)
if err != nil {
return path, err
}
err = ioutil.WriteFile(path, []byte(kubeconfig), 0600)
if err != nil {
return path, err
}
}
}
return path, nil
}
func (mgr *RuntimeTaskMakager) exitStatus() error {
e := &TaskRunError{total: len(mgr.tasks)}
for _, task := range mgr.tasks {
if task.result != nil {
e.failed++
}
}
if e.failed != 0 {
return e
}
return nil
}
func randomString(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyz")
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}