Permalink
c9eb953 Jan 21, 2017
7037 lines (5559 sloc) 246 KB
<
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of SQLAlchemy backend."""
import collections
import copy
import datetime
import functools
import inspect
import sys
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from six.moves import range
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import joinedload_all
from sqlalchemy.orm import noload
from sqlalchemy.orm import undefer
from sqlalchemy.schema import Table
from sqlalchemy import sql
from sqlalchemy.sql.expression import asc
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import false
from sqlalchemy.sql import func
from sqlalchemy.sql import null
from sqlalchemy.sql import true
from nova import block_device
from nova.compute import task_states
from nova.compute import vm_states
import nova.conf
import nova.context
from nova.db.sqlalchemy import models
from nova import exception
from nova.i18n import _, _LI, _LE, _LW
from nova import quota
from nova import safe_utils
profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
main_context_manager = enginefacade.transaction_context()
api_context_manager = enginefacade.transaction_context()
def _get_db_conf(conf_group, connection=None):
kw = dict(
connection=connection or conf_group.connection,
slave_connection=conf_group.slave_connection,
sqlite_fk=False,
__autocommit=True,
expire_on_commit=False,
mysql_sql_mode=conf_group.mysql_sql_mode,
idle_timeout=conf_group.idle_timeout,
connection_debug=conf_group.connection_debug,
max_pool_size=conf_group.max_pool_size,
max_overflow=conf_group.max_overflow,
pool_timeout=conf_group.pool_timeout,
sqlite_synchronous=conf_group.sqlite_synchronous,
connection_trace=conf_group.connection_trace,
max_retries=conf_group.max_retries,
retry_interval=conf_group.retry_interval)
return kw
def _context_manager_from_context(context):
if context:
try:
return context.db_connection
except AttributeError:
pass
def configure(conf):
main_context_manager.configure(**_get_db_conf(conf.database))
api_context_manager.configure(**_get_db_conf(conf.api_database))
if profiler_sqlalchemy and CONF.profiler.enabled \
and CONF.profiler.trace_sqlalchemy:
main_context_manager.append_on_engine_create(
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
api_context_manager.append_on_engine_create(
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
def create_context_manager(connection=None):
"""Create a database context manager object.
: param connection: The database connection string
"""
ctxt_mgr = enginefacade.transaction_context()
ctxt_mgr.configure(**_get_db_conf(CONF.database, connection=connection))
return ctxt_mgr
def get_context_manager(context):
"""Get a database context manager object.
:param context: The request context that can contain a context manager
"""
return _context_manager_from_context(context) or main_context_manager
def get_engine(use_slave=False, context=None):
"""Get a database engine object.
:param use_slave: Whether to use the slave connection
:param context: The request context that can contain a context manager
"""
ctxt_mgr = get_context_manager(context)
return ctxt_mgr.get_legacy_facade().get_engine(use_slave=use_slave)
def get_api_engine():
return api_context_manager.get_legacy_facade().get_engine()
_SHADOW_TABLE_PREFIX = 'shadow_'
_DEFAULT_QUOTA_NAME = 'default'
PER_PROJECT_QUOTAS = ['fixed_ips', 'floating_ips', 'networks']
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def require_context(f):
"""Decorator to require *any* user or admin context.
This does no authorization for user or project access matching, see
:py:func:`nova.context.authorize_project_context` and
:py:func:`nova.context.authorize_user_context`.
The first argument to the wrapped function must be the context.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
nova.context.require_context(args[0])
return f(*args, **kwargs)
return wrapper
def require_instance_exists_using_uuid(f):
"""Decorator to require the specified instance to exist.
Requires the wrapped function to use context and instance_uuid as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, instance_uuid, *args, **kwargs):
instance_get_by_uuid(context, instance_uuid)
return f(context, instance_uuid, *args, **kwargs)
return wrapper
def require_aggregate_exists(f):
"""Decorator to require the specified aggregate to exist.
Requires the wrapped function to use context and aggregate_id as
their first two arguments.
"""
@functools.wraps(f)
def wrapper(context, aggregate_id, *args, **kwargs):
aggregate_get(context, aggregate_id)
return f(context, aggregate_id, *args, **kwargs)
return wrapper
def select_db_reader_mode(f):
"""Decorator to select synchronous or asynchronous reader mode.
The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
will be used if 'use_slave' is True and synchronous reader otherwise.
If 'use_slave' is not specified default value 'False' will be used.
Wrapped function must have a context in the arguments.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
wrapped_func = safe_utils.get_wrapped_function(f)
keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs)
context = keyed_args['context']
use_slave = keyed_args.get('use_slave', False)
if use_slave:
reader_mode = get_context_manager(context).async
else:
reader_mode = get_context_manager(context).reader
with reader_mode.using(context):
return f(*args, **kwargs)
return wrapper
def pick_context_manager_writer(f):
"""Decorator to use a writer db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.writer.using(context):
return f(context, *args, **kwargs)
return wrapped
def pick_context_manager_reader(f):
"""Decorator to use a reader db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.reader.using(context):
return f(context, *args, **kwargs)
return wrapped
def pick_context_manager_reader_allow_async(f):
"""Decorator to use a reader.allow_async db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.reader.allow_async.using(context):
return f(context, *args, **kwargs)
return wrapped
def model_query(context, model,
args=None,
read_deleted=None,
project_only=False):
"""Query helper that accounts for context's `read_deleted` field.
:param context: NovaContext of the query.
:param model: Model to query. Must be a subclass of ModelBase.
:param args: Arguments to query. If None - model is used.
:param read_deleted: If not None, overrides context's read_deleted field.
Permitted values are 'no', which does not return
deleted values; 'only', which only returns deleted
values; and 'yes', which does not filter deleted
values.
:param project_only: If set and context is user-type, then restrict
query to match the context's project_id. If set to
'allow_none', restriction includes project_id = None.
"""
if read_deleted is None:
read_deleted = context.read_deleted
query_kwargs = {}
if 'no' == read_deleted:
query_kwargs['deleted'] = False
elif 'only' == read_deleted:
query_kwargs['deleted'] = True
elif 'yes' == read_deleted:
pass
else:
raise ValueError(_("Unrecognized read_deleted value '%s'")
% read_deleted)
query = sqlalchemyutils.model_query(
model, context.session, args, **query_kwargs)
# We can't use oslo.db model_query's project_id here, as it doesn't allow
# us to return both our projects and unowned projects.
if nova.context.is_user_context(context) and project_only:
if project_only == 'allow_none':
query = query.\
filter(or_(model.project_id == context.project_id,
model.project_id == null()))
else:
query = query.filter_by(project_id=context.project_id)
return query
def convert_objects_related_datetimes(values, *datetime_keys):
if not datetime_keys:
datetime_keys = ('created_at', 'deleted_at', 'updated_at')
for key in datetime_keys:
if key in values and values[key]:
if isinstance(values[key], six.string_types):
try:
values[key] = timeutils.parse_strtime(values[key])
except ValueError:
# Try alternate parsing since parse_strtime will fail
# with say converting '2015-05-28T19:59:38+00:00'
values[key] = timeutils.parse_isotime(values[key])
# NOTE(danms): Strip UTC timezones from datetimes, since they're
# stored that way in the database
values[key] = values[key].replace(tzinfo=None)
return values
def _sync_instances(context, project_id, user_id):
return dict(zip(('instances', 'cores', 'ram'),
_instance_data_get_for_user(context, project_id, user_id)))
def _sync_floating_ips(context, project_id, user_id):
return dict(floating_ips=_floating_ip_count_by_project(
context, project_id))
def _sync_fixed_ips(context, project_id, user_id):
return dict(fixed_ips=_fixed_ip_count_by_project(context, project_id))
def _sync_security_groups(context, project_id, user_id):
return dict(security_groups=_security_group_count_by_project_and_user(
context, project_id, user_id))
def _sync_server_groups(context, project_id, user_id):
return dict(server_groups=_instance_group_count_by_project_and_user(
context, project_id, user_id))
QUOTA_SYNC_FUNCTIONS = {
'_sync_instances': _sync_instances,
'_sync_floating_ips': _sync_floating_ips,
'_sync_fixed_ips': _sync_fixed_ips,
'_sync_security_groups': _sync_security_groups,
'_sync_server_groups': _sync_server_groups,
}
###################
def constraint(**conditions):
return Constraint(conditions)
def equal_any(*values):
return EqualityCondition(values)
def not_equal(*values):
return InequalityCondition(values)
class Constraint(object):
def __init__(self, conditions):
self.conditions = conditions
def apply(self, model, query):
for key, condition in self.conditions.items():
for clause in condition.clauses(getattr(model, key)):
query = query.filter(clause)
return query
class EqualityCondition(object):
def __init__(self, values):
self.values = values
def clauses(self, field):
# method signature requires us to return an iterable even if for OR
# operator this will actually be a single clause
return [or_(*[field == value for value in self.values])]
class InequalityCondition(object):
def __init__(self, values):
self.values = values
def clauses(self, field):
return [field != value for value in self.values]
class DeleteFromSelect(UpdateBase):
def __init__(self, table, select, column):
self.table = table
self.select = select
self.column = column
# NOTE(guochbo): some versions of MySQL doesn't yet support subquery with
# 'LIMIT & IN/ALL/ANY/SOME' We need work around this with nesting select .
@compiles(DeleteFromSelect)
def visit_delete_from_select(element, compiler, **kw):
return "DELETE FROM %s WHERE %s in (SELECT T1.%s FROM (%s) as T1)" % (
compiler.process(element.table, asfrom=True),
compiler.process(element.column),
element.column.name,
compiler.process(element.select))
###################
@pick_context_manager_writer
def service_destroy(context, service_id):
service = service_get(context, service_id)
model_query(context, models.Service).\
filter_by(id=service_id).\
soft_delete(synchronize_session=False)
# TODO(sbauza): Remove the service_id filter in a later release
# once we are sure that all compute nodes report the host field
model_query(context, models.ComputeNode).\
filter(or_(models.ComputeNode.service_id == service_id,
models.ComputeNode.host == service['host'])).\
soft_delete(synchronize_session=False)
@pick_context_manager_reader
def service_get(context, service_id):
query = model_query(context, models.Service).filter_by(id=service_id)
result = query.first()
if not result:
raise exception.ServiceNotFound(service_id=service_id)
return result
@pick_context_manager_reader_allow_async
def service_get_minimum_version(context, binaries):
min_versions = context.session.query(
models.Service.binary,
func.min(models.Service.version)).\
filter(models.Service.binary.in_(binaries)).\
filter(models.Service.forced_down == false()).\
group_by(models.Service.binary)
return dict(min_versions)
@pick_context_manager_reader
def service_get_all(context, disabled=None):
query = model_query(context, models.Service)
if disabled is not None:
query = query.filter_by(disabled=disabled)
return query.all()
@pick_context_manager_reader
def service_get_all_by_topic(context, topic):
return model_query(context, models.Service, read_deleted="no").\
filter_by(disabled=False).\
filter_by(topic=topic).\
all()
@pick_context_manager_reader
def service_get_by_host_and_topic(context, host, topic):
return model_query(context, models.Service, read_deleted="no").\
filter_by(disabled=False).\
filter_by(host=host).\
filter_by(topic=topic).\
first()
@pick_context_manager_reader
def service_get_all_by_binary(context, binary, include_disabled=False):
query = model_query(context, models.Service, read_deleted="no").\
filter_by(binary=binary)
if not include_disabled:
query = query.filter_by(disabled=False)
return query.all()
@pick_context_manager_reader
def service_get_all_computes_by_hv_type(context, hv_type,
include_disabled=False):
query = model_query(context, models.Service, read_deleted="no").\
filter_by(binary='nova-compute')
if not include_disabled:
query = query.filter_by(disabled=False)
query = query.join(models.ComputeNode,
models.Service.host == models.ComputeNode.host).\
filter(models.ComputeNode.hypervisor_type == hv_type).\
distinct('host')
return query.all()
@pick_context_manager_reader
def service_get_by_host_and_binary(context, host, binary):
result = model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
filter_by(binary=binary).\
first()
if not result:
raise exception.HostBinaryNotFound(host=host, binary=binary)
return result
@pick_context_manager_reader
def service_get_all_by_host(context, host):
return model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
all()
@pick_context_manager_reader_allow_async
def service_get_by_compute_host(context, host):
result = model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
filter_by(binary='nova-compute').\
first()
if not result:
raise exception.ComputeHostNotFound(host=host)
return result
@pick_context_manager_writer
def service_create(context, values):
service_ref = models.Service()
service_ref.update(values)
if not CONF.enable_new_services:
msg = _("New service disabled due to config option.")
service_ref.disabled = True
service_ref.disabled_reason = msg
try:
service_ref.save(context.session)
except db_exc.DBDuplicateEntry as e:
if 'binary' in e.columns:
raise exception.ServiceBinaryExists(host=values.get('host'),
binary=values.get('binary'))
raise exception.ServiceTopicExists(host=values.get('host'),
topic=values.get('topic'))
return service_ref
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def service_update(context, service_id, values):
service_ref = service_get(context, service_id)
# Only servicegroup.drivers.db.DbDriver._report_state() updates
# 'report_count', so if that value changes then store the timestamp
# as the last time we got a state report.
if 'report_count' in values:
if values['report_count'] > service_ref.report_count:
service_ref.last_seen_up = timeutils.utcnow()
service_ref.update(values)
return service_ref
###################
def _compute_node_select(context, filters=None, limit=None, marker=None):
if filters is None:
filters = {}
cn_tbl = sa.alias(models.ComputeNode.__table__, name='cn')
select = sa.select([cn_tbl])
if context.read_deleted == "no":
select = select.where(cn_tbl.c.deleted == 0)
if "compute_id" in filters:
select = select.where(cn_tbl.c.id == filters["compute_id"])
if "service_id" in filters:
select = select.where(cn_tbl.c.service_id == filters["service_id"])
if "host" in filters:
select = select.where(cn_tbl.c.host == filters["host"])
if "hypervisor_hostname" in filters:
hyp_hostname = filters["hypervisor_hostname"]
select = select.where(cn_tbl.c.hypervisor_hostname == hyp_hostname)
if marker is not None:
try:
compute_node_get(context, marker)
except exception.ComputeHostNotFound:
raise exception.MarkerNotFound(marker=marker)
select = select.where(cn_tbl.c.id > marker)
if limit is not None:
select = select.limit(limit)
# Explicitly order by id, so we're not dependent on the native sort
# order of the underlying DB.
select = select.order_by(asc("id"))
return select
def _compute_node_fetchall(context, filters=None, limit=None, marker=None):
select = _compute_node_select(context, filters, limit=limit, marker=marker)
engine = get_engine(context=context)
conn = engine.connect()
results = conn.execute(select).fetchall()
# Callers expect dict-like objects, not SQLAlchemy RowProxy objects...
results = [dict(r) for r in results]
conn.close()
return results
@pick_context_manager_reader
def compute_node_get(context, compute_id):
results = _compute_node_fetchall(context, {"compute_id": compute_id})
if not results:
raise exception.ComputeHostNotFound(host=compute_id)
return results[0]
@pick_context_manager_reader
def compute_node_get_model(context, compute_id):
# TODO(edleafe): remove once the compute node resource provider migration
# is complete, and this distinction is no longer necessary.
result = model_query(context, models.ComputeNode).\
filter_by(id=compute_id).\
first()
if not result:
raise exception.ComputeHostNotFound(host=compute_id)
return result
@pick_context_manager_reader
def compute_nodes_get_by_service_id(context, service_id):
results = _compute_node_fetchall(context, {"service_id": service_id})
if not results:
raise exception.ServiceNotFound(service_id=service_id)
return results
@pick_context_manager_reader
def compute_node_get_by_host_and_nodename(context, host, nodename):
results = _compute_node_fetchall(context,
{"host": host, "hypervisor_hostname": nodename})
if not results:
raise exception.ComputeHostNotFound(host=host)
return results[0]
@pick_context_manager_reader_allow_async
def compute_node_get_all_by_host(context, host):
results = _compute_node_fetchall(context, {"host": host})
if not results:
raise exception.ComputeHostNotFound(host=host)
return results
@pick_context_manager_reader
def compute_node_get_all(context):
return _compute_node_fetchall(context)
@pick_context_manager_reader
def compute_node_get_all_by_pagination(context, limit=None, marker=None):
return _compute_node_fetchall(context, limit=limit, marker=marker)
@pick_context_manager_reader
def compute_node_search_by_hypervisor(context, hypervisor_match):
field = models.ComputeNode.hypervisor_hostname
return model_query(context, models.ComputeNode).\
filter(field.like('%%%s%%' % hypervisor_match)).\
all()
@pick_context_manager_writer
def compute_node_create(context, values):
"""Creates a new ComputeNode and populates the capacity fields
with the most recent data.
"""
convert_objects_related_datetimes(values)
compute_node_ref = models.ComputeNode()
compute_node_ref.update(values)
compute_node_ref.save(context.session)
return compute_node_ref
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def compute_node_update(context, compute_id, values):
"""Updates the ComputeNode record with the most recent data."""
compute_ref = compute_node_get_model(context, compute_id)
# Always update this, even if there's going to be no other
# changes in data. This ensures that we invalidate the
# scheduler cache of compute node data in case of races.
values['updated_at'] = timeutils.utcnow()
convert_objects_related_datetimes(values)
compute_ref.update(values)
return compute_ref
@pick_context_manager_writer
def compute_node_delete(context, compute_id):
"""Delete a ComputeNode record."""
result = model_query(context, models.ComputeNode).\
filter_by(id=compute_id).\
soft_delete(synchronize_session=False)
if not result:
raise exception.ComputeHostNotFound(host=compute_id)
@pick_context_manager_reader
def compute_node_statistics(context):
"""Compute statistics over all compute nodes."""
engine = get_engine(context=context)
services_tbl = models.Service.__table__
inner_sel = sa.alias(_compute_node_select(context), name='inner_sel')
# TODO(sbauza): Remove the service_id filter in a later release
# once we are sure that all compute nodes report the host field
j = sa.join(
inner_sel, services_tbl,
sql.and_(
sql.or_(
inner_sel.c.host == services_tbl.c.host,
inner_sel.c.service_id == services_tbl.c.id
),
services_tbl.c.disabled == false(),
services_tbl.c.binary == 'nova-compute'
)
)
# NOTE(jaypipes): This COALESCE() stuff is temporary while the data
# migration to the new resource providers inventories and allocations
# tables is completed.
agg_cols = [
func.count().label('count'),
sql.func.sum(
inner_sel.c.vcpus
).label('vcpus'),
sql.func.sum(
inner_sel.c.memory_mb
).label('memory_mb'),
sql.func.sum(
inner_sel.c.local_gb
).label('local_gb'),
sql.func.sum(
inner_sel.c.vcpus_used
).label('vcpus_used'),
sql.func.sum(
inner_sel.c.memory_mb_used
).label('memory_mb_used'),
sql.func.sum(
inner_sel.c.local_gb_used
).label('local_gb_used'),
sql.func.sum(
inner_sel.c.free_ram_mb
).label('free_ram_mb'),
sql.func.sum(
inner_sel.c.free_disk_gb
).label('free_disk_gb'),
sql.func.sum(
inner_sel.c.current_workload
).label('current_workload'),
sql.func.sum(
inner_sel.c.running_vms
).label('running_vms'),
sql.func.sum(
inner_sel.c.disk_available_least
).label('disk_available_least'),
]
select = sql.select(agg_cols).select_from(j)
conn = engine.connect()
results = conn.execute(select).fetchone()
# Build a dict of the info--making no assumptions about result
fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
'memory_mb_used', 'local_gb_used', 'free_ram_mb', 'free_disk_gb',
'current_workload', 'running_vms', 'disk_available_least')
results = {field: int(results[idx] or 0)
for idx, field in enumerate(fields)}
conn.close()
return results
###################
@pick_context_manager_writer
def certificate_create(context, values):
certificate_ref = models.Certificate()
for (key, value) in values.items():
certificate_ref[key] = value
certificate_ref.save(context.session)
return certificate_ref
@pick_context_manager_reader
def certificate_get_all_by_project(context, project_id):
return model_query(context, models.Certificate, read_deleted="no").\
filter_by(project_id=project_id).\
all()
@pick_context_manager_reader
def certificate_get_all_by_user(context, user_id):
return model_query(context, models.Certificate, read_deleted="no").\
filter_by(user_id=user_id).\
all()
@pick_context_manager_reader
def certificate_get_all_by_user_and_project(context, user_id, project_id):
return model_query(context, models.Certificate, read_deleted="no").\
filter_by(user_id=user_id).\
filter_by(project_id=project_id).\
all()
###################
@require_context
@pick_context_manager_reader
def floating_ip_get(context, id):
try:
result = model_query(context, models.FloatingIp, project_only=True).\
filter_by(id=id).\
options(joinedload_all('fixed_ip.instance')).\
first()
if not result:
raise exception.FloatingIpNotFound(id=id)
except db_exc.DBError:
LOG.warning(_LW("Invalid floating IP ID %s in request"), id)
raise exception.InvalidID(id=id)
return result
@require_context
@pick_context_manager_reader
def floating_ip_get_pools(context):
pools = []
for result in model_query(context, models.FloatingIp,
(models.FloatingIp.pool,)).distinct():
pools.append({'name': result[0]})
return pools
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
retry_on_request=True)
@pick_context_manager_writer
def floating_ip_allocate_address(context, project_id, pool,
auto_assigned=False):
nova.context.authorize_project_context(context, project_id)
floating_ip_ref = model_query(context, models.FloatingIp,
read_deleted="no").\
filter_by(fixed_ip_id=None).\
filter_by(project_id=None).\
filter_by(pool=pool).\
first()
if not floating_ip_ref:
raise exception.NoMoreFloatingIps()
params = {'project_id': project_id, 'auto_assigned': auto_assigned}
rows_update = model_query(context, models.FloatingIp, read_deleted="no").\
filter_by(id=floating_ip_ref['id']).\
filter_by(fixed_ip_id=None).\
filter_by(project_id=None).\
filter_by(pool=pool).\
update(params, synchronize_session='evaluate')
if not rows_update:
LOG.debug('The row was updated in a concurrent transaction, '
'we will fetch another one')
raise db_exc.RetryRequest(exception.FloatingIpAllocateFailed())
return floating_ip_ref['address']
@require_context
@pick_context_manager_writer
def floating_ip_bulk_create(context, ips, want_result=True):
try:
tab = models.FloatingIp().__table__
context.session.execute(tab.insert(), ips)
except db_exc.DBDuplicateEntry as e:
raise exception.FloatingIpExists(address=e.value)
if want_result:
return model_query(context, models.FloatingIp).filter(
models.FloatingIp.address.in_(
[ip['address'] for ip in ips])).all()
def _ip_range_splitter(ips, block_size=256):
"""Yields blocks of IPs no more than block_size elements long."""
out = []
count = 0
for ip in ips:
out.append(ip['address'])
count += 1
if count > block_size - 1:
yield out
out = []
count = 0
if out:
yield out
@require_context
@pick_context_manager_writer
def floating_ip_bulk_destroy(context, ips):
project_id_to_quota_count = collections.defaultdict(int)
for ip_block in _ip_range_splitter(ips):
# Find any floating IPs that were not auto_assigned and
# thus need quota released.
query = model_query(context, models.FloatingIp).\
filter(models.FloatingIp.address.in_(ip_block)).\
filter_by(auto_assigned=False)
for row in query.all():
# The count is negative since we release quota by
# reserving negative quota.
project_id_to_quota_count[row['project_id']] -= 1
# Delete the floating IPs.
model_query(context, models.FloatingIp).\
filter(models.FloatingIp.address.in_(ip_block)).\
soft_delete(synchronize_session='fetch')
# Delete the quotas, if needed.
# Quota update happens in a separate transaction, so previous must have
# been committed first.
for project_id, count in project_id_to_quota_count.items():
try:
reservations = quota.QUOTAS.reserve(context,
project_id=project_id,
floating_ips=count)
quota.QUOTAS.commit(context, reservations, project_id=project_id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Failed to update usages bulk "
"deallocating floating IP"))
@require_context
@pick_context_manager_writer
def floating_ip_create(context, values):
floating_ip_ref = models.FloatingIp()
floating_ip_ref.update(values)
try:
floating_ip_ref.save(context.session)
except db_exc.DBDuplicateEntry:
raise exception.FloatingIpExists(address=values['address'])
return floating_ip_ref
def _floating_ip_count_by_project(context, project_id):
nova.context.authorize_project_context(context, project_id)
# TODO(tr3buchet): why leave auto_assigned floating IPs out?
return model_query(context, models.FloatingIp, read_deleted="no").\
filter_by(project_id=project_id).\
filter_by(auto_assigned=False).\
count()
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def floating_ip_fixed_ip_associate(context, floating_address,
fixed_address, host):
fixed_ip_ref = model_query(context, models.FixedIp).\
filter_by(address=fixed_address).\
options(joinedload('network')).\
first()
if not fixed_ip_ref:
raise exception.FixedIpNotFoundForAddress(address=fixed_address)
rows = model_query(context, models.FloatingIp).\
filter_by(address=floating_address).\
filter(models.FloatingIp.project_id ==
context.project_id).\
filter(or_(models.FloatingIp.fixed_ip_id ==
fixed_ip_ref['id'],
models.FloatingIp.fixed_ip_id.is_(None))).\
update({'fixed_ip_id': fixed_ip_ref['id'], 'host': host})
if not rows:
raise exception.FloatingIpAssociateFailed(address=floating_address)
return fixed_ip_ref
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def floating_ip_deallocate(context, address):
return model_query(context, models.FloatingIp).\
filter_by(address=address).\
filter(and_(models.FloatingIp.project_id != null()),
models.FloatingIp.fixed_ip_id == null()).\
update({'project_id': None,
'host': None,
'auto_assigned': False},
synchronize_session=False)
@require_context
@pick_context_manager_writer
def floating_ip_destroy(context, address):
model_query(context, models.FloatingIp).\
filter_by(address=address).\
delete()
@require_context
@pick_context_manager_writer
def floating_ip_disassociate(context, address):
floating_ip_ref = model_query(context,
models.FloatingIp).\
filter_by(address=address).\
first()
if not floating_ip_ref:
raise exception.FloatingIpNotFoundForAddress(address=address)
fixed_ip_ref = model_query(context, models.FixedIp).\
filter_by(id=floating_ip_ref['fixed_ip_id']).\
options(joinedload('network')).\
first()
floating_ip_ref.fixed_ip_id = None
floating_ip_ref.host = None
return fixed_ip_ref
def _floating_ip_get_all(context):
return model_query(context, models.FloatingIp, read_deleted="no")
@pick_context_manager_reader
def floating_ip_get_all(context):
floating_ip_refs = _floating_ip_get_all(context).\
options(joinedload('fixed_ip')).\
all()
if not floating_ip_refs:
raise exception.NoFloatingIpsDefined()
return floating_ip_refs
@pick_context_manager_reader
def floating_ip_get_all_by_host(context, host):
floating_ip_refs = _floating_ip_get_all(context).\
filter_by(host=host).\
options(joinedload('fixed_ip')).\
all()
if not floating_ip_refs:
raise exception.FloatingIpNotFoundForHost(host=host)
return floating_ip_refs
@require_context
@pick_context_manager_reader
def floating_ip_get_all_by_project(context, project_id):
nova.context.authorize_project_context(context, project_id)
# TODO(tr3buchet): why do we not want auto_assigned floating IPs here?
return _floating_ip_get_all(context).\
filter_by(project_id=project_id).\
filter_by(auto_assigned=False).\
options(joinedload_all('fixed_ip.instance')).\
all()
@require_context
@pick_context_manager_reader
def floating_ip_get_by_address(context, address):
return _floating_ip_get_by_address(context, address)
def _floating_ip_get_by_address(context, address):
# if address string is empty explicitly set it to None
if not address:
address = None
try:
result = model_query(context, models.FloatingIp).\
filter_by(address=address).\
options(joinedload_all('fixed_ip.instance')).\
first()
if not result:
raise exception.FloatingIpNotFoundForAddress(address=address)
except db_exc.DBError:
msg = _("Invalid floating IP %s in request") % address
LOG.warning(msg)
raise exception.InvalidIpAddressError(msg)
# If the floating IP has a project ID set, check to make sure
# the non-admin user has access.
if result.project_id and nova.context.is_user_context(context):
nova.context.authorize_project_context(context, result.project_id)
return result
@require_context
@pick_context_manager_reader
def floating_ip_get_by_fixed_address(context, fixed_address):
return model_query(context, models.FloatingIp).\
outerjoin(models.FixedIp,
models.FixedIp.id ==
models.FloatingIp.fixed_ip_id).\
filter(models.FixedIp.address == fixed_address).\
all()
@require_context
@pick_context_manager_reader
def floating_ip_get_by_fixed_ip_id(context, fixed_ip_id):
return model_query(context, models.FloatingIp).\
filter_by(fixed_ip_id=fixed_ip_id).\
all()
@require_context
@pick_context_manager_writer
def floating_ip_update(context, address, values):
float_ip_ref = _floating_ip_get_by_address(context, address)
float_ip_ref.update(values)
try:
float_ip_ref.save(context.session)
except db_exc.DBDuplicateEntry:
raise exception.FloatingIpExists(address=values['address'])
return float_ip_ref
###################
@require_context
@pick_context_manager_reader
def dnsdomain_get(context, fqdomain):
return model_query(context, models.DNSDomain, read_deleted="no").\
filter_by(domain=fqdomain).\
with_lockmode('update').\
first()
def _dnsdomain_get_or_create(context, fqdomain):
domain_ref = dnsdomain_get(context, fqdomain)
if not domain_ref:
dns_ref = models.DNSDomain()
dns_ref.update({'domain': fqdomain,
'availability_zone': None,
'project_id': None})
return dns_ref
return domain_ref
@pick_context_manager_writer
def dnsdomain_register_for_zone(context, fqdomain, zone):
domain_ref = _dnsdomain_get_or_create(context, fqdomain)
domain_ref.scope = 'private'
domain_ref.availability_zone = zone
context.session.add(domain_ref)
@pick_context_manager_writer
def dnsdomain_register_for_project(context, fqdomain, project):
domain_ref = _dnsdomain_get_or_create(context, fqdomain)
domain_ref.scope = 'public'
domain_ref.project_id = project
context.session.add(domain_ref)
@pick_context_manager_writer
def dnsdomain_unregister(context, fqdomain):
model_query(context, models.DNSDomain).\
filter_by(domain=fqdomain).\
delete()
@pick_context_manager_reader
def dnsdomain_get_all(context):
return model_query(context, models.DNSDomain, read_deleted="no").all()
###################
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
retry_on_request=True)
@pick_context_manager_writer
def fixed_ip_associate(context, address, instance_uuid, network_id=None,
reserved=False, virtual_interface_id=None):
"""Keyword arguments:
reserved -- should be a boolean value(True or False), exact value will be
used to filter on the fixed IP address
"""
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(uuid=instance_uuid)
network_or_none = or_(models.FixedIp.network_id == network_id,
models.FixedIp.network_id == null())
fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
filter(network_or_none).\
filter_by(reserved=reserved).\
filter_by(address=address).\
first()
if fixed_ip_ref is None:
raise exception.FixedIpNotFoundForNetwork(address=address,
network_uuid=network_id)
if fixed_ip_ref.instance_uuid:
raise exception.FixedIpAlreadyInUse(address=address,
instance_uuid=instance_uuid)
params = {'instance_uuid': instance_uuid,
'allocated': virtual_interface_id is not None}
if not fixed_ip_ref.network_id:
params['network_id'] = network_id
if virtual_interface_id:
params['virtual_interface_id'] = virtual_interface_id
rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(id=fixed_ip_ref.id).\
filter(network_or_none).\
filter_by(reserved=reserved).\
filter_by(address=address).\
update(params, synchronize_session='evaluate')
if not rows_updated:
LOG.debug('The row was updated in a concurrent transaction, '
'we will fetch another row')
raise db_exc.RetryRequest(
exception.FixedIpAssociateFailed(net=network_id))
return fixed_ip_ref
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
retry_on_request=True)
@pick_context_manager_writer
def fixed_ip_associate_pool(context, network_id, instance_uuid=None,
host=None, virtual_interface_id=None):
"""allocate a fixed ip out of a fixed ip network pool.
This allocates an unallocated fixed ip out of a specified
network. We sort by updated_at to hand out the oldest address in
the list.
"""
if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(uuid=instance_uuid)
network_or_none = or_(models.FixedIp.network_id == network_id,
models.FixedIp.network_id == null())
fixed_ip_ref = model_query(context, models.FixedIp, read_deleted="no").\
filter(network_or_none).\
filter_by(reserved=False).\
filter_by(instance_uuid=None).\
filter_by(host=None).\
filter_by(leased=False).\
order_by(asc(models.FixedIp.updated_at)).\
first()
if not fixed_ip_ref:
raise exception.NoMoreFixedIps(net=network_id)
params = {'allocated': virtual_interface_id is not None}
if fixed_ip_ref['network_id'] is None:
params['network_id'] = network_id
if instance_uuid:
params['instance_uuid'] = instance_uuid
if host:
params['host'] = host
if virtual_interface_id:
params['virtual_interface_id'] = virtual_interface_id
rows_updated = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(id=fixed_ip_ref['id']).\
filter_by(network_id=fixed_ip_ref['network_id']).\
filter_by(reserved=False).\
filter_by(instance_uuid=None).\
filter_by(host=None).\
filter_by(leased=False).\
filter_by(address=fixed_ip_ref['address']).\
update(params, synchronize_session='evaluate')
if not rows_updated:
LOG.debug('The row was updated in a concurrent transaction, '
'we will fetch another row')
raise db_exc.RetryRequest(
exception.FixedIpAssociateFailed(net=network_id))
return fixed_ip_ref
@require_context
@pick_context_manager_writer
def fixed_ip_create(context, values):
fixed_ip_ref = models.FixedIp()
fixed_ip_ref.update(values)
try:
fixed_ip_ref.save(context.session)
except db_exc.DBDuplicateEntry:
raise exception.FixedIpExists(address=values['address'])
return fixed_ip_ref
@require_context
@pick_context_manager_writer
def fixed_ip_bulk_create(context, ips):
try:
tab = models.FixedIp.__table__
context.session.execute(tab.insert(), ips)
except db_exc.DBDuplicateEntry as e:
raise exception.FixedIpExists(address=e.value)
@require_context
@pick_context_manager_writer
def fixed_ip_disassociate(context, address):
_fixed_ip_get_by_address(context, address).update(
{'instance_uuid': None,
'virtual_interface_id': None})
@pick_context_manager_writer
def fixed_ip_disassociate_all_by_timeout(context, host, time):
# NOTE(vish): only update fixed ips that "belong" to this
# host; i.e. the network host or the instance
# host matches. Two queries necessary because
# join with update doesn't work.
host_filter = or_(and_(models.Instance.host == host,
models.Network.multi_host == true()),
models.Network.host == host)
result = model_query(context, models.FixedIp, (models.FixedIp.id,),
read_deleted="no").\
filter(models.FixedIp.allocated == false()).\
filter(models.FixedIp.updated_at < time).\
join((models.Network,
models.Network.id == models.FixedIp.network_id)).\
join((models.Instance,
models.Instance.uuid == models.FixedIp.instance_uuid)).\
filter(host_filter).\
all()
fixed_ip_ids = [fip[0] for fip in result]
if not fixed_ip_ids:
return 0
result = model_query(context, models.FixedIp).\
filter(models.FixedIp.id.in_(fixed_ip_ids)).\
update({'instance_uuid': None,
'leased': False,
'updated_at': timeutils.utcnow()},
synchronize_session='fetch')
return result
@require_context
@pick_context_manager_reader
def fixed_ip_get(context, id, get_network=False):
query = model_query(context, models.FixedIp).filter_by(id=id)
if get_network:
query = query.options(joinedload('network'))
result = query.first()
if not result:
raise exception.FixedIpNotFound(id=id)
# FIXME(sirp): shouldn't we just use project_only here to restrict the
# results?
if (nova.context.is_user_context(context) and
result['instance_uuid'] is not None):
instance = instance_get_by_uuid(context.elevated(read_deleted='yes'),
result['instance_uuid'])
nova.context.authorize_project_context(context, instance.project_id)
return result
@pick_context_manager_reader
def fixed_ip_get_all(context):
result = model_query(context, models.FixedIp, read_deleted="yes").all()
if not result:
raise exception.NoFixedIpsDefined()
return result
@require_context
@pick_context_manager_reader
def fixed_ip_get_by_address(context, address, columns_to_join=None):
return _fixed_ip_get_by_address(context, address,
columns_to_join=columns_to_join)
def _fixed_ip_get_by_address(context, address, columns_to_join=None):
if columns_to_join is None:
columns_to_join = []
try:
result = model_query(context, models.FixedIp)
for column in columns_to_join:
result = result.options(joinedload_all(column))
result = result.filter_by(address=address).first()
if not result:
raise exception.FixedIpNotFoundForAddress(address=address)
except db_exc.DBError:
msg = _("Invalid fixed IP Address %s in request") % address
LOG.warning(msg)
raise exception.FixedIpInvalid(msg)
# NOTE(sirp): shouldn't we just use project_only here to restrict the
# results?
if (nova.context.is_user_context(context) and
result['instance_uuid'] is not None):
instance = _instance_get_by_uuid(
context.elevated(read_deleted='yes'),
result['instance_uuid'])
nova.context.authorize_project_context(context,
instance.project_id)
return result
@require_context
@pick_context_manager_reader
def fixed_ip_get_by_floating_address(context, floating_address):
return model_query(context, models.FixedIp).\
join(models.FloatingIp,
models.FloatingIp.fixed_ip_id ==
models.FixedIp.id).\
filter(models.FloatingIp.address == floating_address).\
first()
# NOTE(tr3buchet) please don't invent an exception here, None is fine
@require_context
@pick_context_manager_reader
def fixed_ip_get_by_instance(context, instance_uuid):
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(uuid=instance_uuid)
vif_and = and_(models.VirtualInterface.id ==
models.FixedIp.virtual_interface_id,
models.VirtualInterface.deleted == 0)
result = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(instance_uuid=instance_uuid).\
outerjoin(models.VirtualInterface, vif_and).\
options(contains_eager("virtual_interface")).\
options(joinedload('network')).\
options(joinedload('floating_ips')).\
order_by(asc(models.VirtualInterface.created_at),
asc(models.VirtualInterface.id)).\
all()
if not result:
raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)
return result
@pick_context_manager_reader
def fixed_ip_get_by_host(context, host):
instance_uuids = _instance_get_all_uuids_by_host(context, host)
if not instance_uuids:
return []
return model_query(context, models.FixedIp).\
filter(models.FixedIp.instance_uuid.in_(instance_uuids)).\
all()
@require_context
@pick_context_manager_reader
def fixed_ip_get_by_network_host(context, network_id, host):
result = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(network_id=network_id).\
filter_by(host=host).\
first()
if not result:
raise exception.FixedIpNotFoundForNetworkHost(network_id=network_id,
host=host)
return result
@require_context
@pick_context_manager_reader
def fixed_ips_by_virtual_interface(context, vif_id):
result = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(virtual_interface_id=vif_id).\
options(joinedload('network')).\
options(joinedload('floating_ips')).\
all()
return result
@require_context
@pick_context_manager_writer
def fixed_ip_update(context, address, values):
_fixed_ip_get_by_address(context, address).update(values)
def _fixed_ip_count_by_project(context, project_id):
nova.context.authorize_project_context(context, project_id)
return model_query(context, models.FixedIp, (models.FixedIp.id,),
read_deleted="no").\
join((models.Instance,
models.Instance.uuid == models.FixedIp.instance_uuid)).\
filter(models.Instance.project_id == project_id).\
count()
###################
@require_context
@pick_context_manager_writer
def virtual_interface_create(context, values):
"""Create a new virtual interface record in the database.
:param values: = dict containing column values
"""
try:
vif_ref = models.VirtualInterface()
vif_ref.update(values)
vif_ref.save(context.session)
except db_exc.DBError:
LOG.exception(_LE("VIF creation failed with a database error."))
raise exception.VirtualInterfaceCreateException()
return vif_ref
def _virtual_interface_query(context):
return model_query(context, models.VirtualInterface, read_deleted="no")
@require_context
@pick_context_manager_writer
def virtual_interface_update(context, address, values):
vif_ref = virtual_interface_get_by_address(context, address)
vif_ref.update(values)
vif_ref.save(context.session)
return vif_ref
@require_context
@pick_context_manager_reader
def virtual_interface_get(context, vif_id):
"""Gets a virtual interface from the table.
:param vif_id: = id of the virtual interface
"""
vif_ref = _virtual_interface_query(context).\
filter_by(id=vif_id).\
first()
return vif_ref
@require_context
@pick_context_manager_reader
def virtual_interface_get_by_address(context, address):
"""Gets a virtual interface from the table.
:param address: = the address of the interface you're looking to get
"""
try:
vif_ref = _virtual_interface_query(context).\
filter_by(address=address).\
first()
except db_exc.DBError:
msg = _("Invalid virtual interface address %s in request") % address
LOG.warning(msg)
raise exception.InvalidIpAddressError(msg)
return vif_ref
@require_context
@pick_context_manager_reader
def virtual_interface_get_by_uuid(context, vif_uuid):
"""Gets a virtual interface from the table.
:param vif_uuid: the uuid of the interface you're looking to get
"""
vif_ref = _virtual_interface_query(context).\
filter_by(uuid=vif_uuid).\
first()
return vif_ref
@require_context
@require_instance_exists_using_uuid
@pick_context_manager_reader_allow_async
def virtual_interface_get_by_instance(context, instance_uuid):
"""Gets all virtual interfaces for instance.
:param instance_uuid: = uuid of the instance to retrieve vifs for
"""
vif_refs = _virtual_interface_query(context).\
filter_by(instance_uuid=instance_uuid).\
order_by(asc("created_at"), asc("id")).\
all()
return vif_refs
@require_context
@pick_context_manager_reader
def virtual_interface_get_by_instance_and_network(context, instance_uuid,
network_id):
"""Gets virtual interface for instance that's associated with network."""
vif_ref = _virtual_interface_query(context).\
filter_by(instance_uuid=instance_uuid).\
filter_by(network_id=network_id).\
first()
return vif_ref
@require_context
@pick_context_manager_writer
def virtual_interface_delete_by_instance(context, instance_uuid):
"""Delete virtual interface records that are associated
with the instance given by instance_id.
:param instance_uuid: = uuid of instance
"""
_virtual_interface_query(context).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
@require_context
@pick_context_manager_writer
def virtual_interface_delete(context, id):
"""Delete virtual interface records.
:param id: id of the interface
"""
_virtual_interface_query(context).\
filter_by(id=id).\
soft_delete()
@require_context
@pick_context_manager_reader
def virtual_interface_get_all(context):
"""Get all vifs."""
vif_refs = _virtual_interface_query(context).all()
return vif_refs
###################
def _metadata_refs(metadata_dict, meta_class):
metadata_refs = []
if metadata_dict:
for k, v in metadata_dict.items():
metadata_ref = meta_class()
metadata_ref['key'] = k
metadata_ref['value'] = v
metadata_refs.append(metadata_ref)
return metadata_refs
def _validate_unique_server_name(context, name):
if not CONF.osapi_compute_unique_server_name_scope:
return
lowername = name.lower()
base_query = model_query(context, models.Instance, read_deleted='no').\
filter(func.lower(models.Instance.hostname) == lowername)
if CONF.osapi_compute_unique_server_name_scope == 'project':
instance_with_same_name = base_query.\
filter_by(project_id=context.project_id).\
count()
elif CONF.osapi_compute_unique_server_name_scope == 'global':
instance_with_same_name = base_query.count()
else:
return
if instance_with_same_name > 0:
raise exception.InstanceExists(name=lowername)
def _handle_objects_related_type_conversions(values):
"""Make sure that certain things in values (which may have come from
an objects.instance.Instance object) are in suitable form for the
database.
"""
# NOTE(danms): Make sure IP addresses are passed as strings to
# the database engine
for key in ('access_ip_v4', 'access_ip_v6'):
if key in values and values[key] is not None:
values[key] = str(values[key])
datetime_keys = ('created_at', 'deleted_at', 'updated_at',
'launched_at', 'terminated_at')
convert_objects_related_datetimes(values, *datetime_keys)
def _check_instance_exists_in_project(context, instance_uuid):
if not model_query(context, models.Instance, read_deleted="no",
project_only=True).filter_by(
uuid=instance_uuid).first():
raise exception.InstanceNotFound(instance_id=instance_uuid)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def instance_create(context, values):
"""Create a new Instance record in the database.
context - request context object
values - dict containing column values.
"""
security_group_ensure_default(context)
values = values.copy()
values['metadata'] = _metadata_refs(
values.get('metadata'), models.InstanceMetadata)
values['system_metadata'] = _metadata_refs(
values.get('system_metadata'), models.InstanceSystemMetadata)
_handle_objects_related_type_conversions(values)
instance_ref = models.Instance()
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
instance_ref['info_cache'] = models.InstanceInfoCache()
info_cache = values.pop('info_cache', None)
if info_cache is not None:
instance_ref['info_cache'].update(info_cache)
security_groups = values.pop('security_groups', [])
instance_ref['extra'] = models.InstanceExtra()
instance_ref['extra'].update(
{'numa_topology': None,
'pci_requests': None,
'vcpu_model': None,
})
instance_ref['extra'].update(values.pop('extra', {}))
instance_ref.update(values)
def _get_sec_group_models(security_groups):
models = []
default_group = _security_group_ensure_default(context)
if 'default' in security_groups:
models.append(default_group)
# Generate a new list, so we don't modify the original
security_groups = [x for x in security_groups if x != 'default']
if security_groups:
models.extend(_security_group_get_by_names(
context, security_groups))
return models
if 'hostname' in values:
_validate_unique_server_name(context, values['hostname'])
instance_ref.security_groups = _get_sec_group_models(security_groups)
context.session.add(instance_ref)
# create the instance uuid to ec2_id mapping entry for instance
ec2_instance_create(context, instance_ref['uuid'])
return instance_ref
def _instance_data_get_for_user(context, project_id, user_id):
result = model_query(context, models.Instance, (
func.count(models.Instance.id),
func.sum(models.Instance.vcpus),
func.sum(models.Instance.memory_mb))).\
filter_by(project_id=project_id)
if user_id:
result = result.filter_by(user_id=user_id).first()
else:
result = result.first()
# NOTE(vish): convert None to 0
return (result[0] or 0, result[1] or 0, result[2] or 0)
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def instance_destroy(context, instance_uuid, constraint=None):
if uuidutils.is_uuid_like(instance_uuid):
instance_ref = _instance_get_by_uuid(context, instance_uuid)
else:
raise exception.InvalidUUID(instance_uuid)
query = model_query(context, models.Instance).\
filter_by(uuid=instance_uuid)
if constraint is not None:
query = constraint.apply(models.Instance, query)
count = query.soft_delete()
if count == 0:
raise exception.ConstraintNotMet()
model_query(context, models.SecurityGroupInstanceAssociation).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.InstanceInfoCache).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.InstanceMetadata).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.InstanceFault).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.InstanceExtra).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.InstanceSystemMetadata).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.InstanceGroupMember).\
filter_by(instance_id=instance_uuid).\
soft_delete()
model_query(context, models.BlockDeviceMapping).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
model_query(context, models.Migration).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
# NOTE(snikitin): We can't use model_query here, because there is no
# column 'deleted' in 'tags' table.
context.session.query(models.Tag).filter_by(
resource_id=instance_uuid).delete()
context.session.query(models.ConsoleAuthToken).filter_by(
instance_uuid=instance_uuid).delete()
return instance_ref
@require_context
@pick_context_manager_reader_allow_async
def instance_get_by_uuid(context, uuid, columns_to_join=None):
return _instance_get_by_uuid(context, uuid,
columns_to_join=columns_to_join)
def _instance_get_by_uuid(context, uuid, columns_to_join=None):
result = _build_instance_get(context, columns_to_join=columns_to_join).\
filter_by(uuid=uuid).\
first()
if not result:
raise exception.InstanceNotFound(instance_id=uuid)
return result
@require_context
@pick_context_manager_reader
def instance_get(context, instance_id, columns_to_join=None):
try:
result = _build_instance_get(context, columns_to_join=columns_to_join
).filter_by(id=instance_id).first()
if not result:
raise exception.InstanceNotFound(instance_id=instance_id)
return result
except db_exc.DBError:
# NOTE(sdague): catch all in case the db engine chokes on the
# id because it's too long of an int to store.
LOG.warning(_LW("Invalid instance id %s in request"), instance_id)
raise exception.InvalidID(id=instance_id)
def _build_instance_get(context, columns_to_join=None):
query = model_query(context, models.Instance, project_only=True).\
options(joinedload_all('security_groups.rules')).\
options(joinedload('info_cache'))
if columns_to_join is None:
columns_to_join = ['metadata', 'system_metadata']
for column in columns_to_join:
if column in ['info_cache', 'security_groups']:
# Already always joined above
continue
if 'extra.' in column:
query = query.options(undefer(column))
else:
query = query.options(joinedload(column))
# NOTE(alaski) Stop lazy loading of columns not needed.
for col in ['metadata', 'system_metadata']:
if col not in columns_to_join:
query = query.options(noload(col))
return query
def _instances_fill_metadata(context, instances, manual_joins=None):
"""Selectively fill instances with manually-joined metadata. Note that
instance will be converted to a dict.
:param context: security context
:param instances: list of instances to fill
:param manual_joins: list of tables to manually join (can be any
combination of 'metadata' and 'system_metadata' or
None to take the default of both)
"""
uuids = [inst['uuid'] for inst in instances]
if manual_joins is None:
manual_joins = ['metadata', 'system_metadata']
meta = collections.defaultdict(list)
if 'metadata' in manual_joins:
for row in _instance_metadata_get_multi(context, uuids):
meta[row['instance_uuid']].append(row)
sys_meta = collections.defaultdict(list)
if 'system_metadata' in manual_joins:
for row in _instance_system_metadata_get_multi(context, uuids):
sys_meta[row['instance_uuid']].append(row)
pcidevs = collections.defaultdict(list)
if 'pci_devices' in manual_joins:
for row in _instance_pcidevs_get_multi(context, uuids):
pcidevs[row['instance_uuid']].append(row)
filled_instances = []
for inst in instances:
inst = dict(inst)
inst['system_metadata'] = sys_meta[inst['uuid']]
inst['metadata'] = meta[inst['uuid']]
if 'pci_devices' in manual_joins:
inst['pci_devices'] = pcidevs[inst['uuid']]
filled_instances.append(inst)
return filled_instances
def _manual_join_columns(columns_to_join):
"""Separate manually joined columns from columns_to_join
If columns_to_join contains 'metadata', 'system_metadata', or
'pci_devices' those columns are removed from columns_to_join and added
to a manual_joins list to be used with the _instances_fill_metadata method.
The columns_to_join formal parameter is copied and not modified, the return
tuple has the modified columns_to_join list to be used with joinedload in
a model query.
:param:columns_to_join: List of columns to join in a model query.
:return: tuple of (manual_joins, columns_to_join)
"""
manual_joins = []
columns_to_join_new = copy.copy(columns_to_join)
for column in ('metadata', 'system_metadata', 'pci_devices'):
if column in columns_to_join_new:
columns_to_join_new.remove(column)
manual_joins.append(column)
return manual_joins, columns_to_join_new
@require_context
@pick_context_manager_reader
def instance_get_all(context, columns_to_join=None):
if columns_to_join is None:
columns_to_join_new = ['info_cache', 'security_groups']
manual_joins = ['metadata', 'system_metadata']
else:
manual_joins, columns_to_join_new = (
_manual_join_columns(columns_to_join))
query = model_query(context, models.Instance)
for column in columns_to_join_new:
query = query.options(joinedload(column))
if not context.is_admin:
# If we're not admin context, add appropriate filter..
if context.project_id:
query = query.filter_by(project_id=context.project_id)
else:
query = query.filter_by(user_id=context.user_id)
instances = query.all()
return _instances_fill_metadata(context, instances, manual_joins)
@require_context
@pick_context_manager_reader_allow_async
def instance_get_all_by_filters(context, filters, sort_key, sort_dir,
limit=None, marker=None, columns_to_join=None):
"""Return instances matching all filters sorted by the primary key.
See instance_get_all_by_filters_sort for more information.
"""
# Invoke the API with the multiple sort keys and directions using the
# single sort key/direction
return instance_get_all_by_filters_sort(context, filters, limit=limit,
marker=marker,
columns_to_join=columns_to_join,
sort_keys=[sort_key],
sort_dirs=[sort_dir])
@require_context
@pick_context_manager_reader_allow_async
def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None,
columns_to_join=None, sort_keys=None,
sort_dirs=None):
"""Return instances that match all filters sorted by the given keys.
Deleted instances will be returned by default, unless there's a filter that
says otherwise.
Depending on the name of a filter, matching for that filter is
performed using either exact matching or as regular expression
matching. Exact matching is applied for the following filters::
| ['project_id', 'user_id', 'image_ref',
| 'vm_state', 'instance_type_id', 'uuid',
| 'metadata', 'host', 'system_metadata']
A third type of filter (also using exact matching), filters
based on instance metadata tags when supplied under a special
key named 'filter'::
| filters = {
| 'filter': [
| {'name': 'tag-key', 'value': '<metakey>'},
| {'name': 'tag-value', 'value': '<metaval>'},
| {'name': 'tag:<metakey>', 'value': '<metaval>'}
| ]
| }
Special keys are used to tweek the query further::
| 'changes-since' - only return instances updated after
| 'deleted' - only return (or exclude) deleted instances
| 'soft_deleted' - modify behavior of 'deleted' to either
| include or exclude instances whose
| vm_state is SOFT_DELETED.
A fourth type of filter (also using exact matching), filters
based on instance tags (not metadata tags). There are two types
of these tags:
`tags` -- One or more strings that will be used to filter results
in an AND expression: T1 AND T2
`tags-any` -- One or more strings that will be used to filter results in
an OR expression: T1 OR T2
`not-tags` -- One or more strings that will be used to filter results in
an NOT AND expression: NOT (T1 AND T2)
`not-tags-any` -- One or more strings that will be used to filter results
in an NOT OR expression: NOT (T1 OR T2)
Tags should be represented as list::
| filters = {
| 'tags': [some-tag, some-another-tag],
| 'tags-any: [some-any-tag, some-another-any-tag],
| 'not-tags: [some-not-tag, some-another-not-tag],
| 'not-tags-any: [some-not-any-tag, some-another-not-any-tag]
| }
"""
# NOTE(mriedem): If the limit is 0 there is no point in even going
# to the database since nothing is going to be returned anyway.
if limit == 0:
return []
sort_keys, sort_dirs = process_sort_params(sort_keys,
sort_dirs,
default_dir='desc')
if columns_to_join is None:
columns_to_join_new = ['info_cache', 'security_groups']
manual_joins = ['metadata', 'system_metadata']
else:
manual_joins, columns_to_join_new = (
_manual_join_columns(columns_to_join))
query_prefix = context.session.query(models.Instance)
for column in columns_to_join_new:
if 'extra.' in column:
query_prefix = query_prefix.options(undefer(column))
else:
query_prefix = query_prefix.options(joinedload(column))
# Note: order_by is done in the sqlalchemy.utils.py paginate_query(),
# no need to do it here as well
# Make a copy of the filters dictionary to use going forward, as we'll
# be modifying it and we shouldn't affect the caller's use of it.
filters = filters.copy()
if 'changes-since' in filters:
changes_since = timeutils.normalize_time(filters['changes-since'])
query_prefix = query_prefix.\
filter(models.Instance.updated_at >= changes_since)
if 'deleted' in filters:
# Instances can be soft or hard deleted and the query needs to
# include or exclude both
deleted = filters.pop('deleted')
if deleted:
if filters.pop('soft_deleted', True):
delete = or_(
models.Instance.deleted == models.Instance.id,
models.Instance.vm_state == vm_states.SOFT_DELETED
)
query_prefix = query_prefix.\
filter(delete)
else:
query_prefix = query_prefix.\
filter(models.Instance.deleted == models.Instance.id)
else:
query_prefix = query_prefix.\
filter_by(deleted=0)
if not filters.pop('soft_deleted', False):
# It would be better to have vm_state not be nullable
# but until then we test it explicitly as a workaround.
not_soft_deleted = or_(
models.Instance.vm_state != vm_states.SOFT_DELETED,
models.Instance.vm_state == null()
)
query_prefix = query_prefix.filter(not_soft_deleted)
if 'cleaned' in filters:
cleaned = 1 if filters.pop('cleaned') else 0
query_prefix = query_prefix.filter(models.Instance.cleaned == cleaned)
if 'tags' in filters:
tags = filters.pop('tags')
# We build a JOIN ladder expression for each tag, JOIN'ing
# the first tag to the instances table, and each subsequent
# tag to the last JOIN'd tags table
first_tag = tags.pop(0)
query_prefix = query_prefix.join(models.Instance.tags)
query_prefix = query_prefix.filter(models.Tag.tag == first_tag)
for tag in tags:
tag_alias = aliased(models.Tag)
query_prefix = query_prefix.join(tag_alias,
models.Instance.tags)
query_prefix = query_prefix.filter(tag_alias.tag == tag)
if 'tags-any' in filters:
tags = filters.pop('tags-any')
tag_alias = aliased(models.Tag)
query_prefix = query_prefix.join(tag_alias, models.Instance.tags)
query_prefix = query_prefix.filter(tag_alias.tag.in_(tags))
if 'not-tags' in filters:
tags = filters.pop('not-tags')
first_tag = tags.pop(0)
subq = query_prefix.session.query(models.Tag.resource_id)
subq = subq.join(models.Instance.tags)
subq = subq.filter(models.Tag.tag == first_tag)
for tag in tags:
tag_alias = aliased(models.Tag)
subq = subq.join(tag_alias, models.Instance.tags)
subq = subq.filter(tag_alias.tag == tag)
query_prefix = query_prefix.filter(~models.Instance.uuid.in_(subq))
if 'not-tags-any' in filters:
tags = filters.pop('not-tags-any')
query_prefix = query_prefix.filter(~models.Instance.tags.any(
models.Tag.tag.in_(tags)))
if not context.is_admin:
# If we're not admin context, add appropriate filter..
if context.project_id:
filters['project_id'] = context.project_id
else:
filters['user_id'] = context.user_id
# Filters for exact matches that we can do along with the SQL query...
# For other filters that don't match this, we will do regexp matching
exact_match_filter_names = ['project_id', 'user_id', 'image_ref',
'vm_state', 'instance_type_id', 'uuid',
'metadata', 'host', 'task_state',
'system_metadata']
# Filter the query
query_prefix = _exact_instance_filter(query_prefix,
filters, exact_match_filter_names)
if query_prefix is None:
return []
query_prefix = _regex_instance_filter(query_prefix, filters)
query_prefix = _tag_instance_filter(context, query_prefix, filters)
# paginate query
if marker is not None:
try:
marker = _instance_get_by_uuid(
context.elevated(read_deleted='yes'), marker)
except exception.InstanceNotFound:
raise exception.MarkerNotFound(marker=marker)
try:
query_prefix = sqlalchemyutils.paginate_query(query_prefix,
models.Instance, limit,
sort_keys,
marker=marker,
sort_dirs=sort_dirs)
except db_exc.InvalidSortKey:
raise exception.InvalidSortKey()
return _instances_fill_metadata(context, query_prefix.all(), manual_joins)
def _tag_instance_filter(context, query, filters):
"""Applies tag filtering to an Instance query.
Returns the updated query. This method alters filters to remove
keys that are tags. This filters on resources by tags - this
method assumes that the caller will take care of access control
:param context: request context object
:param query: query to apply filters to
:param filters: dictionary of filters
"""
if filters.get('filter') is None:
return query
model = models.Instance
model_metadata = models.InstanceMetadata
model_uuid = model_metadata.instance_uuid
or_query = None
def _to_list(val):
if isinstance(val, dict):
val = val.values()
if not isinstance(val, (tuple, list, set)):
val = (val,)
return val
for filter_block in filters['filter']:
if not isinstance(filter_block, dict):
continue
filter_name = filter_block.get('name')
if filter_name is None:
continue
tag_name = filter_name[4:]
tag_val = _to_list(filter_block.get('value'))
if filter_name.startswith('tag-'):
if tag_name not in ['key', 'value']:
msg = _("Invalid field name: %s") % tag_name
raise exception.InvalidParameterValue(err=msg)
subq = getattr(model_metadata, tag_name).in_(tag_val)
or_query = subq if or_query is None else or_(or_query, subq)
elif filter_name.startswith('tag:'):
subq = model_query(context, model_metadata, (model_uuid,)).\
filter_by(key=tag_name).\
filter(model_metadata.value.in_(tag_val))
query = query.filter(model.uuid.in_(subq))
if or_query is not None:
subq = model_query(context, model_metadata, (model_uuid,)).\
filter(or_query)
query = query.filter(model.uuid.in_(subq))
return query
def _db_connection_type(db_connection):
"""Returns a lowercase symbol for the db type.
This is useful when we need to change what we are doing per DB
(like handling regexes). In a CellsV2 world it probably needs to
do something better than use the database configuration string.
"""
db_string = db_connection.split(':')[0].split('+')[0]
return db_string.lower()
def _safe_regex_mysql(raw_string):
"""Make regex safe to mysql.
Certain items like '|' are interpreted raw by mysql REGEX. If you
search for a single | then you trigger an error because it's
expecting content on either side.
For consistency sake we escape all '|'. This does mean we wouldn't
support something like foo|bar to match completely different
things, however, one can argue putting such complicated regex into
name search probably means you are doing this wrong.
"""
return raw_string.replace('|', '\\|')
def _get_regexp_ops(connection):
"""Return safety filter and db opts for regex."""
regexp_op_map = {
'postgresql': '~',
'mysql': 'REGEXP',
'sqlite': 'REGEXP'
}
regex_safe_filters = {
'mysql': _safe_regex_mysql
}
db_type = _db_connection_type(connection)
return (regex_safe_filters.get(db_type, lambda x: x),
regexp_op_map.get(db_type, 'LIKE'))
def _regex_instance_filter(query, filters):
"""Applies regular expression filtering to an Instance query.
Returns the updated query.
:param query: query to apply filters to
:param filters: dictionary of filters with regex values
"""
model = models.Instance
safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection)
for filter_name in filters:
try:
column_attr = getattr(model, filter_name)
except AttributeError:
continue
if 'property' == type(column_attr).__name__:
continue
filter_val = filters[filter_name]
# Sometimes the REGEX filter value is not a string
if not isinstance(filter_val, six.string_types):
filter_val = str(filter_val)
if db_regexp_op == 'LIKE':
query = query.filter(column_attr.op(db_regexp_op)(
u'%' + filter_val + u'%'))
else:
filter_val = safe_regex_filter(filter_val)
query = query.filter(column_attr.op(db_regexp_op)(
filter_val))
return query
def _exact_instance_filter(query, filters, legal_keys):
"""Applies exact match filtering to an Instance query.
Returns the updated query. Modifies filters argument to remove
filters consumed.
:param query: query to apply filters to
:param filters: dictionary of filters; values that are lists,
tuples, sets, or frozensets cause an 'IN' test to
be performed, while exact matching ('==' operator)
is used for other values
:param legal_keys: list of keys to apply exact filtering to
"""
filter_dict = {}
model = models.Instance
# Walk through all the keys
for key in legal_keys:
# Skip ones we're not filtering on
if key not in filters:
continue
# OK, filtering on this key; what value do we search for?
value = filters.pop(key)
if key in ('metadata', 'system_metadata'):
column_attr = getattr(model, key)
if isinstance(value, list):
for item in value:
for k, v in item.items():
query = query.filter(column_attr.any(key=k))
query = query.filter(column_attr.any(value=v))
else:
for k, v in value.items():
query = query.filter(column_attr.any(key=k))
query = query.filter(column_attr.any(value=v))
elif isinstance(value, (list, tuple, set, frozenset)):
if not value:
return None # empty IN-predicate; short circuit
# Looking for values in a list; apply to query directly
column_attr = getattr(model, key)
query = query.filter(column_attr.in_(value))
else:
# OK, simple exact match; save for later
filter_dict[key] = value
# Apply simple exact matches
if filter_dict:
query = query.filter(*[getattr(models.Instance, k) == v
for k, v in filter_dict.items()])
return query
def process_sort_params(sort_keys, sort_dirs,
default_keys=['created_at', 'id'],
default_dir='asc'):
"""Process the sort parameters to include default keys.
Creates a list of sort keys and a list of sort directions. Adds the default
keys to the end of the list if they are not already included.
When adding the default keys to the sort keys list, the associated
direction is:
1) The first element in the 'sort_dirs' list (if specified), else
2) 'default_dir' value (Note that 'asc' is the default value since this is
the default in sqlalchemy.utils.paginate_query)
:param sort_keys: List of sort keys to include in the processed list
:param sort_dirs: List of sort directions to include in the processed list
:param default_keys: List of sort keys that need to be included in the
processed list, they are added at the end of the list
if not already specified.
:param default_dir: Sort direction associated with each of the default
keys that are not supplied, used when they are added
to the processed list
:returns: list of sort keys, list of sort directions
:raise exception.InvalidInput: If more sort directions than sort keys
are specified or if an invalid sort
direction is specified
"""
# Determine direction to use for when adding default keys
if sort_dirs and len(sort_dirs) != 0:
default_dir_value = sort_dirs[0]
else:
default_dir_value = default_dir
# Create list of keys (do not modify the input list)
if sort_keys:
result_keys = list(sort_keys)
else:
result_keys = []
# If a list of directions is not provided, use the default sort direction
# for all provided keys
if sort_dirs:
result_dirs = []
# Verify sort direction
for sort_dir in sort_dirs:
if sort_dir not in ('asc', 'desc'):
msg = _("Unknown sort direction, must be 'desc' or 'asc'")
raise exception.InvalidInput(reason=msg)
result_dirs.append(sort_dir)
else:
result_dirs = [default_dir_value for _sort_key in result_keys]
# Ensure that the key and direction length match
while len(result_dirs) < len(result_keys):
result_dirs.append(default_dir_value)
# Unless more direction are specified, which is an error
if len(result_dirs) > len(result_keys):
msg = _("Sort direction size exceeds sort key size")
raise exception.InvalidInput(reason=msg)
# Ensure defaults are included
for key in default_keys:
if key not in result_keys:
result_keys.append(key)
result_dirs.append(default_dir_value)
return result_keys, result_dirs
@require_context
@pick_context_manager_reader_allow_async
def instance_get_active_by_window_joined(context, begin, end=None,
project_id=None, host=None,
columns_to_join=None, limit=None,
marker=None):
"""Return instances and joins that were active during window."""
query = context.session.query(models.Instance)
if columns_to_join is None:
columns_to_join_new = ['info_cache', 'security_groups']
manual_joins = ['metadata', 'system_metadata']
else:
manual_joins, columns_to_join_new = (
_manual_join_columns(columns_to_join))
for column in columns_to_join_new:
if 'extra.' in column:
query = query.options(undefer(column))
else:
query = query.options(joinedload(column))
query = query.filter(or_(models.Instance.terminated_at == null(),
models.Instance.terminated_at > begin))
if end:
query = query.filter(models.Instance.launched_at < end)
if project_id:
query = query.filter_by(project_id=project_id)
if host:
query = query.filter_by(host=host)
if marker is not None:
try:
marker = _instance_get_by_uuid(
context.elevated(read_deleted='yes'), marker)
except exception.InstanceNotFound:
raise exception.MarkerNotFound(marker=marker)
query = sqlalchemyutils.paginate_query(
query, models.Instance, limit, ['project_id', 'uuid'], marker=marker)
return _instances_fill_metadata(context, query.all(), manual_joins)
def _instance_get_all_query(context, project_only=False, joins=None):
if joins is None:
joins = ['info_cache', 'security_groups']
query = model_query(context,
models.Instance,
project_only=project_only)
for column in joins:
if 'extra.' in column:
query = query.options(undefer(column))
else:
query = query.options(joinedload(column))
return query
@pick_context_manager_reader_allow_async
def instance_get_all_by_host(context, host, columns_to_join=None):
return _instances_fill_metadata(context,
_instance_get_all_query(context).filter_by(host=host).all(),
manual_joins=columns_to_join)
def _instance_get_all_uuids_by_host(context, host):
"""Return a list of the instance uuids on a given host.
Returns a list of UUIDs, not Instance model objects.
"""
uuids = []
for tuple in model_query(context, models.Instance, (models.Instance.uuid,),
read_deleted="no").\
filter_by(host=host).\
all():
uuids.append(tuple[0])
return uuids
@pick_context_manager_reader
def instance_get_all_by_host_and_node(context, host, node,
columns_to_join=None):
if columns_to_join is None:
manual_joins = []
else:
candidates = ['system_metadata', 'metadata']
manual_joins = [x for x in columns_to_join if x in candidates]
columns_to_join = list(set(columns_to_join) - set(candidates))
return _instances_fill_metadata(context,
_instance_get_all_query(
context,
joins=columns_to_join).filter_by(host=host).
filter_by(node=node).all(), manual_joins=manual_joins)
@pick_context_manager_reader
def instance_get_all_by_host_and_not_type(context, host, type_id=None):
return _instances_fill_metadata(context,
_instance_get_all_query(context).filter_by(host=host).
filter(models.Instance.instance_type_id != type_id).all())
@pick_context_manager_reader
def instance_get_all_by_grantee_security_groups(context, group_ids):
if not group_ids:
return []
return _instances_fill_metadata(context,
_instance_get_all_query(context).
join(models.Instance.security_groups).
filter(models.SecurityGroup.rules.any(
models.SecurityGroupIngressRule.group_id.in_(group_ids))).
all())
@require_context
@pick_context_manager_reader
def instance_floating_address_get_all(context, instance_uuid):
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(uuid=instance_uuid)
floating_ips = model_query(context,
models.FloatingIp,
(models.FloatingIp.address,)).\
join(models.FloatingIp.fixed_ip).\
filter_by(instance_uuid=instance_uuid)
return [floating_ip.address for floating_ip in floating_ips]
# NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0.
@pick_context_manager_reader
def instance_get_all_hung_in_rebooting(context, reboot_window):
reboot_window = (timeutils.utcnow() -
datetime.timedelta(seconds=reboot_window))
# NOTE(danms): this is only used in the _poll_rebooting_instances()
# call in compute/manager, so we can avoid the metadata lookups
# explicitly
return _instances_fill_metadata(context,
model_query(context, models.Instance).
filter(models.Instance.updated_at <= reboot_window).
filter_by(task_state=task_states.REBOOTING).all(),
manual_joins=[])
def _retry_instance_update():
"""Wrap with oslo_db_api.wrap_db_retry, and also retry on
UnknownInstanceUpdateConflict.
"""
exception_checker = \
lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,))
return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
exception_checker=exception_checker)
@require_context
@_retry_instance_update()
@pick_context_manager_writer
def instance_update(context, instance_uuid, values, expected=None):
return _instance_update(context, instance_uuid, values, expected)
@require_context
@_retry_instance_update()
@pick_context_manager_writer
def instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=None, expected=None):
"""Set the given properties on an instance and update it. Return
a shallow copy of the original instance reference, as well as the
updated one.
:param context: = request context object
:param instance_uuid: = instance uuid
:param values: = dict containing column values
If "expected_task_state" exists in values, the update can only happen
when the task state before update matches expected_task_state. Otherwise
a UnexpectedTaskStateError is thrown.
:returns: a tuple of the form (old_instance_ref, new_instance_ref)
Raises NotFound if instance does not exist.
"""
instance_ref = _instance_get_by_uuid(context, instance_uuid,
columns_to_join=columns_to_join)
return (copy.copy(instance_ref), _instance_update(
context, instance_uuid, values, expected, original=instance_ref))
# NOTE(danms): This updates the instance's metadata list in-place and in
# the database to avoid stale data and refresh issues. It assumes the
# delete=True behavior of instance_metadata_update(...)
def _instance_metadata_update_in_place(context, instance, metadata_type, model,
metadata):
metadata = dict(metadata)
to_delete = []
for keyvalue in instance[metadata_type]:
key = keyvalue['key']
if key in metadata:
keyvalue['value'] = metadata.pop(key)
elif key not in metadata:
to_delete.append(keyvalue)
# NOTE: we have to hard_delete here otherwise we will get more than one
# system_metadata record when we read deleted for an instance;
# regular metadata doesn't have the same problem because we don't
# allow reading deleted regular metadata anywhere.
if metadata_type == 'system_metadata':
for condemned in to_delete:
context.session.delete(condemned)
instance[metadata_type].remove(condemned)
else:
for condemned in to_delete:
condemned.soft_delete(context.session)
for key, value in metadata.items():
newitem = model()
newitem.update({'key': key, 'value': value,
'instance_uuid': instance['uuid']})
context.session.add(newitem)
instance[metadata_type].append(newitem)
def _instance_update(context, instance_uuid, values, expected, original=None):
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(instance_uuid)
if expected is None:
expected = {}
else:
# Coerce all single values to singleton lists
expected = {k: [None] if v is None else sqlalchemyutils.to_list(v)
for (k, v) in expected.items()}
# Extract 'expected_' values from values dict, as these aren't actually
# updates
for field in ('task_state', 'vm_state'):
expected_field = 'expected_%s' % field
if expected_field in values:
value = values.pop(expected_field, None)
# Coerce all single values to singleton lists
if value is None:
expected[field] = [None]
else:
expected[field] = sqlalchemyutils.to_list(value)
# Values which need to be updated separately
metadata = values.pop('metadata', None)
system_metadata = values.pop('system_metadata', None)
_handle_objects_related_type_conversions(values)
# Hostname is potentially unique, but this is enforced in code rather
# than the DB. The query below races, but the number of users of
# osapi_compute_unique_server_name_scope is small, and a robust fix
# will be complex. This is intentionally left as is for the moment.
if 'hostname' in values:
_validate_unique_server_name(context, values['hostname'])
compare = models.Instance(uuid=instance_uuid, **expected)
try:
instance_ref = model_query(context, models.Instance,
project_only=True).\
update_on_match(compare, 'uuid', values)
except update_match.NoRowsMatched:
# Update failed. Try to find why and raise a specific error.
# We should get here only because our expected values were not current
# when update_on_match executed. Having failed, we now have a hint that
# the values are out of date and should check them.
# This code is made more complex because we are using repeatable reads.
# If we have previously read the original instance in the current
# transaction, reading it again will return the same data, even though
# the above update failed because it has changed: it is not possible to
# determine what has changed in this transaction. In this case we raise
# UnknownInstanceUpdateConflict, which will cause the operation to be
# retried in a new transaction.
# Because of the above, if we have previously read the instance in the
# current transaction it will have been passed as 'original', and there
# is no point refreshing it. If we have not previously read the
# instance, we can fetch it here and we will get fresh data.
if original is None:
original = _instance_get_by_uuid(context, instance_uuid)
conflicts_expected = {}
conflicts_actual = {}
for (field, expected_values) in expected.items():
actual = original[field]
if actual not in expected_values:
conflicts_expected[field] = expected_values
conflicts_actual[field] = actual
# Exception properties
exc_props = {
'instance_uuid': instance_uuid,
'expected': conflicts_expected,
'actual': conflicts_actual
}
# There was a conflict, but something (probably the MySQL read view,
# but possibly an exceptionally unlikely second race) is preventing us
# from seeing what it is. When we go round again we'll get a fresh
# transaction and a fresh read view.
if len(conflicts_actual) == 0:
raise exception.UnknownInstanceUpdateConflict(**exc_props)
# Task state gets special handling for convenience. We raise the
# specific error UnexpectedDeletingTaskStateError or
# UnexpectedTaskStateError as appropriate
if 'task_state' in conflicts_actual:
conflict_task_state = conflicts_actual['task_state']
if conflict_task_state == task_states.DELETING:
exc = exception.UnexpectedDeletingTaskStateError
else:
exc = exception.UnexpectedTaskStateError
# Everything else is an InstanceUpdateConflict
else:
exc = exception.InstanceUpdateConflict
raise exc(**exc_props)
if metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'metadata',
models.InstanceMetadata,
metadata)
if system_metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'system_metadata',
models.InstanceSystemMetadata,
system_metadata)
return instance_ref
@pick_context_manager_writer
def instance_add_security_group(context, instance_uuid, security_group_id):
"""Associate the given security group with the given instance."""
sec_group_ref = models.SecurityGroupInstanceAssociation()
sec_group_ref.update({'instance_uuid': instance_uuid,
'security_group_id': security_group_id})
sec_group_ref.save(context.session)
@require_context
@pick_context_manager_writer
def instance_remove_security_group(context, instance_uuid, security_group_id):
"""Disassociate the given security group from the given instance."""
model_query(context, models.SecurityGroupInstanceAssociation).\
filter_by(instance_uuid=instance_uuid).\
filter_by(security_group_id=security_group_id).\
soft_delete()
###################
@require_context
@pick_context_manager_reader
def instance_info_cache_get(context, instance_uuid):
"""Gets an instance info cache from the table.
:param instance_uuid: = uuid of the info cache's instance
"""
return model_query(context, models.InstanceInfoCache).\
filter_by(instance_uuid=instance_uuid).\
first()
@require_context
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def instance_info_cache_update(context, instance_uuid, values):
"""Update an instance info cache record in the table.
:param instance_uuid: = uuid of info cache's instance
:param values: = dict containing column values to update
"""
convert_objects_related_datetimes(values)
info_cache = model_query(context, models.InstanceInfoCache).\
filter_by(instance_uuid=instance_uuid).\
first()
needs_create = False
if info_cache and info_cache['deleted']:
raise exception.InstanceInfoCacheNotFound(
instance_uuid=instance_uuid)
elif not info_cache:
# NOTE(tr3buchet): just in case someone blows away an instance's
# cache entry, re-create it.
values['instance_uuid'] = instance_uuid
info_cache = models.InstanceInfoCache(**values)
needs_create = True
try:
with get_context_manager(context).writer.savepoint.using(context):
if needs_create:
info_cache.save(context.session)
else:
info_cache.update(values)
except db_exc.DBDuplicateEntry:
# NOTE(sirp): Possible race if two greenthreads attempt to
# recreate the instance cache entry at the same time. First one
# wins.
pass
return info_cache
@require_context
@pick_context_manager_writer
def instance_info_cache_delete(context, instance_uuid):
"""Deletes an existing instance_info_cache record
:param instance_uuid: = uuid of the instance tied to the cache record
"""
model_query(context, models.InstanceInfoCache).\
filter_by(instance_uuid=instance_uuid).\
soft_delete()
###################
def _instance_extra_create(context, values):
inst_extra_ref = models.InstanceExtra()
inst_extra_ref.update(values)
inst_extra_ref.save(context.session)
return inst_extra_ref
@pick_context_manager_writer
def instance_extra_update_by_uuid(context, instance_uuid, values):
rows_updated = model_query(context, models.InstanceExtra).\
filter_by(instance_uuid=instance_uuid).\
update(values)
if not rows_updated:
LOG.debug("Created instance_extra for %s", instance_uuid)
create_values = copy.copy(values)
create_values["instance_uuid"] = instance_uuid
_instance_extra_create(context, create_values)
rows_updated = 1
return rows_updated
@pick_context_manager_reader
def instance_extra_get_by_instance_uuid(context, instance_uuid,
columns=None):
query = model_query(context, models.InstanceExtra).\
filter_by(instance_uuid=instance_uuid)
if columns is None:
columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model',
'migration_context']
for column in columns:
query = query.options(undefer(column))
instance_extra = query.first()
return instance_extra
###################
@require_context
@pick_context_manager_writer
def key_pair_create(context, values):
try:
key_pair_ref = models.KeyPair()
key_pair_ref.update(values)
key_pair_ref.save(context.session)
return key_pair_ref
except db_exc.DBDuplicateEntry:
raise exception.KeyPairExists(key_name=values['name'])
@require_context
@pick_context_manager_writer
def key_pair_destroy(context, user_id, name):
result = model_query(context, models.KeyPair).\
filter_by(user_id=user_id).\
filter_by(name=name).\
soft_delete()
if not result:
raise exception.KeypairNotFound(user_id=user_id, name=name)
@require_context
@pick_context_manager_reader
def key_pair_get(context, user_id, name):
result = model_query(context, models.KeyPair).\
filter_by(user_id=user_id).\
filter_by(name=name).\
first()
if not result:
raise exception.KeypairNotFound(user_id=user_id, name=name)
return result
@require_context
@pick_context_manager_reader
def key_pair_get_all_by_user(context, user_id, limit=None, marker=None):
marker_row = None
if marker is not None:
marker_row = model_query(context, models.KeyPair, read_deleted="no").\
filter_by(name=marker).filter_by(user_id=user_id).first()
if not marker_row:
raise exception.MarkerNotFound(marker=marker)
query = model_query(context, models.KeyPair, read_deleted="no").\
filter_by(user_id=user_id)
query = sqlalchemyutils.paginate_query(
query, models.KeyPair, limit, ['name'], marker=marker_row)
return query.all()
@require_context
@pick_context_manager_reader
def key_pair_count_by_user(context, user_id):
return model_query(context, models.KeyPair, read_deleted="no").\
filter_by(user_id=user_id).\
count()
###################
@pick_context_manager_writer
def network_associate(context, project_id, network_id=None, force=False):
"""Associate a project with a network.
called by project_get_networks under certain conditions
and network manager add_network_to_project()
only associate if the project doesn't already have a network
or if force is True
force solves race condition where a fresh project has multiple instance
builds simultaneously picked up by multiple network hosts which attempt
to associate the project with multiple networks
force should only be used as a direct consequence of user request
all automated requests should not use force
"""
def network_query(project_filter, id=None):
filter_kwargs = {'project_id': project_filter}
if id is not None:
filter_kwargs['id'] = id
return model_query(context, models.Network, read_deleted="no").\
filter_by(**filter_kwargs).\
with_lockmode('update').\
first()
if not force:
# find out if project has a network
network_ref = network_query(project_id)
if force or not network_ref:
# in force mode or project doesn't have a network so associate
# with a new network
# get new network
network_ref = network_query(None, network_id)
if not network_ref:
raise exception.NoMoreNetworks()
# associate with network
# NOTE(vish): if with_lockmode isn't supported, as in sqlite,
# then this has concurrency issues
network_ref['project_id'] = project_id
context.session.add(network_ref)
return network_ref
def _network_ips_query(context, network_id):
return model_query(context, models.FixedIp, read_deleted="no").\
filter_by(network_id=network_id)
@pick_context_manager_reader
def network_count_reserved_ips(context, network_id):
return _network_ips_query(context, network_id).\
filter_by(reserved=True).\
count()
@pick_context_manager_writer
def network_create_safe(context, values):
network_ref = models.Network()
network_ref['uuid'] = uuidutils.generate_uuid()
network_ref.update(values)
try:
network_ref.save(context.session)
return network_ref
except db_exc.DBDuplicateEntry:
raise exception.DuplicateVlan(vlan=values['vlan'])
@pick_context_manager_writer
def network_delete_safe(context, network_id):
result = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(network_id=network_id).\
filter_by(allocated=True).\
count()
if result != 0:
raise exception.NetworkInUse(network_id=network_id)
network_ref = _network_get(context, network_id=network_id)
model_query(context, models.FixedIp, read_deleted="no").\
filter_by(network_id=network_id).\
soft_delete()
context.session.delete(network_ref)
@pick_context_manager_writer
def network_disassociate(context, network_id, disassociate_host,
disassociate_project):
net_update = {}
if disassociate_project:
net_update['project_id'] = None
if disassociate_host:
net_update['host'] = None
network_update(context, network_id, net_update)
def _network_get(context, network_id, project_only='allow_none'):
result = model_query(context, models.Network, project_only=project_only).\
filter_by(id=network_id).\
first()
if not result:
raise exception.NetworkNotFound(network_id=network_id)
return result
@require_context
@pick_context_manager_reader
def network_get(context, network_id, project_only='allow_none'):
return _network_get(context, network_id, project_only=project_only)
@require_context
@pick_context_manager_reader
def network_get_all(context, project_only):
result = model_query(context, models.Network, read_deleted="no",
project_only=project_only).all()
if not result:
raise exception.NoNetworksFound()
return result
@require_context
@pick_context_manager_reader
def network_get_all_by_uuids(context, network_uuids, project_only):
result = model_query(context, models.Network, read_deleted="no",
project_only=project_only).\
filter(models.Network.uuid.in_(network_uuids)).\
all()
if not result:
raise exception.NoNetworksFound()
# check if the result contains all the networks
# we are looking for
for network_uuid in network_uuids:
for network in result:
if network['uuid'] == network_uuid:
break
else:
if project_only:
raise exception.NetworkNotFoundForProject(
network_uuid=network_uuid, project_id=context.project_id)
raise exception.NetworkNotFound(network_id=network_uuid)
return result
def _get_associated_fixed_ips_query(context, network_id, host=None):
# NOTE(vish): The ugly joins here are to solve a performance issue and
# should be removed once we can add and remove leases
# without regenerating the whole list
vif_and = and_(models.VirtualInterface.id ==
models.FixedIp.virtual_interface_id,
models.VirtualInterface.deleted == 0)
inst_and = and_(models.Instance.uuid == models.FixedIp.instance_uuid,
models.Instance.deleted == 0)
# NOTE(vish): This subquery left joins the minimum interface id for each
# instance. If the join succeeds (i.e. the 11th column is not
# null), then the fixed ip is on the first interface.
subq = context.session.query(
func.min(models.VirtualInterface.id).label("id"),
models.VirtualInterface.instance_uuid).\
group_by(models.VirtualInterface.instance_uuid).subquery()
subq_and = and_(subq.c.id == models.FixedIp.virtual_interface_id,
subq.c.instance_uuid == models.VirtualInterface.instance_uuid)
query = context.session.query(
models.FixedIp.address,
models.FixedIp.instance_uuid,
models.FixedIp.network_id,
models.FixedIp.virtual_interface_id,
models.VirtualInterface.address,
models.Instance.hostname,
models.Instance.updated_at,
models.Instance.created_at,
models.FixedIp.allocated,
models.FixedIp.leased,
subq.c.id).\
filter(models.FixedIp.deleted == 0).\
filter(models.FixedIp.network_id == network_id).\
join((models.VirtualInterface, vif_and)).\
join((models.Instance, inst_and)).\
outerjoin((subq, subq_and)).\
filter(models.FixedIp.instance_uuid != null()).\
filter(models.FixedIp.virtual_interface_id != null())
if host:
query = query.filter(models.Instance.host == host)
return query
@pick_context_manager_reader
def network_get_associated_fixed_ips(context, network_id, host=None):
# FIXME(sirp): since this returns fixed_ips, this would be better named
# fixed_ip_get_all_by_network.
query = _get_associated_fixed_ips_query(context, network_id, host)
result = query.all()
data = []
for datum in result:
cleaned = {}
cleaned['address'] = datum[0]
cleaned['instance_uuid'] = datum[1]
cleaned['network_id'] = datum[2]
cleaned['vif_id'] = datum[3]
cleaned['vif_address'] = datum[4]
cleaned['instance_hostname'] = datum[5]
cleaned['instance_updated'] = datum[6]
cleaned['instance_created'] = datum[7]
cleaned['allocated'] = datum[8]
cleaned['leased'] = datum[9]
# NOTE(vish): default_route is True if this fixed ip is on the first
# interface its instance.
cleaned['default_route'] = datum[10] is not None
data.append(cleaned)
return data
@pick_context_manager_reader
def network_in_use_on_host(context, network_id, host):
query = _get_associated_fixed_ips_query(context, network_id, host)
return query.count() > 0
def _network_get_query(context):
return model_query(context, models.Network, read_deleted="no")
@pick_context_manager_reader
def network_get_by_uuid(context, uuid):
result = _network_get_query(context).filter_by(uuid=uuid).first()
if not result:
raise exception.NetworkNotFoundForUUID(uuid=uuid)
return result
@pick_context_manager_reader
def network_get_by_cidr(context, cidr):
result = _network_get_query(context).\
filter(or_(models.Network.cidr == cidr,
models.Network.cidr_v6 == cidr)).\
first()
if not result:
raise exception.NetworkNotFoundForCidr(cidr=cidr)
return result
@pick_context_manager_reader
def network_get_all_by_host(context, host):
fixed_host_filter = or_(models.FixedIp.host == host,
and_(models.FixedIp.instance_uuid != null(),
models.Instance.host == host))
fixed_ip_query = model_query(context, models.FixedIp,
(models.FixedIp.network_id,)).\
outerjoin((models.Instance,
models.Instance.uuid ==
models.FixedIp.instance_uuid)).\
filter(fixed_host_filter)
# NOTE(vish): return networks that have host set
# or that have a fixed ip with host set
# or that have an instance with host set
host_filter = or_(models.Network.host == host,
models.Network.id.in_(fixed_ip_query.subquery()))
return _network_get_query(context).filter(host_filter).all()
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
retry_on_request=True)
@pick_context_manager_writer
def network_set_host(context, network_id, host_id):
network_ref = _network_get_query(context).\
filter_by(id=network_id).\
first()
if not network_ref:
raise exception.NetworkNotFound(network_id=network_id)
if network_ref.host:
return None
rows_updated = _network_get_query(context).\
filter_by(id=network_id).\
filter_by(host=None).\
update({'host': host_id})
if not rows_updated:
LOG.debug('The row was updated in a concurrent transaction, '
'we will fetch another row')
raise db_exc.RetryRequest(
exception.NetworkSetHostFailed(network_id=network_id))
@require_context
@pick_context_manager_writer
def network_update(context, network_id, values):
network_ref = _network_get(context, network_id)
network_ref.update(values)
try:
network_ref.save(context.session)
except db_exc.DBDuplicateEntry:
raise exception.DuplicateVlan(vlan=values['vlan'])
return network_ref
###################
@require_context
@pick_context_manager_reader
def quota_get(context, project_id, resource, user_id=None):
model = models.ProjectUserQuota if user_id else models.Quota
query = model_query(context, model).\
filter_by(project_id=project_id).\
filter_by(resource=resource)
if user_id:
query = query.filter_by(user_id=user_id)
result = query.first()
if not result:
if user_id:
raise exception.ProjectUserQuotaNotFound(project_id=project_id,
user_id=user_id)
else:
raise exception.ProjectQuotaNotFound(project_id=project_id)
return result
@require_context
@pick_context_manager_reader
def quota_get_all_by_project_and_user(context, project_id, user_id):
user_quotas = model_query(context, models.ProjectUserQuota,
(models.ProjectUserQuota.resource,
models.ProjectUserQuota.hard_limit)).\
filter_by(project_id=project_id).\
filter_by(user_id=user_id).\
all()
result = {'project_id': project_id, 'user_id': user_id}