-
Notifications
You must be signed in to change notification settings - Fork 175
/
utils.go
184 lines (169 loc) · 5.85 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package serving
import (
"fmt"
"strings"
"github.com/kubeflow/arena/pkg/types"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// Get all jobs under the assigned conditons.
func NewServingJobList(client *kubernetes.Clientset, servingName string, ns string) ([]Serving, error) {
jobs := []Serving{}
deployments, err := client.AppsV1().Deployments(ns).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("serviceName=%s", servingName),
})
if err != nil {
return nil, fmt.Errorf("Failed due to %v", err)
}
podListObject, err := client.CoreV1().Pods(ns).List(metav1.ListOptions{
TypeMeta: metav1.TypeMeta{
Kind: "ListOptions",
APIVersion: "v1",
}, LabelSelector: fmt.Sprintf("serviceName=%s", servingName),
})
if err != nil {
return nil, fmt.Errorf("Failed to get pods by label serviceName=%s,reason=%s", servingName, err.Error())
}
for _, deploy := range deployments.Items {
jobs = append(jobs, NewServingJob(client, deploy, podListObject.Items))
}
if len(jobs) == 0 {
return nil, types.ErrNotFoundJobs
}
return jobs, nil
}
// filter jobs under the assigned conditions.
func FilterJobs(namespace, version, servingTypeKey string, jobs []Serving) []Serving {
filterJobs := []Serving{}
for _, job := range jobs {
isMatchedNamespace := job.IsMatchedGivenCondition(namespace, "NAMESPACE")
isMatchedVersion := job.IsMatchedGivenCondition(version, "VERSION")
isMatchedType := job.IsMatchedGivenCondition(servingTypeKey, "TYPE")
if isMatchedNamespace && isMatchedVersion && isMatchedType {
filterJobs = append(filterJobs, job)
}
}
return filterJobs
}
// print the help info when jobs more than one
func GetMultiJobsHelpInfo(jobs []Serving) string {
header := fmt.Sprintf("There is %d jobs have been found:", len(jobs))
tableHeader := "NAME\tTYPE\tVERSION"
printLines := []string{tableHeader}
footer := fmt.Sprintf("please use \"--type\" or \"--version\" to filter.")
for _, job := range jobs {
line := fmt.Sprintf("%s\t%s\t%s",
job.Name,
string(job.ServeType),
job.Version,
)
printLines = append(printLines, line)
}
return fmt.Sprintf("%s\n\n%s\n\n%s\n", header, strings.Join(printLines, "\n"), footer)
}
func GetOnlyOneJob(client *kubernetes.Clientset, ns, servingName, servingTypeKey, version string) (Serving, string, error) {
allJobs, err := NewServingJobList(client, servingName, ns)
if err != nil {
return Serving{}, "", err
}
filterJobs := FilterJobs(ns, version, servingTypeKey, allJobs)
if len(filterJobs) == 0 {
return Serving{}, "", types.ErrNotFoundJobs
} else if len(filterJobs) > 1 {
return Serving{}, GetMultiJobsHelpInfo(filterJobs), types.ErrTooManyJobs
}
return filterJobs[0], "", nil
}
func DefinePodPhaseStatus(pod v1.Pod) (string, int, int, int) {
restarts := 0
totalContainers := len(pod.Spec.Containers)
readyContainers := 0
reason := string(pod.Status.Phase)
if pod.Status.Reason != "" {
reason = pod.Status.Reason
}
initializing := false
for i := range pod.Status.InitContainerStatuses {
container := pod.Status.InitContainerStatuses[i]
restarts += int(container.RestartCount)
switch {
case container.State.Terminated != nil && container.State.Terminated.ExitCode == 0:
continue
case container.State.Terminated != nil:
// initialization is failed
if len(container.State.Terminated.Reason) == 0 {
if container.State.Terminated.Signal != 0 {
reason = fmt.Sprintf("Init:Signal:%d", container.State.Terminated.Signal)
} else {
reason = fmt.Sprintf("Init:ExitCode:%d", container.State.Terminated.ExitCode)
}
} else {
reason = "Init:" + container.State.Terminated.Reason
}
initializing = true
case container.State.Waiting != nil && len(container.State.Waiting.Reason) > 0 && container.State.Waiting.Reason != "PodInitializing":
reason = "Init:" + container.State.Waiting.Reason
initializing = true
default:
reason = fmt.Sprintf("Init:%d/%d", i, len(pod.Spec.InitContainers))
initializing = true
}
break
}
if !initializing {
restarts = 0
hasRunning := false
for i := len(pod.Status.ContainerStatuses) - 1; i >= 0; i-- {
container := pod.Status.ContainerStatuses[i]
restarts += int(container.RestartCount)
if container.State.Waiting != nil && container.State.Waiting.Reason != "" {
reason = container.State.Waiting.Reason
} else if container.State.Terminated != nil && container.State.Terminated.Reason != "" {
reason = container.State.Terminated.Reason
} else if container.State.Terminated != nil && container.State.Terminated.Reason == "" {
if container.State.Terminated.Signal != 0 {
reason = fmt.Sprintf("Signal:%d", container.State.Terminated.Signal)
} else {
reason = fmt.Sprintf("ExitCode:%d", container.State.Terminated.ExitCode)
}
} else if container.Ready && container.State.Running != nil {
hasRunning = true
readyContainers++
}
}
// change pod status back to "Running" if there is at least one container still reporting as "Running" status
if reason == "Completed" && hasRunning {
reason = "Running"
}
}
if pod.DeletionTimestamp != nil && pod.Status.Reason == "NodeLost" {
reason = "Unknown"
} else if pod.DeletionTimestamp != nil {
reason = "Terminating"
}
return reason, totalContainers, restarts, readyContainers
}
func KeyMapServingType(servingKey string) types.ServingType {
switch servingKey {
case "tf", "tf-serving", "tensorflow-serving":
return types.ServingTF
case "trt", "trt-serving", "tensorrt-serving":
return types.ServingTRT
case "custom", "custom-serving":
return types.ServingCustom
case "kfs", "kfserving", "kf-serving":
return types.KFServing
default:
return types.ServingType("")
}
}
func CheckServingTypeIsOk(stype string) error {
if stype == "" {
return nil
}
if KeyMapServingType(stype) == types.ServingType("") {
return fmt.Errorf("unknow serving type: %s", stype)
}
return nil
}