Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add a new cloud-config plugin for setting the timezone
cloud-config supports a new plugin, called 'set-timezone', which
can be used to change the timezone on the underlying instance.
The patch adds a new method in the osutils abstraction, called `set_timezone`,
which should be implemented  by each separated OS. The abstraction calls
into cloudbaseinit.utils.windows.timezone, another layer of abstraction
over two API methods, SetTimeZoneInformation for Windows 2003 and older
and SetDynamicTimeZoneInformation, for newer versions of Windows,
which also handles Daylight Saving Time.
The plugin supports standard IANA timezone names, which are then translated
to the Windows-specific timezone names, using tzlocal library.

Change-Id: I18674e1ae078fc69f3fb938065ba01a4de5464a1
  • Loading branch information
PCManticore committed Mar 25, 2015
1 parent 0073c7b commit b10917f
Show file tree
Hide file tree
Showing 11 changed files with 717 additions and 42 deletions.
4 changes: 4 additions & 0 deletions cloudbaseinit/osutils/base.py
Expand Up @@ -110,3 +110,7 @@ def firewall_remove_rule(self, name, port, protocol, allow=True):
def get_maximum_password_length(self):
"""Obtain the maximum password length tailored for each OS."""
raise NotImplementedError()

def set_timezone(self, timezone):
"""Set the timezone for this instance."""
raise NotImplementedError()
34 changes: 17 additions & 17 deletions cloudbaseinit/osutils/windows.py
Expand Up @@ -23,6 +23,7 @@
import pywintypes
import six
from six.moves import winreg
from tzlocal import windows_tz
from win32com import client
import win32process
import win32security
Expand All @@ -33,6 +34,8 @@
from cloudbaseinit.osutils import base
from cloudbaseinit.utils import encoding
from cloudbaseinit.utils.windows import network
from cloudbaseinit.utils.windows import privilege
from cloudbaseinit.utils.windows import timezone


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -296,24 +299,14 @@ class WindowsUtils(base.BaseOSUtils):
_FW_SCOPE_ALL = 0
_FW_SCOPE_LOCAL_SUBNET = 1

def _enable_shutdown_privilege(self):
process = win32process.GetCurrentProcess()
token = win32security.OpenProcessToken(
process,
win32security.TOKEN_ADJUST_PRIVILEGES |
win32security.TOKEN_QUERY)
priv_luid = win32security.LookupPrivilegeValue(
None, win32security.SE_SHUTDOWN_NAME)
privilege = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)]
win32security.AdjustTokenPrivileges(token, False, privilege)

def reboot(self):
self._enable_shutdown_privilege()

ret_val = advapi32.InitiateSystemShutdownW(0, "Cloudbase-Init reboot",
0, True, True)
if not ret_val:
raise exception.WindowsCloudbaseInitException("Reboot failed: %r")
with privilege.acquire_privilege(win32security.SE_SHUTDOWN_NAME):
ret_val = advapi32.InitiateSystemShutdownW(
0, "Cloudbase-Init reboot",
0, True, True)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Reboot failed: %r")

def _get_user_wmi_object(self, username):
conn = wmi.WMI(moniker='//./root/cimv2')
Expand Down Expand Up @@ -1047,3 +1040,10 @@ def execute_system32_process(self, args, shell=True, decode_output=False,

def get_maximum_password_length(self):
return 20

def set_timezone(self, timezone_name):
windows_name = windows_tz.tz_win.get(timezone_name)
if not windows_name:
raise exception.CloudbaseInitException(
"The given timezone name is unrecognised: %r" % timezone_name)
timezone.Timezone(windows_name).set(self)
Expand Up @@ -20,6 +20,8 @@
PLUGINS = {
'write_files': 'cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.write_files.WriteFilesPlugin',
'set_timezone': 'cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.set_timezone.SetTimezonePlugin',
}


Expand Down
@@ -0,0 +1,42 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import factory
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
base
)


LOG = logging.getLogger(__name__)


class SetTimezonePlugin(base.BaseCloudConfigPlugin):
"""Change the timezone for the underlying platform.
This uses IANA timezone names (which are mapped to the Windows
time zone names, as seen in the following link:
https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx).
For instance, to change the timezone to 'America/Montevideo', use
this syntax::
set_timezone: America/Montevideo
"""

def process(self, data):
LOG.info("Changing timezone to %r", data)
osutils = factory.get_os_utils()
osutils.set_timezone(data)
62 changes: 37 additions & 25 deletions cloudbaseinit/tests/osutils/test_windows.py
Expand Up @@ -51,6 +51,7 @@ def setUp(self):
self._moves_mock = mock.MagicMock()
self._xmlrpc_client_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock()
self._tzlocal_mock = mock.Mock()

self._module_patcher = mock.patch.dict(
'sys.modules',
Expand All @@ -61,7 +62,8 @@ def setUp(self):
'six.moves': self._moves_mock,
'six.moves.xmlrpc_client': self._xmlrpc_client_mock,
'ctypes': self._ctypes_mock,
'pywintypes': self._pywintypes_mock})
'pywintypes': self._pywintypes_mock,
'tzlocal': self._tzlocal_mock})

self._module_patcher.start()
self.windows_utils = importlib.import_module(
Expand All @@ -78,31 +80,10 @@ def setUp(self):
def tearDown(self):
self._module_patcher.stop()

def test_enable_shutdown_privilege(self):
fake_process = mock.MagicMock()
fake_token = True
LUID = 'fakeid'
self._win32process_mock.GetCurrentProcess.return_value = fake_process
self._win32security_mock.OpenProcessToken.return_value = fake_token
self._win32security_mock.LookupPrivilegeValue.return_value = LUID

self._winutils._enable_shutdown_privilege()

privilege = [(LUID,
self._win32security_mock.SE_PRIVILEGE_ENABLED)]
self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
fake_token,
False,
privilege)

