Skip to content

Commit

Permalink
Refactoring to add API classes, added much documentation, simplified
Browse files Browse the repository at this point in the history
shell commands, and added samples (run_rsa_etcd_xmlrpc.py,
run_rsa_xmlrpc.py) and samples.rsa.helloimpl and
samples.rsa.helloconsumer.
  • Loading branch information
scottslewis committed Jun 1, 2018
1 parent a34f506 commit 583f227
Show file tree
Hide file tree
Showing 16 changed files with 1,250 additions and 526 deletions.
908 changes: 656 additions & 252 deletions pelix/rsa/__init__.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pelix/rsa/edef.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
# Documentation strings format
__docformat__ = "restructuredtext en"
# ------------------------------------------------------------------------------
import pelix.rsa as rsa
from pelix.rsa.endpointdescription import EndpointDescription
from pelix import rsa

# Standard library
import xml.etree.ElementTree as ElementTree
Expand Down Expand Up @@ -455,6 +455,7 @@ def _make_xml(self, endpoints):

return root


def to_string(self, endpoints):
"""
Converts the given endpoint description beans into a string
Expand Down
1 change: 1 addition & 0 deletions pelix/rsa/endpointdescription.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,4 @@ def matches(self, ldap_filter):
"""
return get_ldap_filter(ldap_filter) \
.matches(self._properties)

1 change: 0 additions & 1 deletion pelix/rsa/providers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# Constants needed for all discovery and distribution providers
# Note that the API is in pelix.rsa.providers package
14 changes: 8 additions & 6 deletions pelix/rsa/providers/discovery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
# ------------------------------------------------------------------------------
# Standard logging
import logging
from pelix.rsa import get_string_plus_property, get_string_plus_property_value,\
ECF_ENDPOINT_CONTAINERID_NAMESPACE
_logger = logging.getLogger(__name__)
# ------------------------------------------------------------------------------
# Module version
Expand All @@ -38,12 +36,13 @@
# Documentation strings format
__docformat__ = "restructuredtext en"
# ------------------------------------------------------------------------------# Standard library
from pelix.rsa import get_string_plus_property, get_string_plus_property_value,\
ECF_ENDPOINT_CONTAINERID_NAMESPACE
from pelix.rsa.endpointdescription import EndpointDescription
from threading import RLock
from pelix.ipopo.decorators import Provides, ComponentFactory, Instantiate,\
BindField, UnbindField, Requires
#------------------------------------------------------------------------------# Standard library

# Endpoint advertiser service specification. EndpointAdvertiser services
# are used to advertise exported remote services. See EndpointAdvertiser
# class below.
Expand Down Expand Up @@ -139,6 +138,9 @@ def _advertise(self,endpoint_description):
def _unadvertise(self,advertised):
raise Exception('Endpoint._unadvertise must be overridden by subclasses')

