Skip to content

Commit

Permalink
Consolidated store configuration for services
Browse files Browse the repository at this point in the history
  • Loading branch information
labisso committed Dec 8, 2012
1 parent 1050797 commit 732574a
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 88 deletions.
23 changes: 3 additions & 20 deletions epu/dashiproc/dtrs.py
Expand Up @@ -5,9 +5,9 @@
from dashi.exceptions import WriteConflictError as DashiWriteConflictError

from epu.dtrs.core import DTRSCore
from epu.dtrs.store import DTRSStore, DTRSZooKeeperStore
from epu.dtrs.store import get_dtrs_store
from epu.exceptions import DeployableTypeLookupError, DeployableTypeValidationError, NotFoundError, WriteConflictError
from epu.util import get_class, get_config_paths
from epu.util import get_config_paths
import epu.dashiproc

log = logging.getLogger(__name__)
Expand All @@ -28,28 +28,11 @@ def __init__(self, *args, **kwargs):
self.CFG, self.amqp_uri)

store = kwargs.get('store')
self.store = store or self._get_dtrs_store()
self.store = store or get_dtrs_store(self.CFG)
self.store.initialize()

self.core = DTRSCore(self.store)

def _get_dtrs_store(self):

server_config = self.CFG.get("server")
if server_config is None:
raise Exception("missing server configuration")

zookeeper = server_config.get("zookeeper")
if zookeeper and zookeeper.get("enabled", True):
log.info("Using ZooKeeper DTRS store")
store = DTRSZooKeeperStore(zookeeper['hosts'],
zookeeper['path'], username=zookeeper.get('username'),
password=zookeeper.get('password'), timeout=zookeeper.get('timeout'))
else:
log.info("Using in-memory DTRS store")
store = DTRSStore()
return store

def start(self):

log.info("starting DTRS instance %s" % self)
Expand Down
24 changes: 4 additions & 20 deletions epu/dashiproc/epumanagement.py
Expand Up @@ -6,7 +6,7 @@
from epu.epumanagement import EPUManagement
from epu.epumanagement.conf import EPUM_INITIALCONF_SERVICE_NAME, \
EPUM_DEFAULT_SERVICE_NAME, EPUM_INITIALCONF_PROC_NAME
from epu.epumanagement.store import LocalEPUMStore, ZooKeeperEPUMStore
from epu.epumanagement.store import get_epum_store
from epu.dashiproc.provisioner import ProvisionerClient
from epu.dashiproc.dtrs import DTRSClient
from epu.util import get_config_paths
Expand All @@ -15,6 +15,7 @@

log = logging.getLogger(__name__)


