-
Notifications
You must be signed in to change notification settings - Fork 176
/
list.go
123 lines (118 loc) · 3.32 KB
/
list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package serving
import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"text/tabwriter"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/kubeflow/arena/pkg/apis/utils"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
var (
errUnknownServingJobType = fmt.Errorf("unknown serving types,only support: %v", utils.GetServingJobTypes())
)
func ListServingJobs(namespace string, allNamespace bool, servingType types.ServingJobType) ([]ServingJob, error) {
if servingType == types.UnknownServingJob {
return nil, fmt.Errorf("Unknown serving job type,arena only supports: [%s]", utils.GetSupportServingJobTypesInfo())
}
processers := GetAllProcesser()
if servingType != types.AllServingJob {
processer, ok := processers[servingType]
if !ok {
return nil, fmt.Errorf("unknown processer %v,please define it", servingType)
}
return processer.ListServingJobs(namespace, allNamespace)
}
servingJobs := []ServingJob{}
var wg sync.WaitGroup
locker := new(sync.RWMutex)
noPrivileges := false
for _, pr := range processers {
wg.Add(1)
p := pr
go func() {
defer wg.Done()
jobs, err := p.ListServingJobs(namespace, allNamespace)
if err != nil {
if strings.Contains(err.Error(), "forbidden: User") {
log.Debugf("the user has no privileges to get the serving job %v,reason: %v", p.Type(), err)
noPrivileges = true
return
}
log.Debugf("failed to get serving jobs whose type are %v", p.Type())
return
}
locker.Lock()
servingJobs = append(servingJobs, jobs...)
locker.Unlock()
}()
}
wg.Wait()
if noPrivileges {
item := fmt.Sprintf("namespace %v", namespace)
if allNamespace {
item = fmt.Sprintf("all namespaces")
}
return nil, fmt.Errorf("the user has no privileges to list the serving jobs in %v", item)
}
return servingJobs, nil
}
func DisplayAllServingJobs(jobs []ServingJob, allNamespace bool, format types.FormatStyle) {
jobInfos := []types.ServingJobInfo{}
for _, job := range jobs {
jobInfos = append(jobInfos, job.Convert2JobInfo())
}
switch format {
case types.JsonFormat:
data, _ := json.MarshalIndent(jobInfos, "", " ")
fmt.Printf("%v", string(data))
return
case types.YamlFormat:
data, _ := yaml.Marshal(jobInfos)
fmt.Printf("%v", string(data))
return
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
header := []string{}
if allNamespace {
header = append(header, "NAMESPACE")
}
fields := []string{"NAME", "TYPE", "VERSION", "DESIRED", "AVAILABLE", "ADDRESS", "PORTS"}
header = append(header, fields...)
PrintLine(w, header...)
for _, jobInfo := range jobInfos {
line := []string{}
if allNamespace {
line = append(line, jobInfo.Namespace)
}
endpointAddress := jobInfo.IPAddress
ports := []string{}
for _, e := range jobInfo.Endpoints {
port := ""
if e.NodePort != 0 {
port = fmt.Sprintf("%v:%v->%v", strings.ToUpper(e.Name), e.NodePort, e.Port)
} else {
port = fmt.Sprintf("%v:%v", strings.ToUpper(e.Name), e.Port)
}
ports = append(ports, port)
}
if len(ports) == 0 {
ports = append(ports, "N/A")
}
items := []string{
jobInfo.Name,
fmt.Sprintf("%v", jobInfo.Type),
jobInfo.Version,
fmt.Sprintf("%v", jobInfo.Desired),
fmt.Sprintf("%v", jobInfo.Available),
endpointAddress,
strings.Join(ports, ","),
}
line = append(line, items...)
PrintLine(w, line...)
}
_ = w.Flush()
}