Skip to content

Commit

Permalink
Merge branch 'master' into agent_state_computed_attr
Browse files Browse the repository at this point in the history
  • Loading branch information
swarbhanu committed Feb 18, 2013
2 parents 30b8633 + 687b25c commit 082a869
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 70 deletions.
3 changes: 1 addition & 2 deletions ion/agents/data/test/test_external_dataset_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,7 @@ def _setup_resources(self):
spatial_domain=sdom)

dproduct_id = dpms_cli.create_data_product(data_product=dprod,
stream_definition_id=streamdef_id,
parameter_dictionary=pdict_id)
stream_definition_id=streamdef_id)

dams_cli.assign_data_product(input_resource_id=ds_id, data_product_id=dproduct_id) # , create_stream=True)

Expand Down
3 changes: 1 addition & 2 deletions ion/agents/data/test/test_external_dataset_agent_netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ def _setup_resources(self):

# Generate the data product and associate it to the ExternalDataset
dproduct_id = dpms_cli.create_data_product(data_product=dprod,
stream_definition_id=streamdef_id,
parameter_dictionary=pdict_id)
stream_definition_id=streamdef_id)

dams_cli.assign_data_product(input_resource_id=ds_id, data_product_id=dproduct_id)

Expand Down
3 changes: 1 addition & 2 deletions ion/agents/data/test/test_external_dataset_agent_ruv.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ def _setup_resources(self):

# Generate the data product and associate it to the ExternalDataset
dproduct_id = dpms_cli.create_data_product(data_product=dprod,
stream_definition_id=streamdef_id,
parameter_dictionary=pdict_id)
stream_definition_id=streamdef_id)

dams_cli.assign_data_product(input_resource_id=ds_id, data_product_id=dproduct_id)

Expand Down
3 changes: 1 addition & 2 deletions ion/agents/data/test/test_external_dataset_agent_slocum.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ def _setup_resources(self):
spatial_domain=sdom)

dproduct_id = dpms_cli.create_data_product(data_product=dprod,
stream_definition_id=streamdef_id,
parameter_dictionary=pdict_id)
stream_definition_id=streamdef_id)

dams_cli.assign_data_product(input_resource_id=ds_id, data_product_id=dproduct_id)

Expand Down
4 changes: 4 additions & 0 deletions ion/processes/bootstrap/plugins/bootstrap_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def on_initial_bootstrap(self, process, config, **kwargs):
sys_actor = ActorIdentity(name=actor_name, description="ION System Agent")
process.container.resource_registry.create(sys_actor)

webauth_actor_name = get_safe(config, "system.web_authentication_actor", "web_authentication")
web_auth_actor = ActorIdentity(name=webauth_actor_name, description="Web Authentication Actor")
process.container.resource_registry.create(web_auth_actor)

# Store all resource types

# Store all event types
Expand Down
27 changes: 15 additions & 12 deletions ion/processes/event/event_persister.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

"""Process that subscribes to ALL events and persists them efficiently into the events datastore"""
"""Process that subscribes to ALL events and persists them efficiently in bulk into the events datastore"""

from pyon.core import bootstrap
from pyon.event.event import EventSubscriber
Expand All @@ -11,22 +11,17 @@
from pyon.public import log


"""
TODO:
- How fast can this receive event messages?
"""


class EventPersister(StandaloneProcess):

def on_init(self):
# Time in between event persists
self.persist_interval = 1.0
self.persist_interval = float(self.CFG.get_safe("event_persist_interval", 1.0))

# Holds received events FIFO
# Holds received events FIFO in syncronized queue
self.event_queue = Queue()

# Temporarily holds list of events to persist while datastore operation not yet completed
# Temporarily holds list of events to persist while datastore operation are not yet completed
# This is where events to persist will remain if datastore operation fails occasionally.
self.events_to_persist = None

# bookkeeping for timeout greenlet
Expand All @@ -39,7 +34,7 @@ def on_init(self):
def on_start(self):
# Persister thread
self._persist_greenlet = spawn(self._trigger_func, self.persist_interval)
log.debug('Publisher Greenlet started in "%s"', self.__class__.__name__)
log.debug('EventPersister timer greenlet started in "%s" (interval %s)', self.__class__.__name__, self.persist_interval)

