Skip to content

Commit

Permalink
Rename shards to pool
Browse files Browse the repository at this point in the history
Shard turned out to be a quite overloaded/confusing term to use here,
although correct. We decided to rename our shard feature to pool.

P.S: I swear I didn't use sed O.O

Change-Id: Ic54f29a4da7d7690c9c9210b74876b96f0ae0eac
  • Loading branch information
flaper87 committed Jun 18, 2014
1 parent e89b390 commit 15d55e7
Show file tree
Hide file tree
Showing 40 changed files with 629 additions and 631 deletions.
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""shards: JSON schema for marconi-queues shards resources."""
"""pools: JSON schema for marconi-queues pools resources."""

# NOTE(cpp-cabrera): options can be anything. These will be unique to
# each storage driver, so we don't perform any further validation at
Expand Down
21 changes: 11 additions & 10 deletions marconi/queues/bootstrap.py
Expand Up @@ -21,7 +21,7 @@
from marconi.openstack.common.cache import cache as oslo_cache
from marconi.openstack.common import log
from marconi.queues.storage import pipeline
from marconi.queues.storage import sharding
from marconi.queues.storage import pooling
from marconi.queues.storage import utils as storage_utils
from marconi.queues import transport # NOQA

Expand All @@ -38,13 +38,14 @@
CONF.register_cli_opts(_CLI_OPTIONS)

_GENERAL_OPTIONS = (
cfg.BoolOpt('sharding', default=False,
help=('Enable sharding across multiple storage backends. ',
'If sharding is enabled, the storage driver ',
cfg.BoolOpt('pooling', default=False,
help=('Enable pooling across multiple storage backends. ',
'If pooling is enabled, the storage driver ',
'configuration is used to determine where the ',
'catalogue/control plane data is kept.')),
'catalogue/control plane data is kept.'),
deprecated_opts=[cfg.DeprecatedOpt('pooling')]),
cfg.BoolOpt('admin_mode', default=False,
help='Activate endpoints to manage shard registry.'),
help='Activate endpoints to manage pool registry.'),
)

_DRIVER_OPTIONS = (
Expand Down Expand Up @@ -81,10 +82,10 @@ def __init__(self, conf):
def storage(self):
LOG.debug(u'Loading storage driver')

if self.conf.sharding:
LOG.debug(u'Storage sharding enabled')
storage_driver = sharding.DataDriver(self.conf, self.cache,
self.control)
if self.conf.pooling:
LOG.debug(u'Storage pooling enabled')
storage_driver = pooling.DataDriver(self.conf, self.cache,
self.control)
else:
storage_driver = storage_utils.load_storage_driver(
self.conf, self.cache)
Expand Down
4 changes: 2 additions & 2 deletions marconi/queues/storage/__init__.py
Expand Up @@ -23,10 +23,10 @@
Claim = base.Claim
Message = base.Message
Queue = base.Queue
ShardsBase = base.ShardsBase
PoolsBase = base.PoolsBase

DEFAULT_QUEUES_PER_PAGE = base.DEFAULT_QUEUES_PER_PAGE
DEFAULT_MESSAGES_PER_PAGE = base.DEFAULT_MESSAGES_PER_PAGE
DEFAULT_SHARDS_PER_PAGE = base.DEFAULT_SHARDS_PER_PAGE
DEFAULT_POOLS_PER_PAGE = base.DEFAULT_POOLS_PER_PAGE

DEFAULT_MESSAGES_PER_CLAIM = base.DEFAULT_MESSAGES_PER_CLAIM
82 changes: 41 additions & 41 deletions marconi/queues/storage/base.py
Expand Up @@ -20,7 +20,7 @@

DEFAULT_QUEUES_PER_PAGE = 10
DEFAULT_MESSAGES_PER_PAGE = 10
DEFAULT_SHARDS_PER_PAGE = 10
DEFAULT_POOLS_PER_PAGE = 10

DEFAULT_MESSAGES_PER_CLAIM = 10

Expand Down Expand Up @@ -48,7 +48,7 @@ class DataDriverBase(DriverBase):
core functionality of the system.
Connection information and driver-specific options are
loaded from the config file or the shard catalog.
loaded from the config file or the pool catalog.
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
Expand Down Expand Up @@ -89,8 +89,8 @@ class ControlDriverBase(DriverBase):
modify aspects of the functionality of the system. This is ideal
for administrative purposes.
Allows access to the shard registry through a catalogue and a
shard controller.
Allows access to the pool registry through a catalogue and a
pool controller.
:param conf: Configuration containing options for this driver.
:type conf: `oslo.config.ConfigOpts`
Expand All @@ -105,8 +105,8 @@ def catalogue_controller(self):
raise NotImplementedError

@abc.abstractproperty
def shards_controller(self):
"""Returns storage's shard management controller."""
def pools_controller(self):
"""Returns storage's pool management controller."""
raise NotImplementedError


