forked from argoproj/argo-workflows
/
common.go
139 lines (128 loc) · 4 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package common
import (
"bytes"
"compress/gzip"
"fmt"
"os"
"strings"
"syscall"
"time"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)
const (
containerShimPrefix = "://"
)
// killGracePeriod is the time in seconds after sending SIGTERM before
// forcefully killing the sidecar with SIGKILL (value matches k8s)
const KillGracePeriod = 10
// GetContainerID returns container ID of a ContainerStatus resource
func GetContainerID(container *v1.ContainerStatus) string {
i := strings.Index(container.ContainerID, containerShimPrefix)
if i == -1 {
return ""
}
return container.ContainerID[i+len(containerShimPrefix):]
}
// KubernetesClientInterface is the interface to implement getContainerStatus method
type KubernetesClientInterface interface {
GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error)
KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error
CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error)
}
// WaitForTermination of the given containerID, set the timeout to 0 to discard it
func WaitForTermination(c KubernetesClientInterface, containerID string, timeout time.Duration) error {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
timer := time.NewTimer(timeout)
if timeout == 0 {
if !timer.Stop() {
<-timer.C
}
} else {
defer timer.Stop()
}
log.Infof("Starting to wait completion of containerID %s ...", containerID)
for {
select {
case <-ticker.C:
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return err
}
if containerStatus.State.Terminated == nil {
continue
}
log.Infof("ContainerID %q is terminated: %v", containerID, containerStatus.String())
return nil
case <-timer.C:
return fmt.Errorf("timeout after %s", timeout.String())
}
}
}
// TerminatePodWithContainerID invoke the given SIG against the PID1 of the container.
// No-op if the container is on the hostPID
func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string, sig syscall.Signal) error {
pod, container, err := c.GetContainerStatus(containerID)
if err != nil {
return err
}
if container.State.Terminated != nil {
log.Infof("Container %s is already terminated: %v", container.ContainerID, container.State.Terminated.String())
return nil
}
if pod.Spec.HostPID {
return fmt.Errorf("cannot terminate a hostPID Pod %s", pod.Name)
}
if pod.Spec.RestartPolicy != "Never" {
return fmt.Errorf("cannot terminate pod with a %q restart policy", pod.Spec.RestartPolicy)
}
return c.KillContainer(pod, container, sig)
}
// KillGracefully kills a container gracefully.
func KillGracefully(c KubernetesClientInterface, containerID string) error {
log.Infof("SIGTERM containerID %q: %s", containerID, syscall.SIGTERM.String())
err := TerminatePodWithContainerID(c, containerID, syscall.SIGTERM)
if err != nil {
return err
}
err = WaitForTermination(c, containerID, time.Second*KillGracePeriod)
if err == nil {
log.Infof("ContainerID %q successfully killed", containerID)
return nil
}
log.Infof("SIGKILL containerID %q: %s", containerID, syscall.SIGKILL.String())
err = TerminatePodWithContainerID(c, containerID, syscall.SIGKILL)
if err != nil {
return err
}
err = WaitForTermination(c, containerID, time.Second*KillGracePeriod)
if err != nil {
return err
}
log.Infof("ContainerID %q successfully killed", containerID)
return nil
}
// CopyArchive downloads files and directories as a tarball and saves it to a specified path.
func CopyArchive(c KubernetesClientInterface, containerID, sourcePath, destPath string) error {
log.Infof("Archiving %s:%s to %s", containerID, sourcePath, destPath)
b, err := c.CreateArchive(containerID, sourcePath)
if err != nil {
return err
}
f, err := os.OpenFile(destPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
if err != nil {
return err
}
w := gzip.NewWriter(f)
_, err = w.Write(b.Bytes())
if err != nil {
return err
}
err = w.Flush()
if err != nil {
return err
}
err = w.Close()
return err
}