Skip to content

Commit

Permalink
Core modifications for future zones service.
Browse files Browse the repository at this point in the history
Makes compute/network/volume API classes pluggable
Splits some code out in compute/api
Adds some race condition checking on deletes in compute/api
Make instance_delete support UUIDs
Add support to RPC to cast to specific servers
Migrations for future zones

Change-Id: Ibee126cd6d325e11770f27589e79dfd0e6104b99
  • Loading branch information
comstud committed Feb 16, 2012
1 parent f5e17bb commit c729ba8
Show file tree
Hide file tree
Showing 18 changed files with 429 additions and 58 deletions.
7 changes: 6 additions & 1 deletion nova/compute/__init__.py
Expand Up @@ -16,5 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.

from nova.compute.api import API
from nova.compute.api import AggregateAPI
# Importing full names to not pollute the namespace and cause possible
# collisions with use of 'from nova.compute import <foo>' elsewhere.
import nova.flags
import nova.utils

API = nova.utils.import_class(nova.flags.FLAGS.compute_api_class)
55 changes: 35 additions & 20 deletions nova/compute/api.py
Expand Up @@ -568,6 +568,20 @@ def _schedule_run_instance(self,
"requested_networks": requested_networks,
"filter_properties": filter_properties}})

def _check_create_policies(self, context, availability_zone,
requested_networks, block_device_mapping):
"""Check policies for create()."""
target = {'project_id': context.project_id,
'user_id': context.user_id,
'availability_zone': availability_zone}
check_policy(context, 'create', target)

if requested_networks:
check_policy(context, 'create:attach_network', target)

if block_device_mapping:
check_policy(context, 'create:attach_volume', target)