class EPUManagementService(object):
"""EPU management service interface
Expand Down Expand Up @@ -43,7 +44,8 @@ def __init__(self):
self.service_name = self.CFG.epumanagement.get(EPUM_INITIALCONF_SERVICE_NAME, EPUM_DEFAULT_SERVICE_NAME)
self.proc_name = self.CFG.epumanagement.get(EPUM_INITIALCONF_PROC_NAME, None)

self.store = self._get_epum_store()
self.store = get_epum_store(self.CFG, service_name=self.service_name,
proc_name=self.proc_name)
self.store.initialize()

dtrs_client = DTRSClient(self.dashi)
Expand All @@ -55,24 +57,6 @@ def __init__(self):
if isinstance(prov_client, MockProvisionerClient):
prov_client._set_epum(self.epumanagement)

def _get_epum_store(self):
server_config = self.CFG.get("server")
if server_config is None:
raise Exception("server configuration not found")

zookeeper = server_config.get("zookeeper")
if zookeeper and zookeeper.get("enabled", True):
log.info("Using ZooKeeper EPUM store")

store = ZooKeeperEPUMStore(self.service_name, zookeeper['hosts'],
zookeeper['path'], username=zookeeper.get('username'),
password=zookeeper.get('password'),
timeout=zookeeper.get('timeout'), proc_name=self.proc_name)
else:
log.info("Using in-memory DTRS store")
store = LocalEPUMStore(self.service_name)
return store

def start(self):
self.dashi.handle(self.subscribe_domain)
self.dashi.handle(self.unsubscribe_domain)
Expand Down
20 changes: 3 additions & 17 deletions epu/dashiproc/processdispatcher.py
Expand Up @@ -3,7 +3,7 @@
from dashi import bootstrap

from epu.processdispatcher.core import ProcessDispatcherCore
from epu.processdispatcher.store import ProcessDispatcherStore, ProcessDispatcherZooKeeperStore
from epu.processdispatcher.store import get_processdispatcher_store
from epu.processdispatcher.engines import EngineRegistry
from epu.processdispatcher.matchmaker import PDMatchmaker
from epu.dashiproc.epumanagement import EPUManagementClient
Expand All @@ -18,8 +18,6 @@ class ProcessDispatcherService(object):
"""PD service interface
"""



def __init__(self, amqp_uri=None, topic="processdispatcher", registry=None,
store=None, epum_client=None, notifier=None, definition_id=None, domain_config=None):

Expand All @@ -35,7 +33,7 @@ def __init__(self, amqp_uri=None, topic="processdispatcher", registry=None,
default_engine = self.CFG.processdispatcher.get('default_engine')
if default_engine is None and len(engine_conf.keys()) == 1:
default_engine = engine_conf.keys()[0]
self.store = store or self._get_processdispatcher_store()
self.store = store or get_processdispatcher_store(self.CFG)
self.store.initialize()
self.registry = registry or EngineRegistry.from_config(engine_conf, default=default_engine)
self.eeagent_client = EEAgentClient(self.dashi)
Expand All @@ -50,7 +48,7 @@ def __init__(self, amqp_uri=None, topic="processdispatcher", registry=None,
elif not self.CFG.processdispatcher.get('static_resources'):
domain_definition_id = definition_id or self.CFG.processdispatcher.get('definition_id')
base_domain_config = domain_config or self.CFG.processdispatcher.get('domain_config')
epum_service_name = self.CFG.processdispatcher.get('epum_service_name',
epum_service_name = self.CFG.processdispatcher.get('epum_service_name',
'epu_management_service')
self.epum_client = EPUManagementClient(self.dashi, epum_service_name)

Expand Down Expand Up @@ -160,18 +158,6 @@ def heartbeat(self, sender, message):
def dump(self):
return self.core.dump()

def _get_processdispatcher_store(self):

zookeeper = self.CFG.get("zookeeper")
if zookeeper:
log.info("Using ZooKeeper ProcessDispatcher store")
store = ProcessDispatcherZooKeeperStore(zookeeper['hosts'],
zookeeper['processdispatcher_path'], zookeeper.get('timeout'))
else:
log.info("Using in-memory ProcessDispatcher store")
store = ProcessDispatcherStore()
return store


class SubscriberNotifier(object):
def __init__(self, dashi):
Expand Down
22 changes: 2 additions & 20 deletions epu/dashiproc/provisioner.py
Expand Up @@ -3,8 +3,7 @@
import dashi.bootstrap as bootstrap

from epu.dashiproc.dtrs import DTRSClient
from epu.provisioner.store import ProvisionerStore, ProvisionerZooKeeperStore,\
sanitize_record
from epu.provisioner.store import get_provisioner_store, sanitize_record
from epu.provisioner.core import ProvisionerCore, ProvisionerContextClient
from epu.provisioner.leader import ProvisionerLeader
from epu.states import InstanceState
Expand All @@ -31,7 +30,7 @@ def __init__(self, *args, **kwargs):

store = kwargs.get('store')
self.proc_name = self.CFG.provisioner.get('proc_name', "")
self.store = store or self._get_provisioner_store()
self.store = store or get_provisioner_store(self.CFG)
self.store.initialize()

notifier = kwargs.get('notifier')
Expand Down Expand Up @@ -198,23 +197,6 @@ def dump_state(self, nodes=None, force_subscribe=False):
else:
self.core.dump_state(nodes, force_subscribe=force_subscribe)

def _get_provisioner_store(self):
server_config = self.CFG.get("server")
if server_config is None:
raise Exception("missing server configuration")

zookeeper = server_config.get("zookeeper")
if zookeeper and zookeeper.get("enabled", True):
log.info("Using ZooKeeper Provisioner store")
store = ProvisionerZooKeeperStore(zookeeper['hosts'],
zookeeper['path'], username=zookeeper.get('username'),
password=zookeeper.get('password'), timeout=zookeeper.get('timeout'),
proc_name=self.proc_name)
else:
log.info("Using in-memory Provisioner store")
store = ProvisionerStore()
return store

def _get_context_client(self):
if not self.CFG.get('context'):
log.warning("No context configuration provided.")
Expand Down
24 changes: 22 additions & 2 deletions epu/dtrs/store.py
Expand Up @@ -5,14 +5,34 @@
from kazoo.exceptions import NodeExistsException, BadVersionException, \
NoNodeException

from epu.zkutil import get_kazoo_kwargs
from epu import zkutil
from epu.exceptions import WriteConflictError, NotFoundError, DeployableTypeValidationError

log = logging.getLogger(__name__)

VERSION_KEY = "__version"


def get_dtrs_store(config, use_gevent=False):
"""Instantiate DTRS store object for the given configuration
"""
if zkutil.is_zookeeper_enabled(config):
zookeeper = zkutil.get_zookeeper_config(config)

log.info("Using ZooKeeper DTRS store")
store = DTRSZooKeeperStore(zookeeper['hosts'], zookeeper['path'],
username=zookeeper.get('username'),
password=zookeeper.get('password'),
timeout=zookeeper.get('timeout'),
use_gevent=use_gevent)

else:
log.info("Using in-memory DTRS store")
store = DTRSStore()

return store


class DTRSStore(object):
"""In-memory version of DTRS storage"""

Expand Down Expand Up @@ -250,7 +270,7 @@ class DTRSZooKeeperStore(object):

def __init__(self, hosts, base_path, username=None, password=None, timeout=None, use_gevent=False):

kwargs = get_kazoo_kwargs(username=username, password=password,
kwargs = zkutil.get_kazoo_kwargs(username=username, password=password,
timeout=timeout, use_gevent=use_gevent)
self.kazoo = KazooClient(hosts + base_path, **kwargs)

Expand Down
25 changes: 22 additions & 3 deletions epu/epumanagement/store.py
Expand Up @@ -14,14 +14,33 @@
from epu.epumanagement.core import EngineState, SensorItemParser, InstanceParser, CoreInstance
from epu.states import InstanceState, InstanceHealthState
from epu.exceptions import NotFoundError, WriteConflictError
from epu.zkutil import get_kazoo_kwargs
from epu import zkutil
from epu.epumanagement.conf import *



log = logging.getLogger(__name__)


def get_epum_store(config, service_name, use_gevent=False, proc_name=None):
"""Instantiate EPUM store object for the given configuration
"""
if zkutil.is_zookeeper_enabled(config):
zookeeper = zkutil.get_zookeeper_config(config)

log.info("Using ZooKeeper EPUM store")

store = ZooKeeperEPUMStore(service_name, zookeeper['hosts'],
zookeeper['path'], username=zookeeper.get('username'),
password=zookeeper.get('password'),
timeout=zookeeper.get('timeout'), proc_name=proc_name)

else:
log.info("Using in-memory EPUM store")
store = LocalEPUMStore(service_name)

return store


#############################################################################
# STORAGE INTERFACES
#############################################################################
Expand Down Expand Up @@ -870,7 +889,7 @@ def __init__(self, service_name, hosts, base_path, username=None, password=None,

self.service_name = service_name

kwargs = get_kazoo_kwargs(username=username, password=password,
kwargs = zkutil.get_kazoo_kwargs(username=username, password=password,
timeout=timeout, use_gevent=use_gevent)
self.kazoo = KazooClient(hosts + base_path, **kwargs)

Expand Down
25 changes: 22 additions & 3 deletions epu/processdispatcher/store.py
Expand Up @@ -11,11 +11,28 @@

import epu.tevent as tevent
from epu.exceptions import NotFoundError, WriteConflictError
from epu.zkutil import get_kazoo_kwargs
from epu import zkutil

log = logging.getLogger(__name__)


def get_processdispatcher_store(config, use_gevent=False):
"""Instantiate PD store object for the given configuration
"""
if zkutil.is_zookeeper_enabled(config):
zookeeper = zkutil.get_zookeeper_config(config)

log.info("Using ZooKeeper ProcessDispatcher store")
store = ProcessDispatcherZooKeeperStore(zookeeper['hosts'],
zookeeper['processdispatcher_path'], zookeeper.get('timeout'))

else:
log.info("Using in-memory ProcessDispatcher store")
store = ProcessDispatcherStore()

return store


class ProcessDispatcherStore(object):
"""
This store is responsible for persistence of several types of records.
Expand Down Expand Up @@ -523,9 +540,11 @@ class ProcessDispatcherZooKeeperStore(object):
# an exclusive lock on leadership.
ELECTION_PATH = "/election"

