Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
rest api for networks vlans and sites
Browse files Browse the repository at this point in the history
  • Loading branch information
uberj committed Aug 7, 2013
1 parent 24cad51 commit 9ffacf2
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 33 deletions.
175 changes: 173 additions & 2 deletions invtool/core_dispatch.py
Expand Up @@ -3,11 +3,19 @@
except ImportError:
import json

from invtool.lib.registrar import registrar
from invtool.dispatch import ObjectDispatch
from invtool.tests.utils import call_to_json, test_method_to_params, EXEC

from invtool.lib.dns_options import detail_pk_argument
from invtool.lib.registrar import registrar
from invtool.lib.parser import build_detail_parser
from invtool.lib.dns_options import (
description_argument, comment_argument, update_pk_argument,
delete_pk_argument, detail_pk_argument
)
from invtool.lib.core_options import (
name_argument, number_argument, site_argument, vlan_argument,
network_str_argument
)


# TODO, make a core_dispatch.py with CoreDispatch base class
Expand All @@ -16,6 +24,11 @@ class CoreDispatch(ObjectDispatch):
object_url = "/en-US/core/api/v1_core/{1}/{2}/"
object_list_url = "/en-US/core/api/v1_core/{1}/"


class ROCoreDispatch(ObjectDispatch):
object_url = "/en-US/core/api/v1_core/{1}/{2}/"
object_list_url = "/en-US/core/api/v1_core/{1}/"

