forked from aws/deep-learning-containers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
testrunner.py
191 lines (154 loc) · 6.99 KB
/
testrunner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import random
import sys
import logging
import re
from multiprocessing import Pool
import boto3
import pytest
from botocore.config import Config
from invoke import run
from invoke.context import Context
from test_utils import eks as eks_utils
from test_utils import get_dlc_images, is_pr_context, destroy_ssh_keypair, KEYS_TO_DESTROY_FILE
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOGGER.addHandler(logging.StreamHandler(sys.stdout))
def assign_sagemaker_instance_type(image):
if "tensorflow" in image:
return "ml.p3.8xlarge" if "gpu" in image else "ml.c4.4xlarge"
else:
return "ml.p2.8xlarge" if "gpu" in image else "ml.c4.8xlarge"
def generate_sagemaker_pytest_cmd(image):
"""
Parses the image ECR url and returns appropriate pytest command
:param image: ECR url of image
:return: <tuple> pytest command to be run, path where it should be executed, image tag
"""
reruns = 4
region = os.getenv("AWS_REGION", "us-west-2")
integration_path = os.path.join("integration", "sagemaker")
account_id = os.getenv("ACCOUNT_ID", image.split(".")[0])
docker_base_name, tag = image.split("/")[1].split(":")
# Assign instance type
instance_type = assign_sagemaker_instance_type(image)
# Get path to test directory
find_path = docker_base_name.split("-")
# NOTE: We are relying on the fact that repos are defined as <context>-<framework>-<job_type> in our infrastructure
framework = find_path[1]
job_type = find_path[2]
path = os.path.join("test", "sagemaker_tests", framework, job_type)
aws_id_arg = "--aws-id"
docker_base_arg = "--docker-base-name"
instance_type_arg = "--instance-type"
# Conditions for modifying tensorflow SageMaker pytest commands
if framework == "tensorflow":
if job_type == "training":
aws_id_arg = "--account-id"
# NOTE: We rely on Framework Version being in "major.minor.patch" format
tf_framework_version = re.search(r"\d+(\.\d+){2}", tag).group()
tf_major_version = tf_framework_version.split(".")[0]
path = os.path.join(os.path.dirname(path), f"{framework}{tf_major_version}_training")
else:
aws_id_arg = "--registry"
docker_base_arg = "--repo"
integration_path = os.path.join(integration_path, "test_tfs.py")
instance_type_arg = "--instance-types"
test_report = os.path.join(os.getcwd(), "test", f"{tag}.xml")
return (
f"pytest --reruns {reruns} {integration_path} --region {region} {docker_base_arg} "
f"{docker_base_name} --tag {tag} {aws_id_arg} {account_id} {instance_type_arg} {instance_type} "
f"--junitxml {test_report}",
path,
tag,
)
def run_sagemaker_pytest_cmd(image):
"""
Run pytest in a virtual env for a particular image
Expected to run via multiprocessing
:param image: ECR url
"""
pytest_command, path, tag = generate_sagemaker_pytest_cmd(image)
context = Context()
with context.cd(path):
context.run(f"virtualenv {tag}")
with context.prefix(f"source {tag}/bin/activate"):
context.run("pip install -r requirements.txt", warn=True)
context.run(pytest_command)
def run_sagemaker_tests(images):
"""
Function to set up multiprocessing for SageMaker tests
:param images: <list> List of all images to be used in SageMaker tests
"""
if not images:
return
pool_number = len(images)
with Pool(pool_number) as p:
p.map(run_sagemaker_pytest_cmd, images)
def pull_dlc_images(images):
"""
Pulls DLC images to CodeBuild jobs before running PyTest commands
"""
for image in images:
run(f"docker pull {image}", hide="out")
def setup_eks_clusters(dlc_images):
frameworks = {"tensorflow": "tf", "pytorch": "pt", "mxnet": "mx"}
frameworks_in_images = [framework for framework in frameworks.keys() if framework in dlc_images]
if len(frameworks_in_images) != 1:
raise ValueError(
f"All images in dlc_images must be of a single framework for EKS tests.\n"
f"Instead seeing {frameworks_in_images} frameworks."
)
long_name = frameworks_in_images[0]
short_name = frameworks[long_name]
num_nodes = 2 if is_pr_context() else 3 if long_name != "pytorch" else 4
cluster_name = f"dlc-{short_name}-cluster-" \
f"{os.getenv('CODEBUILD_RESOLVED_SOURCE_VERSION')}-{random.randint(1, 10000)}"
eks_utils.create_eks_cluster(cluster_name, "gpu", num_nodes, "p3.16xlarge", "pytest.pem")
eks_utils.eks_setup(long_name, cluster_name)
return cluster_name
def main():
# Define constants
test_type = os.getenv("TEST_TYPE")
dlc_images = get_dlc_images()
LOGGER.info(f"Images tested: {dlc_images}")
all_image_list = dlc_images.split(" ")
standard_images_list = [image_uri for image_uri in all_image_list if "example" not in image_uri]
new_eks_cluster_name = None
benchmark_mode = "benchmark" in test_type
specific_test_type = re.sub("benchmark-", "", test_type) if benchmark_mode else test_type
test_path = os.path.join("benchmark", specific_test_type) if benchmark_mode else specific_test_type
if specific_test_type in ("sanity", "ecs", "ec2", "eks"):
report = os.path.join(os.getcwd(), "test", f"{test_type}.xml")
# PyTest must be run in this directory to avoid conflicting w/ sagemaker_tests conftests
os.chdir(os.path.join("test", "dlc_tests"))
# Pull images for necessary tests
if specific_test_type == "sanity":
pull_dlc_images(all_image_list)
if specific_test_type == "eks":
new_eks_cluster_name = setup_eks_clusters(dlc_images)
# Execute dlc_tests pytest command
pytest_cmd = ["-s", "-rA", test_path, f"--junitxml={report}", "-n=auto"]
try:
sys.exit(pytest.main(pytest_cmd))
finally:
if specific_test_type == "eks":
eks_utils.delete_eks_cluster(new_eks_cluster_name)
# Delete dangling EC2 KeyPairs
if specific_test_type == "ec2" and os.path.exists(KEYS_TO_DESTROY_FILE):
with open(KEYS_TO_DESTROY_FILE) as key_destroy_file:
for key_file in key_destroy_file:
LOGGER.info(key_file)
ec2_client = boto3.client("ec2", config=Config(retries={'max_attempts': 10}))
if ".pem" in key_file:
_resp, keyname = destroy_ssh_keypair(ec2_client, key_file)
LOGGER.info(f"Deleted {keyname}")
elif specific_test_type == "sagemaker":
run_sagemaker_tests(
[image for image in standard_images_list if not ("tensorflow-inference" in image and "py2" in image)]
)
else:
raise NotImplementedError(f"{test_type} test is not supported. "
f"Only support ec2, ecs, eks, sagemaker and sanity currently")
if __name__ == "__main__":
main()