Skip to content

Commit

Permalink
Added async and basic intents to xmlrpc exported services.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottslewis committed Jun 1, 2018
1 parent 4b79ca9 commit ca04d6a
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 37 deletions.
14 changes: 13 additions & 1 deletion pelix/rsa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,20 @@
SERVICE_RANKING = 'service.ranking'
SERVICE_COMPONENT_NAME = 'component.name'
SERVICE_COMPONENT_ID = 'component.id'
# R7 standardized intents
OSGI_BASIC_INTENT = 'osgi.basic'
OSGI_BASIC_TIMEOUT_INTENT = '{0}.{1}'.format(OSGI_BASIC_INTENT,'timeout')
OSGI_ASYNC_INTENT = 'osgi.async'
OSGI_CONFIDENTIAL_INTENT = 'osgi.confidential'
OSGI_PRIVATE_INTENT = 'osgi.private'
# List of them
RSA_PROP_NAMES = [ENDPOINT_ID,ENDPOINT_SERVICE_ID,ENDPOINT_FRAMEWORK_UUID,SERVICE_EXPORTED_INTERFACES,REMOTE_CONFIGS_SUPPORTED,REMOTE_INTENTS_SUPPORTED,SERVICE_EXPORTED_CONFIGS,SERVICE_EXPORTED_INTENTS,SERVICE_EXPORTED_INTENTS_EXTRA,SERVICE_IMPORTED,SERVICE_IMPORTED_CONFIGS,SERVICE_INTENTS,SERVICE_ID,OBJECT_CLASS,INSTANCE_NAME,SERVICE_RANKING,SERVICE_COMPONENT_ID,SERVICE_COMPONENT_NAME]
RSA_PROP_NAMES = [ENDPOINT_ID,ENDPOINT_SERVICE_ID,ENDPOINT_FRAMEWORK_UUID,
SERVICE_EXPORTED_INTERFACES,REMOTE_CONFIGS_SUPPORTED,
REMOTE_INTENTS_SUPPORTED,SERVICE_EXPORTED_CONFIGS,
SERVICE_EXPORTED_INTENTS,SERVICE_EXPORTED_INTENTS_EXTRA,
SERVICE_IMPORTED,SERVICE_IMPORTED_CONFIGS,SERVICE_INTENTS,
SERVICE_ID,OBJECT_CLASS,INSTANCE_NAME,SERVICE_RANKING,
SERVICE_COMPONENT_ID,SERVICE_COMPONENT_NAME]
# ECF constants
ECF_ENDPOINT_CONTAINERID_NAMESPACE = "ecf.endpoint.id.ns"
ECF_ENDPOINT_ID = "ecf.endpoint.id"
Expand Down
62 changes: 33 additions & 29 deletions pelix/rsa/providers/distribution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ class Container():
def __init__(self):
self._bundle_context = None
self._container_props = None
self._rs_instances = {}
self._rs_instances_lock = RLock()
self._exported_services = {}
self._exported_instances_lock = RLock()

def get_id(self):
return self._container_props.get(IPOPO_INSTANCE_NAME,None)
Expand All @@ -302,24 +302,31 @@ def _validate_component(self, bundle_context, container_props):

@Invalidate
def _invalidate_component(self, bundle_context):
with self._rs_instances_lock:
self._rs_instances.clear()
with self._exported_instances_lock:
self._exported_services.clear()

def _get_bundle_context(self):
return self._bundle_context

def _add_export(self, get_remoteservice_id, inst):
with self._rs_instances_lock:
self._rs_instances[get_remoteservice_id] = inst
def _add_export(self, ed_id, inst):
with self._exported_instances_lock:
self._exported_services[ed_id] = inst

def _remove_export(self, get_remoteservice_id):
with self._rs_instances_lock:
return self._rs_instances.pop(get_remoteservice_id,None)
def _remove_export(self, ed_id):
with self._exported_instances_lock:
return self._exported_services.pop(ed_id,None)