#------------------------------------------------------------------------------# Standard library
# EndpointEvent implemenatation used to provide EndpointEventListener service
# instances with valid EndpointEvent by endpoint advertisers
class EndpointEvent(object):
'''
EndpointEvents are used by endpoint advertisers to call
Expand Down Expand Up @@ -174,14 +176,14 @@ def get_endpoint_description(self):
def __str__(self):
return 'EndpointEvent(type={0},ed={1})'.format(self.get_type(),self.get_endpoint_description())

#------------------------------------------------------------------------------# Standard library
# Endpoint listener service specification
# This service specification is exposed by instances that wish to be
# notified by discovery providers when an EndpointEvent has occurred.
# For example, TopologyManagers will typically expose themselves as
# a service endpoint listener so that discovery subscribers can
# notify all such services when an endpoint event has been received.
SERVICE_ENDPOINT_LISTENER = 'pelix.rsa.discovery.endpointeventlistener'

class EndpointEventListener(object):
'''
Subclasses should override the endpoint_changed method
Expand Down Expand Up @@ -212,10 +214,10 @@ def endpoint_changed(self,endpoint_event,matched_filter):
:param matched_filter the filter (as string) that matched
this endpoint event listener service instance.
'''
pass
raise Exception("{0}.endpoint_changed not implemented".format(self))

@Requires('_event_listeners',SERVICE_ENDPOINT_LISTENER,True,True)
class EndpointSubscriber():
class EndpointSubscriber(object):
'''
Utility superclass for EndpointSubscribers.
'''
Expand Down
44 changes: 12 additions & 32 deletions pelix/rsa/providers/discovery/etcd.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@
from pelix.ipopo.constants import ARG_BUNDLE_CONTEXT

import etcd

# ------------------------------------------------------------------------------#
# Etcd-based implementation of EndpointAdvertiser and EndpointSubscriber
# discovery APIs. See EndpointAdviser and EndpointSubscriber classes
@ComponentFactory('ecf.namespace.etcd-endpoint-discovery-factory')
@Provides(SERVICE_ENDPOINT_ADVERTISER)
@Property('_hostname','hostname','localhost')
@Property('_port','port',2379)
@Property('_top_path','top_path','/org.eclipse.ecf.provider.etcd.EtcdDiscoveryContainer')
@Property('_session_ttl','_session_ttl',30)
@Property('_watch_start_wait','watch_start_wait',5)
@Property('_hostname','etcd.hostname','localhost')
@Property('_port','etcd.port',2379)
@Property('_top_path','etcd.toppath','/org.eclipse.ecf.provider.etcd.EtcdDiscoveryContainer')
@Property('_session_ttl','etcd.sesssionttl',30)
@Property('_watch_start_wait','etc.watchstartwait',5)
@Instantiate('ecf.namespace.etcd-endpoint-discovery')
class EtcdEndpointDiscovery(EndpointAdvertiser,EndpointSubscriber):
'''
Expand All @@ -68,15 +70,6 @@ class EtcdEndpointDiscovery(EndpointAdvertiser,EndpointSubscriber):
Note that this depends upon the python-etcd client library.
'''
# Property names that are read upon ValidateComponent to set
# the etcd hostname,port,toppath,session ttl (time to live)
# See above for defaults
ETCD_HOSTNAME_PROP = 'etcd.hostname'
ETCD_PORT_PROP = 'etcd.port'
ETCD_TOP_PATH_PROP = 'etcd.toppath'
ETCD_SESSION_TTL_PROP = 'etcd.sesssionttl'
ETCD_WATCH_START_WAIT_PROP = 'etc.watchstartwait'

REMOVE_ACTIONS = ['delete','expire']
ADD_ACTIONS = ['set','create']

Expand All @@ -103,21 +96,6 @@ def __init__(self):

@ValidateComponent(ARG_BUNDLE_CONTEXT)
def _validate_component(self,bundle_context):
hostname = bundle_context.get_property(self.ETCD_HOSTNAME_PROP)
if hostname:
self._hostname = hostname
port = bundle_context.get_property(self.ETCD_PORT_PROP)
if port:
self._port = int(port)
top_path = bundle_context.get_property(self.ETCD_TOP_PATH_PROP)
if top_path:
self._top_path = top_path
session_ttl = bundle_context.get_property(self.ETCD_SESSION_TTL_PROP)
if session_ttl:
self._session_ttl = int(session_ttl)
watch_start_wait = bundle_context.get_property(self.ETCD_WATCH_START_WAIT_PROP)
if watch_start_wait:
self._watch_start_wait = int(watch_start_wait)
# now connect
self._connect()

Expand Down Expand Up @@ -260,7 +238,7 @@ def _watch_job(self):
action = result.action
if key.endswith(self._sessionid):
if action == 'delete':
print('watch_job: session dir deleted...exiting')
_logger.debug('watch_job: session dir deleted...exiting')
#we are done
return
else:
Expand Down Expand Up @@ -311,4 +289,6 @@ def _ttl_job(self):
except:
_logger.exception('Exception updating in ttl job')
waittime = self._get_start_wait()


# ------------------------------------------------------------------------------#

45 changes: 39 additions & 6 deletions pelix/rsa/providers/distribution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from pelix.ipopo.constants import SERVICE_IPOPO, IPOPO_INSTANCE_NAME, ARG_BUNDLE_CONTEXT,\
ARG_PROPERTIES
from pelix.ipopo.decorators import Requires, ValidateComponent, Invalidate
from pelix.rsa import DISTRIBUTION_PROVIDER_CONTAINER_PROP, get_dot_properties, SERVICE_INTENTS,\
from pelix.rsa import get_dot_properties, SERVICE_INTENTS,\
merge_dicts, ECF_RSVC_ID, RemoteServiceError,copy_non_reserved,\
ECF_SERVICE_EXPORTED_ASYNC_INTERFACES,ENDPOINT_ID,SERVICE_ID,\
SERVICE_IMPORTED, SERVICE_IMPORTED_CONFIGS,REMOTE_CONFIGS_SUPPORTED,\
Expand All @@ -50,10 +50,17 @@
import pelix.rsa as rsa
from threading import RLock
from pelix.rsa.endpointdescription import EndpointDescription
# ------------------------------------------------------------------------------# Standard library

# Standard service property that is added to the set of properties provided
# to the call to ipop.instantiate(container_factory,container_id,properties
# The property value is guaranteed to refer to the self instance of the
# distribution provider that is creating/instantiating the container
DISTRIBUTION_PROVIDER_CONTAINER_PROP = "pelix.rsa.distributionprovider"
# ------------------------------------------------------------------------------#
# Abstract DistributionProvider superclass
@Requires('_rsa',SERVICE_REMOTE_SERVICE_ADMIN)
@Requires('_ipopo', SERVICE_IPOPO)
class DistributionProvider():
class DistributionProvider(object):
'''
Abstract super class for all distribution providers.
Does not expose and 'public' methods (all methods _)
Expand Down Expand Up @@ -208,6 +215,11 @@ def _handle_import_close(self,ed):
if import_reg:
import_reg.close()

