Skip to content

Commit

Permalink
Run integration tests in parallel when possible.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie committed Jul 29, 2015
1 parent 4da999b commit 669a0db
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -37,6 +37,7 @@ prog/sigproxy/sigproxy
prog/weavewait/weavewait
prog/netcheck/netcheck
testing/cover/cover
testing/runner/runner
tools/bin
tools/build
releases
Expand Down
8 changes: 5 additions & 3 deletions Makefile
Expand Up @@ -15,8 +15,9 @@ SIGPROXY_EXE=prog/sigproxy/sigproxy
WEAVEWAIT_EXE=prog/weavewait/weavewait
NETCHECK_EXE=prog/netcheck/netcheck
COVER_EXE=testing/cover/cover
RUNNER_EXE=testing/runner/runner

EXES=$(WEAVER_EXE) $(SIGPROXY_EXE) $(WEAVEPROXY_EXE) $(WEAVEWAIT_EXE) $(NETCHECK_EXE) $(COVER_EXE)
EXES=$(WEAVER_EXE) $(SIGPROXY_EXE) $(WEAVEPROXY_EXE) $(WEAVEWAIT_EXE) $(NETCHECK_EXE) $(COVER_EXE) $(RUNNER_EXE)

WEAVER_UPTODATE=.weaver.uptodate
WEAVEEXEC_UPTODATE=.weaveexec.uptodate
Expand All @@ -42,7 +43,7 @@ NETGO_CHECK=@strings $@ | grep cgo_stub\\\.go >/dev/null || { \
false; \
}

all: $(WEAVE_EXPORT) $(COVER_EXE)
all: $(WEAVE_EXPORT) $(COVER_EXE) $(RUNNER_EXE)

travis: $(EXES)

Expand Down Expand Up @@ -75,8 +76,9 @@ $(NETCHECK_EXE): prog/netcheck/netcheck.go
$(SIGPROXY_EXE): prog/sigproxy/main.go
$(WEAVEWAIT_EXE): prog/weavewait/main.go
$(COVER_EXE): testing/cover/cover.go
$(RUNNER_EXE): testing/runner/runner.go

$(WEAVEWAIT_EXE) $(SIGPROXY_EXE) $(COVER_EXE):
$(WEAVEWAIT_EXE) $(SIGPROXY_EXE) $(COVER_EXE) $(RUNNER_EXE):
go get ./$(@D)
go build -o $@ ./$(@D)

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
36 changes: 8 additions & 28 deletions test/run_all.sh
Expand Up @@ -10,37 +10,17 @@ if ! bash "$DIR/sanity_check.sh"; then
fi
whitely echo ...ok

# Modified version of _assert_cleanup from assert.sh that
# prints overall status
check_test_status() {
if [ $? -ne 0 ]; then
redly echo "---= !!!ABNORMAL TEST TERMINATION!!! =---"
elif [ $tests_suite_status -ne 0 ]; then
redly echo "---= !!!SUITE FAILURES - SEE ABOVE FOR DETAILS!!! =---"
exit $tests_suite_status
else
greenly echo "---= ALL SUITES PASSED =---"
fi
}
# Overwrite assert.sh _assert_cleanup trap with our own
trap check_test_status EXIT

TESTS="${@:-*_test.sh}"
TESTS="${@:-$(find . -name '*_test.sh')}"
RUNNER_ARGS=""

# If running on circle, use the scheduler to work out what tests to run
if [ -n "$CIRCLECI" -a -z "$NO_SCHEDULER" ]; then
TESTS=$(echo $TESTS | "$DIR/sched" sched integration-$CIRCLE_BUILD_NUM $CIRCLE_NODE_TOTAL $CIRCLE_NODE_INDEX)
RUNNER_ARGS="$RUNNER_ARGS -scheduler"
fi

echo Running $TESTS

for t in $TESTS; do
echo
greyly echo "---= Running $t =---"
. $t
# If running on circle or PARALLEL is not empty, run tests in parallel
if [ -n "$CIRCLECI" -o -z "$PARALLEL" ]; then
RUNNER_ARGS="$RUNNER_ARGS -parallel"
fi

# Report test runtime when running on circle, to help scheduler
if [ -n "$CIRCLECI" -a -z "$NO_SCHEDULER" ]; then
"$DIR/sched" time $t $(bc -l <<< "$tests_time/1000000000")
fi
done
../testing/runner/runner $RUNNER_ARGS $TESTS
246 changes: 246 additions & 0 deletions testing/runner/runner.go
@@ -0,0 +1,246 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/docker/pkg/mflag"
"github.com/mgutz/ansi"
)

const (
schedulerHost = "positive-cocoa-90213.appspot.com"
JSON = "application/json"
)

var (
start = ansi.ColorCode("white+ub")
fail = ansi.ColorCode("red+b")
succ = ansi.ColorCode("green+b")
reset = ansi.ColorCode("reset")

useScheduler = false
runParallel = false

consoleLock = sync.Mutex{}
)

type test struct {
name string
hosts int
}

type schedule struct {
Tests []string `json:"tests"`
}

type result struct {
errored bool
hosts []string
}

type tests []test