self._win32security_mock.OpenProcessToken.assert_called_with(
fake_process, self._win32security_mock.TOKEN_ADJUST_PRIVILEGES |
self._win32security_mock.TOKEN_QUERY)

@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._enable_shutdown_privilege')
def _test_reboot(self, mock_enable_shutdown_privilege, ret_value,
@mock.patch('cloudbaseinit.osutils.windows.privilege')
def _test_reboot(self, mock_privilege_module, ret_value,
expected_ret_value=None):
mock_privilege_module.acquire_privilege = mock.MagicMock()
advapi32 = self._windll_mock.advapi32
advapi32.InitiateSystemShutdownW = mock.MagicMock(
return_value=ret_value)
Expand All @@ -118,6 +99,8 @@ def _test_reboot(self, mock_enable_shutdown_privilege, ret_value,
0,
"Cloudbase-Init reboot",
0, True, True)
mock_privilege_module.acquire_privilege.assert_called_once_with(
self._win32security_mock.SE_SHUTDOWN_NAME)

def test_reboot(self):
self._test_reboot(ret_value=True)
Expand Down Expand Up @@ -1477,3 +1460,32 @@ def test_execute_system32_process(self, mock_execute_process,

def test_get_password_maximum_length(self):
self.assertEqual(20, self._winutils.get_maximum_password_length())

@mock.patch('cloudbaseinit.osutils.windows.windows_tz')
def test_set_timezone_fails(self, mock_windows_tz):
mock_windows_tz.tz_win.get.return_value = None

with self.assertRaises(exception.CloudbaseInitException) as cm:
self._winutils.set_timezone(mock.sentinel.timezone)
expected = (
"The given timezone name is unrecognised: %r"
% mock.sentinel.timezone
)
self.assertEqual(expected, str(cm.exception))
mock_windows_tz.tz_win.get.assert_called_once_with(
mock.sentinel.timezone)

@mock.patch('cloudbaseinit.osutils.windows.timezone')
@mock.patch('cloudbaseinit.osutils.windows.windows_tz')
def test_set_timezone(self, mock_windows_tz, mock_timezone):
mock_windows_tz.tz_win.get.return_value = (
mock.sentinel.windows_timezone)

self._winutils.set_timezone(mock.sentinel.timezone)

mock_windows_tz.tz_win.get.assert_called_once_with(
mock.sentinel.timezone)
mock_timezone.Timezone.assert_called_once_with(
mock.sentinel.windows_timezone)
mock_timezone.Timezone.return_value.set.assert_called_once_with(
self._winutils)
@@ -0,0 +1,54 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import unittest

try:
import unittest.mock as mock
except ImportError:
import mock

from cloudbaseinit.plugins.common.userdataplugins import cloudconfig
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
set_timezone
)
from cloudbaseinit.tests import testutils


class TestSetTimezone(unittest.TestCase):

@mock.patch('cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.set_timezone.factory')
def test_process(self, mock_osutils_factory):
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'userdataplugins.cloudconfigplugins.'
'set_timezone') as snatcher:
set_timezone.SetTimezonePlugin().process(mock.sentinel.timezone)

expected_logging = [
'Changing timezone to %r' % mock.sentinel.timezone
]
mock_osutils_factory.get_os_utils.assert_called_once_with()
mock_osutils = mock_osutils_factory.get_os_utils.return_value
mock_osutils.set_timezone.assert_called_once_with(
mock.sentinel.timezone)
self.assertEqual(expected_logging, snatcher.output)

@mock.patch('cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.set_timezone.SetTimezonePlugin.process')
def test_timezone_dispatch(self, mock_process_plugin):
plugin = cloudconfig.CloudConfigPlugin()
plugin.process_non_multipart("set_timezone: America Standard Time")

mock_process_plugin.assert_called_once_with("America Standard Time")
63 changes: 63 additions & 0 deletions cloudbaseinit/tests/utils/windows/test_privilege.py
@@ -0,0 +1,63 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import importlib
import unittest

try:
import unittest.mock as mock
except ImportError:
import mock


class TestPrivilege(unittest.TestCase):

def setUp(self):
self._win32process_mock = mock.MagicMock()
self._win32security_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'win32process': self._win32process_mock,
'win32security': self._win32security_mock})

self._module_patcher.start()
self.privilege_module = importlib.import_module(
"cloudbaseinit.utils.windows.privilege")

def tearDown(self):
self._module_patcher.stop()

def test_privilege_context_manager(self):
fake_process = mock.MagicMock()
fake_token = True
LUID = 'fakeid'
self._win32process_mock.GetCurrentProcess.return_value = fake_process
self._win32security_mock.OpenProcessToken.return_value = fake_token
self._win32security_mock.LookupPrivilegeValue.return_value = LUID
privilege_enabled = [(LUID,
self._win32security_mock.SE_PRIVILEGE_ENABLED)]
privilege_removed = [(LUID,
self._win32security_mock.SE_PRIVILEGE_REMOVED)]
with self.privilege_module.acquire_privilege(mock.sentinel.privilege):

self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
fake_token, False, privilege_enabled)

self._win32security_mock.OpenProcessToken.assert_called_with(
fake_process,
self._win32security_mock.TOKEN_ADJUST_PRIVILEGES |
self._win32security_mock.TOKEN_QUERY)

self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
fake_token, False, privilege_removed)

0 comments on commit b10917f

Please sign in to comment.