Permalink
Browse files

Simplified governance in the agent framework

  • Loading branch information...
1 parent 6a9e419 commit 664925f497c58f0fb8e35bf52245739d336e4fb4 @shenrie shenrie committed Oct 12, 2012
Showing with 69 additions and 56 deletions.
  1. +1 −1 extern/ion-definitions
  2. +6 −11 pyon/agent/agent.py
  3. +6 −14 pyon/agent/simple_agent.py
  4. +29 −16 pyon/container/procs.py
  5. +27 −14 pyon/core/governance/governance_controller.py
View
@@ -177,23 +177,18 @@ def _on_quit(self):
# Governance interfaces and helpers
##############################################################
- def _is_governance_enabled(self):
- return self.container.governance_controller.enabled and self.CFG.get_safe("system.load_policy", False)
def _get_resource_commitments(self, user_id):
- if not self._is_governance_enabled():
+ if not self.container.governance_controller.enabled:
return None
- log.debug("Finding commitments for user_id: " + user_id)
-
- rr_client = ResourceRegistryServiceProcessClient(node=self.container.node, process=self)
- commitments,_ = rr_client.find_objects(self.resource_id, PRED.hasCommitment, RT.Commitment)
- for com in commitments:
- if com.consumer == user_id and com.lcstate != LCS.RETIRED:
- return com
+ try:
+ return self.container.governance_controller.get_resource_commitment(user_id, self.resource_id)
+ except Exception, e:
+ log.error(e.message)
+ return None
- return None
def negotiate(self, resource_id="", sap_in=None):
"""
@@ -57,24 +57,16 @@ def _on_quit(self):
# Governance interfaces and helpers
##############################################################
- def _is_governance_enabled(self):
- return self.container.governance_controller.enabled and self.CFG.get_safe("system.load_policy", False)
-
def _get_resource_commitments(self, user_id):
- if not self._is_governance_enabled():
+ if not self.container.governance_controller.enabled:
return None
- log.debug("Finding commitments for user_id: " + user_id)
-
- rr_client = ResourceRegistryServiceProcessClient(node=self.container.node, process=self)
- commitments,_ = rr_client.find_objects(self.resource_id, PRED.hasCommitment, RT.Commitment)
- for com in commitments:
- if com.consumer == user_id and com.lcstate != LCS.RETIRED:
- return com
-
- return None
-
+ try:
+ return self.container.governance_controller.get_resource_commitment(user_id, self.resource_id)
+ except Exception, e:
+ log.error(e.message)
+ return None
def negotiate(self, resource_id="", sap_in=None):
@@ -178,6 +178,9 @@ def spawn_process(self, name=None, module=None, cls=None, config=None, process_i
# terminate process also triggers TERMINATING/TERMINATED
self._call_proc_state_changed(process_instance, ProcessStateEnum.EXITED)
+ else:
+ #Shouldn't be any policies for immediate processes
+ self.update_container_policies(process_instance)
return process_instance.id
@@ -190,6 +193,31 @@ def spawn_process(self, name=None, module=None, cls=None, config=None, process_i
raise
+ #This must be called after registering the process
+ def update_container_policies(self, process_instance):
+
+ if not self.container.governance_controller:
+ return
+
+ if process_instance._proc_type == "service":
+
+ # look to load any existing policies for this service
+ self.container.governance_controller.safe_update_service_access_policy(process_instance._proc_listen_name)
+
+ if process_instance._proc_type == "agent":
+
+ # look to load any existing policies for this agent service
+ if process_instance.resource_type is None:
+ self.container.governance_controller.safe_update_service_access_policy(process_instance.name)
+ else:
+ self.container.governance_controller.safe_update_service_access_policy(process_instance.resource_type)
+
+ if process_instance.resource_id:
+ # look to load any existing policies for this resource
+ self.container.governance_controller.safe_update_resource_access_policy(process_instance.resource_id)
+
+
+
def list_local_processes(self, process_type=''):
"""
Returns a list of the running ION processes in the container or filtered by the process_type
@@ -363,10 +391,6 @@ def _spawn_service_process(self, process_id, name, module, cls, config):
proc.start_listeners()
- # look to load any existing policies for this service
- if self.container.governance_controller:
- self.container.governance_controller.safe_update_service_access_policy(process_instance._proc_listen_name)
-
return process_instance
# -----------------------------------------------------------------
@@ -476,18 +500,7 @@ def agent_cleanup(x):
proc.start_listeners()
- # look to load any existing policies for this agent service
- if self.container.governance_controller:
- if process_instance.resource_type is None:
- self.container.governance_controller.safe_update_service_access_policy(process_instance.name)
- else:
- self.container.governance_controller.safe_update_service_access_policy(process_instance.resource_type)
-
- if process_instance.resource_id:
- # look to load any existing policies for this resource
- if self.container.governance_controller:
- self.container.governance_controller.safe_update_resource_access_policy(process_instance.resource_id)
- else:
+ if not process_instance.resource_id:
log.warn("Agent process id=%s does not define resource_id!!" % process_instance.id)
return process_instance
@@ -8,8 +8,7 @@
from pyon.core.bootstrap import CFG
from pyon.core.governance.governance_dispatcher import GovernanceDispatcher
from pyon.util.log import log
-from pyon.core.governance.policy.policy_decision import COMMON_SERVICE_POLICY_RULES
-from pyon.ion.resource import RT, PRED
+from pyon.ion.resource import RT, PRED, LCS
from pyon.core.governance.policy.policy_decision import PolicyDecisionPointManager
from pyon.event.event import EventSubscriber
from interface.services.coi.ipolicy_management_service import PolicyManagementServiceProcessClient
@@ -149,7 +148,6 @@ def process_message(self,invocation,interceptor_list, method):
#Helper methods
-
#Iterate the Org(s) that the user belongs to and create a header that lists only the role names per Org assigned
#to the user; i.e. {'ION': ['Member', 'Operator'], 'Org2': ['Member']}
def get_role_message_headers(self, org_roles):
@@ -194,6 +192,18 @@ def get_system_actor_header(self, system_actor=None):
return actor_header
+ #Returns the list of commitments for the specified user and resource
+ def get_resource_commitment(self, user_id, resource_id):
+
+ log.debug("Finding commitments for user_id: %s and resource_id: %s" % (user_id, resource_id))
+
+ commitments,_ = self.rr_client.find_objects(resource_id, PRED.hasCommitment, RT.Commitment)
+ for com in commitments:
+ if com.consumer == user_id and com.lcstate != LCS.RETIRED:
+ return com
+
+ return None
+
#Manage all of the policies in the container
def resource_policy_event_callback(self, *args, **kwargs):
@@ -273,19 +283,22 @@ def update_service_access_policy(self, service_name, service_op=''):
log.error("The service %s is not found or there was an error applying access policy: %s" % ( service_name, e.message))
#Next update any precondition policies
- if service_op:
- try:
- proc = self.container.proc_manager.get_a_local_process(service_name)
- if proc is not None:
- preconditions = self.policy_client.get_active_process_operation_preconditions(service_name, service_op, self._container_org_name)
+ try:
+ proc = self.container.proc_manager.get_a_local_process(service_name)
+ if proc is not None:
+ if service_op: #handles the delete policy case
self.unregister_all_process_operation_precondition(proc,service_op)
- if preconditions:
- for pre in preconditions:
- self.register_process_operation_precondition(proc,service_op, pre )
+ op_preconditions = self.policy_client.get_active_process_operation_preconditions(service_name, service_op, self._container_org_name)
+ if op_preconditions:
+ for op in op_preconditions:
+ self.unregister_all_process_operation_precondition(proc,op.op)
+ for pre in op.preconditions:
+ self.register_process_operation_precondition(proc, op.op, pre )
+
+ except Exception, e:
+ #If the resource does not exist, just ignore it - but log a warning.
+ log.error("The process %s is not found for op %s or there was an error applying access policy: %s" % ( service_name, service_op, e.message))
- except Exception, e:
- #If the resource does not exist, just ignore it - but log a warning.
- log.error("The process %s is not found for op %s or there was an error applying access policy: %s" % ( service_name, service_op, e.message))
#TODO - Might need to change this once the HA Agent is available
def _is_policy_management_service_available(self):

0 comments on commit 664925f

Please sign in to comment.