-
Notifications
You must be signed in to change notification settings - Fork 46
/
test_exchange_management_service.py
341 lines (247 loc) · 14.7 KB
/
test_exchange_management_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/env python
from pyon.util.log import log
__author__ = 'Stephen P. Henrie, Dave Foster <dfoster@asascience.com>'
__license__ = 'Apache 2.0'
from pyon.core import exception
from pyon.ion import exchange
from interface.objects import ExchangeSpace, ExchangePoint, ExchangeName
from mock import Mock, patch, sentinel
from pyon.util.unit_test import PyonTestCase
from pyon.util.int_test import IonIntegrationTestCase
from nose.plugins.attrib import attr
from pyon.net.transport import BaseTransport
from pyon.util.containers import DotDict
import unittest
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
from interface.services.coi.iexchange_management_service import ExchangeManagementServiceClient
from pyon.core.exception import BadRequest, Conflict, Inconsistent, NotFound
from pyon.public import PRED, RT, CFG
from ion.services.coi.exchange_management_service import ExchangeManagementService
import os
@attr('UNIT', group='coi')
class TestExchangeManagementService(PyonTestCase):
def setUp(self):
self.container = Mock()
mock_clients = self._create_service_mock('exchange_management')
self.exchange_management_service = ExchangeManagementService()
self.exchange_management_service.clients = mock_clients
self.exchange_management_service.container = self.container
# Rename to save some typing
self.mock_create = self.container.resource_registry.create
self.mock_read = self.container.resource_registry.read
self.mock_update = self.container.resource_registry.update
self.mock_delete = self.container.resource_registry.delete
self.mock_create_association = self.container.resource_registry.create_association
self.mock_delete_association = self.container.resource_registry.delete_association
self.mock_find_objects = self.container.resource_registry.find_objects
self.mock_find_resources = self.container.resource_registry.find_resources
self.mock_find_subjects = self.container.resource_registry.find_subjects
# Exchange Space
self.exchange_space = Mock()
self.exchange_space.name = "Foo"
# fixup for direct RR access
self.container.resource_registry.create.return_value = (sentinel.id, sentinel.rev)
self.container.resource_registry.find_subjects.return_value = (sentinel.id, [])
def test_create_exchange_space(self):
self.mock_create.return_value = ['111', 1]
#TODO - Need to mock an org to pass in an valid Org_id
exchange_space_id = self.exchange_management_service.create_exchange_space(self.exchange_space, "1233")
assert exchange_space_id == '111'
self.mock_create.assert_called_once_with(self.exchange_space)
def test_xs_create_bad_params(self):
self.assertRaises(exception.BadRequest, self.exchange_management_service.create_exchange_space)
self.assertRaises(exception.BadRequest, self.exchange_management_service.create_exchange_space, exchange_space=sentinel.ex_space)
self.assertRaises(exception.BadRequest, self.exchange_management_service.create_exchange_space, org_id=sentinel.org_id)
def test_read_and_update_exchange_space(self):
self.mock_read.return_value = self.exchange_space
exchange_space = self.exchange_management_service.read_exchange_space('111')
assert exchange_space is self.mock_read.return_value
self.mock_read.assert_called_once_with('111')
exchange_space.name = 'Bar'
self.mock_update.return_value = ['111', 2]
self.exchange_management_service.update_exchange_space(exchange_space)
self.mock_update.assert_called_once_with(exchange_space)
def test_delete_exchange_space(self):
self.mock_read.return_value = self.exchange_space
self.mock_find_subjects.return_value = (None, [])
self.mock_find_objects.return_value = (None, [])
self.exchange_management_service.delete_exchange_space('111')
self.mock_read.assert_called_once_with('111')
self.mock_delete.assert_called_once_with('111')
def test_read_exchange_space_not_found(self):
self.mock_read.return_value = None
# TEST: Execute the service operation call
with self.assertRaises(NotFound) as cm:
self.exchange_management_service.read_exchange_space('bad')
ex = cm.exception
self.assertEqual(ex.message, 'Exchange Space bad does not exist')
self.mock_read.assert_called_once_with('bad')
def test_delete_exchange_space_not_found(self):
self.mock_read.return_value = None
# TEST: Execute the service operation call
with self.assertRaises(NotFound) as cm:
self.exchange_management_service.delete_exchange_space('bad')
ex = cm.exception
self.assertEqual(ex.message, 'Exchange Space bad does not exist')
self.mock_read.assert_called_once_with('bad')
@attr('INT', group='coi')
@patch.dict('pyon.ion.exchange.CFG', container=DotDict(CFG.container, exchange=DotDict(auto_register=True)))
class TestExchangeManagementServiceInt(IonIntegrationTestCase):
def setUp(self):
self._start_container()
self.container.start_rel_from_url('res/deploy/r2coi.yml')
self.ems = ExchangeManagementServiceClient()
self.rr = ResourceRegistryServiceClient()
orglist, _ = self.rr.find_resources(RT.Org)
if not len(orglist) == 1:
raise StandardError("Unexpected number of orgs found")
self.org_id = orglist[0]._id
# we test actual exchange interaction in pyon, so it's fine to mock the broker interaction here
self.container.ex_manager.create_xs = Mock()
self.container.ex_manager.delete_xs = Mock()
self.container.ex_manager.create_xp = Mock()
self.container.ex_manager.delete_xp = Mock()
@attr('LOCOINT')
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False),'Test reaches into container, doesn\'t work with CEI')
def test_xs_create_delete(self):
exchange_space = ExchangeSpace(name="bobo")
esid = self.ems.create_exchange_space(exchange_space, self.org_id)
# should be able to pull from RR an exchange space
es2 = self.rr.read(esid)
self.assertEquals(exchange_space.name, es2.name)
# should have an exchange declared on the broker
self.container.ex_manager.create_xs.assert_called_once_with('bobo', use_ems=False)
# should have an assoc to an org
orglist, _ = self.rr.find_subjects(RT.Org, PRED.hasExchangeSpace, esid, id_only=True)
self.assertEquals(len(orglist), 1)
self.assertEquals(orglist[0], self.org_id)
self.ems.delete_exchange_space(esid)
# should no longer have that id in the RR
self.assertRaises(NotFound, self.rr.read, esid)
# should no longer have an assoc to an org
orglist2, _ = self.rr.find_subjects(RT.Org, PRED.hasExchangeSpace, esid, id_only=True)
self.assertEquals(len(orglist2), 0)
# should no longer have that exchange declared
self.assertEquals(self.container.ex_manager.delete_xs.call_count, 1)
self.assertIn('bobo', self.container.ex_manager.delete_xs.call_args[0][0].exchange) # prefixed with sysname
def test_xs_delete_without_create(self):
self.assertRaises(NotFound, self.ems.delete_exchange_space, '123')
@attr('LOCOINT')
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False),'Test reaches into container, doesn\'t work with CEI')
def test_xp_create_delete(self):
# xp needs an xs first
exchange_space = ExchangeSpace(name="doink")
esid = self.ems.create_exchange_space(exchange_space, self.org_id)
exchange_point = ExchangePoint(name="hammer")
epid = self.ems.create_exchange_point(exchange_point, esid)
# should be in RR
ep2 = self.rr.read(epid)
self.assertEquals(exchange_point.name, ep2.name)
# should be associated to the XS as well
xslist, _ = self.rr.find_subjects(RT.ExchangeSpace, PRED.hasExchangePoint, epid, id_only=True)
self.assertEquals(len(xslist), 1)
self.assertEquals(xslist[0], esid)
# should exist on broker (both xp and xs)
self.container.ex_manager.create_xs.assert_called_once_with('doink', use_ems=False)
self.assertIn('hammer', self.container.ex_manager.create_xp.call_args[0])
self.ems.delete_exchange_point(epid)
self.ems.delete_exchange_space(esid)
# should no longer be in RR
self.assertRaises(NotFound, self.rr.read, epid)
# should no longer be associated
xslist2, _ = self.rr.find_subjects(RT.ExchangeSpace, PRED.hasExchangePoint, epid, id_only=True)
self.assertEquals(len(xslist2), 0)
# should no longer exist on broker (both xp and xs)
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False),'Test reaches into container, doesn\'t work with CEI')
def test_xp_create_then_delete_xs(self):
# xp needs an xs first
exchange_space = ExchangeSpace(name="doink")
esid = self.ems.create_exchange_space(exchange_space, self.org_id)
exchange_point = ExchangePoint(name="hammer")
epid = self.ems.create_exchange_point(exchange_point, esid)
# delete xs
self.ems.delete_exchange_space(esid)
# should no longer have an association
xslist2, _ = self.rr.find_subjects(RT.ExchangeSpace, PRED.hasExchangePoint, epid, id_only=True)
self.assertEquals(len(xslist2), 0)
# TEST ONLY: have to clean up the xp or we leave junk on the broker
# we have to do it manually because the xs is gone
#self.ems.delete_exchange_point(epid)
# @TODO: reaching into ex manager for transport is clunky
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
xp = exchange.ExchangePoint(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_point.name, xs, 'ttree')
self.container.ex_manager.delete_xp(xp, use_ems=False)
def test_xs_create_update(self):
raise unittest.SkipTest("Test not implemented yet")
def test_xn_declare_and_undeclare(self):
# xn needs an xs first
exchange_space = ExchangeSpace(name="bozo")
esid = self.ems.create_exchange_space(exchange_space, self.org_id)
exchange_name = ExchangeName(name='shoes', xn_type="XN_PROCESS")
enid = self.ems.declare_exchange_name(exchange_name, esid)
# should be in RR
en2 = self.rr.read(enid)
self.assertEquals(exchange_name.name, en2.name)
# should have an assoc from XN to XS
xnlist, _ = self.rr.find_subjects(RT.ExchangeSpace, PRED.hasExchangeName, enid, id_only=True)
self.assertEquals(len(xnlist), 1)
self.assertEquals(xnlist[0], esid)
# container API got called, will have declared a queue
self.ems.undeclare_exchange_name(enid) # canonical name = xn id in current impl
def test_xn_declare_no_xs(self):
exchange_name = ExchangeName(name="shoez", xn_type='XN_PROCESS')
self.assertRaises(NotFound, self.ems.declare_exchange_name, exchange_name, '11')
def test_xn_undeclare_without_declare(self):
self.assertRaises(NotFound, self.ems.undeclare_exchange_name, 'some_non_id')
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False),'Test reaches into container, doesn\'t work with CEI')
def test_xn_declare_then_delete_xs(self):
# xn needs an xs first
exchange_space = ExchangeSpace(name="bozo")
esid = self.ems.create_exchange_space(exchange_space, self.org_id)
exchange_name = ExchangeName(name='shnoz', xn_type="XN_PROCESS")
enid = self.ems.declare_exchange_name(exchange_name, esid)
# delete the XS
self.ems.delete_exchange_space(esid)
# no longer should have assoc from XS to XN
xnlist, _ = self.rr.find_subjects(RT.ExchangeSpace, PRED.hasExchangeName, enid, id_only=True)
self.assertEquals(len(xnlist), 0)
# cleanup: delete the XN (assoc already removed, so we reach into the implementation here)
# @TODO: reaching into ex manager for transport is clunky
self.rr.delete(enid)
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
xn = exchange.ExchangeName(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_name.name, xs)
self.container.ex_manager.delete_xn(xn, use_ems=False)
@attr('INT', group='coi')
class TestContainerExchangeToEms(IonIntegrationTestCase):
# these tests should auto contact the EMS to do the work
def setUp(self):
self._start_container()
self.container.start_rel_from_url('res/deploy/r2coi.yml')
self.ems = ExchangeManagementServiceClient()
self.rr = ResourceRegistryServiceClient()
# we want the ex manager to do its thing, but without actual calls to broker
# just mock out the transport
self.container.ex_manager._priviledged_transport = Mock(BaseTransport)
@attr('LOCOINT')
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False),'Test reaches into container, doesn\'t work with CEI')
def test_create_xs_talks_to_ems(self):
self.patch_cfg('pyon.ion.exchange.CFG', container=DotDict(CFG.container, exchange=DotDict(auto_register=True)))
xs = self.container.ex_manager.create_xs('house')
self.addCleanup(xs.delete)
# should have called EMS and set RR items
res, _ = self.rr.find_resources(RT.ExchangeSpace, name='house')
self.assertEquals(res[0].name, 'house')
# should have tried to call broker as well
self.assertEquals(self.container.ex_manager._priviledged_transport.declare_exchange_impl.call_count, 1)
self.assertIn('house', self.container.ex_manager._priviledged_transport.declare_exchange_impl.call_args[0][0])
@patch.dict('pyon.ion.exchange.CFG', container=DotDict(CFG.container, exchange=DotDict(auto_register=False)))
@unittest.skipIf(os.getenv('CEI_LAUNCH_TEST', False),'Test reaches into container, doesn\'t work with CEI')
def test_create_xs_with_no_flag_only_uses_ex_manager(self):
self.patch_cfg('pyon.ion.exchange.CFG', container=DotDict(CFG.container, exchange=DotDict(auto_register=False)))
xs = self.container.ex_manager.create_xs('house')
self.addCleanup(xs.delete)
e1,e2 = self.rr.find_resources(RT.ExchangeSpace, name='house')
self.assertEquals(e1, [])
self.assertEquals(e2, [])
self.assertEquals(self.container.ex_manager._priviledged_transport.declare_exchange_impl.call_count, 1)
self.assertIn('house', self.container.ex_manager._priviledged_transport.declare_exchange_impl.call_args[0][0])