# Event subscription
self.event_sub = EventSubscriber(pattern=EventSubscriber.ALL_EVENTS,
Expand Down Expand Up @@ -67,12 +62,20 @@ def _trigger_func(self, persist_interval):
# Event.wait returns False on timeout (and True when set in on_quit), so we use this to both exit cleanly and do our timeout in a loop
while not self._terminate_persist.wait(timeout=persist_interval):
try:
if self.events_to_persist:
# There was an error last time and we need to retry
log.info("Retry persisting %s events" % len(self.events_to_persist))
self._persist_events(self.events_to_persist)
self.events_to_persist = None

self.events_to_persist = [self.event_queue.get() for x in xrange(self.event_queue.qsize())]

self._persist_events(self.events_to_persist)
self.events_to_persist = None
except Exception as ex:
log.exception("Failed to persist received events")
# Note: Persisting events may fail occasionally during test runs (when the "events" datastore is force
# deleted and recreated). We'll log and keep retrying forever.
log.exception("Failed to persist %s received events. Will retry next cycle" % len(self.events_to_persist))
return False

def _persist_events(self, event_list):
Expand Down
10 changes: 5 additions & 5 deletions ion/services/coi/test/test_governance.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def test_org_boundary(self):

#First try to get a list of Users by hitting the RR anonymously - should be allowed.
users,_ = self.rr_client.find_resources(restype=RT.ActorIdentity)
self.assertEqual(len(users),2) #Should include the ION System Actor and non-user actor from setup as well.
self.assertEqual(len(users),3) #Should include the ION System Actor, Web auth actor and non-user actor from setup as well.

log.debug('Begin testing with policies')

Expand All @@ -571,7 +571,7 @@ def test_org_boundary(self):

#First try to get a list of Users by hitting the RR anonymously - should be allowed.
users,_ = self.rr_client.find_resources(restype=RT.ActorIdentity)
self.assertEqual(len(users),3) #Should include the ION System Actor and non-user actor from setup as well.
self.assertEqual(len(users),4) #Should include the ION System Actor and non-user actor from setup as well.

#Now enroll the user as a member of the Second Org
self.org_client.enroll_member(org2_id,actor_id, headers=self.system_actor_header)
Expand All @@ -596,7 +596,7 @@ def test_org_boundary(self):

#Now try to hit the RR with a real user and should noe bw allowed
users,_ = self.rr_client.find_resources(restype=RT.ActorIdentity, headers=actor_header)
self.assertEqual(len(users),3) #Should include the ION System Actor and non-user actor from setup as well.
self.assertEqual(len(users),4) #Should include the ION System Actor and non-user actor from setup as well.

#TODO - figure out how to right a XACML rule to be a member of the specific Org as well

Expand Down Expand Up @@ -665,7 +665,7 @@ def test_org_enroll_negotiation(self):
self.assertIn( 'org_management(find_enrolled_users) has been denied',cm.exception.message)

users = self.org_client.find_enrolled_users(self.ion_org._id, headers=self.system_actor_header)
self.assertEqual(len(users),3) # WIll include the ION system actor and the non user actor from setup
self.assertEqual(len(users),4) # WIll include the ION system actor and the non user actor from setup

#Create a second Org
with self.assertRaises(NotFound) as nf:
Expand Down Expand Up @@ -797,7 +797,7 @@ def test_org_role_negotiation(self):
actor_header = self.container.governance_controller.get_actor_header(actor_id)

users = self.org_client.find_enrolled_users(self.ion_org._id, headers=self.system_actor_header)
self.assertEqual(len(users),3) # WIll include the ION system actor and the non user actor from setup
self.assertEqual(len(users),4) # WIll include the ION system actor and the non user actor from setup

## test_org_roles and policies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ def test_resource_state_save_restore(self):
spatial_domain = sdom)

data_product_id2 = self.DP.create_data_product(data_product=dp_obj,
stream_definition_id=raw_stream_def_id,
parameter_dictionary=rpdict_id)
stream_definition_id=raw_stream_def_id)
log.debug( 'new dp_id = %s', str(data_product_id2))

