-
Notifications
You must be signed in to change notification settings - Fork 17
/
test_data_product_management.py
67 lines (45 loc) · 1.8 KB
/
test_data_product_management.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python
"""
@file ion/services/sa/test/test_data_product_management.py
@test ion.services.sa.data_product_management
@author
"""
import ion.util.ionlog
log = ion.util.ionlog.getLogger(__name__)
from twisted.internet import defer
from ion.core.process.process import Process
from ion.services.sa.data_product_management.data_product_management import DataProductManagementServiceClient
from ion.test.iontest import IonTestCase
class DataProductManagementTest(IonTestCase):
"""
Testing data product management service
"""
@defer.inlineCallbacks
def setUp(self):
yield self._start_container()
services = [
{
'name':'dataprodmgmt',
'module':'ion.services.sa.data_product_management.data_product_management',
'class':'DataProductManagementServiceClient'
}
]
log.debug('AppIntegrationTest.setUp(): spawning processes')
sup = yield self._spawn_processes(services)
log.debug('AppIntegrationTest.setUp(): spawned processes')
self.sup = sup
self.dpmc = DataProductManagementServiceClient(proc=sup)
self._proc = Process()
@defer.inlineCallbacks
def tearDown(self):
yield self._shutdown_processes()
yield self._stop_container()
@defer.inlineCallbacks
def test_define_data_product(self):
"""
Accepts a dictionary containing metadata about a data product.
Updates are made to the registries.
"""
log.info("test_define_data_product Now testing: Create sample data product")
result = yield self.dpmc.define_data_product(owner='Instrument1Owner', source='Instrument1', description='SeaBird')
log.info("define_data_product Finished testing: Create sample data product")