def _get_export(self, ed_id):
with self._exported_instances_lock:
return self._exported_services.get(ed_id,None)

def _find_export(self, func):
with self._exported_instances_lock:
for val in self._exported_services.values():
if func(val):
return val
return None

def _get_export(self, get_remoteservice_id):
with self._rs_instances_lock:
return self._rs_instances.get(get_remoteservice_id,None)

def _get_distribution_provider(self):
return self._container_props[DISTRIBUTION_PROVIDER_CONTAINER_PROP]

Expand All @@ -346,18 +353,15 @@ class ExportContainer(Container):
def _get_service_intents(self):
return self._container_props.get(SERVICE_INTENTS)

def _export_service(self,svc,ed_props):
self._add_export(ed_props.get(ECF_RSVC_ID), svc)
def _export_service(self,svc,ed):
self._add_export(ed.get_id(), (svc,ed))

def _update_service(self, ed):
# do nothing by default, subclasses may override
pass

def _unexport_service(self, ed):
return self._remove_export(ed.get_properties().get(ECF_RSVC_ID))

def _prepare_endpoint_extra_props(self, export_props):
return {}
return self._remove_export(ed.get_id())

def prepare_endpoint_props(self, intfs, svc_ref, export_props):
pkg_vers = rsa.get_package_versions(intfs, export_props)
Expand All @@ -369,31 +373,31 @@ def prepare_endpoint_props(self, intfs, svc_ref, export_props):
rsa.get_next_rsid(),
rsa.get_current_time_millis())
extra_props = rsa.get_extra_props(export_props)
exporter_props = self._prepare_endpoint_extra_props(export_props)
merged = rsa.merge_dicts(rsa_props, ecf_props, extra_props, exporter_props)
merged = rsa.merge_dicts(rsa_props, ecf_props, extra_props)
# remove service.bundleid
merged.pop(SERVICE_BUNDLE_ID,None)
# remove service.scope
merged.pop(SERVICE_SCOPE,None)
return merged

def export_service(self, svc_ref, export_props):
self._export_service(self._get_bundle_context().get_service(svc_ref), export_props.copy())
return EndpointDescription.fromprops(export_props)
ed = EndpointDescription.fromprops(export_props)
self._export_service(self._get_bundle_context().get_service(svc_ref), ed)
return ed

def update_service(self, ed):
return self._update_service(ed)

def unexport_service(self, ed):
return self._unexport_service(ed)

def _dispatch_exported(self,get_remoteservice_id,method_name,params):
# first lookup service instance
service = self._get_export(get_remoteservice_id)
def _dispatch_exported(self,rs_id,method_name,params):
# first lookup service instance by comparing the rs_id against the service's remote service id
service = self._find_export(lambda val: val[1].get_remoteservice_id()[1] == int(rs_id))
if not service:
raise RemoteServiceError('Unknown instance with get_remoteservice_id={0} for method call={1}'.format(str(get_remoteservice_id),method_name))
raise RemoteServiceError('Unknown service with rs_id={0} for method call={1}'.format(rs_id,method_name))
# Get the method
method_ref = getattr(service, method_name, None)
method_ref = getattr(service[0], method_name, None)
if method_ref is None:
raise RemoteServiceError("Unknown method {0}".format(method_name))
# Call it (let the errors be propagated)
Expand Down
26 changes: 20 additions & 6 deletions pelix/rsa/providers/distribution/xmlrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
# ------------------------------------------------------------------------------
# Standard logging
import logging
from pelix.rsa import prop_dot_suffix
from pelix.rsa import prop_dot_suffix, OSGI_BASIC_TIMEOUT_INTENT
from concurrent.futures.thread import ThreadPoolExecutor
_logger = logging.getLogger(__name__)
# ------------------------------------------------------------------------------
# Module version
Expand Down Expand Up @@ -57,31 +58,36 @@
# pylint: disable=F0401
from xmlrpc.server import SimpleXMLRPCDispatcher
import xmlrpc.client as xmlrpclib
from concurrent.futures import Executor
except ImportError:
# Python 2
# pylint: disable=F0401
from SimpleXMLRPCServer import SimpleXMLRPCDispatcher
import xmlrpclib
Executor = None
# ------------------------------------------------------------------------------
# XmlRpc Distribution Provider Constants. Note that to get interoperability with
# Java-based ECF RSA providers, these must match the Java-side constants.
ECF_XMLRPC_SERVER_CONFIG = 'ecf.xmlrpc.server'
ECF_XMLRPC_CLIENT_CONFIG = 'ecf.xmlrpc.client'
ECF_XMLRPC_SUPPORTED_CONFIGS = [ ECF_XMLRPC_SERVER_CONFIG ]
ECF_XMLRPC_NAMESPACE = 'ecf.namespace.xmlrpc'
ECF_XMLRPC_SUPPORTED_INTENTS = [ 'osgi.basic' ]
ECF_XMLRPC_SUPPORTED_INTENTS = [ 'osgi.basic', 'osgi.async' ]
ECF_XMLRPC_PATH_PROP = 'path'
ECF_XMLRPC_HOSTNAME_PROP = 'hostname'