def build_parser(self, base_parser):
record_base_parser = base_parser.add_parser(
self.dtype,
Expand All @@ -33,9 +46,122 @@ class DispatchNetwork(CoreDispatch):
resource_name = 'network'
dtype = 'NET'
dgroup = 'core'
ip_type = None

def test_setup(self):
def setUp(self):
def create_site():
command = [EXEC, DispatchSite.dtype, 'create']
for add_arg, extract_arg, test_f in DispatchSite.create_args:
command.append(test_method_to_params(test_f()))

site_ret, site_errors, site_rc = call_to_json(
' '.join(command)
)

if site_errors:
self.fail(site_errors)

self.assertEqual(0, site_rc)

# Make sure the SREG was created correctly
self.assertTrue('http_status' in site_ret)
self.assertEqual(site_ret['http_status'], 201)
self.assertTrue('pk' in site_ret)
return site_ret['pk']

def create_vlan():
command = [EXEC, DispatchVlan.dtype, 'create']
for add_arg, extract_arg, test_f in DispatchVlan.create_args:
command.append(test_method_to_params(test_f()))

vlan_ret, vlan_errors, vlan_rc = call_to_json(
' '.join(command)
)

if vlan_errors:
self.fail(vlan_errors)

self.assertEqual(0, vlan_rc)

# Make sure the SREG was created correctly
self.assertTrue('http_status' in vlan_ret)
self.assertEqual(vlan_ret['http_status'], 201)
self.assertTrue('pk' in vlan_ret)
return vlan_ret['pk']

self.site_pk = create_site()
self.vlan_pk = create_site()

def modify_command(command_str):
command_str = command_str.replace(
'{{ site }}', str(self.site_pk)
)
command_str = command_str.replace(
'{{ vlan }}', str(self.vlan_pk)
)
return command_str

self.modify_command = modify_command
return setUp

def test_teardown(self):
def tearDown(self):
# Delete the object
def delete_vlan():
delete_command = "{0} {1} delete --pk {2}".format(
EXEC, 'VLAN', self.vlan_pk)
ret, errors, rc = call_to_json(delete_command)
if errors:
self.fail(errors)
self.assertEqual(0, rc)
self.assertTrue('http_status' in ret)
self.assertEqual(ret['http_status'], 204)

def delete_site():
delete_command = "{0} {1} delete --pk {2}".format(
EXEC, 'SITE', self.vlan_pk)
ret, errors, rc = call_to_json(delete_command)
if errors:
self.fail(errors)
self.assertEqual(0, rc)
self.assertTrue('http_status' in ret)
self.assertEqual(ret['http_status'], 204)
delete_vlan()
delete_site()
return tearDown

update_args = [
site_argument('site'),
vlan_argument('vlan'),
network_str_argument('network-str'),
comment_argument('comment'),
description_argument('description'),
update_pk_argument('pk', dtype)
]

create_args = [
site_argument('site'),
vlan_argument('vlan'),
network_str_argument('network-str'),
comment_argument('comment'),
description_argument('description'),
]

delete_args = [
delete_pk_argument('pk', dtype)
]

detail_args = [detail_pk_argument('pk', dtype)]

def get_create_data(self, nas):
data = super(DispatchNetwork, self).get_create_data(nas)
return set_ip_type('network_str', data)

def get_update_data(self, nas):
data = super(DispatchNetwork, self).get_update_data(nas)
return set_ip_type('network_str', data)


registrar.register(DispatchNetwork())

Expand All @@ -45,6 +171,23 @@ class DispatchSite(CoreDispatch):
dtype = 'SITE'
dgroup = 'core'

update_args = [
name_argument('full_name'),
comment_argument('comment'),
description_argument('description'),
update_pk_argument('pk', dtype)
]

create_args = [
name_argument('full_name', required=True),
comment_argument('comment'),
description_argument('description')
]

delete_args = [
delete_pk_argument('pk', dtype)
]

detail_args = [detail_pk_argument('pk', dtype)]


Expand All @@ -56,7 +199,35 @@ class DispatchVlan(CoreDispatch):
dtype = 'VLAN'
dgroup = 'core'

update_args = [
name_argument('name'),
number_argument('number'),
comment_argument('comment'),
description_argument('description'),
update_pk_argument('pk', dtype)
]

create_args = [
name_argument('name', required=True),
number_argument('number', required=True),
comment_argument('comment'),
description_argument('description')
]

delete_args = [
delete_pk_argument('pk', dtype)
]

detail_args = [detail_pk_argument('pk', dtype)]


registrar.register(DispatchVlan())


def set_ip_type(nas_name, data):
if nas_name in data:
if data[nas_name].find(':') < 0:
data['ip_type'] = '4'
else:
data['ip_type'] = '6'
return data
1 change: 0 additions & 1 deletion invtool/kv/kv_sreg_dispatch.py
Expand Up @@ -3,7 +3,6 @@
from invtool.tests.utils import call_to_json, test_method_to_params, EXEC

from invtool.kv.kv_dispatch import DispatchKV
from invtool.lib.dns_options import detail_pk_argument
from invtool.lib.kv_options import (
key_argument, value_argument, update_pk_argument,
delete_pk_argument, detail_pk_argument, kvlist_pk_argument,
Expand Down
98 changes: 98 additions & 0 deletions invtool/lib/core_options.py
@@ -0,0 +1,98 @@
from invtool.lib.options import build_extractor, write_num_argument
from invtool.tests.test_data import TEST_NAME, TEST_PORT, TEST_NETWORK


def name_argument(field_name, required=False):
def add_name_arg(parser, required=required, **kwargs):
parser.add_argument(
'--{0}'.format(field_name), type=str,
dest=field_name, help="A name.",
required=required
)

def test_data():
return field_name, TEST_NAME() # Reuse this test data

return add_name_arg, build_extractor(field_name, field_name), test_data


def number_argument(field_name, required=False):
def add_number_argument(parser, required=required):
write_num_argument(
parser, field_name, field_name, "A number number.",
required=required
)

def test_data():
return field_name, TEST_PORT

return (
add_number_argument,
build_extractor(field_name, field_name),
test_data
)


def network_str_argument(field_name, required=False):
def add_network_argument(parser, required=required):
parser.add_argument(
'--{0}'.format(field_name), type=str, dest='network_str',
required=required, help="Complete CIDR Notation"
)

def test_data():
return 'network-str', TEST_NETWORK

def build_extractor(field_name, nas_name):
def extractor(nas):
if not getattr(nas, nas_name):
return {}
data = {field_name: getattr(nas, nas_name)}
return data
return extractor

return (
add_network_argument,
build_extractor('network_str', 'network_str'),
test_data
)


def vlan_argument(field_name):
def add_argument(parser, required=False):
parser.add_argument(
'--vlan-pk', type=str, dest='vlan',
required=required, help="The pk of a vlan"
)

def extract_arg(nas):
data = {}
if nas.vlan:
data = {field_name: nas.vlan}
return data

def test_data():
# vlan will need to be evaluated in at test time
return 'vlan-pk', '{{ vlan }}'

return add_argument, extract_arg, test_data


def site_argument(field_name):
def add_argument(parser, required=False):
parser.add_argument(
'--site-pk', type=str, dest='site',
required=required, help="The pk of a site"
)

def extract_arg(nas):
data = {}
if nas.site:
data = {field_name: nas.site}
return data

def test_data():
# site need to be evaluated in at test time
return 'site-pk', '{{ site }}'

return add_argument, extract_arg, test_data
10 changes: 1 addition & 9 deletions invtool/lib/dns_options.py
@@ -1,4 +1,4 @@
from invtool.lib.options import build_extractor
from invtool.lib.options import build_extractor, write_num_argument
from invtool.tests.test_data import (
TEST_DOMAIN, TEST_FQDN, TEST_IPv4, TEST_IPv6, TEST_DESCRIPTION,
TEST_TTL, TEST_PORT, TEST_WEIGHT, TEST_PRIORITY, TEST_TEXT, TEST_INAME
Expand Down Expand Up @@ -196,14 +196,6 @@ def test_data():
return add_text_argument, build_extractor(field_name, 'text'), test_data


def write_num_argument(parser, name, dest, help_text, required=False):
parser.add_argument(
'--{0}'.format(name), default=None, type=int,
dest=dest, help=help_text, required=required
)
return parser


def ttl_argument(field_name):
def add_ttl_argument(parser, **kwargs):
write_num_argument(
Expand Down
8 changes: 8 additions & 0 deletions invtool/lib/options.py
Expand Up @@ -9,3 +9,11 @@ def extractor(nas):

def extract_pk(nas, field_name):
return {field_name: getattr(nas, field_name)}


def write_num_argument(parser, name, dest, help_text, required=False):
parser.add_argument(
'--{0}'.format(name), default=None, type=int,
dest=dest, help=help_text, required=required
)
return parser
9 changes: 4 additions & 5 deletions invtool/lib/parser.py
@@ -1,31 +1,30 @@
def build_create_parser(dispatch, action_parser, help=''):
if not help:
help="Create a(n) {0} record".format(dispatch.dtype)
help = "Create a(n) {0} record".format(dispatch.dtype)
create_parser = action_parser.add_parser('create', help=help)
for add_arg, extract_arg, test_method in dispatch.create_args:
add_arg(create_parser)


def build_update_parser(dispatch, action_parser, help=''):
if not help:
help="Update a(n) {0}".format(dispatch.dtype)
help = "Update a(n) {0}".format(dispatch.dtype)
update_parser = action_parser.add_parser('update', help=help)
for add_arg, extract_arg, test_method in dispatch.update_args:
add_arg(update_parser, required=False)


def build_delete_parser(dispatch, action_parser, help=''):
if not help:
help="Delete a(n) {0}".format(dispatch.dtype)
help = "Delete a(n) {0}".format(dispatch.dtype)
delete_parser = action_parser.add_parser('delete', help=help)
for add_arg, extract_arg, test_method in dispatch.delete_args:
add_arg(delete_parser)


def build_detail_parser(dispatch, action_parser, help=''):
if not help:
help="Detail a(n) {0}".format(dispatch.dtype)
help = "Detail a(n) {0}".format(dispatch.dtype)
detail_parser = action_parser.add_parser('detail', help=help)
for add_arg, extract_arg, test_method in dispatch.detail_args:
add_arg(detail_parser)

0 comments on commit 9ffacf2

Please sign in to comment.