def __init__(self, hosts, base_path, timeout=None, use_gevent=False):
def __init__(self, hosts, base_path, username=None, password=None,
timeout=None, use_gevent=False):

kwargs = get_kazoo_kwargs(timeout=timeout, use_gevent=use_gevent)
kwargs = zkutil.get_kazoo_kwargs(username=username, password=password,
timeout=timeout, use_gevent=use_gevent)
self.kazoo = KazooClient(hosts + base_path, **kwargs)
self.election = self.kazoo.Election(self.ELECTION_PATH)

Expand Down
24 changes: 22 additions & 2 deletions epu/provisioner/store.py
Expand Up @@ -19,13 +19,33 @@

import epu.tevent as tevent
from epu.exceptions import WriteConflictError, NotFoundError
from epu.zkutil import get_kazoo_kwargs
from epu import zkutil


log = logging.getLogger(__name__)

VERSION_KEY = "__version"


def get_provisioner_store(config, use_gevent=False, proc_name=None):
"""Instantiate Provisioner store object for the given configuration
"""
if zkutil.is_zookeeper_enabled(config):
zookeeper = zkutil.get_zookeeper_config(config)

log.info("Using ZooKeeper Provisioner store")
store = ProvisionerZooKeeperStore(zookeeper['hosts'],
zookeeper['path'], username=zookeeper.get('username'),
password=zookeeper.get('password'), timeout=zookeeper.get('timeout'),
proc_name=proc_name)

else:
log.info("Using in-memory Provisioner store")
store = ProvisionerStore()

return store


class ProvisionerStore(object):
"""In-memory version of Provisioner storage
"""
Expand Down Expand Up @@ -332,7 +352,7 @@ class ProvisionerZooKeeperStore(object):
def __init__(self, hosts, base_path, username=None, password=None,
timeout=None, use_gevent=False, proc_name=None):

kwargs = get_kazoo_kwargs(username=username, password=password,
kwargs = zkutil.get_kazoo_kwargs(username=username, password=password,
timeout=timeout, use_gevent=use_gevent)
self.kazoo = KazooClient(hosts + base_path, **kwargs)

Expand Down

0 comments on commit 732574a

Please sign in to comment.