func (ts tests) Len() int { return len(ts) }
func (ts tests) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
func (ts tests) Less(i, j int) bool {
if ts[i].hosts != ts[j].hosts {
return ts[i].hosts < ts[j].hosts
}
return ts[i].name < ts[j].name
}

func (ts *tests) pick(availible int) (test, bool) {
// pick the first test that fits in the availible hosts
for i, test := range *ts {
if test.hosts <= availible {
*ts = append((*ts)[:i], (*ts)[i+1:]...)
return test, true
}
}

return test{}, false
}

func (t test) run(hosts []string) bool {
consoleLock.Lock()
fmt.Printf("%s>>> Running %s on %s%s\n", start, t.name, hosts, reset)
consoleLock.Unlock()

var out bytes.Buffer

cmd := exec.Command(t.name)
cmd.Env = os.Environ()
cmd.Stdout = &out
cmd.Stderr = &out

// replace HOSTS in env
for i, env := range cmd.Env {
if strings.HasPrefix(env, "HOSTS") {
cmd.Env[i] = fmt.Sprintf("HOSTS=%s", strings.Join(hosts, " "))
break
}
}

start := time.Now()
err := cmd.Run()
duration := float64(time.Now().Sub(start)) / float64(time.Second)

consoleLock.Lock()
if err != nil {
fmt.Printf("%s>>> Test %s finished after %0.1f secs with error: %v%s\n", fail, t.name, duration, err, reset)
} else {
fmt.Printf("%s>>> Test %s finished with success after %0.1f secs%s\n", succ, t.name, duration, reset)
}
fmt.Print(out.String())
fmt.Println()
consoleLock.Unlock()

if err != nil && useScheduler {
updateScheduler(t.name, duration)
}

return err != nil
}

func updateScheduler(test string, duration float64) {
req := &http.Request{
Method: "POST",
Host: schedulerHost,
URL: &url.URL{
Opaque: fmt.Sprintf("/record/%s/%0.2f", url.QueryEscape(test), duration),
Scheme: "http",
Host: schedulerHost,
},
Close: true,
}
if resp, err := http.DefaultClient.Do(req); err != nil {
fmt.Printf("Error updating scheduler: %v\n", err)
} else {
resp.Body.Close()
}
}

func getSchedule(tests []string) ([]string, error) {
var (
testRun = "integration-" + os.Getenv("CIRCLE_BUILD_NUM")
shardCount = os.Getenv("CIRCLE_NODE_TOTAL")
shardID = os.Getenv("CIRCLE_NODE_INDEX")
requestBody = &bytes.Buffer{}
)
if err := json.NewEncoder(requestBody).Encode(schedule{tests}); err != nil {
return []string{}, err
}
url := fmt.Sprintf("http://%s/schedule/%s/%s/%s", schedulerHost, testRun, shardCount, shardID)
resp, err := http.Post(url, JSON, requestBody)
if err != nil {
return []string{}, err
}
var sched schedule
if err := json.NewDecoder(resp.Body).Decode(&sched); err != nil {
return []string{}, err
}
return sched.Tests, nil
}

func getTests(testNames []string) (tests, error) {
var err error
if useScheduler {
testNames, err = getSchedule(testNames)
if err != nil {
return tests{}, err
}
}
tests := tests{}
for _, name := range testNames {
parts := strings.Split(strings.TrimSuffix(name, "_test.sh"), "_")
numHosts, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
numHosts = 1
}
tests = append(tests, test{name, numHosts})
fmt.Printf("Test %s needs %d hosts\n", name, numHosts)
}
sort.Sort(sort.Reverse(tests))
return tests, nil
}

func parallel(tests tests, hosts []string) bool {
resultsChan := make(chan result)
outstanding := 0
errored := false
for len(tests) > 0 || outstanding > 0 {
// While we have some free hosts, try and schedule
// a test on them
for len(hosts) > 0 {
test, ok := tests.pick(len(hosts))
if !ok {
break
}
testHosts := hosts[:test.hosts]
hosts = hosts[test.hosts:]

go func() {
errored := test.run(testHosts)
resultsChan <- result{errored, testHosts}
}()
outstanding++
}

// Otherwise, wait for the test to finish and return
// the hosts to the pool
result := <-resultsChan
hosts = append(hosts, result.hosts...)
outstanding--
errored = result.errored || errored
}
return errored
}

func sequential(tests tests, hosts []string) bool {
errored := false
for _, test := range tests {
errored = test.run(hosts) || errored
}
return errored
}

func main() {
mflag.BoolVar(&useScheduler, []string{"scheduler"}, false, "Use scheduler to distribute tests across shards")
mflag.BoolVar(&runParallel, []string{"parallel"}, false, "Run tests in parallel on hosts where possible")
mflag.Parse()

tests, err := getTests(mflag.Args())
if err != nil {
fmt.Printf("Error parsing tests: %v\n", err)
os.Exit(1)
}

hosts := strings.Fields(os.Getenv("HOSTS"))
maxHosts := len(hosts)
if maxHosts == 0 {
fmt.Print("No HOSTS specified.\n")
os.Exit(1)
}

var errored bool
if runParallel {
errored = parallel(tests, hosts)
} else {
errored = sequential(tests, hosts)
}

if errored {
os.Exit(1)
}
}

0 comments on commit 669a0db

Please sign in to comment.