Skip to content

Commit

Permalink
Merge pull request #444 from lukecampbell/oob_retrieval
Browse files Browse the repository at this point in the history
Adds out of band retrieval and bugfix for replay.
  • Loading branch information
Christopher Mueller committed Jan 9, 2013
2 parents cef1320 + 22030de commit 0a837ca
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
3 changes: 2 additions & 1 deletion ion/processes/data/replay/replay_process.py
Expand Up @@ -171,11 +171,12 @@ def execute_retrieve(self):
rdt = RecordDictionaryTool(param_dictionary=coverage.parameter_dictionary)
else:
rdt = self._coverage_to_granule(coverage,self.start_time, self.end_time, self.stride_time, self.parameters,tdoa=self.tdoa)
coverage.close(timeout=5)
except Exception as e:
import traceback
traceback.print_exc(e)
raise BadRequest('Problems reading from the coverage')
finally:
coverage.close(timeout=5)
return rdt.to_granule()


Expand Down
34 changes: 29 additions & 5 deletions ion/services/dm/inventory/data_retriever_service.py
Expand Up @@ -4,15 +4,19 @@
@description Data Retriever Service
'''

from interface.services.dm.idata_retriever_service import BaseDataRetrieverService
from interface.objects import Replay
from pyon.core.exception import BadRequest
from ion.core.function.transform_function import TransformFunction
from pyon.util.arg_check import validate_is_instance, validate_true
from ion.processes.data.replay.replay_process import ReplayProcess
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService
from ion.services.dm.utility.granule import RecordDictionaryTool

from pyon.core.exception import BadRequest
from pyon.public import PRED, RT
from pyon.util.arg_check import validate_is_instance, validate_true
from pyon.util.containers import for_name
from pyon.util.log import log

from interface.objects import Replay
from interface.services.dm.idata_retriever_service import BaseDataRetrieverService

class DataRetrieverService(BaseDataRetrieverService):
REPLAY_PROCESS = 'replay_process'
Expand Down Expand Up @@ -97,11 +101,31 @@ def cancel_replay_agent(self, replay_id=''):

self.clients.resource_registry.delete(replay_id)

@classmethod
def retrieve_oob(cls, dataset_id='', query=None, delivery_format=None):
query = query or {}

try:
coverage = DatasetManagementService._get_coverage(dataset_id, mode='r')
if coverage.num_timesteps == 0:
log.info('Reading from an empty coverage')
rdt = RecordDictionaryTool(param_dictionary=coverage.parameter_dictionary)
else:
rdt = ReplayProcess._coverage_to_granule(coverage, query.get('star_time', None), query.get('end_time',None), query.get('stride_time',None), query.get('parameters',None), delivery_format, query.get('tdoa',None))
except Exception as e:
import traceback
traceback.print_exc(e)
raise BadRequest('Problems reading from the coverage')
finally:
coverage.close(timeout=5)
return rdt.to_granule()


def retrieve(self, dataset_id='', query=None, delivery_format=None, module='', cls='', kwargs=None):
'''
Retrieves a dataset.
@param dataset_id Dataset identifier
@param query Query parameters (start_time, end_time, stride_time, parameters)
@param query Query parameters (start_time, end_time, stride_time, parameters, tdoa)
@param delivery_format The stream definition identifier for the outgoing granule (stream_defintinition_id)
@param module Module to chain a transform into
@param cls Class of the transform
Expand Down
16 changes: 16 additions & 0 deletions ion/services/dm/test/test_dm_end_2_end.py
Expand Up @@ -15,6 +15,7 @@
from ion.processes.data.replay.replay_client import ReplayClient
from ion.services.dm.ingestion.test.ingestion_management_test import IngestionManagementIntTest
from ion.services.dm.inventory.dataset_management_service import DatasetManagementService
from ion.services.dm.inventory.data_retriever_service import DataRetrieverService
from ion.services.dm.utility.granule_utils import RecordDictionaryTool, CoverageCraft, time_series_domain

from coverage_model.parameter import ParameterContext
Expand Down Expand Up @@ -523,3 +524,18 @@ def test_repersist_data(self):

self.assertTrue(success)


def test_out_of_band_retrieve(self):
# Setup the environemnt
stream_id, route, stream_def_id, dataset_id = self.make_simple_dataset()
self.start_ingestion(stream_id, dataset_id)

# Fill the dataset
self.publish_fake_data(stream_id, route)
self.wait_until_we_have_enough_granules(dataset_id,40)

# Retrieve the data
granule = DataRetrieverService.retrieve_oob(dataset_id)
rdt = RecordDictionaryTool.load_from_granule(granule)
self.assertTrue((rdt['time'] == np.arange(40)).all())

0 comments on commit 0a837ca

Please sign in to comment.