From dad4e0e5451cdbe613f08eadbc3b790d1afe5539 Mon Sep 17 00:00:00 2001 From: wisererik Date: Tue, 28 Apr 2020 17:18:52 +0800 Subject: [PATCH 1/2] Add api of registering storage --- dolphin/api/schemas/storages.py | 38 +++ dolphin/api/v1/storages.py | 86 ++---- dolphin/api/validation/parameter_types.py | 13 + dolphin/common/sqlalchemyutils.py | 177 ++++++++++++ dolphin/db/api.py | 5 + dolphin/db/sqlalchemy/api.py | 320 +++++++++++++++++++--- dolphin/db/sqlalchemy/models.py | 9 +- dolphin/drivers/driver.py | 2 +- dolphin/drivers/fake_storage/__init__.py | 17 +- dolphin/drivers/manager.py | 55 +++- dolphin/exception.py | 4 + setup.py | 3 + 12 files changed, 615 insertions(+), 114 deletions(-) create mode 100644 dolphin/api/schemas/storages.py create mode 100644 dolphin/common/sqlalchemyutils.py diff --git a/dolphin/api/schemas/storages.py b/dolphin/api/schemas/storages.py new file mode 100644 index 000000000..96507aa75 --- /dev/null +++ b/dolphin/api/schemas/storages.py @@ -0,0 +1,38 @@ +# Copyright 2020 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dolphin.api.validation import parameter_types + + +create = { + 'type': 'object', + 'properties': { + 'host': parameter_types.hostname_or_ip_address, + 'port': parameter_types.tcp_udp_port, + 'username': {'type': 'string', 'minLength': 1, 'maxLength': 255}, + 'password': {'type': 'string'}, + 'vendor': {'type': 'string', 'minLength': 1, 'maxLength': 255}, + 'model': {'type': 'string', 'minLength': 1, 'maxLength': 255}, + 'extra_attributes': { + 'type': 'object', + 'patternProperties': { + '^[a-zA-Z0-9-_:. ]{1,255}$': { + 'type': 'string', 'maxLength': 255 + } + } + } + }, + 'required': ['host', 'port', 'username', 'password', 'vendor', 'model'], + 'additionalProperties': False +} diff --git a/dolphin/api/v1/storages.py b/dolphin/api/v1/storages.py index 0a9e86e2f..22ed17cf7 100644 --- a/dolphin/api/v1/storages.py +++ b/dolphin/api/v1/storages.py @@ -19,16 +19,17 @@ from oslo_log import log -from dolphin import db, context -from dolphin.api.views import storages as storage_view from dolphin.api.common import wsgi +from dolphin.api.schemas import storages as schema_storages +from dolphin.api import validation +from dolphin.api.views import storages as storage_view +from dolphin import context +from dolphin import db from dolphin.drivers import manager as drivermanager -from dolphin.db.sqlalchemy import api as db from dolphin import exception -from dolphin import utils from dolphin.i18n import _ -from dolphin import context from dolphin.task_manager import rpcapi as task_rpcapi +from dolphin import utils LOG = log.getLogger(__name__) @@ -51,6 +52,7 @@ def validate_parameters(data, required_parameters, class StorageController(wsgi.Controller): def __init__(self): + super().__init__() self.task_rpcapi = task_rpcapi.TaskAPI() def index(self, req): @@ -85,72 +87,22 @@ def index(self, req): def show(self, req, id): return dict(name="Storage 2") + @validation.schema(schema_storages.create) def create(self, req, body): - """ - This function for registering the new storage device - :param req: - :param body: "It contains the all input parameters" - :return: - """ - # Check if body is valid - if not self.is_valid_body(body, 'storages'): - msg = _("Storage entity not found in request body") - raise exc.HTTPUnprocessableEntity(explanation=msg) - - storage = body['storages'] - - # validate the body has all required parameters - required_parameters = ('hostip', 'vendor', 'model', 'username', - 'password') - validate_parameters(storage, required_parameters) - - # validate the hostip - if not utils.is_valid_ip_address(storage['hostip'], ip_version='4'): - msg = _("Invalid hostip: {0}. Please provide a " - "valid hostip".format(storage['hostip'])) - LOG.error(msg) - raise exception.InvalidHost(msg) - - # get dolphin.context. Later may be validated context parameters - context = req.environ.get('dolphin.context') - + """Register a new storage device.""" + # ctxt = req.environ['dolphin.context'] + ctxt = context.get_admin_context() driver = drivermanager.DriverManager() try: - device_info = driver.register_storage(context, storage) - status = '' - if device_info.get('status') == 'available': - status = device_info.get('status') - except AttributeError as e: - LOG.error(e) - raise exception.DolphinException(e) - except Exception as e: - msg = _('Failed to register device in driver :{0}'.format(e)) + storage = driver.register_storage(ctxt, body) + except exception.DolphinException as e: LOG.error(e) - raise exception.DolphinException(msg) - - if status == 'available': - try: - storage['storage_id'] = device_info.get('id') - - db.access_info_create(context, storage) - - db.storage_create(context, device_info) - except AttributeError as e: - LOG.error(e) - raise exception.DolphinException(e) - except Exception as e: - msg = _('Failed to create device entry in DB: {0}' - .format(e)) - LOG.exception(msg) - raise exception.DolphinException(msg) - - else: - msg = _('Device registration failed with status: {0}' - .format(status)) - LOG.error(msg) - raise exception.DolphinException(msg) - - return device_info + raise e + # except Exception as e: + # msg = _('Failed to register device in driver :{0}'.format(e)) + # LOG.error(e) + # raise exception.DolphinException(msg) + return storage_view.build_storage(storage) def update(self, req, id, body): return dict(name="Storage 4") diff --git a/dolphin/api/validation/parameter_types.py b/dolphin/api/validation/parameter_types.py index ba057c948..d2d7f20f8 100644 --- a/dolphin/api/validation/parameter_types.py +++ b/dolphin/api/validation/parameter_types.py @@ -159,3 +159,16 @@ def valid_char(char): 'type': ['string', 'null'], 'minLength': 0, 'maxLength': 255, 'pattern': valid_description_regex, } + +hostname_or_ip_address = { + # NOTE: Allow to specify hostname, ipv4 and ipv6. + 'type': 'string', 'minLength': 0, 'maxLength': 255, + 'pattern': '^[a-zA-Z0-9-_.:]*$' +} + +tcp_udp_port = { + 'type': ['integer', 'string'], + 'pattern': '^[0-9]*$', + 'minimum': 0, 'maximum': 65535 +} + diff --git a/dolphin/common/sqlalchemyutils.py b/dolphin/common/sqlalchemyutils.py new file mode 100644 index 000000000..c0415336f --- /dev/null +++ b/dolphin/common/sqlalchemyutils.py @@ -0,0 +1,177 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2010-2011 OpenStack Foundation +# Copyright 2012 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of paginate query.""" +import datetime + +from oslo_log import log as logging +from six.moves import range +import sqlalchemy +import sqlalchemy.sql as sa_sql +from sqlalchemy.sql import type_api + +from dolphin.db import api +from dolphin import exception +from dolphin.i18n import _ + + +LOG = logging.getLogger(__name__) + +_TYPE_SCHEMA = { + 'datetime': datetime.datetime(1900, 1, 1), + 'big_integer': 0, + 'integer': 0, + 'string': '' +} + + +def _get_default_column_value(model, column_name): + """Return the default value of the columns from DB table. + + In postgreDB case, if no right default values are being set, an + psycopg2.DataError will be thrown. + """ + attr = getattr(model, column_name) + # Return the default value directly if the model contains. Otherwise return + # a default value which is not None. + if attr.default and isinstance(attr.default, type_api.TypeEngine): + return attr.default.arg + + attr_type = attr.type + return _TYPE_SCHEMA[attr_type.__visit_name__] + + +# TODO(wangxiyuan): Use oslo_db.sqlalchemy.utils.paginate_query once it is +# stable and afforded by the minimum version in requirement.txt. +# copied from glance/db/sqlalchemy/api.py +def paginate_query(query, model, limit, sort_keys, marker=None, + sort_dir=None, sort_dirs=None, offset=None): + """Returns a query with sorting / pagination criteria added. + + Pagination works by requiring a unique sort_key, specified by sort_keys. + (If sort_keys is not unique, then we risk looping through values.) + We use the last row in the previous page as the 'marker' for pagination. + So we must return values that follow the passed marker in the order. + With a single-valued sort_key, this would be easy: sort_key > X. + With a compound-values sort_key, (k1, k2, k3) we must do this to repeat + the lexicographical ordering: + (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3) + + We also have to cope with different sort_directions. + + Typically, the id of the last row is used as the client-facing pagination + marker, then the actual marker object must be fetched from the db and + passed in to us as marker. + + :param query: the query object to which we should add paging/sorting + :param model: the ORM model class + :param limit: maximum number of items to return + :param sort_keys: array of attributes by which results should be sorted + :param marker: the last item of the previous page; we returns the next + results after this value. + :param sort_dir: direction in which results should be sorted (asc, desc) + :param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys + :param offset: the number of items to skip from the marker or from the + first element. + + :rtype: sqlalchemy.orm.query.Query + :return: The query with sorting/pagination added. + """ + + if 'id' not in sort_keys: + # TODO(justinsb): If this ever gives a false-positive, check + # the actual primary key, rather than assuming its id + LOG.warning('Id not in sort_keys; is sort_keys unique?') + + if sort_dir and sort_dirs: + raise AssertionError('Both sort_dir and sort_dirs specified.') + + # Default the sort direction to ascending + if sort_dirs is None and sort_dir is None: + sort_dir = 'asc' + + # Ensure a per-column sort direction + if sort_dirs is None: + sort_dirs = [sort_dir for _sort_key in sort_keys] + + if len(sort_dirs) != len(sort_keys): + raise AssertionError( + 'sort_dirs length is not equal to sort_keys length.') + + # Add sorting + for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): + sort_dir_func = { + 'asc': sqlalchemy.asc, + 'desc': sqlalchemy.desc, + }[current_sort_dir] + + try: + sort_key_attr = getattr(model, current_sort_key) + except AttributeError: + raise exception.InvalidInput(reason='Invalid sort key') + if not api.is_orm_value(sort_key_attr): + raise exception.InvalidInput(reason='Invalid sort key') + query = query.order_by(sort_dir_func(sort_key_attr)) + + # Add pagination + if marker is not None: + marker_values = [] + for sort_key in sort_keys: + v = getattr(marker, sort_key) + if v is None: + v = _get_default_column_value(model, sort_key) + marker_values.append(v) + + # Build up an array of sort criteria as in the docstring + criteria_list = [] + for i in range(0, len(sort_keys)): + crit_attrs = [] + for j in range(0, i): + model_attr = getattr(model, sort_keys[j]) + default = _get_default_column_value(model, sort_keys[j]) + attr = sa_sql.expression.case([(model_attr.isnot(None), + model_attr), ], + else_=default) + crit_attrs.append((attr == marker_values[j])) + + model_attr = getattr(model, sort_keys[i]) + default = _get_default_column_value(model, sort_keys[i]) + attr = sa_sql.expression.case([(model_attr.isnot(None), + model_attr), ], + else_=default) + if sort_dirs[i] == 'desc': + crit_attrs.append((attr < marker_values[i])) + elif sort_dirs[i] == 'asc': + crit_attrs.append((attr > marker_values[i])) + else: + raise ValueError(_("Unknown sort direction, " + "must be 'desc' or 'asc'")) + + criteria = sqlalchemy.sql.and_(*crit_attrs) + criteria_list.append(criteria) + + f = sqlalchemy.sql.or_(*criteria_list) + query = query.filter(f) + + if limit is not None: + query = query.limit(limit) + + if offset: + query = query.offset(offset) + + return query diff --git a/dolphin/db/api.py b/dolphin/db/api.py index 9bd1f6a67..d0f84109a 100755 --- a/dolphin/db/api.py +++ b/dolphin/db/api.py @@ -262,3 +262,8 @@ def access_info_get_all(context, marker=None, limit=None, sort_keys=None, """ return IMPL.access_info_get_all(context, marker, limit, sort_keys, sort_dirs, filters, offset) + + +def is_orm_value(obj): + """Check if object is an ORM field.""" + return IMPL.is_orm_value(obj) \ No newline at end of file diff --git a/dolphin/db/sqlalchemy/api.py b/dolphin/db/sqlalchemy/api.py index 8cefe5278..4dad4c1a1 100755 --- a/dolphin/db/sqlalchemy/api.py +++ b/dolphin/db/sqlalchemy/api.py @@ -19,18 +19,25 @@ """Implementation of SQLAlchemy backend.""" from functools import wraps +import six import sys + +import sqlalchemy +from sqlalchemy import create_engine, update + from oslo_config import cfg from oslo_db import options as db_options +from oslo_db.sqlalchemy import utils as db_utils from oslo_db.sqlalchemy import session from oslo_log import log from oslo_utils import uuidutils -from sqlalchemy import create_engine, update -from dolphin import exception +from dolphin.common import sqlalchemyutils from dolphin.db.sqlalchemy import models from dolphin.db.sqlalchemy.models import Storage, AccessInfo -from dolphin.exception import InvalidInput +from dolphin import exception +from dolphin.i18n import _ + CONF = cfg.CONF LOG = log.getLogger(__name__) @@ -85,15 +92,84 @@ def register_db(): model.metadata.create_all(engine) +def _process_model_like_filter(model, query, filters): + """Applies regex expression filtering to a query. + + :param model: model to apply filters to + :param query: query to apply filters to + :param filters: dictionary of filters with regex values + :returns: the updated query. + """ + if query is None: + return query + + for key in sorted(filters): + column_attr = getattr(model, key) + if 'property' == type(column_attr).__name__: + continue + value = filters[key] + if not (isinstance(value, (six.string_types, int))): + continue + query = query.filter( + column_attr.op('LIKE')(u'%%%s%%' % value)) + return query + + +def apply_like_filters(model): + def decorator_filters(process_exact_filters): + def _decorator(query, filters): + exact_filters = filters.copy() + regex_filters = {} + for key, value in filters.items(): + # NOTE(tommylikehu): For inexact match, the filter keys + # are in the format of 'key~=value' + if key.endswith('~'): + exact_filters.pop(key) + regex_filters[key.rstrip('~')] = value + query = process_exact_filters(query, exact_filters) + return _process_model_like_filter(model, query, regex_filters) + return _decorator + return decorator_filters + + +def is_valid_model_filters(model, filters, exclude_list=None): + """Return True if filter values exist on the model + + :param model: a Dolphin model + :param filters: dictionary of filters + """ + for key in filters.keys(): + if exclude_list and key in exclude_list: + continue + if key == 'metadata': + if not isinstance(filters[key], dict): + LOG.debug("Metadata filter value is not valid dictionary") + return False + continue + try: + key = key.rstrip('~') + getattr(model, key) + except AttributeError: + LOG.debug("'%s' filter key is not valid.", key) + return False + return True + + def access_info_create(context, values): """Create a storage access information.""" - register_ref = models.AccessInfo() - this_session = get_session() - this_session.begin() - register_ref.update(values) - this_session.add(register_ref) - this_session.commit() - return register_ref + if not values.get('storage_id'): + values['storage_id'] = uuidutils.generate_uuid() + + access_info_ref = models.AccessInfo() + access_info_ref.update(values) + + session = get_session() + with session.begin(): + session.add(access_info_ref) + + return _access_info_get(context, + access_info_ref['storage_id'], + session=session) def access_info_update(context, access_info_id, values): @@ -103,36 +179,60 @@ def access_info_update(context, access_info_id, values): def access_info_get(context, storage_id): """Get a storage access information.""" - this_session = get_session() - this_session.begin() - access_info = this_session.query(AccessInfo) \ - .filter(AccessInfo.storage_id == storage_id) \ - .first() - if not access_info: + return _access_info_get(context, storage_id) + + +def _access_info_get(context, storage_id, session=None): + result = (_access_info_get_query(context, session=session) + .filter_by(storage_id=storage_id) + .first()) + + if not result: raise exception.AccessInfoNotFound(storage_id=storage_id) - return access_info + + return result + + +def _access_info_get_query(context, session=None): + return model_query(context, models.AccessInfo, session=session) def access_info_get_all(context, marker=None, limit=None, sort_keys=None, sort_dirs=None, filters=None, offset=None): """Retrieves all storage access information.""" - this_session = get_session() - this_session.begin() - if filters.get('hostname', False): - access_info = this_session.query(AccessInfo.hostname).all() - else: - access_info = this_session.query(AccessInfo).all() - return access_info + session = get_session() + with session.begin(): + query = _generate_paginate_query(context, session, marker, limit, + sort_keys, sort_dirs, filters, offset, + paginate_type=models.AccessInfo) + if query is None: + return [] + return query.all() + + +@apply_like_filters(model=models.AccessInfo) +def _process_access_info_filters(query, filters): + """Common filter processing for AccessInfo queries.""" + if filters: + if not is_valid_model_filters(models.AccessInfo, filters): + return + query = query.filter_by(**filters) + + return query def storage_create(context, values): """Add a storage device from the values dictionary.""" + if not values.get('id'): + values['id'] = uuidutils.generate_uuid() + storage_ref = models.Storage() - this_session = get_session() - this_session.begin() storage_ref.update(values) - this_session.add(storage_ref) - this_session.commit() + + session = get_session() + with session.begin(): + session.add(storage_ref) + return storage_ref @@ -143,12 +243,22 @@ def storage_update(context, storage_id, values): def storage_get(context, storage_id): """Retrieve a storage device.""" - this_session = get_session() - this_session.begin() - storage_by_id = this_session.query(Storage) \ - .filter(Storage.id == storage_id) \ - .first() - return storage_by_id + return _storage_get_get(context, storage_id) + + +def _storage_get_get(context, storage_id, session=None): + result = (_storage_get_get_query(context, session=session) + .filter_by(id=storage_id) + .first()) + + if not result: + raise exception.StorageNotFound(id=storage_id) + + return result + + +def _storage_get_get_query(context, session=None): + return model_query(context, models.Storage, session=session) def storage_get_all(context, marker=None, limit=None, sort_keys=None, @@ -232,3 +342,145 @@ def disk_get_all(context, marker=None, limit=None, sort_keys=None, sort_dirs=None, filters=None, offset=None): """Retrieves all disks.""" return NotImplemented + + +def is_orm_value(obj): + """Check if object is an ORM field or expression.""" + return isinstance(obj, (sqlalchemy.orm.attributes.InstrumentedAttribute, + sqlalchemy.sql.expression.ColumnElement)) + + +def model_query(context, model, *args, **kwargs): + """Query helper for model query. + + :param context: context to query under + :param model: model to query. Must be a subclass of ModelBase. + :param session: if present, the session to use + """ + session = kwargs.pop('session') or get_session() + return db_utils.model_query( + model=model, session=session, args=args, **kwargs) + + +PAGINATION_HELPERS = { + models.AccessInfo: (_access_info_get_query, _process_access_info_filters, _access_info_get), +} + + +def process_sort_params(sort_keys, sort_dirs, default_keys=None, + default_dir='asc'): + """Process the sort parameters to include default keys. + + Creates a list of sort keys and a list of sort directions. Adds the default + keys to the end of the list if they are not already included. + + When adding the default keys to the sort keys list, the associated + direction is: + 1) The first element in the 'sort_dirs' list (if specified), else + 2) 'default_dir' value (Note that 'asc' is the default value since this is + the default in sqlalchemy.utils.paginate_query) + + :param sort_keys: List of sort keys to include in the processed list + :param sort_dirs: List of sort directions to include in the processed list + :param default_keys: List of sort keys that need to be included in the + processed list, they are added at the end of the list + if not already specified. + :param default_dir: Sort direction associated with each of the default + keys that are not supplied, used when they are added + to the processed list + :returns: list of sort keys, list of sort directions + :raise exception.InvalidInput: If more sort directions than sort keys + are specified or if an invalid sort + direction is specified + """ + if default_keys is None: + default_keys = ['created_at', 'id'] + + # Determine direction to use for when adding default keys + if sort_dirs and len(sort_dirs): + default_dir_value = sort_dirs[0] + else: + default_dir_value = default_dir + + # Create list of keys (do not modify the input list) + if sort_keys: + result_keys = list(sort_keys) + else: + result_keys = [] + + # If a list of directions is not provided, use the default sort direction + # for all provided keys. + if sort_dirs: + result_dirs = [] + # Verify sort direction + for sort_dir in sort_dirs: + if sort_dir not in ('asc', 'desc'): + msg = _("Unknown sort direction, must be 'desc' or 'asc'.") + raise exception.InvalidInput(reason=msg) + result_dirs.append(sort_dir) + else: + result_dirs = [default_dir_value for _sort_key in result_keys] + + # Ensure that the key and direction length match + while len(result_dirs) < len(result_keys): + result_dirs.append(default_dir_value) + # Unless more direction are specified, which is an error + if len(result_dirs) > len(result_keys): + msg = _("Sort direction array size exceeds sort key array size.") + raise exception.InvalidInput(reason=msg) + + # Ensure defaults are included + for key in default_keys: + if key not in result_keys: + result_keys.append(key) + result_dirs.append(default_dir_value) + + return result_keys, result_dirs + + +def _generate_paginate_query(context, session, marker, limit, sort_keys, + sort_dirs, filters, offset=None, + paginate_type=models.Volume): + """Generate the query to include the filters and the paginate options. + + Returns a query with sorting / pagination criteria added or None + if the given filters will not yield any results. + + :param context: context to query under + :param session: the session to use + :param marker: the last item of the previous page; we returns the next + results after this value. + :param limit: maximum number of items to return + :param sort_keys: list of attributes by which results should be sorted, + paired with corresponding item in sort_dirs + :param sort_dirs: list of directions in which results should be sorted, + paired with corresponding item in sort_keys + :param filters: dictionary of filters; values that are in lists, tuples, + or sets cause an 'IN' operation, while exact matching + is used for other values, see _process_volume_filters + function for more information + :param offset: number of items to skip + :param paginate_type: type of pagination to generate + :returns: updated query or None + """ + get_query, process_filters, get = PAGINATION_HELPERS[paginate_type] + + sort_keys, sort_dirs = process_sort_params(sort_keys, + sort_dirs, + default_dir='desc') + query = get_query(context, session=session) + + if filters: + query = process_filters(query, filters) + if query is None: + return None + + marker_object = None + if marker is not None: + marker_object = get(context, marker, session) + + return sqlalchemyutils.paginate_query(query, paginate_type, limit, + sort_keys, + marker=marker_object, + sort_dirs=sort_dirs, + offset=offset) diff --git a/dolphin/db/sqlalchemy/models.py b/dolphin/db/sqlalchemy/models.py index a64618300..58608d12e 100755 --- a/dolphin/db/sqlalchemy/models.py +++ b/dolphin/db/sqlalchemy/models.py @@ -47,10 +47,11 @@ def to_dict(self): class AccessInfo(BASE, DolphinBase): """Represent access info required for storage accessing.""" __tablename__ = "access_info" - storage_id = Column(String(128), primary_key=True) - hostname = Column(String(128)) - username = Column(String(128)) - password = Column(String(128)) + storage_id = Column(String(36), primary_key=True) + host = Column(String(255)) + port = Column(String(255)) + username = Column(String(255)) + password = Column(String(255)) extra_attributes = Column(JsonEncodedDict) diff --git a/dolphin/drivers/driver.py b/dolphin/drivers/driver.py index 0fac06077..2db829bb2 100644 --- a/dolphin/drivers/driver.py +++ b/dolphin/drivers/driver.py @@ -19,7 +19,7 @@ @six.add_metaclass(abc.ABCMeta) class StorageDriver(object): - def __init__(self, storage_id): + def __init__(self, storage_id=None): self.storage_id = storage_id @staticmethod diff --git a/dolphin/drivers/fake_storage/__init__.py b/dolphin/drivers/fake_storage/__init__.py index 96a50258f..a9f6e401d 100644 --- a/dolphin/drivers/fake_storage/__init__.py +++ b/dolphin/drivers/fake_storage/__init__.py @@ -20,7 +20,7 @@ class FakeStorageDriver(driver.StorageDriver): it also plays a role as faker to fake data for being tested by clients. """ - def __init__(self, storage_id): + def __init__(self, storage_id=None): super().__init__(storage_id) @staticmethod @@ -28,7 +28,20 @@ def get_storage_registry(): pass def register_storage(self, context, register_info): - pass + # Do something here + return { + 'name': 'fake_driver', + 'description': 'it is a fake driver.', + 'vendor': register_info['vendor'], + 'model': register_info['model'], + 'status': 'normal', + 'serial_number': '2102453JPN12KA000011', + 'firmware_version': '1.0.0', + 'location': 'HK', + 'total_capacity': 1024 * 1024, + 'used_capacity': 3126, + 'free_capacity': 1045449, + } def get_storage(self, context): pass diff --git a/dolphin/drivers/manager.py b/dolphin/drivers/manager.py index 6e3f25cf4..9182152fc 100644 --- a/dolphin/drivers/manager.py +++ b/dolphin/drivers/manager.py @@ -12,18 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy +import six +import stevedore + from oslo_log import log +from oslo_utils import uuidutils from dolphin import utils +from dolphin import coordination +from dolphin import db +# from dolphin import cryptor +from dolphin import exception +from dolphin.i18n import _ LOG = log.getLogger(__name__) -DRIVER_MAPPING = { - "fake_storage": "dolphin.drivers.fake_storage.FakeStorageDriver" -} - class DriverManager(metaclass=utils.Singleton): + NAMESPACE = 'dolphin.storage.drivers' def __init__(self): # The driver_factory will keep the driver instance for @@ -36,9 +43,45 @@ def get_storage_registry(): """Show register parameters which the driver needs.""" pass + @coordination.synchronized('driver-{register_info[vendor]}-' + '{register_info[model]}') def register_storage(self, context, register_info): - """Discovery a storage system with register parameters.""" - pass + """Discovery a storage system with access information.""" + # Check same access info from DB + access_info = copy.deepcopy(register_info) + vendor, model = access_info.pop('vendor'), access_info.pop('model') + db_access_info = db.access_info_get_all(context, sort_keys=['host'], + filters=access_info) + if db_access_info: + msg = _("Storage device has been registered.") + raise exception.Conflict(msg) + + # Load and initialize a driver + # todo: add exception handler + driver = stevedore.driver.DriverManager( + namespace=self.NAMESPACE, + name='%s %s' % (vendor, model), + invoke_on_load=True + ).driver + + storage = driver.register_storage(context, + register_info) + if storage: + storage_id = six.text_type(uuidutils.generate_uuid()) + access_info['storage_id'] = storage_id + # todo + # access_info['password'] = cryptor.encode( + # access_info['password']) + db.access_info_create(context, access_info) + + storage['id'] = storage_id + storage = db.storage_create(context, storage) + + driver.storage_id = storage_id + self.driver_factory[storage_id] = driver + + LOG.info("Storage was registered successfully.") + return storage def remove_storage(self, context, storage_id): """Clear driver instance from driver factory.""" diff --git a/dolphin/exception.py b/dolphin/exception.py index 8e82d3c15..ed89318b0 100644 --- a/dolphin/exception.py +++ b/dolphin/exception.py @@ -243,6 +243,10 @@ class AccessInfoNotFound(NotFound): message = _("Storage access info %(storage_id)s could not be found.") +class StorageNotFound(NotFound): + message = _("Storage %(id)s could not be found.") + + class ServiceIsDown(Invalid): message = _("Service %(service)s is down.") diff --git a/setup.py b/setup.py index a63130d60..c4e9fdf36 100644 --- a/setup.py +++ b/setup.py @@ -29,5 +29,8 @@ 'example1 = dolphin.exporter.example_exporter:Example1Exporter', 'example2 = dolphin.exporter.example_exporter:Example2Exporter' ], + 'dolphin.storage.drivers': [ + 'fake_storage fake_driver = dolphin.drivers.fake_storage:FakeStorageDriver' + ] }, ) From 7cbac5e83b687c5480e211cffa8e4a273b98adfd Mon Sep 17 00:00:00 2001 From: wisererik Date: Tue, 5 May 2020 17:57:51 +0800 Subject: [PATCH 2/2] Address some comments and add exception handle --- dolphin/api/schemas/storages.py | 2 +- dolphin/api/v1/storages.py | 56 ++++++++++++++++---- dolphin/common/sqlalchemyutils.py | 5 -- dolphin/db/api.py | 2 +- dolphin/db/sqlalchemy/api.py | 17 +++--- dolphin/db/sqlalchemy/models.py | 66 ++++++++++++------------ dolphin/drivers/driver.py | 2 +- dolphin/drivers/fake_storage/__init__.py | 6 +-- dolphin/drivers/manager.py | 56 +++++++------------- dolphin/exception.py | 8 +++ 10 files changed, 119 insertions(+), 101 deletions(-) diff --git a/dolphin/api/schemas/storages.py b/dolphin/api/schemas/storages.py index 96507aa75..49e3841b9 100644 --- a/dolphin/api/schemas/storages.py +++ b/dolphin/api/schemas/storages.py @@ -21,7 +21,7 @@ 'host': parameter_types.hostname_or_ip_address, 'port': parameter_types.tcp_udp_port, 'username': {'type': 'string', 'minLength': 1, 'maxLength': 255}, - 'password': {'type': 'string'}, + 'password': {'type': 'string', 'minLength': 1, 'maxLength': 255}, 'vendor': {'type': 'string', 'minLength': 1, 'maxLength': 255}, 'model': {'type': 'string', 'minLength': 1, 'maxLength': 255}, 'extra_attributes': { diff --git a/dolphin/api/v1/storages.py b/dolphin/api/v1/storages.py index 22ed17cf7..a82ca1fbf 100644 --- a/dolphin/api/v1/storages.py +++ b/dolphin/api/v1/storages.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import six from six.moves import http_client import webob @@ -24,6 +25,8 @@ from dolphin.api import validation from dolphin.api.views import storages as storage_view from dolphin import context +from dolphin import coordination +from dolphin import cryptor from dolphin import db from dolphin.drivers import manager as drivermanager from dolphin import exception @@ -54,6 +57,7 @@ class StorageController(wsgi.Controller): def __init__(self): super().__init__() self.task_rpcapi = task_rpcapi.TaskAPI() + self.driver_manager = drivermanager.DriverManager() def index(self, req): @@ -87,21 +91,36 @@ def index(self, req): def show(self, req, id): return dict(name="Storage 2") + @wsgi.response(201) @validation.schema(schema_storages.create) + @coordination.synchronized('storage-create-{body[host]}-{body[port]}') def create(self, req, body): """Register a new storage device.""" - # ctxt = req.environ['dolphin.context'] - ctxt = context.get_admin_context() - driver = drivermanager.DriverManager() + ctxt = req.environ['dolphin.context'] + access_info_dict = body + + if self._is_registered(ctxt, access_info_dict): + msg = _("Storage has been registered.") + raise exc.HTTPBadRequest(explanation=msg) + try: - storage = driver.register_storage(ctxt, body) - except exception.DolphinException as e: - LOG.error(e) - raise e - # except Exception as e: - # msg = _('Failed to register device in driver :{0}'.format(e)) - # LOG.error(e) - # raise exception.DolphinException(msg) + storage = self.driver_manager.register_storage(ctxt, access_info_dict) + storage = db.storage_create(context, storage) + + # Need to encode the password before saving. + access_info_dict['storage_id'] = storage['id'] + access_info_dict['password'] = cryptor.encode(access_info_dict['password']) + db.access_info_create(context, access_info_dict) + except (exception.InvalidCredential, + exception.StorageDriverNotFound, + exception.AccessInfoNotFound, + exception.StorageNotFound) as e: + raise exc.HTTPBadRequest(explanation=e.message) + except Exception as e: + msg = _('Failed to register storage: {0}'.format(e)) + LOG.error(msg) + raise exc.HTTPBadRequest(explanation=msg) + return storage_view.build_storage(storage) def update(self, req, id, body): @@ -137,6 +156,21 @@ def sync(self, req, id): return dict(name="Sync storage 1") + def _is_registered(self, context, access_info): + access_info_dict = copy.deepcopy(access_info) + + # Remove unrelated query fields + access_info_dict.pop('username', None) + access_info_dict.pop('password', None) + access_info_dict.pop('vendor', None) + access_info_dict.pop('model', None) + + # Check if storage is registered + if db.access_info_get_all(context, + filters=access_info_dict): + return True + return False + def create_resource(): return wsgi.Resource(StorageController()) diff --git a/dolphin/common/sqlalchemyutils.py b/dolphin/common/sqlalchemyutils.py index c0415336f..d8f5e23ce 100644 --- a/dolphin/common/sqlalchemyutils.py +++ b/dolphin/common/sqlalchemyutils.py @@ -93,11 +93,6 @@ def paginate_query(query, model, limit, sort_keys, marker=None, :return: The query with sorting/pagination added. """ - if 'id' not in sort_keys: - # TODO(justinsb): If this ever gives a false-positive, check - # the actual primary key, rather than assuming its id - LOG.warning('Id not in sort_keys; is sort_keys unique?') - if sort_dir and sort_dirs: raise AssertionError('Both sort_dir and sort_dirs specified.') diff --git a/dolphin/db/api.py b/dolphin/db/api.py index d0f84109a..8798f7f51 100755 --- a/dolphin/db/api.py +++ b/dolphin/db/api.py @@ -266,4 +266,4 @@ def access_info_get_all(context, marker=None, limit=None, sort_keys=None, def is_orm_value(obj): """Check if object is an ORM field.""" - return IMPL.is_orm_value(obj) \ No newline at end of file + return IMPL.is_orm_value(obj) diff --git a/dolphin/db/sqlalchemy/api.py b/dolphin/db/sqlalchemy/api.py index 4dad4c1a1..3ac0fa33c 100755 --- a/dolphin/db/sqlalchemy/api.py +++ b/dolphin/db/sqlalchemy/api.py @@ -18,12 +18,11 @@ """Implementation of SQLAlchemy backend.""" -from functools import wraps import six import sys import sqlalchemy -from sqlalchemy import create_engine, update +from sqlalchemy import create_engine from oslo_config import cfg from oslo_db import options as db_options @@ -233,7 +232,9 @@ def storage_create(context, values): with session.begin(): session.add(storage_ref) - return storage_ref + return _storage_get(context, + storage_ref['id'], + session=session) def storage_update(context, storage_id, values): @@ -243,11 +244,11 @@ def storage_update(context, storage_id, values): def storage_get(context, storage_id): """Retrieve a storage device.""" - return _storage_get_get(context, storage_id) + return _storage_get(context, storage_id) -def _storage_get_get(context, storage_id, session=None): - result = (_storage_get_get_query(context, session=session) +def _storage_get(context, storage_id, session=None): + result = (_storage_get_query(context, session=session) .filter_by(id=storage_id) .first()) @@ -257,7 +258,7 @@ def _storage_get_get(context, storage_id, session=None): return result -def _storage_get_get_query(context, session=None): +def _storage_get_query(context, session=None): return model_query(context, models.Storage, session=session) @@ -394,7 +395,7 @@ def process_sort_params(sort_keys, sort_dirs, default_keys=None, direction is specified """ if default_keys is None: - default_keys = ['created_at', 'id'] + default_keys = ['created_at'] # Determine direction to use for when adding default keys if sort_dirs and len(sort_dirs): diff --git a/dolphin/db/sqlalchemy/models.py b/dolphin/db/sqlalchemy/models.py index 58608d12e..d80477e58 100755 --- a/dolphin/db/sqlalchemy/models.py +++ b/dolphin/db/sqlalchemy/models.py @@ -59,15 +59,15 @@ class Storage(BASE, DolphinBase): """Represents a storage object.""" __tablename__ = 'storages' - id = Column(String(128), primary_key=True) - name = Column(String(128)) - vendor = Column(String(128)) - description = Column(String(256)) - model = Column(String(128)) - status = Column(String(128)) - serial_number = Column(String(128)) - firmware_version = Column(String(128)) - location = Column(String(128)) + id = Column(String(36), primary_key=True) + name = Column(String(255)) + vendor = Column(String(255)) + description = Column(String(255)) + model = Column(String(255)) + status = Column(String(255)) + serial_number = Column(String(255)) + firmware_version = Column(String(255)) + location = Column(String(255)) total_capacity = Column(Numeric) used_capacity = Column(Numeric) free_capacity = Column(Numeric) @@ -76,15 +76,15 @@ class Storage(BASE, DolphinBase): class Volume(BASE, DolphinBase): """Represents a volume object.""" __tablename__ = 'volumes' - id = Column(String(128), primary_key=True) - name = Column(String(128)) - storage_id = Column(String(128)) - original_pool_id = Column(String(128)) - description = Column(String(128)) - status = Column(String(128)) - original_id = Column(String(128)) - wwn = Column(String(128)) - provisioning_policy = Column(String(128)) + id = Column(String(36), primary_key=True) + name = Column(String(255)) + storage_id = Column(String(36)) + original_pool_id = Column(String(255)) + description = Column(String(255)) + status = Column(String(255)) + original_id = Column(String(255)) + wwn = Column(String(255)) + provisioning_policy = Column(String(255)) total_capacity = Column(Numeric) used_capacity = Column(Numeric) free_capacity = Column(Numeric) @@ -95,13 +95,13 @@ class Volume(BASE, DolphinBase): class Pool(BASE, DolphinBase): """Represents a pool object.""" __tablename__ = 'pools' - id = Column(String(128), primary_key=True) - name = Column(String(128)) - storage_id = Column(String(128)) - original_id = Column(String(128)) - description = Column(String(256)) - status = Column(String(128)) - storage_type = Column(String(128)) + id = Column(String(36), primary_key=True) + name = Column(String(255)) + storage_id = Column(String(36)) + original_id = Column(String(255)) + description = Column(String(255)) + status = Column(String(255)) + storage_type = Column(String(255)) total_capacity = Column(Numeric) used_capacity = Column(Numeric) free_capacity = Column(Numeric) @@ -110,12 +110,12 @@ class Pool(BASE, DolphinBase): class Disk(BASE, DolphinBase): """Represents a disk object.""" __tablename__ = 'disks' - id = Column(String(128), primary_key=True) - name = Column(String(128)) - status = Column(String(128)) - vendor = Column(String(128)) - original_id = Column(String(128)) - serial_number = Column(String(128)) - model = Column(String(128)) - media_type = Column(String(128)) + id = Column(String(36), primary_key=True) + name = Column(String(255)) + status = Column(String(255)) + vendor = Column(String(255)) + original_id = Column(String(255)) + serial_number = Column(String(255)) + model = Column(String(255)) + media_type = Column(String(255)) capacity = Column(Numeric) diff --git a/dolphin/drivers/driver.py b/dolphin/drivers/driver.py index 2db829bb2..6e7391893 100644 --- a/dolphin/drivers/driver.py +++ b/dolphin/drivers/driver.py @@ -28,7 +28,7 @@ def get_storage_registry(): pass @abc.abstractmethod - def register_storage(self, context, register_info): + def register_storage(self, context, access_info): """Discovery a storage system with register parameters.""" pass diff --git a/dolphin/drivers/fake_storage/__init__.py b/dolphin/drivers/fake_storage/__init__.py index a9f6e401d..12493b5e2 100644 --- a/dolphin/drivers/fake_storage/__init__.py +++ b/dolphin/drivers/fake_storage/__init__.py @@ -27,13 +27,13 @@ def __init__(self, storage_id=None): def get_storage_registry(): pass - def register_storage(self, context, register_info): + def register_storage(self, context, access_info): # Do something here return { 'name': 'fake_driver', 'description': 'it is a fake driver.', - 'vendor': register_info['vendor'], - 'model': register_info['model'], + 'vendor': access_info['vendor'], + 'model': access_info['model'], 'status': 'normal', 'serial_number': '2102453JPN12KA000011', 'firmware_version': '1.0.0', diff --git a/dolphin/drivers/manager.py b/dolphin/drivers/manager.py index 9182152fc..14200a4dd 100644 --- a/dolphin/drivers/manager.py +++ b/dolphin/drivers/manager.py @@ -12,19 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy import six import stevedore from oslo_log import log from oslo_utils import uuidutils -from dolphin import utils -from dolphin import coordination -from dolphin import db -# from dolphin import cryptor from dolphin import exception from dolphin.i18n import _ +from dolphin import utils LOG = log.getLogger(__name__) @@ -43,44 +39,28 @@ def get_storage_registry(): """Show register parameters which the driver needs.""" pass - @coordination.synchronized('driver-{register_info[vendor]}-' - '{register_info[model]}') - def register_storage(self, context, register_info): + def register_storage(self, context, access_info): """Discovery a storage system with access information.""" - # Check same access info from DB - access_info = copy.deepcopy(register_info) - vendor, model = access_info.pop('vendor'), access_info.pop('model') - db_access_info = db.access_info_get_all(context, sort_keys=['host'], - filters=access_info) - if db_access_info: - msg = _("Storage device has been registered.") - raise exception.Conflict(msg) - - # Load and initialize a driver - # todo: add exception handler - driver = stevedore.driver.DriverManager( - namespace=self.NAMESPACE, - name='%s %s' % (vendor, model), - invoke_on_load=True - ).driver + try: + driver = stevedore.driver.DriverManager( + namespace=self.NAMESPACE, + name='%s %s' % (access_info['vendor'], + access_info['model']), + invoke_on_load=True + ).driver + except Exception as e: + msg = (_("Storage driver '%s %s' could not be found.") % (access_info['vendor'], + access_info['model'])) + LOG.error(msg) + raise exception.StorageDriverNotFound(message=msg) storage = driver.register_storage(context, - register_info) + access_info) if storage: - storage_id = six.text_type(uuidutils.generate_uuid()) - access_info['storage_id'] = storage_id - # todo - # access_info['password'] = cryptor.encode( - # access_info['password']) - db.access_info_create(context, access_info) - - storage['id'] = storage_id - storage = db.storage_create(context, storage) - - driver.storage_id = storage_id - self.driver_factory[storage_id] = driver + storage['id'] = six.text_type(uuidutils.generate_uuid()) + self.driver_factory[storage['id']] = driver - LOG.info("Storage was registered successfully.") + LOG.info("Storage was found successfully.") return storage def remove_storage(self, context, storage_id): diff --git a/dolphin/exception.py b/dolphin/exception.py index ed89318b0..bc5ea1bae 100644 --- a/dolphin/exception.py +++ b/dolphin/exception.py @@ -147,6 +147,10 @@ class Invalid(DolphinException): code = 400 +class InvalidCredential(Invalid): + message = _("The credentials are invalid.") + + class InvalidRequest(Invalid): message = _("The request is invalid.") @@ -247,6 +251,10 @@ class StorageNotFound(NotFound): message = _("Storage %(id)s could not be found.") +class StorageDriverNotFound(NotFound): + message = _("Storage driver could not be found.") + + class ServiceIsDown(Invalid): message = _("Service %(service)s is down.")