Expand Down Expand Up @@ -390,99 +390,99 @@ def delete(self, queue, claim_id, project=None):


@six.add_metaclass(abc.ABCMeta)
class ShardsBase(ControllerBase):
"""A controller for managing shards."""
class PoolsBase(ControllerBase):
"""A controller for managing pools."""

@abc.abstractmethod
def list(self, marker=None, limit=DEFAULT_SHARDS_PER_PAGE,
def list(self, marker=None, limit=DEFAULT_POOLS_PER_PAGE,
detailed=False):
"""Lists all registered shards.
"""Lists all registered pools.
:param marker: used to determine which shard to start with
:param marker: used to determine which pool to start with
:type marker: six.text_type
:param limit: (Default 10) Max number of results to return
:type limit: int
:param detailed: whether to include options
:type detailed: bool
:returns: A list of shards - name, weight, uri
:returns: A list of pools - name, weight, uri
:rtype: [{}]
"""
raise NotImplementedError

@abc.abstractmethod
def create(self, name, weight, uri, options=None):
"""Registers a shard entry.
"""Registers a pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:param weight: the likelihood that this shard will be used
:param weight: the likelihood that this pool will be used
:type weight: int
:param uri: A URI that can be used by a storage client
(e.g., pymongo) to access this shard.
(e.g., pymongo) to access this pool.
:type uri: six.text_type
:param options: Options used to configure this shard
:param options: Options used to configure this pool
:type options: dict
"""
raise NotImplementedError

@abc.abstractmethod
def get(self, name, detailed=False):
"""Returns a single shard entry.
"""Returns a single pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:param detailed: Should the options data be included?
:type detailed: bool
:returns: weight, uri, and options for this shard
:returns: weight, uri, and options for this pool
:rtype: {}
:raises: ShardDoesNotExist if not found
:raises: PoolDoesNotExist if not found
"""
raise NotImplementedError

@abc.abstractmethod
def exists(self, name):
"""Returns a single shard entry.
"""Returns a single pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:returns: True if the shard exists
:returns: True if the pool exists
:rtype: bool
"""
raise NotImplementedError

@abc.abstractmethod
def delete(self, name):
"""Removes a shard entry.
"""Removes a pool entry.
:param name: The name of this shard
:param name: The name of this pool
:type name: six.text_type
:rtype: None
"""
raise NotImplementedError

@abc.abstractmethod
def update(self, name, **kwargs):
"""Updates the weight, uris, and/or options of this shard
"""Updates the weight, uris, and/or options of this pool
:param name: Name of the shard
:param name: Name of the pool
:type name: text
:param kwargs: one of: `uri`, `weight`, `options`
:type kwargs: dict
:raises: ShardDoesNotExist
:raises: PoolDoesNotExist
"""
raise NotImplementedError

@abc.abstractmethod
def drop_all(self):
"""Deletes all shards from storage."""
"""Deletes all pools from storage."""
raise NotImplementedError


@six.add_metaclass(abc.ABCMeta)
class CatalogueBase(ControllerBase):
"""A controller for managing the catalogue. The catalogue is
responsible for maintaining a mapping between project.queue
entries to their shard.
entries to their pool.
"""

@abc.abstractmethod
Expand All @@ -493,21 +493,21 @@ def list(self, project):
:param project: The project to use when filtering through queue
entries.
:type project: six.text_type
:returns: [{'project': ..., 'queue': ..., 'shard': ...},]
:returns: [{'project': ..., 'queue': ..., 'pool': ...},]
:rtype: [dict]
"""
raise NotImplementedError

@abc.abstractmethod
def get(self, project, queue):
"""Returns the shard identifier for the queue registered under this
"""Returns the pool identifier for the queue registered under this
project.
:param project: Namespace to search for the given queue
:type project: six.text_type
:param queue: The name of the queue to search for
:type queue: six.text_type
:returns: {'shard': ...}
:returns: {'pool': ...}
:rtype: dict
:raises: QueueNotMapped
"""
Expand All @@ -526,15 +526,15 @@ def exists(self, project, queue):
"""

@abc.abstractmethod
def insert(self, project, queue, shard):
def insert(self, project, queue, pool):
"""Creates a new catalogue entry, or updates it if it already existed.
:param project: str - Namespace to insert the given queue into
:type project: six.text_type
:param queue: str - The name of the queue to insert
:type queue: six.text_type
:param shard: shard identifier to associate this queue with
:type shard: six.text_type
:param pool: pool identifier to associate this queue with
:type pool: six.text_type
"""
raise NotImplementedError

Expand All @@ -550,15 +550,15 @@ def delete(self, project, queue):
raise NotImplementedError

@abc.abstractmethod
def update(self, project, queue, shards=None):
"""Updates the shard identifier for this queue
def update(self, project, queue, pools=None):
"""Updates the pool identifier for this queue
:param project: Namespace to search
:type project: six.text_type
:param queue: The name of the queue
:type queue: six.text_type
:param shards: The name of the shard where this project/queue lives.
:type shards: six.text_type
:param pools: The name of the pool where this project/queue lives.
:type pools: six.text_type
:raises: QueueNotMapped
"""
raise NotImplementedError
Expand Down
16 changes: 8 additions & 8 deletions marconi/queues/storage/errors.py
Expand Up @@ -112,7 +112,7 @@ def __init__(self, cid, queue, project):

class QueueNotMapped(DoesNotExist):

msg_format = (u'No shard found for '
msg_format = (u'No pool found for '
u'queue {queue} for project {project}')

def __init__(self, queue, project):
Expand All @@ -127,17 +127,17 @@ def __init__(self, mid, cid):
super(MessageIsClaimedBy, self).__init__(cid=cid, mid=mid)


class ShardDoesNotExist(DoesNotExist):
class PoolDoesNotExist(DoesNotExist):

msg_format = u'Shard {shard} does not exist'
msg_format = u'Pool {pool} does not exist'

def __init__(self, shard):
super(ShardDoesNotExist, self).__init__(shard=shard)
def __init__(self, pool):
super(PoolDoesNotExist, self).__init__(pool=pool)


class NoShardFound(ExceptionBase):
class NoPoolFound(ExceptionBase):

msg_format = u'No shards registered'
msg_format = u'No pools registered'

def __init__(self):
super(NoShardFound, self).__init__()
super(NoPoolFound, self).__init__()
18 changes: 9 additions & 9 deletions marconi/queues/storage/mongodb/catalogue.py
Expand Up @@ -15,11 +15,11 @@

"""MongoDB storage controller for the queues catalogue.
Serves to construct an association between a project + queue -> shard
Serves to construct an association between a project + queue -> pool
{
'p_q': project_queue :: six.text_type,
's': shard_identifier :: six.text_type
's': pool_identifier :: six.text_type
}
"""

Expand Down Expand Up @@ -47,10 +47,10 @@ def __init__(self, *args, **kwargs):
self._col.ensure_index(CATALOGUE_INDEX, unique=True)

@utils.raises_conn_error
def _insert(self, project, queue, shard, upsert):
def _insert(self, project, queue, pool, upsert):
key = utils.scope_queue_name(queue, project)
return self._col.update({PRIMARY_KEY: key},
{'$set': {'s': shard}}, upsert=upsert)
{'$set': {'s': pool}}, upsert=upsert)

@utils.raises_conn_error
def list(self, project):
Expand All @@ -77,18 +77,18 @@ def exists(self, project, queue):
key = utils.scope_queue_name(queue, project)
return self._col.find_one({PRIMARY_KEY: key}) is not None

def insert(self, project, queue, shard):
def insert(self, project, queue, pool):
# NOTE(cpp-cabrera): _insert handles conn_error
self._insert(project, queue, shard, upsert=True)
self._insert(project, queue, pool, upsert=True)

@utils.raises_conn_error
def delete(self, project, queue):
self._col.remove({PRIMARY_KEY: utils.scope_queue_name(queue, project)},
w=0)

def update(self, project, queue, shard=None):
def update(self, project, queue, pool=None):
# NOTE(cpp-cabrera): _insert handles conn_error
res = self._insert(project, queue, shard, upsert=False)
res = self._insert(project, queue, pool, upsert=False)

if not res['updatedExisting']:
raise errors.QueueNotMapped(project, queue)
Expand All @@ -104,5 +104,5 @@ def _normalize(entry):
return {
'queue': queue,
'project': project,
'shard': entry['s']
'pool': entry['s']
}

0 comments on commit 15d55e7

Please sign in to comment.