Skip to content

Commit 315be8b

Browse files
authored
fix(kube-run): handle SIGINT/SIGTERM properly (#6893)
Signed-off-by: Alexandr Zaytsev <alexandr.zaytsev@flant.com>
1 parent 4e3a057 commit 315be8b

File tree

6 files changed

+150
-27
lines changed

6 files changed

+150
-27
lines changed

cmd/werf/kube_run/kube_run.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -696,9 +696,6 @@ func copyFromPod(ctx context.Context, namespace, pod, container string, copyFrom
696696
}
697697

698698
if err := cmd.Run(); err != nil {
699-
if errors.Is(ctx.Err(), context.Canceled) {
700-
graceful.Terminate(ctx, err, werfExec.ExitCode(err))
701-
}
702699
logboek.Context(ctx).Warn().LogF("Error copying %q from pod %s/s: %s\n", copyFrom.Src, namespace, pod, err)
703700
}
704701
}
@@ -749,15 +746,33 @@ func stopContainer(ctx context.Context, namespace, pod, container string, extraA
749746
}
750747

751748
if err := cmd.Run(); err != nil {
752-
if errors.Is(ctx.Err(), context.Canceled) {
753-
graceful.Terminate(ctx, err, werfExec.ExitCode(err))
754-
}
755749
logboek.Context(ctx).Warn().LogF("Error stopping service container %s/%s/%s for copying files: %s\n", namespace, pod, container, err)
756750
}
757751
}
758752

