Skip to content

Commit

Permalink
Make SecurityGroupsRpcCallback a separate callback class
Browse files Browse the repository at this point in the history
RPC has a version of itself. In Neutron a plugin implements
several RPC interface, so a single RPC version doesn't work.
In Mixin callback class approach, RPC versioning depends on
each plugin implementation and it makes harder to maintain
RPC version appropriately. This patch series replaces mixin
RPC callback of server side with a separate class.

This commit handles server-side callback of security group
RPC interface.
* The server-side callback of Security group RPC is moved to
  api/rpc/handler and db/securitygroups_rpc_base now only
  contains a mixin class to add agent-based security group
  implementation with db operations.
* get_port_from_device method in server-side callback class
  is moved to a mixin class of plugin implementation
  (SecurityGroupServerRpcMixin) because it involves DB lookup
  and is tightly coupled with plugin implementation rather
  than RPC interface definition.

Most unit tests for SGServerRpcCallBackTestCase were skipped
in the base class before, but now they are no longer skipped.

The following items will be planned in later patches
to avoid drastic changes in a single patch.
* Merge security group RPC API and agent callback classes in
  agent/securitygroups_rpc into api/rpc/handlers/securitygroup_rpc
* Remove completely duplicated db access code in get_port_from_device
  and get_port_and_sgs

Partial-Bug: #1359416
Change-Id: Ia6535217d2e3b849a95667c1b53dd09675002892
  • Loading branch information
amotoki authored and s00218254 committed Aug 30, 2014
1 parent 9741786 commit 2781fce
Show file tree
Hide file tree
Showing 22 changed files with 338 additions and 276 deletions.
59 changes: 59 additions & 0 deletions neutron/api/rpc/handlers/securitygroups_rpc.py
@@ -0,0 +1,59 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from neutron.common import rpc as n_rpc
from neutron import manager


# TODO(amotoki): Move security group RPC API and agent callback
# from securitygroups_rpc.py.


class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
"""Callback for SecurityGroup agent RPC in plugin implementations.
Subclass which inherits this class must implement get_port_from_device().
"""

# API version history:
# 1.1 - Initial version

# NOTE: RPC_API_VERSION must not be overridden in subclasses
# to keep RPC API version consistent across plugins.
RPC_API_VERSION = '1.1'

@property
def plugin(self):
return manager.NeutronManager.get_plugin()

def security_group_rules_for_devices(self, context, **kwargs):
"""Callback method to return security group rules for each port.
also convert remote_group_id rule
to source_ip_prefix and dest_ip_prefix rule
:params devices: list of devices
:returns: port correspond to the devices with security group rules
"""
devices = kwargs.get('devices')

ports = {}
for device in devices:
port = self.plugin.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
ports[port['id']] = port
return self.plugin.security_group_rules_for_ports(context, ports)
50 changes: 22 additions & 28 deletions neutron/db/securitygroups_rpc_base.py
Expand Up @@ -36,6 +36,27 @@


class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
"""Mixin class to add agent-based security group implementation."""

def get_port_from_device(self, device):
"""Get port dict from device name on an agent.
Subclass must provide this method.
:param device: device name which identifies a port on the agent side.
What is specified in "device" depends on a plugin agent implementation.
For example, it is a port ID in OVS agent and netdev name in Linux
Bridge agent.
:return: port dict returned by DB plugin get_port(). In addition,
it must contain the following fields in the port dict returned.
- device
- security_groups
- security_group_rules,
- security_group_source_groups
- fixed_ips
"""
raise NotImplementedError(_("%s must implement get_port_from_device.")
% self.__class__.__name__)

def create_security_group_rule(self, context, security_group_rule):
bulk_rule = {'security_group_rules': [security_group_rule]}
Expand Down Expand Up @@ -128,33 +149,6 @@ def notify_security_groups_member_updated(self, context, port):
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))


class SecurityGroupServerRpcCallbackMixin(object):
"""A mix-in that enable SecurityGroup agent support in plugin
implementations.
"""

def security_group_rules_for_devices(self, context, **kwargs):
"""Return security group rules for each port.
also convert remote_group_id rule
to source_ip_prefix and dest_ip_prefix rule
:params devices: list of devices
:returns: port correspond to the devices with security group rules
"""
devices = kwargs.get('devices')

ports = {}
for device in devices:
port = self.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
ports[port['id']] = port
return self._security_group_rules_for_ports(context, ports)

def _select_rules_for_ports(self, context, ports):
if not ports:
return []
Expand Down Expand Up @@ -354,7 +348,7 @@ def _apply_provider_rule(self, context, ports):
self._add_ingress_ra_rule(port, ips_ra)
self._add_ingress_dhcp_rule(port, ips_dhcp)

def _security_group_rules_for_ports(self, context, ports):
def security_group_rules_for_ports(self, context, ports):
rules_in_db = self._select_rules_for_ports(context, ports)
for (binding, rule_in_db) in rules_in_db:
port_id = binding['port_id']
Expand Down
12 changes: 5 additions & 7 deletions neutron/plugins/bigswitch/plugin.py
Expand Up @@ -56,6 +56,7 @@
from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
Expand All @@ -72,7 +73,7 @@
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron.db import securitygroups_rpc_base as sg_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import external_net
from neutron.extensions import extra_dhcp_opt as edo_ext
Expand Down Expand Up @@ -113,10 +114,7 @@ def port_update(self, context, port):
topic=self.topic_port_update)