self.DAMS.assign_data_product(input_resource_id=instDevice_id, data_product_id=data_product_id2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from interface.objects import ProcessStateEnum
from mock import patch
from ion.agents.port.port_agent_process import PortAgentProcessType, PortAgentType
from ion.services.cei.process_dispatcher_service import ProcessStateGate
import gevent
from sets import Set

Expand Down Expand Up @@ -372,24 +373,15 @@ def test_createDataProcessUsingSim(self):

ctd_l0_all_data_process_id = self.dataprocessclient.create_data_process(ctd_L0_all_dprocdef_id, [ctd_parsed_data_product], self.output_products)
data_process = self.rrclient.read(ctd_l0_all_data_process_id)
self.addCleanup(self.process_dispatcher.cancel_process, data_process.process_id)
process_id = data_process.process_id
self.addCleanup(self.process_dispatcher.cancel_process, process_id)

#-------------------------------
# Wait until the process launched in the create_data_process() method is actually running, before proceeding further in this test
#-------------------------------

self.event_repo = self.container.event_repository

def data_process_running(event_repo, process_id):
event_tuples = event_repo.find_events(origin=process_id, event_type='ProcessLifecycleEvent', origin_type= 'DispatchedProcess')
recent_events = [tuple[2] for tuple in event_tuples]
for evt in recent_events:
log.debug("Got an event with event_state: %s. While ProcessStateEnum.RUNNING would be: %s", evt.state, ProcessStateEnum.RUNNING)
if evt.state == ProcessStateEnum.RUNNING:
return True
return False

poll(data_process_running, self.event_repo, data_process.process_id)
gate = ProcessStateGate(self.process_dispatcher.read_process, process_id, ProcessStateEnum.RUNNING)
self.assertTrue(gate.await(30), "The data process (%s) did not spawn in 30 seconds" % process_id)

#-------------------------------
# Retrieve a list of all data process defintions in RR and validate that the DPD is listed
Expand All @@ -411,9 +403,6 @@ def data_process_running(event_repo, process_id):
subs = self.rrclient.read(input_subscription_id)
self.assertTrue(subs.activated)

process_obj = self.process_dispatcher.read_process(ctd_l0_all_data_process.process_id)
self.assertEquals(process_obj.process_state, ProcessStateEnum.RUNNING)

# todo: This has not yet been completed by CEI, will prbly surface thru a DPMS call
self.dataprocessclient.deactivate_data_process(ctd_l0_all_data_process_id)

Expand Down
59 changes: 35 additions & 24 deletions ion/services/sa/product/data_product_management_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def override_clients(self, new_clients):
self.data_product = DataProductImpl(self.clients)


def create_data_product(self, data_product=None, stream_definition_id='', parameter_dictionary=None, exchange_point=''):
def create_data_product(self, data_product=None, stream_definition_id='', exchange_point=''):
"""
@param data_product IonObject which defines the general data product resource
@param source_resource_id IonObject id which defines the source for the data
Expand All @@ -49,7 +49,7 @@ def create_data_product(self, data_product=None, stream_definition_id='', parame
# If the stream definition has a parameter dictionary, use that
validate_is_not_none(stream_definition_id, 'A stream definition id must be passed to register a data product')
stream_def_obj = self.clients.pubsub_management.read_stream_definition(stream_definition_id) # Validates and checks for param_dict
parameter_dictionary = stream_def_obj.parameter_dictionary or parameter_dictionary
parameter_dictionary = stream_def_obj.parameter_dictionary
validate_is_not_none(parameter_dictionary , 'A parameter dictionary must be passed to register a data product')
validate_is_not_none(data_product, 'A data product (ion object) must be passed to register a data product')
exchange_point = exchange_point or 'science_data'
Expand All @@ -76,15 +76,6 @@ def create_data_product(self, data_product=None, stream_definition_id='', parame
self.data_product.link_stream(data_product_id, stream_id)


# create a dataset...
data_set_id = self.clients.dataset_management.create_dataset( name= 'data_set_%s' % stream_id,
stream_id=stream_id,
parameter_dict=parameter_dictionary,
temporal_domain=data_product.temporal_domain,
spatial_domain=data_product.spatial_domain)

# link dataset with data product. This creates the association in the resource registry
self.data_product.link_data_set(data_product_id=data_product_id, data_set_id=data_set_id)

# Return the id of the new data product
return data_product_id
Expand Down Expand Up @@ -206,14 +197,30 @@ def activate_data_product_persistence(self, data_product_id=''):

validate_is_not_none(data_product_obj, "The data product id should correspond to a valid registered data product.")

#--------------------------------------------------------------------------------
# get the Stream associated with this data product; if no stream then create one, if multiple streams then Throw
#--------------------------------------------------------------------------------
streams = self.data_product.find_stemming_stream(data_product_id)
if not streams:
raise BadRequest('Data Product %s must have one stream associated' % str(data_product_id))
stream_ids, _ = self.clients.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, id_only=True)
if not stream_ids:
raise BadRequest('Specified DataProduct has no streams associated with it')
stream_id = stream_ids[0]

stream_defs, _ = self.clients.resource_registry.find_objects(subject=stream_id, predicate=PRED.hasStreamDefinition,id_only=True)
if not stream_defs:
raise BadRequest("Data Product stream is without a stream definition")
stream_def_id = stream_defs[0]

stream_def = self.clients.pubsub_management.read_stream_definition(stream_def_id) # additional read necessary to fill in the pdict

stream_id = streams[0]._id



dataset_id = self.clients.dataset_management.create_dataset( name= 'data_set_%s' % stream_id,
stream_id=stream_id,
parameter_dict=stream_def.parameter_dictionary,
temporal_domain=data_product_obj.temporal_domain,
spatial_domain=data_product_obj.spatial_domain)

# link dataset with data product. This creates the association in the resource registry
self.data_product.link_data_set(data_product_id=data_product_id, data_set_id=dataset_id)

log.debug("Activating data product persistence for stream_id: %s" % str(stream_id))


Expand All @@ -231,8 +238,6 @@ def activate_data_product_persistence(self, data_product_id=''):
#--------------------------------------------------------------------------------

# find datasets for the data product
dataset_id = self._get_dataset_id(data_product_id)
log.debug("Activating data product persistence for dataset_id: %s" % str(dataset_id))
dataset_id = self.clients.ingestion_management.persist_data_stream(stream_id=stream_id,
ingestion_configuration_id=ingestion_configuration_id,
dataset_id=dataset_id)
Expand Down Expand Up @@ -491,6 +496,8 @@ def get_last_update(self, data_product_id=''):
def _get_dataset_id(self, data_product_id=''):
# find datasets for the data product
dataset_ids, _ = self.clients.resource_registry.find_objects(data_product_id, PRED.hasDataset, RT.DataSet, id_only=True)
if not dataset_ids:
raise NotFound('No Dataset is associated with DataProduct %s' % data_product_id)
return dataset_ids[0]

def _get_stream_id(self, data_product_id=''):
Expand Down Expand Up @@ -688,11 +695,15 @@ def get_data_url(self, data_product_id=''):

erddap_host = CFG.get_safe('server.erddap.host','localhost')
errdap_port = CFG.get_safe('server.erddap.port','8080')
dataset_id = self._get_dataset_id(data_product_id)
ret.value = string.join( ["http://", erddap_host, ":", str(errdap_port),"/erddap/griddap/", str(dataset_id), "_0.html"],'')
try:
dataset_id = self._get_dataset_id(data_product_id)
ret.value = string.join( ["http://", erddap_host, ":", str(errdap_port),"/erddap/griddap/", str(dataset_id), "_0.html"],'')
ret.status = ComputedValueAvailability.PROVIDED
log.debug("get_data_url: data_url: %s", ret.value)
except NotFound:
ret.status = ComputedValueAvailability.NOTAVAILABLE
ret.reason = "Dataset for this Data Product could not be located"

ret.status = ComputedValueAvailability.PROVIDED
log.debug("get_data_url: data_url: %s", ret.value)
return ret

def get_provenance(self, data_product_id=''):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ def test_createDataProduct_and_DataProducer_success(self):

# test call
dp_id = self.data_product_management_service.create_data_product(data_product=dp_obj,
stream_definition_id='a stream def id',
parameter_dictionary=parameter_dictionary)
stream_definition_id='a stream def id')



Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
if sys.platform == 'darwin':
os.environ['C_INCLUDE_PATH'] = '/usr/local/include'

version = '2.0.8-dev'
version = '2.0.9-dev'

setup( name = 'coi-services',
version = version,
Expand Down

0 comments on commit 082a869

Please sign in to comment.