Skip to content
This repository has been archived by the owner on Mar 6, 2024. It is now read-only.

Commit

Permalink
[VDP-1665, VCDA-642] Implemented enable, disable, detach VC (#260)
Browse files Browse the repository at this point in the history
- Implemented enable, disable, detach VC
  • Loading branch information
ckolovson committed Jul 5, 2018
1 parent ea2a2d5 commit 7bf4d2b
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 64 deletions.
1 change: 1 addition & 0 deletions pyvcloud/vcd/client.py
Expand Up @@ -160,6 +160,7 @@ class RelationType(Enum):
TASK_CANCEL = 'task:cancel'
UNDEPLOY = 'undeploy'
UNLINK_FROM_TEMPLATE = 'unlinkFromTemplate'
UNREGISTER = 'unregister'
UP = 'up'


Expand Down
43 changes: 43 additions & 0 deletions pyvcloud/vcd/platform.py
Expand Up @@ -23,6 +23,7 @@
from pyvcloud.vcd.client import RelationType
from pyvcloud.vcd.client import ResourceType
from pyvcloud.vcd.exceptions import EntityNotFoundException
from pyvcloud.vcd.exceptions import InvalidStateException
from pyvcloud.vcd.extension import Extension


Expand Down Expand Up @@ -330,6 +331,48 @@ def attach_vcenter(self,
REGISTER_VC_SERVER_PARAMS.value,
contents=register_vc_server_params)

def enable_disable_vcenter(self, vc_name, enable_flag):
"""Enable or disable a Virtual Center (VC) server.
:param str vc_name: name of VC server.
:param boolean enable_flag: True means enable, False means disable.
:return: an object containing EntityType.TASK XML data which represents
the asynchronous task that is enabling or disabling the VC.
rtype: lxml.objectify.ObjectifiedElement'
"""
vc = self.get_vcenter(vc_name)
if enable_flag:
vc.IsEnabled = E_VMEXT.IsEnabled('true')
else:
vc.IsEnabled = E_VMEXT.IsEnabled('false')
return self.client.\
put_linked_resource(resource=vc,
rel=RelationType.EDIT,
media_type=EntityType.VIRTUAL_CENTER.value,
contents=vc)

def detach_vcenter(self, vc_name):
"""Detach (unregister) a Virtual Center (VC) server.
:param str vc_name: name of VC server.
:return: an object containing XML data of the VC server
specifically, EntityType.VIRTUAL_CENTER
:rtype: lxml.objectify.ObjectifiedElement
"""
vc = self.get_vcenter(vc_name)
if vc.IsEnabled:
raise InvalidStateException('VC must be disabled before detach.')

return self.client.\
post_linked_resource(resource=vc,
rel=RelationType.UNREGISTER,
media_type=None,
contents=vc)

def register_nsxt_manager(self,
nsxt_manager_name,
nsxt_manager_url,
Expand Down
153 changes: 153 additions & 0 deletions system_tests/vc_tests.py
@@ -0,0 +1,153 @@
# VMware vCloud Director Python SDK
# Copyright (c) 2018 VMware, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from pyvcloud.system_test_framework.base_test import BaseTestCase
from pyvcloud.system_test_framework.environment import Environment

from pyvcloud.vcd.client import NSMAP
from pyvcloud.vcd.exceptions import EntityNotFoundException
from pyvcloud.vcd.exceptions import InvalidStateException
from pyvcloud.vcd.platform import Platform


class TestVC(BaseTestCase):

def test_0000_setup(self):
# TODO(): need more pipeline work before this test can actually be run
TestVC._client = Environment.get_sys_admin_client()
TestVC._config = Environment.get_config()
TestVC._vcenter_host_name = self._config['vc']['vcenter_host_name']
TestVC._vcenter_invalid = 'xyz'
TestVC._vcServerName = self._config['vc']['vcServerName']
TestVC._vcServerHost = self._config['vc']['vcServerHost']
TestVC._vcAdminUser = self._config['vc']['vcAdminUser']
TestVC._vcAdminPwd = self._config['vc']['vcAdminPwd']
TestVC._NSXServerName = self._config['vc']['NSXServerName']
TestVC._NSXHost = self._config['vc']['NSXHost']
TestVC._NSXAdminUser = self._config['vc']['NSXAdminUser']
TestVC._NSXAdminPwd = self._config['vc']['NSXAdminPwd']
TestVC._isEnabled = 'false'

def test_0010_list_vc(self):
"""Platform.list_vcenters prints a list of virtual center servers."""
logger = Environment.get_default_logger()
platform = Platform(TestVC._client)
vcenters = platform.list_vcenters()
for vcenter in vcenters:
logger.debug('vCenter found: %s' % vcenter.get('name'))
self.assertTrue(len(vcenters) > 0)

def test_0020_get_vc(self):
"""Platform.get_vcenter finds a known vcenter."""
logger = Environment.get_default_logger()
platform = Platform(TestVC._client)
vcenter = platform.get_vcenter(TestVC._vcenter_host_name)
logger.debug('vCenter: name=%s, url=%s' %
(vcenter.get('name'), vcenter.Url.text))
self.assertIsNotNone(vcenter)

def test_0030_get_vc_negative(self):
"""Platform.get_vcenter does not find a non-existent vcenter."""
try:
platform = Platform(TestVC._client)
platform.get_vcenter(TestVC._vcenter_invalid)
self.fail('Should not be able to find VC that does not exist ' +
TestVC._vcenter_invalid)
except EntityNotFoundException as e:
return

def test_0040_attach_vc(self):
"""Platform.attach_vcenter attaches a vcenter."""
platform = Platform(TestVC._client)

vc = platform.attach_vcenter(
vc_server_name=TestVC._vcServerName,
vc_server_host=TestVC._vcServerHost,
vc_admin_user=TestVC._vcAdminUser,
vc_admin_pwd=TestVC._vcAdminPwd,
nsx_server_name=TestVC._NSXServerName,
nsx_host=TestVC._NSXHost,
nsx_admin_user=TestVC._NSXAdminUser,
nsx_admin_pwd=TestVC._NSXAdminPwd,
is_enabled=TestVC._isEnabled)
task = vc.VimServer['{' + NSMAP['vcloud'] + '}Tasks'].Task[0]
TestVC._client.get_task_monitor().wait_for_success(task=task)
self.assertEqual(TestVC._vcServerName, vc.VimServer.get('name'))

def test_0050_enable_vc(self):
"""Platform.enable_vcenter enables a vcenter.
Wait for async command to complete before checking result.
"""
platform = Platform(TestVC._client)

task = platform.enable_disable_vcenter(
vc_name=TestVC._vcServerName, enable_flag=True)
TestVC._client.get_task_monitor().wait_for_success(task=task)
vc = platform.get_vcenter(name=TestVC._vcServerName)
self.assertTrue(vc.IsEnabled)

def test_0060_detach_vc_while_still_enabled(self):
"""Platform.detach_vcenter while VC is enabled should fail.
Wait for async command to complete before checking result.
"""
platform = Platform(TestVC._client)
try:
task = platform.detach_vcenter(vc_name=TestVC._vcServerName)
TestVC._client.get_task_monitor().wait_for_success(task=task)
self.fail('Should not be able to detach VC that is enabled ' +
TestVC._vcServerName)
except InvalidStateException as e:
return

def test_0070_disable_vc(self):
"""Platform.disable_vcenter disables a vcenter.
Wait for async command to complete before checking result.
"""
platform = Platform(TestVC._client)

task = platform.enable_disable_vcenter(
vc_name=TestVC._vcServerName, enable_flag=False)
TestVC._client.get_task_monitor().wait_for_success(task=task)
vc = platform.get_vcenter(name=TestVC._vcServerName)
self.assertFalse(vc.IsEnabled)

def test_0080_detach_vc(self):
"""Platform.detach_vcenter unregisters (detaches) a vcenter.
Wait for async command to complete before checking result.
"""
platform = Platform(TestVC._client)

task = platform.detach_vcenter(vc_name=TestVC._vcServerName)
TestVC._client.get_task_monitor().wait_for_success(task=task)
try:
platform.get_vcenter(name=TestVC._vcServerName)
self.fail('Should not be able to find detached VC ' +
TestVC._vcServerName)
except EntityNotFoundException as e:
return

def test_9999_cleanup(self):
"""Release all resources held by this object for testing purposes."""
TestVC._client.logout()


if __name__ == '__main__':
unittest.main()
64 changes: 0 additions & 64 deletions tests/vcd_vc.py

This file was deleted.

0 comments on commit 7bf4d2b

Please sign in to comment.