# ------------------------------------------------------------------------------
class ServerDispatcher(SimpleXMLRPCDispatcher):
'''ServerDispatcher (subclass of SimpleXMLRPCDispatcher)
uses ECF remote service id to identify the service
for method invocation requests. See do_POST and _dispatch
for the actual method invocation.
'''
def __init__(self,dispatch_func):
def __init__(self,dispatch_func,timeout=None,executor=None):
super(ServerDispatcher, self).__init__(allow_none=True)
self._dispatch_func = dispatch_func
self._timeout = timeout
self._executor = executor

def do_POST(self, request, response):
data = to_str(request.read_data())
Expand All @@ -93,7 +99,10 @@ def _dispatch(self, method, params):
if not len(obj_method_list) == 2:
raise Exception('_dispatch: invalid method='+method+'. Must be of form <objectid>.<methodname>')
# and call _dispatch_func/3
return self._dispatch_func(int(obj_method_list[0]),obj_method_list[1],params)
if self._executor:
return self._executor.submit(self._dispatch_func, obj_method_list[0], obj_method_list[1], params).result(self._timeout)
else:
return self._dispatch_func(obj_method_list[0],obj_method_list[1],params)

# ------------------------------------------------------------------------------
# Implementation of SERVICE_EXPORT_CONTAINER
Expand All @@ -111,8 +120,13 @@ class XmlRpcExportContainer(ExportContainer):
def _validate_component(self, bundle_context, container_props):
ExportContainer._validate_component(self, bundle_context, container_props)
dp = self._get_distribution_provider()
# register the _XmlRpcServlet instance with the desired uri_path
dp._httpservice.register_servlet(dp._uri_path,ServerDispatcher(self._dispatch_exported))
timeout = container_props.get(OSGI_BASIC_TIMEOUT_INTENT,None)/1000
if Executor:
executor = ThreadPoolExecutor(max_workers=5)
else:
executor = None
# register the ServerDispatcher instance our uri_path
dp._httpservice.register_servlet(dp._uri_path,ServerDispatcher(self._dispatch_exported,timeout,executor))

@Invalidate
def _invalidate_component(self, bundle_context):
Expand Down
2 changes: 1 addition & 1 deletion samples/rsa/helloimpl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pelix.ipopo.decorators import ComponentFactory, Provides, Instantiate
@ComponentFactory("helloimpl-provider-factory")
@Provides("org.eclipse.ecf.examples.hello.IHello")
@Instantiate("helloimpl-provider-auto")
@Instantiate("helloimpl-provider-auto", { "service.intents": ['osgi.basic', 'osgi.async'], 'osgi.basic.timeout':30000})
class HelloImpl(object):

def sayHello(self, name='Not given', message = 'nothing'):
Expand Down

0 comments on commit ca04d6a

Please sign in to comment.