class RestProxyCallbacks(n_rpc.RpcCallback,
sg_rpc_base.SecurityGroupServerRpcCallbackMixin):

RPC_API_VERSION = '1.1'
class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):

def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device)
Expand Down Expand Up @@ -454,7 +452,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
sg_rpc_base.SecurityGroupServerRpcMixin):
SecurityGroupServerRpcMixin):

_supported_extension_aliases = ["external-net", "router", "binding",
"router_rules", "extra_dhcp_opt", "quotas",
Expand Down Expand Up @@ -509,7 +507,7 @@ def _setup_rpc(self):
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self._dhcp_agent_notifier
)
self.endpoints = [RestProxyCallbacks(),
self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
Expand Down
64 changes: 34 additions & 30 deletions neutron/plugins/brocade/NeutronPlugin.py
Expand Up @@ -30,6 +30,7 @@
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
Expand Down Expand Up @@ -57,6 +58,7 @@
PLUGIN_VERSION = 0.88
AGENT_OWNER_PREFIX = "network:"
NOS_DRIVER = 'neutron.plugins.brocade.nos.nosdriver.NOSdriver'
TAP_PREFIX_LEN = 3

SWITCH_OPTS = [cfg.StrOpt('address', default='',
help=_('The address of the host to SSH to')),
Expand All @@ -77,41 +79,14 @@
cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE")


class BridgeRpcCallbacks(n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
class BridgeRpcCallbacks(n_rpc.RpcCallback):
"""Agent callback."""

RPC_API_VERSION = '1.2'
# Device names start with "tap"
# history
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
TAP_PREFIX_LEN = 3

@classmethod
def get_port_from_device(cls, device):
"""Get port from the brocade specific db."""

# TODO(shh) context is not being passed as
# an argument to this function;
#
# need to be fixed in:
# file: neutron/db/securtygroups_rpc_base.py
# function: securitygroup_rules_for_devices()
# which needs to pass context to us

# Doing what other plugins are doing
session = db.get_session()
port = brocade_db.get_port_from_device(
session, device[cls.TAP_PREFIX_LEN:])

# TODO(shiv): need to extend the db model to include device owners
# make it appears that the device owner is of type network
if port:
port['device'] = device
port['device_owner'] = AGENT_OWNER_PREFIX
port['binding:vif_type'] = 'bridge'
return port

def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
Expand All @@ -120,7 +95,7 @@ def get_device_details(self, rpc_context, **kwargs):
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = brocade_db.get_port(rpc_context, device[self.TAP_PREFIX_LEN:])
port = brocade_db.get_port(rpc_context, device[TAP_PREFIX_LEN:])
if port:
entry = {'device': device,
'vlan_id': port.vlan_id,
Expand Down Expand Up @@ -163,6 +138,34 @@ def update_device_down(self, rpc_context, **kwargs):
return entry


class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):

@classmethod
def get_port_from_device(cls, device):
"""Get port from the brocade specific db."""

# TODO(shh) context is not being passed as
# an argument to this function;
#
# need to be fixed in:
# file: neutron/db/securtygroups_rpc_base.py
# function: securitygroup_rules_for_devices()
# which needs to pass context to us

# Doing what other plugins are doing
session = db.get_session()
port = brocade_db.get_port_from_device(
session, device[TAP_PREFIX_LEN:])

# TODO(shiv): need to extend the db model to include device owners
# make it appears that the device owner is of type network
if port:
port['device'] = device
port['device_owner'] = AGENT_OWNER_PREFIX
port['binding:vif_type'] = 'bridge'
return port


class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the linux bridge rpc API.
Expand Down Expand Up @@ -205,7 +208,7 @@ def port_update(self, context, port, physical_network, vlan_id):
class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
SecurityGroupServerRpcMixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_base.PortBindingBaseMixin):
Expand Down Expand Up @@ -262,6 +265,7 @@ def _setup_rpc(self):
is_admin=False)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [BridgeRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
Expand Down
32 changes: 17 additions & 15 deletions neutron/plugins/linuxbridge/lb_neutron_plugin.py
Expand Up @@ -22,6 +22,7 @@
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
Expand Down Expand Up @@ -52,32 +53,25 @@

LOG = logging.getLogger(__name__)

# Device names start with "tap"
TAP_PREFIX_LEN = 3

class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin
):

class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback):

# history
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
RPC_API_VERSION = '1.2'
# Device names start with "tap"
TAP_PREFIX_LEN = 3

@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
if port:
port['device'] = device
return port

def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device)
plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port:
binding = db.get_network_binding(db_api.get_session(),
port['network_id'])
Expand Down Expand Up @@ -117,10 +111,10 @@ def update_device_down(self, rpc_context, **kwargs):
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port:
entry = {'device': device,
'exists': True}
Expand All @@ -143,10 +137,10 @@ def update_device_up(self, rpc_context, **kwargs):
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
host = kwargs.get('host')
port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port:
if (host and
not plugin.get_port_host(rpc_context, port['id']) == host):
Expand Down Expand Up @@ -283,6 +277,7 @@ def _setup_rpc(self):
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [LinuxBridgeRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
Expand Down Expand Up @@ -542,3 +537,10 @@ def _notify_port_updated(self, context, port):
self.notifier.port_update(context, port,
binding.physical_network,
binding.vlan_id)

@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
if port:
port['device'] = device
return port

0 comments on commit 2781fce

Please sign in to comment.