Skip to content

Commit

Permalink
reduce the execution time of operating serving jobs (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
happy2048 committed Mar 5, 2021
1 parent c183c9b commit ad6688d
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 21 deletions.
4 changes: 3 additions & 1 deletion pkg/commands/serving/serving_custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/kubeflow/arena/pkg/apis/arenaclient"
"github.com/kubeflow/arena/pkg/apis/config"
"github.com/kubeflow/arena/pkg/apis/serving"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,7 +35,8 @@ func NewSubmitCustomServingJobCommand() *cobra.Command {
if err != nil {
return fmt.Errorf("failed to create arena client: %v\n", err)
}
job, err := builder.Namespace(viper.GetString("namespace")).Command(args).Build()

job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build()
if err != nil {
return fmt.Errorf("failed to validate command args: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/commands/serving/serving_kubeflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/kubeflow/arena/pkg/apis/arenaclient"
"github.com/kubeflow/arena/pkg/apis/config"
"github.com/kubeflow/arena/pkg/apis/serving"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/spf13/cobra"
Expand All @@ -30,7 +31,7 @@ func NewSubmitKFServingJobCommand() *cobra.Command {
if err != nil {
return fmt.Errorf("failed to create arena client: %v\n", err)
}
job, err := builder.Namespace(viper.GetString("namespace")).Command(args).Build()
job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build()
if err != nil {
return fmt.Errorf("failed to validate command args: %v", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/commands/serving/serving_seldon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package serving

import (
"fmt"

"github.com/kubeflow/arena/pkg/apis/arenaclient"
"github.com/kubeflow/arena/pkg/apis/config"
"github.com/kubeflow/arena/pkg/apis/serving"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/spf13/cobra"
Expand All @@ -29,7 +31,7 @@ func NewSubmitSeldonServingJobCommand() *cobra.Command {
if err != nil {
return fmt.Errorf("failed to create arena client: %v\n", err)
}
job, err := builder.Namespace(viper.GetString("namespace")).Command(args).Build()
job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build()
if err != nil {
return fmt.Errorf("failed to validate command args: %v", err)
}
Expand All @@ -38,4 +40,4 @@ func NewSubmitSeldonServingJobCommand() *cobra.Command {
}
builder.AddCommandFlags(command)
return command
}
}
3 changes: 2 additions & 1 deletion pkg/commands/serving/serving_tensorflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/kubeflow/arena/pkg/apis/arenaclient"
"github.com/kubeflow/arena/pkg/apis/config"
"github.com/kubeflow/arena/pkg/apis/serving"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/spf13/cobra"
Expand All @@ -30,7 +31,7 @@ func NewSubmitTFServingJobCommand() *cobra.Command {
if err != nil {
return fmt.Errorf("failed to create arena client: %v\n", err)
}
job, err := builder.Namespace(viper.GetString("namespace")).Command(args).Build()
job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build()
if err != nil {
return fmt.Errorf("failed to validate command args: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/commands/serving/serving_tensorrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/kubeflow/arena/pkg/apis/arenaclient"
"github.com/kubeflow/arena/pkg/apis/config"
"github.com/kubeflow/arena/pkg/apis/serving"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,7 +35,7 @@ func NewSubmitTRTServingJobCommand() *cobra.Command {
if err != nil {
return fmt.Errorf("failed to create arena client: %v\n", err)
}
job, err := builder.Namespace(viper.GetString("namespace")).Command(args).Build()
job, err := builder.Namespace(config.GetArenaConfiger().GetNamespace()).Command(args).Build()
if err != nil {
return fmt.Errorf("failed to validate command args: %v", err)
}
Expand Down
34 changes: 27 additions & 7 deletions pkg/serving/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"text/tabwriter"

"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -54,13 +55,32 @@ func SearchServingJob(namespace, name, version string, servingType types.Serving
return servingJobs[0], nil
}
jobs := []ServingJob{}
for _, p := range processers {
servingJobs, err := p.GetServingJobs(namespace, name, version)
if err != nil {
log.Debugf("processer %v does not support the serving job %v", p.Type(), name)
continue
}
jobs = append(jobs, servingJobs...)
var wg sync.WaitGroup
locker := new(sync.RWMutex)
noPrivileges := false
for _, pr := range processers {
wg.Add(1)
p := pr
go func() {
defer wg.Done()
servingJobs, err := p.GetServingJobs(namespace, name, version)
if err != nil {
if strings.Contains(err.Error(), "forbidden: User") {
log.Debugf("the user has no privileges to get the serving job %v,reason: %v", p.Type(), err)
noPrivileges = true
return
}
log.Debugf("processer %v does not support the serving job %v", p.Type(), name)
return
}
locker.Lock()
jobs = append(jobs, servingJobs...)
locker.Unlock()
}()
}
wg.Wait()
if noPrivileges {
return nil, fmt.Errorf("the user has no privileges to get the serving job in namespace %v", namespace)
}
if len(jobs) == 0 {
return nil, fmt.Errorf(errNotFoundServingJobMessage, name, name)
Expand Down
36 changes: 30 additions & 6 deletions pkg/serving/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"text/tabwriter"

"github.com/kubeflow/arena/pkg/apis/types"
Expand All @@ -30,13 +31,36 @@ func ListServingJobs(namespace string, allNamespace bool, servingType types.Serv
return processer.ListServingJobs(namespace, allNamespace)
}
servingJobs := []ServingJob{}
for _, p := range processers {
jobs, err := p.ListServingJobs(namespace, allNamespace)
if err != nil {
log.Debugf("failed to get serving jobs whose type are %v", p.Type())
continue
var wg sync.WaitGroup
locker := new(sync.RWMutex)
noPrivileges := false
for _, pr := range processers {
wg.Add(1)
p := pr
go func() {
defer wg.Done()
jobs, err := p.ListServingJobs(namespace, allNamespace)
if err != nil {
if strings.Contains(err.Error(), "forbidden: User") {
log.Debugf("the user has no privileges to get the serving job %v,reason: %v", p.Type(), err)
noPrivileges = true
return
}
log.Debugf("failed to get serving jobs whose type are %v", p.Type())
return
}
locker.Lock()
servingJobs = append(servingJobs, jobs...)
locker.Unlock()
}()
}
wg.Wait()
if noPrivileges {
item := fmt.Sprintf("namespace %v", namespace)
if allNamespace {
item = fmt.Sprintf("all namespaces")
}
servingJobs = append(servingJobs, jobs...)
return nil, fmt.Errorf("the user has no privileges to list the serving jobs in %v", item)
}
return servingJobs, nil
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/serving/serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var once sync.Once

func GetAllProcesser() map[types.ServingJobType]Processer {
once.Do(func() {
locker := new(sync.RWMutex)
processers = map[types.ServingJobType]Processer{}
processerInits := []func() Processer{
NewCustomServingProcesser,
Expand All @@ -50,10 +51,19 @@ func GetAllProcesser() map[types.ServingJobType]Processer {
NewTensorrtServingProcesser,
NewSeldonServingProcesser,
}
var wg sync.WaitGroup
for _, initFunc := range processerInits {
p := initFunc()
processers[p.Type()] = p
wg.Add(1)
f := initFunc
go func() {
defer wg.Done()
p := f()
locker.Lock()
processers[p.Type()] = p
locker.Unlock()
}()
}
wg.Wait()
})
return processers
}
Expand Down

0 comments on commit ad6688d

Please sign in to comment.