Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Add GBP APIs to interact with Neutron RESTful Client
Browse files Browse the repository at this point in the history
1. Add GBP APIs to interact with Neutron RESTful client to:
    CRUD networks
    CRUD subnets
    CRUD ports
    CRUD security groups
    CRD security group rules
    CRUD routers
    add/remove router interfaces

2. Add UT for the above APIs

Change-Id: If7d73284ad9e7e09bd261e5ba0d82d67ec99ec49
Signed-off-by: Yi Yang <yyos1999@gmail.com>
Author: Yi Yang <yyos1999@gmail.com>
Co-Authored-By: Yapeng Wu <yapengwu@gmail.com>
  • Loading branch information
Yi Yang and yapengwu committed Mar 26, 2015
1 parent 5e9db63 commit 07ba164
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 0 deletions.
151 changes: 151 additions & 0 deletions gbpservice/network/neutronv2/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from gbpservice.network.neutronv2 import client


class API(object):
"""API for interacting with the neutron 2.x API."""

def _create_resource(self, context, resource, attrs):
action = 'create_' + resource
neutron = client.get_client(context)
obj_creator = getattr(neutron, action)
return obj_creator(attrs)[resource]

def _show_resource(self, context, resource, resource_id):
action = 'show_' + resource
neutron = client.get_client(context)
obj_method = getattr(neutron, action)
return obj_method(resource_id)[resource]

def _list_resources(self, context, resource, filters={}):
resources = resource + 's'
action = 'list_' + resources
neutron = client.get_client(context)
obj_lister = getattr(neutron, action)
return obj_lister(**filters)[resources]

def _update_resource(self, context, resource, resource_id, attrs):
action = 'update_' + resource
neutron = client.get_client(context)
obj_updater = getattr(neutron, action)
return obj_updater(resource_id, attrs)[resource]

def _delete_resource(self, context, resource, resource_id):
action = 'delete_' + resource
neutron = client.get_client(context)
obj_deleter = getattr(neutron, action)
obj_deleter(resource_id)

def create_network(self, context, network):
return self._create_resource(context, 'network', network)

def show_network(self, context, net_id):
return self._show_resource(context, 'network', net_id)

def list_networks(self, context, filters={}):
return self._list_resources(context, 'network', filters)

def update_network(self, context, net_id, network):
return self._update_resource(context, 'network', net_id, network)

def delete_network(self, context, net_id):
self._delete_resource(context, 'network', net_id)

def create_subnet(self, context, subnet):
return self._create_resource(context, 'subnet', subnet)

def show_subnet(self, context, subnet_id):
return self._show_resource(context, 'subnet', subnet_id)

def list_subnets(self, context, filters={}):
return self._list_resources(context, 'subnet', filters)

def update_subnet(self, context, subnet_id, subnet):
return self._update_resource(context, 'subnet', subnet_id, subnet)

def delete_subnet(self, context, subnet_id):
self._delete_resource(context, 'subnet', subnet_id)

def create_port(self, context, port):
return self._create_resource(context, 'port', port)

def show_port(self, context, port_id):
return self._show_resource(context, 'port', port_id)

def list_ports(self, context, filters={}):
return self._list_resources(context, 'port', filters)

def update_port(self, context, port_id, port):
return self._update_resource(context, 'port', port_id, port)

def delete_port(self, context, port_id):
self._delete_resource(context, 'port', port_id)

def create_security_group(self, context, sg):
return self._create_resource(context, 'security_group', sg)

def show_security_group(self, context, sg_id):
return self._show_resource(context, 'security_group', sg_id)

def list_security_groups(self, context, filters={}):
return self._list_resources(context, 'security_group', filters)

def update_security_group(self, context, sg_id, sg):
return self._update_resource(context, 'security_group', sg_id, sg)

def delete_security_group(self, context, sg_id):
self._delete_resource(context, 'security_group', sg_id)

def create_security_group_rule(self, context, rule):
return self._create_resource(context, 'security_group_rule', rule)

def show_security_group_rule(self, context, rule_id):
return self._show_resource(context, 'security_group_rule', rule_id)

def list_security_group_rules(self, context, filters={}):
return self._list_resources(context, 'security_group_rule', filters)

# REVISIT(yi): update_security_group_rule not supported in neutron yet
# def update_security_group_rule(self, context, rule_id, rule):
# return self._update_resource(context,
# 'security_group_rule',
# rule_id,
# rule)

def delete_security_group_rule(self, context, rule_id):
self._delete_resource(context, 'security_group_rule', rule_id)

def create_router(self, context, router):
return self._create_resource(context, 'router', router)

def show_router(self, context, router_id):
return self._show_resource(context, 'router', router_id)

def list_routers(self, context, filters={}):
return self._list_resources(context, 'router', filters)

def update_router(self, context, router_id, router):
return self._update_resource(context, 'router', router_id, router)

def delete_router(self, context, router_id):
self._delete_resource(context, 'router', router_id)

def add_router_interface(self, context, router_id, interface):
return client.get_client(context).add_interface_router(router_id,
interface)

def remove_router_interface(self, context, router_id, interface):
return client.get_client(context).remove_interface_router(router_id,
interface)
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import unittest

from neutronclient.v2_0 import client as clientv20

from gbpservice.network.neutronv2 import api as nc_api
from gbpservice.network.neutronv2 import client as nc_client