753+
func signalContainer(ctx context.Context, namespace, pod, container string, extraArgs []string) error {
754+
ctx = context.WithoutCancel(ctx)
755+
756+
logboek.Context(ctx).LogF("Signal container %q in pod ...\n", container)
757+
758+
args := []string{
759+
"exec", pod, "-q", "--pod-running-timeout", "5s", "-c", container,
760+
}
761+
762+
args = append(args, extraArgs...)
763+
args = append(args, "--", "pkill", "-P", "0")
764+
765+
cmd := werfExec.PrepareGracefulCancellation(util.ExecKubectlCmdContext(ctx, args...))
766+
767+
if err := cmd.Run(); err != nil {
768+
return fmt.Errorf("signal container error: %w", err)
769+
}
770+
771+
return nil
772+
}
773+
759774
func execCommandInPod(ctx context.Context, namespace, pod, container string, command, extraArgs []string) error {
760-
logboek.Context(ctx).LogF("Execing into pod ...\n")
775+
logboek.Context(ctx).LogF("Executing into pod ...\n")
761776

762777
args := []string{
763778
"exec", pod, "-q", "--pod-running-timeout", "5h", "-c", container,
@@ -777,7 +792,9 @@ func execCommandInPod(ctx context.Context, namespace, pod, container string, com
777792
args = append(args, command...)
778793

779794
cmd := werfExec.PrepareGracefulCancellation(util.ExecKubectlCmdContext(ctx, args...))
780-
cmd.Stdin = os.Stdin
795+
cmd.Cancel = func() error {
796+
return signalContainer(ctx, namespace, pod, container, extraArgs)
797+
}
781798

782799
if *commonCmdData.DryRun {
783800
fmt.Println(cmd.String())
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/bin/bash
2+
3+
set -eu
4+
5+
is_terminated=0
6+
7+
log () {
8+
echo "[$(date +"%T")] $1"
9+
}
10+
11+
signal_handler () {
12+
log "Signal handled"
13+
is_terminated=1
14+
}
15+
16+
trap signal_handler SIGINT SIGTERM EXIT
17+
18+
while [ ${is_terminated} -eq 0 ]; do
19+
log "Looping ..."
20+
sleep 1
21+
done
22+
23+
log "Script completed"

test/e2e/kube-run/_fixtures/simple/state0/werf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@ configVersion: 1
44
---
55
image: main
66
from: ubuntu:22.04
7+
git:
8+
- add: /app.sh
9+
to: /opt/app.sh

test/e2e/kube-run/simple_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package e2e_kube_run_test
33
import (
44
"os"
55
"strings"
6+
"time"
67

78
. "github.com/onsi/ginkgo/v2"
89
. "github.com/onsi/gomega"
@@ -25,6 +26,12 @@ var _ = Describe("Simple kube-run", Label("e2e", "kube-run", "simple"), func() {
2526

2627
By("state0: execute kube-run")
2728
combinedOut := werfProject.KubeRun(ctx, kubeRunOpts)
29+
Expect(combinedOut).To(ContainSubstring("Creating namespace"))
30+
Expect(combinedOut).To(ContainSubstring("Running pod"))
31+
Expect(combinedOut).To(ContainSubstring("Executing into pod"))
32+
Expect(combinedOut).To(ContainSubstring("Stopping container"))
33+
Expect(combinedOut).To(ContainSubstring("Cleaning up pod"))
34+
2835
expectOutFn(combinedOut)
2936
},
3037
Entry(
@@ -77,6 +84,25 @@ var _ = Describe("Simple kube-run", Label("e2e", "kube-run", "simple"), func() {
7784
Expect(out).To(ContainSubstring("ID=ubuntu"))
7885
},
7986
),
87+
Entry(
88+
"should cancel long running process",
89+
&werf.KubeRunOptions{
90+
Command: []string{"/opt/app.sh"},
91+
Image: "main",
92+
CommonOptions: werf.CommonOptions{
93+
ShouldFail: true,
94+
ExtraArgs: []string{}, // be able to work without "-it" options
95+
CancelOnOutput: "Looping ...",
96+
CancelOnOutputTimeout: time.Minute,
97+
},
98+
},
99+
func(out string) {
100+
Expect(out).To(ContainSubstring("Signal container"))
101+
Expect(out).To(ContainSubstring("Signal handled")) // from script
102+
Expect(out).To(ContainSubstring("Script completed")) // from script
103+
},
104+
SpecTimeout(time.Minute*3),
105+
),
80106
)
81107
})
82108

test/pkg/utils/command.go

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package utils
22

33
import (
4+
"bufio"
45
"bytes"
56
"context"
67
"encoding/base64"
78
"fmt"
9+
"io"
810
"os"
911
"os/exec"
1012
"strings"
13+
"time"
1114

1215
. "github.com/onsi/ginkgo/v2"
1316
. "github.com/onsi/gomega"
1417

15-
"github.com/werf/werf/v2/pkg/logging"
18+
"github.com/werf/werf/v2/pkg/util/option"
19+
werfExec "github.com/werf/werf/v2/pkg/werf/exec"
1620
)
1721

1822
func RunCommand(ctx context.Context, dir, command string, args ...string) ([]byte, error) {
@@ -33,10 +37,14 @@ type RunCommandOptions struct {
3337
ToStdin string
3438
ShouldSucceed bool
3539
NoStderr bool
40+
41+
CancelOnOutput string
42+
CancelOnOutputTimeout time.Duration
3643
}
3744

3845
func RunCommandWithOptions(ctx context.Context, dir, command string, args []string, options RunCommandOptions) ([]byte, error) {
39-
cmd := exec.CommandContext(logging.WithLogger(ctx), command, args...)
46+
cmd := exec.CommandContext(ctx, command, args...)
47+
cmd = werfExec.PrepareGracefulCancellation(cmd)
4048

4149
if dir != "" {
4250
cmd.Dir = dir
@@ -45,29 +53,65 @@ func RunCommandWithOptions(ctx context.Context, dir, command string, args []stri
4553
cmd.Env = append(os.Environ(), options.ExtraEnv...)
4654

4755
if options.ToStdin != "" {
48-
var b bytes.Buffer
49-
b.Write([]byte(options.ToStdin))
50-
cmd.Stdin = &b
56+
cmd.Stdin = bytes.NewReader([]byte(options.ToStdin))
5157
}
5258

53-
var res []byte
54-
var err error
59+
stdout, err := cmd.StdoutPipe()
60+
Expect(err).To(Succeed())
61+
62+
stderr, err := cmd.StderrPipe()
63+
Expect(err).To(Succeed())
64+
65+
var outputReader io.Reader
66+
5567
if options.NoStderr {
56-
res, err = cmd.Output()
68+
outputReader = stdout
5769
} else {
58-
res, err = cmd.CombinedOutput()
70+
outputReader = io.MultiReader(stdout, stderr)
5971
}
6072

61-
_, _ = GinkgoWriter.Write(res)
73+
res := &bytes.Buffer{}
74+
75+
Expect(cmd.Start()).To(Succeed())
76+
77+
if options.CancelOnOutput != "" {
78+
copyReader := io.TeeReader(outputReader, res)
79+
waitForOutput(copyReader, options.CancelOnOutput, options.CancelOnOutputTimeout)
80+
Expect(cmd.Cancel()).To(Succeed())
81+
}
82+
83+
_, err = io.Copy(res, outputReader)
84+
Expect(err).To(Succeed())
85+
86+
err = cmd.Wait()
87+
88+
_, _ = GinkgoWriter.Write(res.Bytes())
6289

6390
if options.ShouldSucceed {
6491
errorDesc := fmt.Sprintf("%[2]s %[3]s (dir: %[1]s)", dir, command, strings.Join(args, " "))
6592
Expect(err).ShouldNot(HaveOccurred(), errorDesc)
6693
}
6794

68-
return res, err
95+
return res.Bytes(), err
6996
}
7097

7198
func ShelloutPack(command string) string {
7299
return fmt.Sprintf("eval $(echo %s | base64 -d)", base64.StdEncoding.EncodeToString([]byte(command)))
73100
}
101+
102+
// waitForOutput waits for output and exits early or exits by timeout
103+
func waitForOutput(reader io.Reader, output string, timeout time.Duration) {
104+
scanner := bufio.NewScanner(reader)
105+
tmr := time.NewTimer(option.ValueOrDefault(timeout, time.Minute))
106+
107+
for scanner.Scan() {
108+
select {
109+
case <-tmr.C:
110+
return
111+
default:
112+
if strings.Contains(scanner.Text(), output) {
113+
return
114+
}
115+
}
116+
}
117+
}

test/pkg/werf/project.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"path/filepath"
1010
"strings"
1111
"sync"
12+
"time"
1213

1314
. "github.com/onsi/gomega"
1415

@@ -36,6 +37,9 @@ type Project struct {
3637
type CommonOptions struct {
3738
ShouldFail bool
3839
ExtraArgs []string
40+
41+
CancelOnOutput string
42+
CancelOnOutputTimeout time.Duration
3943
}
4044

4145
type BuildOptions struct {
@@ -87,6 +91,9 @@ type KubeCtlOptions struct {
8791
type runCommandOptions struct {
8892
ShouldFail bool
8993
Args []string
94+
95+
CancelOnOutput string
96+
CancelOnOutputTimeout time.Duration
9097
}
9198

9299
func (p *Project) Build(ctx context.Context, opts *BuildOptions) (combinedOut string) {
@@ -217,7 +224,12 @@ func (p *Project) KubeRun(ctx context.Context, opts *KubeRunOptions) string {
217224
args = append(args, opts.Command...)
218225
}
219226

220-
return p.runCommand(ctx, runCommandOptions{Args: args, ShouldFail: opts.ShouldFail})
227+
return p.runCommand(ctx, runCommandOptions{
228+
Args: args,
229+
ShouldFail: opts.ShouldFail,
230+
CancelOnOutput: opts.CancelOnOutput,
231+
CancelOnOutputTimeout: opts.CancelOnOutputTimeout,
232+
})
221233
}
222234

223235
func (p *Project) KubeCtl(ctx context.Context, opts *KubeCtlOptions) string {
@@ -287,13 +299,11 @@ func (p *Project) CreateRegistryPullSecretFromDockerConfig(ctx context.Context)
287299
}
288300

289301
func (p *Project) runCommand(ctx context.Context, opts runCommandOptions) string {
290-
outb, err := iutils.RunCommand(ctx, p.GitRepoPath, p.WerfBinPath, opts.Args...)
291-
if opts.ShouldFail {
292-
Expect(err).To(HaveOccurred())
293-
} else {
294-
Expect(err).NotTo(HaveOccurred())
295-
}
296-
302+
outb, _ := iutils.RunCommandWithOptions(ctx, p.GitRepoPath, p.WerfBinPath, opts.Args, iutils.RunCommandOptions{
303+
ShouldSucceed: !opts.ShouldFail,
304+
CancelOnOutput: opts.CancelOnOutput,
305+
CancelOnOutputTimeout: opts.CancelOnOutputTimeout,
306+
})
297307
return string(outb)
298308
}
299309

0 commit comments

Comments
 (0)