This repository has been archived by the owner on Aug 1, 2019. It is now read-only.
/
test_nodepool_integration.py
128 lines (106 loc) · 4.53 KB
/
test_nodepool_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import time
from unittest import skip
import zuul.zk
import zuul.nodepool
from zuul import model
from tests.base import BaseTestCase
class TestNodepoolIntegration(BaseTestCase):
# Tests the Nodepool interface class using a *real* nodepool and
# fake scheduler.
def setUp(self):
super(TestNodepoolIntegration, self).setUp()
self.statsd = None
self.zk = zuul.zk.ZooKeeper()
self.addCleanup(self.zk.disconnect)
self.zk.connect('localhost:2181')
self.hostname = socket.gethostname()
self.provisioned_requests = []
# This class implements the scheduler methods zuul.nodepool
# needs, so we pass 'self' as the scheduler.
self.nodepool = zuul.nodepool.Nodepool(self)
def waitForRequests(self):
# Wait until all requests are complete.
while self.nodepool.requests:
time.sleep(0.1)
def onNodesProvisioned(self, request):
# This is a scheduler method that the nodepool class calls
# back when a request is provisioned.
self.provisioned_requests.append(request)
def test_node_request(self):
# Test a simple node request
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'fake-label'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FULFILLED)
# Accept the nodes
self.nodepool.acceptNodes(request, request.id)
nodeset = request.nodeset
for node in nodeset.getNodes():
self.assertIsNotNone(node.lock)
self.assertEqual(node.state, model.STATE_READY)
# Mark the nodes in use
self.nodepool.useNodeSet(nodeset)
for node in nodeset.getNodes():
self.assertEqual(node.state, model.STATE_IN_USE)
# Return the nodes
self.nodepool.returnNodeSet(nodeset)
for node in nodeset.getNodes():
self.assertIsNone(node.lock)
self.assertEqual(node.state, model.STATE_USED)
def test_invalid_node_request(self):
# Test requests with an invalid node type fail
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'invalid-label'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FAILED)
@skip("Disabled until nodepool is ready")
def test_node_request_disconnect(self):
# Test that node requests are re-submitted after disconnect
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job, 0)
self.zk.client.stop()
self.zk.client.start()
self.fake_nodepool.paused = False
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FULFILLED)
@skip("Disabled until nodepool is ready")
def test_node_request_canceled(self):
# Test that node requests can be canceled
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job, 0)
self.nodepool.cancelRequest(request)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 0)