/
testenv-client
executable file
·231 lines (199 loc) · 8.82 KB
/
testenv-client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#!/usr/bin/python
#
# Runs a tripleo-ci test-client
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import argparse
import json
import logging
import sys
import subprocess
import os
import tempfile
import textwrap
import threading
import time
import uuid
import gear
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('testenv-client')
logger.setLevel(logging.INFO)
class TestCallback(object):
def __init__(self, servers, name, command):
self.servers = servers
self.name = name
self.command = command
self.created = time.time()
# Default the return value to None, this may end up being
# used if the gearman worker goes down before the job finishes
self.rv = None
def __call__(self):
self.worker = gear.Worker('testenv-client-%s' % self.name)
add_servers(self.worker, self.servers)
self.worker.waitForServer()
self.worker.registerFunction(self.name)
try:
job = self.worker.getJob()
except gear.InterruptedError:
return
logger.info('Received job : %s', job.arguments.strip())
time_waiting = time.time() - self.created
if time_waiting > 90:
logger.warn('%.1f seconds waiting for a worker.' % (time_waiting))
if "Couldn't retrieve env" in job.arguments or "Failed creating OVB stack" in job.arguments:
logger.error(job.arguments)
self.rv = 2
job.sendWorkComplete("")
return
logger.info('Running command "%s"', ' '.join(self.command))
with tempfile.NamedTemporaryFile('w') as fp:
fp.write(job.arguments)
fp.flush()
os.environ["TE_DATAFILE"] = fp.name
try:
self.rv = subprocess.call(self.command)
except:
logger.exception("Error calling command")
self.rv = 2
job.sendWorkComplete("")
class TestEnvClient(gear.Client):
def __init__(self):
super(TestEnvClient, self).__init__()
self.event = threading.Event()
def handleWorkComplete(self, packet):
super(TestEnvClient, self).handleWorkComplete(packet)
self.event.set()
def handleWorkException(self, packet):
super(TestEnvClient, self).handleWorkException(packet)
self.event.set()
def handleWorkFail(self, packet):
super(TestEnvClient, self).handleWorkFail(packet)
self.event.set()
def wait(self, timeout=None):
"""Wait for notification of completion, error or failure.
:param timeout: a timeout for the operation in seconds
:type timeout: float
:returns: True if a notification was received, False on timeout
"""
self.event.wait(timeout)
return self.event.is_set()
def add_servers(client, servers):
for server in servers.split(','):
server = server.rsplit(':', 1)
if len(server) == 1:
server.append('4730')
client.addServer(server[0], int(server[1]))
def main(args=sys.argv[1:]):
parser = argparse.ArgumentParser(
description=(textwrap.dedent("""
Starts up a gearman worker and then calls the job "lockenv" over
gearman, then waits for the worker to be called, once the worker
is called it will place the provided data in a datafile (indicated
by the TE_DATAFILE environment variable) and run the "command"
provided, the exit code will be the exit code of the command that
was run. Essentially this allows a command to be run while the
worker is holding a test environment in a locked state e.g. to
simply output the data provided one could run the command:
$ echo 'cat $TE_DATAFILE' | %s -- bash
""" % sys.argv[0])),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('command', nargs="+",
help='A command to run once the test env is locked')
parser.add_argument('--geard', '-b', default='127.0.0.1:4730',
help='A comma separated list of gearman brokers to '
'connect to.')
parser.add_argument('--jobnum', '-n', default=uuid.uuid4().hex,
help='A unique identifier identifing this job.')
parser.add_argument('--timeout', '-t', default='10800',
help='Set a timeout, after which the command will '
'be killed.')
parser.add_argument('--envsize', default="2",
help='Number of baremetal nodes to request')
parser.add_argument('--compute-envsize', default='0',
help='Number of compute baremetal nodes to request. '
'When this is set to a value > 0, the primary '
'nodes will be tagged with the controller '
'profile and the extra nodes with compute. The '
'compute nodes will be a smaller flavor in order '
'to use less resources.')
parser.add_argument('--ucinstance',
help='uuid for the undercloud instance (where an '
'interface on the provisioning net is attached')
parser.add_argument('--create-undercloud', action='store_true',
help='deploy the undercloud node.')
parser.add_argument('--ssh-key', default='',
help='ssh key for the ovb nodes to be deployed.')
parser.add_argument('--net-iso',
default="multi-nic",
choices=['none', 'multi-nic', 'public-bond'],
help='"none" requests an environment without network '
'isolation, "multi-nic" requests one with a '
'basic multiple nic configuration, and '
'"public-bond" requests one like "multi-nic" '
'but with two public nics for use with bonded '
'nic-configs.')
parser.add_argument('--extra-nodes', default='0',
help='Number of extra undercloud-like nodes to '
'request')
parser.add_argument('--debug', '-d', action='store_true',
help='Set to debug mode.')
opts = parser.parse_args(args)
if opts.debug:
logger.setLevel(logging.DEBUG)
callback_name = "callback_" + opts.jobnum
cb = TestCallback(opts.geard, callback_name, opts.command)
threading.Thread(target=cb).start()
client = TestEnvClient()
add_servers(client, opts.geard)
client.waitForServer()
job_identifier = '%s: %s' % (os.environ.get('ZUUL_CHANGE', 'No change'),
os.environ['TOCI_JOBTYPE'])
job_params = {
"callback_name": callback_name,
"timeout": opts.timeout,
"envsize":opts.envsize,
"compute_envsize":opts.compute_envsize,
"ucinstance":opts.ucinstance,
"create_undercloud": "true" if opts.create_undercloud else "",
"ssh_key":opts.ssh_key,
"net_iso":opts.net_iso,
"extra_nodes":opts.extra_nodes,
"job_identifier":job_identifier,
}
job = gear.Job('lockenv', json.dumps(job_params))
client.submitJob(job)
# No timeout here as there will be a timeout on the jenkins jobs, which is
# also passed to the testenv-worker, lets not second guess them.
client.wait()
if job.failure:
# This signals an error with the gearman connection to the worker
# we log it, but still return cb.rv the command may have succeeded
logger.error("The gearman Job has failed")
cb.worker.stopWaitingForJobs()
# If the testenv worker releases the environment before our command
# completes we kill this process and all its children, to immediately
# stop the running job
if cb.rv is None:
logger.error("The command hasn't completed but the testenv worker has "
"released the environment. Killing all processes.")
subprocess.call(["sudo", "kill", "-9", "-%d" % os.getpgrp()])
logger.debug("Exiting with status : %d", cb.rv)
return cb.rv
if __name__ == '__main__':
exit(main())