Skip to content

Commit

Permalink
update to use the new ingestion signiture
Browse files Browse the repository at this point in the history
  • Loading branch information
MauriceManning committed Jul 3, 2012
1 parent 1671a96 commit 668b993
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
20 changes: 10 additions & 10 deletions ion/services/sa/product/data_product_management_service.py
Expand Up @@ -158,7 +158,7 @@ def activate_data_product_persistence(self, data_product_id='', persist_data=Tru
# retrieve the data_process object
data_product_obj = self.data_product.read_one(data_product_id)

# get the Stream associated with this data set; if no stream then create one, if multiple streams then Throw
# get the Stream associated with this data product; if no stream then create one, if multiple streams then Throw
streams = self.data_product.find_stemming_stream(data_product_id)
if not streams:
raise BadRequest('Data Product %s must have one stream associated' % str(data_product_id))
Expand All @@ -173,7 +173,7 @@ def activate_data_product_persistence(self, data_product_id='', persist_data=Tru


self.exchange_point = 'science_data'
self.exchange_space = 'science_ingestion'
self.exchange_space = 'science_granule_ingestion'
ingest_queue = IngestionQueue(name=self.exchange_space, type='science_granule')
ingestion_configuration_id = self.clients.ingestion_management.create_ingestion_configuration(name='standard_ingest', exchange_point_id=self.exchange_point, queues=[ingest_queue])

Expand Down Expand Up @@ -246,18 +246,18 @@ def suspend_data_product_persistence(self, data_product_id=''):
if data_product_obj.dataset_configuration_id is None:
raise NotFound("Data Product %s dataset configuration does not exist" % data_product_id)

# get the Stream associated with this data product; if no stream then create one, if multiple streams then Throw
streams = self.data_product.find_stemming_stream(data_product_id)
if not streams:
raise BadRequest('Data Product %s must have one stream associated' % str(data_product_id))

#retrieve the dataset configuation object so that attrs can be changed
dataset_configuration_obj = self.clients.resource_registry.read(data_product_obj.dataset_configuration_id)
if dataset_configuration_obj is None:
raise NotFound("Dataset Configuration %s does not exist" % data_product_obj.dataset_configuration_id)
#todo: what if there are multiple streams?
stream_id = streams[0]
log.debug("activate_data_product_persistence: stream = %s" % str(stream_id))

# #Set the dataset config archive data/metadata attrs to false
# dataset_configuration_obj.configuration.archive_data = False
# dataset_configuration_obj.configuration.archive_metadata = False

# todo: dataset_configuration_obj contains the ingest config for now...
ret = self.clients.ingestion_management.unpersist_data_stream(dataset_configuration_obj)
ret = self.clients.ingestion_management.unpersist_data_stream(stream_id=stream_id, ingestion_configuration_id=data_product_obj.dataset_configuration_id)

log.debug("suspend_data_product_persistence: deactivate = %s" % str(ret))

Expand Down
Expand Up @@ -28,7 +28,7 @@ class FakeProcess(LocalContextMixin):



@attr('INT', group='sa')
@attr('INT', group='ingest')
#@unittest.skip('not working')
class TestDataProductManagementServiceIntegration(IonIntegrationTestCase):

Expand All @@ -49,6 +49,7 @@ def setUp(self):
self.ingestclient = IngestionManagementServiceClient(node=self.container.node)
self.process_dispatcher = ProcessDispatcherServiceClient()

@unittest.skip('OBE')
def test_get_last_update(self):
from ion.processes.data.last_update_cache import CACHE_DATASTORE_NAME

Expand Down Expand Up @@ -95,7 +96,7 @@ def test_createDataProduct(self):


# First launch the ingestors
self.exchange_space = 'science_ingestion'
self.exchange_space = 'science_granule_ingestion'
self.exchange_point = 'science_data'
config = DotDict()
config.process.datastore_name = 'datasets'
Expand Down Expand Up @@ -154,10 +155,10 @@ def test_createDataProduct(self):


# test suspend data product persistence
# try:
# client.suspend_data_product_persistence(dp_id2)
# except BadRequest as ex:
# self.fail("failed to suspend deactivate data product persistence : %s" %ex)
try:
client.suspend_data_product_persistence(dp_id2)
except BadRequest as ex:
self.fail("failed to suspend deactivate data product persistence : %s" %ex)

pid = self.container.spawn_process(name='dummy_process_for_test',
module='pyon.ion.process',
Expand Down

0 comments on commit 668b993

Please sign in to comment.