Skip to content

Commit

Permalink
runBuiltinConfigServer (#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgarithm committed Oct 8, 2020
1 parent 20c1a0f commit 9f34568
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 18 deletions.
19 changes: 3 additions & 16 deletions scripts/tests/run-tensorflow-resize-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,20 @@ cd ../..
ROOT=$(pwd)

kungfu_run_flags() {
local config_port=9100
echo -q
echo -w
echo -config-server http://127.0.0.1:$config_port/config
echo -builtin-config-port $config_port
echo -np 1
echo -config-server http://127.0.0.1:9100/config
}

kungfu_run() {
kungfu-run $(kungfu_run_flags) $@
}

start_config_server() {
./bin/kungfu-config-server &
}

stop_config_server() {
curl http://127.0.0.1:9100/stop
}

GOBIN=$PWD/bin \
go install -v ./srcs/go/cmd/kungfu-config-server

# Test CPU
start_config_server
kungfu_run python3 tests/python/integration/test_tensorflow_resize.py
stop_config_server

# Test NCCL
start_config_server
kungfu_run python3 tests/python/integration/test_tensorflow_resize.py --use-nccl
stop_config_server
34 changes: 34 additions & 0 deletions srcs/go/cmd/kungfu-run/app/builtin-config-server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package app

import (
"context"
"net"
"net/http"
"strconv"

"github.com/lsds/KungFu/srcs/go/kungfu/elastic/configserver"
"github.com/lsds/KungFu/srcs/go/log"
)

func runBuiltinConfigServer(port int) {
const endpoint = `/config`
addr := net.JoinHostPort("", strconv.Itoa(port))
log.Infof("running builtin config server listening %s%s", addr, endpoint)
_, cancel := context.WithCancel(context.TODO())
defer cancel()
srv := &http.Server{
Addr: addr,
Handler: logRequest(configserver.New(cancel, nil, endpoint)),
}
srv.SetKeepAlivesEnabled(false)
if err := srv.ListenAndServe(); err != nil {
log.Errorf("config server stopped: %v", err)
}
}

func logRequest(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
log.Debugf("%s %s from %s, UA: %s", req.Method, req.URL.Path, req.RemoteAddr, req.UserAgent())
h.ServeHTTP(w, req)
})
}
3 changes: 3 additions & 0 deletions srcs/go/cmd/kungfu-run/app/kungfu-run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func Main(args []string) {
log.Warnf("delay start for %s", f.DelayStart)
time.Sleep(f.DelayStart)
}
if f.BuiltinConfigPort > 0 {
go runBuiltinConfigServer(f.BuiltinConfigPort)
}
if logfile := f.Logfile; len(logfile) > 0 {
if len(f.LogDir) > 0 {
logfile = path.Join(f.LogDir, logfile)
Expand Down
6 changes: 4 additions & 2 deletions srcs/go/kungfu/runner/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type FlagSet struct {
Prog string
Args []string

// testing flag
DelayStart time.Duration
// debug and testing flags
BuiltinConfigPort int
DelayStart time.Duration
}

func (f *FlagSet) Register(flag *flag.FlagSet) {
Expand Down Expand Up @@ -97,6 +98,7 @@ func (f *FlagSet) Register(flag *flag.FlagSet) {
flag.BoolVar(&f.Quiet, "q", false, "don't log debug info")

flag.DurationVar(&f.DelayStart, "delay", 0, "delay start for testing purpose")
flag.IntVar(&f.BuiltinConfigPort, "builtin-config-port", 0, "will run a builtin config server if not zero")
}

var errMissingProgramName = errors.New("missing program name")
Expand Down
3 changes: 3 additions & 0 deletions tests/python/integration/test_tensorflow_resize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import tensorflow as tf
from kungfu.python import detached
from kungfu.tensorflow.ops import all_reduce, resize
from tensorflow.python.util import deprecation

deprecation._PRINT_DEPRECATION_WARNINGS = False


def parse_args():
Expand Down

0 comments on commit 9f34568

Please sign in to comment.