-
Notifications
You must be signed in to change notification settings - Fork 651
/
test_runner.py
151 lines (120 loc) · 4.3 KB
/
test_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""Test runner runs a TFJob test."""
import argparse
import logging
import os
import time
import uuid
import jinja2
import yaml
from kubernetes import client as k8s_client
from google.cloud import storage # pylint: disable=no-name-in-module
from py import test_util
from py import util
from py import tf_job_client
def run_test(args):
"""Run a test."""
gcs_client = storage.Client(project=args.project)
project = args.project
cluster_name = args.cluster
zone = args.zone
util.configure_kubectl(project, zone, cluster_name)
util.load_kube_config()
api_client = k8s_client.ApiClient()
t = test_util.TestCase()
t.class_name = "tfjob_test"
t.name = os.path.basename(args.spec)
loader = jinja2.FileSystemLoader(os.path.dirname(args.spec))
if not args.image_tag:
raise ValueError("--image_tag must be provided.")
logging.info("Loading spec from %s with image_tag=%s", args.spec, args.image_tag)
spec_contents = jinja2.Environment(loader=loader).get_template(
os.path.basename(args.spec)).render(image_tag=args.image_tag)
spec = yaml.load(spec_contents)
# Make the job name unique.
spec["metadata"]["name"] += "-" + uuid.uuid4().hex[0:4]
try:
start = time.time()
api_response = tf_job_client.create_tf_job(api_client, spec)
namespace = api_response["metadata"]["namespace"]
name = api_response["metadata"]["name"]
logging.info("Created job %s in namespaces %s", name, namespace)
results = tf_job_client.wait_for_job(api_client, namespace, name,
status_callback=tf_job_client.log_status)
if results["status"]["state"].lower() != "succeeded":
t.failure = "Job {0} in namespace {1} in state {2}".format(
name, namespace, results["status"]["state"])
# TODO(jlewi):
# Here are some validation checks to run:
# 1. Check tensorboard is created if its part of the job spec.
# 2. Check that all resources are garbage collected.
# TODO(jlewi): Add an option to add chaos and randomly kill various resources?
# TODO(jlewi): Are there other generic validation checks we should
# run.
except util.TimeoutError:
t.failure = "Timeout waiting for {0} in namespace {1} to finish.".format(
name, namespace)
except Exception as e: # pylint: disable-msg=broad-except
# We want to catch all exceptions because we warm the test as failed.
t.failure = e.message
finally:
t.time = time.time() - start
if args.junit_path:
test_util.create_junit_xml_file([t], args.junit_path, gcs_client)
def add_common_args(parser):
"""Add a set of common parser arguments."""
parser.add_argument(
"--spec",
default=None,
type=str,
required=True,
help="Path to the YAML file specifying the test to run.")
parser.add_argument(
"--project",
default=None,
type=str,
help=("The project to use."))
parser.add_argument(
"--cluster",
default=None,
type=str,
help=("The name of the cluster."))
parser.add_argument(
"--image_tag",
default=None,
type=str,
help="The tag for the docker image to use.")
parser.add_argument(
"--zone",
default="us-east1-d",
type=str,
help=("The zone for the cluster."))
parser.add_argument(
"--junit_path",
default="",
type=str,
help="Where to write the junit xml file with the results.")
def build_parser():
# create the top-level parser
parser = argparse.ArgumentParser(
description="Run a TFJob test.")
subparsers = parser.add_subparsers()
parser_test = subparsers.add_parser(
"test",
help="Run a tfjob test.")
add_common_args(parser_test)
parser_test.set_defaults(func=run_test)
return parser
def main(): # pylint: disable=too-many-locals
logging.getLogger().setLevel(logging.INFO) # pylint: disable=too-many-locals
if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
logging.info("GOOGLE_APPLICATION_CREDENTIALS is set; configuring gcloud "
"to use service account.")
# Since a service account is set tell gcloud to use it.
util.run(["gcloud", "auth", "activate-service-account", "--key-file=" +
os.getenv("GOOGLE_APPLICATION_CREDENTIALS")])
parser = build_parser()
# parse the args and call whatever function was selected
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()