ATTRIBUTES = 'fake_attributes'
CONTEXT = 'fake_context'
FILTERS = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
ID = 'fake_id'
INTERFACE = 'fake_interface'
VALUE = 'fake_value'


class NeutronV2ApiTestCase(unittest.TestCase):
""" Test case for Neutron V2 API testing.
Provide a set of mocked methods for the neutron v2 API testing.
"""

def setUp(self):
self.neutron_api = nc_api.API()
self.mock_client = mock.Mock(clientv20.Client)

def _test_create_resource(self, resource):
with mock.patch.object(nc_client, 'get_client') as mock_get_client:
action = 'create_' + resource
mock_get_client.return_value = self.mock_client
mock_client_action = getattr(self.mock_client, action)
mock_client_action.return_value = {resource: VALUE}

method_to_test = getattr(self.neutron_api, action)
method_to_test(CONTEXT, ATTRIBUTES)
mock_client_action.assert_called_once_with(ATTRIBUTES)

def _test_show_resource(self, resource):
with mock.patch.object(nc_client, 'get_client') as mock_get_client:
action = 'show_' + resource
mock_get_client.return_value = self.mock_client
mock_client_action = getattr(self.mock_client, action)
mock_client_action.return_value = {resource: VALUE}

method_to_test = getattr(self.neutron_api, action)
method_to_test(CONTEXT, ID)
mock_client_action.assert_called_once_with(ID)

def _test_list_resources(self, resource):
with mock.patch.object(nc_client, 'get_client') as mock_get_client:
resources = resource + 's'
action = 'list_' + resources
mock_get_client.return_value = self.mock_client
mock_client_action = getattr(self.mock_client, action)
mock_client_action.return_value = {resources: VALUE}

method_to_test = getattr(self.neutron_api, action)
method_to_test(CONTEXT, FILTERS)
mock_client_action.assert_called_once_with(**FILTERS)

def _test_update_resource(self, resource):
with mock.patch.object(nc_client, 'get_client') as mock_get_client:
action = 'update_' + resource
mock_get_client.return_value = self.mock_client
mock_client_action = getattr(self.mock_client, action)
mock_client_action.return_value = {resource: VALUE}

method_to_test = getattr(self.neutron_api, action)
method_to_test(CONTEXT, ID, ATTRIBUTES)
mock_client_action.assert_called_once_with(ID, ATTRIBUTES)

def _test_delete_resource(self, resource):
with mock.patch.object(nc_client, 'get_client') as mock_get_client:
action = 'delete_' + resource
mock_get_client.return_value = self.mock_client
mock_client_action = getattr(self.mock_client, action)
mock_client_action.return_value = {resource: VALUE}

method_to_test = getattr(self.neutron_api, action)
method_to_test(CONTEXT, ID)
mock_client_action.assert_called_once_with(ID)

def _test_router_interface(self, action):
with mock.patch.object(nc_client, 'get_client') as mock_get_client:
mock_get_client.return_value = self.mock_client
mock_client_action = getattr(self.mock_client,
action + '_interface_router')
mock_client_action.return_value = VALUE

method_to_test = getattr(self.neutron_api,
action + '_router_interface')
method_to_test(CONTEXT, ID, INTERFACE)
mock_client_action.assert_called_once_with(ID, INTERFACE)

def test_create_network(self):
self._test_create_resource('network')

def test_show_network(self):
self._test_show_resource('network')

def test_list_networks(self):
self._test_list_resources('network')

def test_update_network(self):
self._test_update_resource('network')

def test_delete_network(self):
self._test_delete_resource('network')

def test_create_subnet(self):
self._test_create_resource('subnet')

def test_show_subnet(self):
self._test_show_resource('subnet')

def test_list_subnets(self):
self._test_list_resources('subnet')

def test_update_subnet(self):
self._test_update_resource('subnet')

def test_delete_subnet(self):
self._test_delete_resource('subnet')

def test_create_port(self):
self._test_create_resource('port')

def test_show_port(self):
self._test_show_resource('port')

def test_list_ports(self):
self._test_list_resources('port')

def test_update_port(self):
self._test_update_resource('port')

def test_delete_port(self):
self._test_delete_resource('port')

def test_create_security_group(self):
self._test_create_resource('security_group')

def test_show_security_group(self):
self._test_show_resource('security_group')

def test_list_security_groups(self):
self._test_list_resources('security_group')

def test_update_security_group(self):
self._test_update_resource('security_group')

def test_delete_security_group(self):
self._test_delete_resource('security_group')

def test_create_security_group_rule(self):
self._test_create_resource('security_group_rule')

def test_show_security_group_rule(self):
self._test_show_resource('security_group_rule')

def test_list_security_group_rules(self):
self._test_list_resources('security_group_rule')

# REVISIT(yi): update_security_group_rule not supported in neutron yet
# def test_update_security_group_rule(self):
# self._test_update_resource('security_group_rule')

def test_delete_security_group_rule(self):
self._test_delete_resource('security_group_rule')

def test_create_router(self):
self._test_create_resource('router')

def test_show_router(self):
self._test_show_resource('router')

def test_list_routers(self):
self._test_list_resources('router')

def test_update_router(self):
self._test_update_resource('router')

def test_delete_router(self):
self._test_delete_resource('router')

def test_add_router_interface(self):
self._test_router_interface('add')

def test_remove_router_interface(self):
self._test_router_interface('remove')

0 comments on commit 07ba164

Please sign in to comment.