# ------------------------------------------------------------------------------#
# Specification for SERVICE_EXPORT_DISTRIBUTION_PROVIDER
SERVICE_EXPORT_DISTRIBUTION_PROVIDER = "pelix.rsa.exportdistributionprovider"
# Abstract implementation of SERVICE_EXPORT_DISTRIBUTION_PROVIDER extends
# DistributionProvider superclass
class ExportDistributionProvider(DistributionProvider):
'''
Export distribution provider.
Expand All @@ -233,7 +245,12 @@ def supports_export(self, exported_configs, service_intents, export_props):
The default implementation returns self._get_or_create_container.
'''
return self._get_or_create_container(exported_configs, service_intents, export_props)


# ------------------------------------------------------------------------------#
# Specification for SERVICE_IMPORT_DISTRIBUTION_PROVIDER
SERVICE_IMPORT_DISTRIBUTION_PROVIDER = "pelix.rsa.importdistributionprovider"
# Abstract implementation of SERVICE_EXPORT_DISTRIBUTION_PROVIDER
# extends DistributionProvider superclass
class ImportDistributionProvider(DistributionProvider):

def _prepare_container_id(self,container_props):
Expand All @@ -256,6 +273,8 @@ def supports_import(self, exported_configs, service_intents, endpoint_props):
'''
return self._get_or_create_container(exported_configs, service_intents, endpoint_props)

# ------------------------------------------------------------------------------#
# Abstract Container type supporting both ImportContainer and ExportContainer
class Container():

def __init__(self):
Expand Down Expand Up @@ -315,7 +334,13 @@ def _match_container_props(self,container_props):

def get_connected_id(self):
return None


# ------------------------------------------------------------------------------#
# Service specification for SERVICE_EXPORT_CONTAINER
SERVICE_EXPORT_CONTAINER = "pelix.rsa.exportcontainer"
# Abstract implementation of SERVICE_EXPORT_CONTAINER service specification
# extends Container class. New export distribution containers should use this
# class as a superclass to inherit required behavior.
class ExportContainer(Container):

def _get_service_intents(self):
Expand Down Expand Up @@ -379,7 +404,13 @@ def _dispatch_exported(self,get_remoteservice_id,method_name,params):

def get_connected_id(self):
return self.get_id()


# ------------------------------------------------------------------------------#
# Service specification for SERVICE_IMPORT_CONTAINER
SERVICE_IMPORT_CONTAINER = "pelix.rsa.importcontainer"
# Abstract implementation of SERVICE_IMPORT_CONTAINER service specification
# extends Container class. New import container classes should
# subclass this ImportContainer class to inherit necessary functionality.
class ImportContainer(Container):

def _get_imported_configs(self,exported_configs):
Expand Down Expand Up @@ -415,3 +446,5 @@ def import_service(self, ed):

def unimport_service(self,ed):
pass

# ------------------------------------------------------------------------------#
26 changes: 19 additions & 7 deletions pelix/rsa/providers/distribution/xmlrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
# ------------------------------------------------------------------------------
# Standard logging
import logging
from pelix.ipopo.constants import ARG_BUNDLE_CONTEXT, ARG_PROPERTIES
_logger = logging.getLogger(__name__)
# ------------------------------------------------------------------------------
# Module version
Expand All @@ -37,12 +36,13 @@
# Documentation strings format
__docformat__ = "restructuredtext en"
# ------------------------------------------------------------------------------
from pelix.ipopo.constants import ARG_BUNDLE_CONTEXT, ARG_PROPERTIES
# RSA constants
from pelix.rsa import SERVICE_EXPORT_DISTRIBUTION_PROVIDER, SERVICE_EXPORT_CONTAINER, SERVICE_IMPORT_CONTAINER,\
SERVICE_IMPORT_DISTRIBUTION_PROVIDER
# Providers API
from pelix.rsa.providers.distribution import ExportContainer,ImportContainer, ExportDistributionProvider,\
ImportDistributionProvider
ImportDistributionProvider, SERVICE_EXPORT_CONTAINER,\
SERVICE_EXPORT_DISTRIBUTION_PROVIDER, SERVICE_IMPORT_CONTAINER,\
SERVICE_IMPORT_DISTRIBUTION_PROVIDER
# Httpservice API
from pelix.http import HTTP_SERVICE
# to_str utility
Expand Down Expand Up @@ -93,6 +93,8 @@ def _dispatch(self, method, params):
# and call _dispatch_func/3
return self._dispatch_func(int(obj_method_list[0]),obj_method_list[1],params)

# ------------------------------------------------------------------------------
# Implementation of SERVICE_EXPORT_CONTAINER
@ComponentFactory(ECF_XMLRPC_SERVER_CONFIG)
@Provides(SERVICE_EXPORT_CONTAINER)
class XmlRpcExportContainer(ExportContainer):
Expand Down Expand Up @@ -121,7 +123,9 @@ def _invalidate_component(self, bundle_context):
ExportContainer._invalidate_component(self, bundle_context)
except:
pass


# ------------------------------------------------------------------------------
# Implementation of SERVICE_EXPORT_DISTRIBUTION_PROVIDER
@ComponentFactory("xmlrpc-export-distribution-provider-factory")
@Provides(SERVICE_EXPORT_DISTRIBUTION_PROVIDER)
@Property('_config_name', 'config_name', ECF_XMLRPC_SERVER_CONFIG)
Expand All @@ -130,6 +134,7 @@ def _invalidate_component(self, bundle_context):
@Property('_supported_intents', 'supported_intents', ECF_XMLRPC_SUPPORTED_INTENTS)
@Requires('_httpservice', HTTP_SERVICE)
@Property('_uri_path', 'uri_path', ECF_XMLRPC_DEFAULT_PATH)
@Property('_hostname', ECF_XMLRPC_SERVER_CONFIG+'.hostname',None)
@Instantiate("xmlrpc-export-distribution-provider")
class XmlRpcExportDistributionProvider(ExportDistributionProvider):
'''
Expand Down Expand Up @@ -159,15 +164,20 @@ def _prepare_container_id(self,container_props):
uri = 'http://'
if self._httpservice.is_https():
uri = 'https://'
hostname = container_props.get('host')
hostname = container_props.get('hostname',None)
if not hostname:
hostname = self._httpservice.get_hostname()
hostname = self._hostname
if not hostname:
self._httpservice.get_hostname()
port = container_props.get('port')
if not port:
port = str(self._httpservice.get_access()[1])
uri = uri + '{0}:{1}'.format(hostname,port)
return uri + self._uri_path


# ------------------------------------------------------------------------------
# Implementation of SERVICE_IMPORT_CONTAINER
@ComponentFactory(ECF_XMLRPC_CLIENT_CONFIG)
@Provides(SERVICE_IMPORT_CONTAINER)
class XmlRpcImportContainer(ImportContainer):
Expand Down Expand Up @@ -197,6 +207,8 @@ def __getattr__(self, name):
# create instance of XmlRpcProxy and pass in remoteservice id: ((ns,cid),get_remoteservice_id)
return XmlRpcProxy(endpoint_description.get_remoteservice_id())

# ------------------------------------------------------------------------------
# Implementation of SERVICE_IMPORT_DISTRIBUTION_PROVIDER
@ComponentFactory("xmlrpc-import-distribution-provider-factory")
@Provides(SERVICE_IMPORT_DISTRIBUTION_PROVIDER)
@Property('_config_name', 'config_name', ECF_XMLRPC_CLIENT_CONFIG)
Expand Down
Loading

0 comments on commit 583f227

Please sign in to comment.