Skip to content

Commit

Permalink
Add smarted scheduler to decide which tests to run on which shard.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie authored and Tom Wilkie committed May 29, 2015
1 parent 3a0c2df commit 2c61e4c
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 11 deletions.
1 change: 1 addition & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
docker save weaveworks/weave-build >$WEAVE_BUILD;
fi
post:
- pip install requests
- curl https://sdk.cloud.google.com | bash
- bin/setup-circleci-secrets "$SECRET_PASSWORD"
- mkdir -p $(dirname $SRCDIR)
Expand Down
19 changes: 8 additions & 11 deletions test/run_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@ check_test_status() {
# Overwrite assert.sh _assert_cleanup trap with our own
trap check_test_status EXIT

# If running on circle, only run the tests for my "shard".
# CIRCLE_NODE_TOTAL is the total number of shards, and
# CIRCLE_NODE_INDEX is my index.
i=0
TESTS=
for test in *_test.sh; do
if [ -z "$CIRCLECI" ] || [ $(($i % $CIRCLE_NODE_TOTAL)) -eq $CIRCLE_NODE_INDEX ]; then
TESTS="$TESTS $test"
fi
i=$(($i + 1))
done
TESTS=*_test.sh

# If running on circle, use the scheduler to work out what tests to run
if [ -n "$CIRCLECI" ]; then
TESTS=$(echo $TESTS | ./sched sched $CIRCLE_BUILD_NUM $CIRCLE_NODE_TOTAL $CIRCLE_NODE_INDEX)
fi

echo Running $TESTS

for t in $TESTS; do
echo
Expand Down
38 changes: 38 additions & 0 deletions test/sched
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/python
import sys, string, json
import requests

BASE_URL="http://positive-cocoa-90213.appspot.com"

def test_time(test_name, runtime):
r = requests.post(BASE_URL + "/record/%s/%f" % (test_name, runtime))
print r.text
assert r.status_code == 204

def test_sched(test_run, shard_count, shard_id):
tests = json.dumps({'tests': string.split(sys.stdin.read())})
r = requests.post(BASE_URL + "/schedule/%d/%d/%d" % (test_run, shard_count, shard_id), data=tests)
assert r.status_code == 200
result = r.json()
for test in sorter(result['tests']):
print test

def usage():
print "%s <cmd> <args..>" % sys.argv[0]
print " time <test name> <run time>"
print " sched <test run> <num shards> <shard id>"

def main():
if len(sys.argv) < 4:
usage()
sys.exit(1)

if sys.argv[1] == "time":
test_time(sys.argv[2], float(sys.argv[3]))
elif sys.argv[1] == "sched":
test_sched(int(sys.argv[2]), int(sys.argv[3]), int(sys.argv[4]))
else:
usage()

if __name__ == '__main__':
main()
14 changes: 14 additions & 0 deletions test/scheduler/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
application: positive-cocoa-90213
version: 1
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: .*
script: main.app

libraries:
- name: webapp2
version: latest

61 changes: 61 additions & 0 deletions test/scheduler/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import operator

import flask

from google.appengine.ext import ndb

app = flask.Flask('scheduler')
app.debug = True

class Test(ndb.Model):
total_run_time = ndb.FloatProperty(default=0.)
total_runs = ndb.IntegerProperty(default=0)

class Schedule(ndb.Model):
shards = ndb.JsonProperty()

@app.route('/record/<test_name>/<runtime>', methods=['POST'])
@ndb.transactional
def record(test_name, runtime):
test = Test.get_by_id(test_name)
if test is None:
test = Test(id=test_name)
test.total_run_time += float(runtime)
test.total_runs += 1
test.put()
return ('', 204)

@app.route('/schedule/<int:test_run>/<int:shard_count>/<int:shard>', methods=['POST'])
def schedule(test_run, shard_count, shard):
# read tests from body
test_names = flask.request.get_json(force=True)['tests']

# first see if we have a scedule already
schedule_id = "%d-%d" % (test_run, shard_count)
schedule = Schedule.get_by_id(schedule_id)
if schedule is not None:
return flask.json.jsonify(tests=schedule.shards[str(shard)])

# if not, do simple greedy algorithm
test_times = ndb.get_multi(ndb.Key(Test, test_name) for test_name in test_names)
def avg(test):
if test is not None:
return test.total_run_time / test.total_runs
return 1
test_times = [(test_name, avg(test)) for test_name, test in zip(test_names, test_times)]
test_times_dict = dict(test_times)
test_times.sort(key=operator.itemgetter(1))

shards = {i: [] for i in xrange(shard_count)}
while test_times:
test_name, time = test_times.pop()

# find shortest shard and put it in that
s, _ = min(((i, sum(test_times_dict[t] for t in shards[i]))
for i in xrange(shard_count)), key=operator.itemgetter(1))

shards[s].append(test_name)

# atomically insert or retrieve existing schedule
schedule = Schedule.get_or_insert(schedule_id, shards=shards)
return flask.json.jsonify(tests=schedule.shards[str(shard)])

0 comments on commit 2c61e4c

Please sign in to comment.