def create(self, context, instance_type,
image_href, kernel_id=None, ramdisk_id=None,
min_count=None, max_count=None,
Expand All @@ -588,16 +602,9 @@ def create(self, context, instance_type,
could be 'None' or a list of instance dicts depending on if
we waited for information from the scheduler or not.
"""
target = {'project_id': context.project_id,
'user_id': context.user_id,
'availability_zone': availability_zone}
check_policy(context, 'create', target)

if requested_networks:
check_policy(context, 'create:attach_network', target)

if block_device_mapping:
check_policy(context, 'create:attach_volume', target)
self._check_create_policies(context, availability_zone,
requested_networks, block_device_mapping)

# We can create the DB entry for the instance here if we're
# only going to create 1 instance and we're in a single
Expand Down Expand Up @@ -848,20 +855,28 @@ def soft_delete(self, context, instance):
else:
LOG.warning(_("No host for instance %s, deleting immediately"),
instance["uuid"])
self.db.instance_destroy(context, instance['id'])
try:
self.db.instance_destroy(context, instance['id'])
except exception.InstanceNotFound:
# NOTE(comstud): Race condition. Instance already gone.
pass

def _delete(self, context, instance):
host = instance['host']
if host:
self.update(context,
instance,
task_state=task_states.DELETING,
progress=0)

self._cast_compute_message('terminate_instance', context,
instance)
else:
self.db.instance_destroy(context, instance['id'])
try:
if host:
self.update(context,
instance,
task_state=task_states.DELETING,
progress=0)

self._cast_compute_message('terminate_instance', context,
instance)
else:
self.db.instance_destroy(context, instance['id'])
except exception.InstanceNotFound:
# NOTE(comstud): Race condition. Instance already gone.
pass

# NOTE(jerdfelt): The API implies that only ACTIVE and ERROR are
# allowed but the EC2 API appears to allow from RESCUED and STOPPED
Expand Down
16 changes: 12 additions & 4 deletions nova/db/sqlalchemy/api.py
Expand Up @@ -1354,11 +1354,12 @@ def instance_create(context, values):
context - request context object
values - dict containing column values.
"""
values = values.copy()
values['metadata'] = _metadata_refs(values.get('metadata'),
models.InstanceMetadata)
instance_ref = models.Instance()
instance_ref['uuid'] = str(utils.gen_uuid())

if not values.get('uuid'):
values['uuid'] = str(utils.gen_uuid())
instance_ref.update(values)

session = get_session()
Expand Down Expand Up @@ -1388,7 +1389,13 @@ def instance_data_get_for_project(context, project_id):
def instance_destroy(context, instance_id):
session = get_session()
with session.begin():
instance_ref = instance_get(context, instance_id, session=session)
if utils.is_uuid_like(instance_id):
instance_ref = instance_get_by_uuid(context, instance_id,
session=session)
instance_id = instance_ref['id']
else:
instance_ref = instance_get(context, instance_id,
session=session)
session.query(models.Instance).\
filter_by(id=instance_id).\
update({'deleted': True,
Expand All @@ -1412,6 +1419,7 @@ def instance_destroy(context, instance_id):

instance_info_cache_delete(context, instance_ref['uuid'],
session=session)
return instance_ref


@require_context
Expand Down Expand Up @@ -3541,7 +3549,7 @@ def zone_get(context, zone_id):

@require_admin_context
def zone_get_all(context):
return model_query(context, models.Zone, read_deleted="yes").all()
return model_query(context, models.Zone, read_deleted="no").all()


####################
Expand Down
@@ -0,0 +1,44 @@
# Copyright 2012 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from sqlalchemy import *

meta = MetaData()

zones = Table('zones', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)

is_parent = Column('is_parent', Boolean(), default=False)
rpc_host = Column('rpc_host', String(255))
rpc_port = Column('rpc_port', Integer())
rpc_virtual_host = Column('rpc_virtual_host', String(255))


def upgrade(migrate_engine):
meta.bind = migrate_engine

zones.create_column(is_parent)
zones.create_column(rpc_host)
zones.create_column(rpc_port)
zones.create_column(rpc_virtual_host)


def downgrade(migrate_engine):
meta.bind = migrate_engine

zones.drop_column(rpc_virtual_host)
zones.drop_column(rpc_port)
zones.drop_column(rpc_host)
zones.drop_column(is_parent)
35 changes: 35 additions & 0 deletions nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql
@@ -0,0 +1,35 @@
BEGIN TRANSACTION;

CREATE TEMPORARY TABLE zones_temp (
created_at DATETIME,
updated_at DATETIME,
deleted_at DATETIME,
deleted BOOLEAN,
id INTEGER NOT NULL,
name VARCHAR(255),
api_url VARVHAR(255),
username VARCHAR(255),
password VARCHAR(255),
weight_offset FLOAT,
weight_scale FLOAT,
PRIMARY KEY (id),
CHECK (deleted IN (0, 1))
);

INSERT INTO zones_temp
SELECT created_at,
updated_at,
deleted_at,
deleted,
id,
name,
api_url,
username,
password,
weight_offset,
weight_scale FROM zones;

DROP TABLE zones;

ALTER TABLE zones_temp RENAME TO zones;
COMMIT;
@@ -0,0 +1,31 @@
# Copyright 2012 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from sqlalchemy import *

meta = MetaData()


def upgrade(migrate_engine):
meta.bind = migrate_engine
instances = Table('instances', meta, autoload=True)
zone_name = Column('zone_name', String(255))
instances.create_column(zone_name)


def downgrade(migrate_engine):
meta.bind = migrate_engine
instances = Table('instances', meta, autoload=True)
zone_name = Column('zone_name', String(255))
instances.drop_column(zone_name)
7 changes: 7 additions & 0 deletions nova/db/sqlalchemy/models.py
Expand Up @@ -277,6 +277,9 @@ def name(self):
# EC2 disable_api_termination
disable_terminate = Column(Boolean(), default=False, nullable=False)

# Openstack zone name
zone_name = Column(String(255))


class InstanceInfoCache(BASE, NovaBase):
"""
Expand Down Expand Up @@ -876,6 +879,10 @@ class Zone(BASE, NovaBase):
password = Column(String(255))
weight_offset = Column(Float(), default=0.0)
weight_scale = Column(Float(), default=1.0)
is_parent = Column(Boolean())
rpc_host = Column(String(255))
rpc_port = Column(Integer())
rpc_virtual_host = Column(String(255))


class Aggregate(BASE, NovaBase):
Expand Down
11 changes: 10 additions & 1 deletion nova/flags.py
Expand Up @@ -450,7 +450,16 @@ def _get_my_ip():
help='Cache glance images locally'),
cfg.BoolOpt('use_cow_images',
default=True,
help='Whether to use cow images')
help='Whether to use cow images'),
cfg.StrOpt('compute_api_class',
default='nova.compute.api.API',
help='The compute API class to use'),
cfg.StrOpt('network_api_class',
default='nova.network.api.API',
help='The network API class to use'),
cfg.StrOpt('volume_api_class',
default='nova.volume.api.API',
help='The volume API class to use'),
]

FLAGS.register_opts(global_opts)
7 changes: 6 additions & 1 deletion nova/network/__init__.py
Expand Up @@ -16,4 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.

from nova.network.api import API
# Importing full names to not pollute the namespace and cause possible
# collisions with use of 'from nova.network import <foo>' elsewhere.
import nova.flags
import nova.utils

API = nova.utils.import_class(nova.flags.FLAGS.network_api_class)
31 changes: 31 additions & 0 deletions nova/rpc/__init__.py
Expand Up @@ -161,6 +161,37 @@ def cleanup():
return _get_impl().cleanup()


def cast_to_server(context, server_params, topic, msg):
"""Invoke a remote method that does not return anything.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().cast_to_server(context, server_params, topic, msg)


def fanout_cast_to_server(context, server_params, topic, msg):
"""Broadcast to a remote method invocation with no return.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().fanout_cast_to_server(context, server_params, topic,
msg)


_RPCIMPL = None


Expand Down
22 changes: 20 additions & 2 deletions nova/rpc/amqp.py
Expand Up @@ -73,14 +73,15 @@ class ConnectionContext(rpc_common.Connection):
the pool.
"""

def __init__(self, connection_pool, pooled=True):
def __init__(self, connection_pool, pooled=True, server_params=None):
"""Create a new connection, or get one from the pool"""
self.connection = None
self.connection_pool = connection_pool
if pooled:
self.connection = connection_pool.get()
else:
self.connection = connection_pool.connection_cls()
self.connection = connection_pool.connection_cls(
server_params=server_params)
self.pooled = pooled

def __enter__(self):
Expand Down Expand Up @@ -353,6 +354,23 @@ def fanout_cast(context, topic, msg, connection_pool):
conn.fanout_send(topic, msg)


def cast_to_server(context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.topic_send(topic, msg)


def fanout_cast_to_server(context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.fanout_send(topic, msg)


def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic)
Expand Down

0 comments on commit c729ba8

Please sign in to comment.