From b10917f9b029fb0d649e898b43dd0e4f2ea17a6a Mon Sep 17 00:00:00 2001 From: Claudiu Popa Date: Thu, 19 Mar 2015 17:16:58 +0200 Subject: [PATCH] Add a new cloud-config plugin for setting the timezone cloud-config supports a new plugin, called 'set-timezone', which can be used to change the timezone on the underlying instance. The patch adds a new method in the osutils abstraction, called `set_timezone`, which should be implemented by each separated OS. The abstraction calls into cloudbaseinit.utils.windows.timezone, another layer of abstraction over two API methods, SetTimeZoneInformation for Windows 2003 and older and SetDynamicTimeZoneInformation, for newer versions of Windows, which also handles Daylight Saving Time. The plugin supports standard IANA timezone names, which are then translated to the Windows-specific timezone names, using tzlocal library. Change-Id: I18674e1ae078fc69f3fb938065ba01a4de5464a1 --- cloudbaseinit/osutils/base.py | 4 + cloudbaseinit/osutils/windows.py | 34 +-- .../cloudconfigplugins/factory.py | 2 + .../cloudconfigplugins/set_timezone.py | 42 +++ cloudbaseinit/tests/osutils/test_windows.py | 62 ++-- .../cloudconfigplugins/test_set_timezone.py | 54 ++++ .../tests/utils/windows/test_privilege.py | 63 ++++ .../tests/utils/windows/test_timezone.py | 271 ++++++++++++++++++ cloudbaseinit/utils/windows/privilege.py | 36 +++ cloudbaseinit/utils/windows/timezone.py | 190 ++++++++++++ requirements-windows.txt | 1 + 11 files changed, 717 insertions(+), 42 deletions(-) create mode 100644 cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py create mode 100644 cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py create mode 100644 cloudbaseinit/tests/utils/windows/test_privilege.py create mode 100644 cloudbaseinit/tests/utils/windows/test_timezone.py create mode 100644 cloudbaseinit/utils/windows/privilege.py create mode 100644 cloudbaseinit/utils/windows/timezone.py diff --git a/cloudbaseinit/osutils/base.py b/cloudbaseinit/osutils/base.py index 2b206ee..3eb711d 100644 --- a/cloudbaseinit/osutils/base.py +++ b/cloudbaseinit/osutils/base.py @@ -110,3 +110,7 @@ def firewall_remove_rule(self, name, port, protocol, allow=True): def get_maximum_password_length(self): """Obtain the maximum password length tailored for each OS.""" raise NotImplementedError() + + def set_timezone(self, timezone): + """Set the timezone for this instance.""" + raise NotImplementedError() diff --git a/cloudbaseinit/osutils/windows.py b/cloudbaseinit/osutils/windows.py index 8d7fce1..12b0ac9 100644 --- a/cloudbaseinit/osutils/windows.py +++ b/cloudbaseinit/osutils/windows.py @@ -23,6 +23,7 @@ import pywintypes import six from six.moves import winreg +from tzlocal import windows_tz from win32com import client import win32process import win32security @@ -33,6 +34,8 @@ from cloudbaseinit.osutils import base from cloudbaseinit.utils import encoding from cloudbaseinit.utils.windows import network +from cloudbaseinit.utils.windows import privilege +from cloudbaseinit.utils.windows import timezone LOG = logging.getLogger(__name__) @@ -296,24 +299,14 @@ class WindowsUtils(base.BaseOSUtils): _FW_SCOPE_ALL = 0 _FW_SCOPE_LOCAL_SUBNET = 1 - def _enable_shutdown_privilege(self): - process = win32process.GetCurrentProcess() - token = win32security.OpenProcessToken( - process, - win32security.TOKEN_ADJUST_PRIVILEGES | - win32security.TOKEN_QUERY) - priv_luid = win32security.LookupPrivilegeValue( - None, win32security.SE_SHUTDOWN_NAME) - privilege = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)] - win32security.AdjustTokenPrivileges(token, False, privilege) - def reboot(self): - self._enable_shutdown_privilege() - - ret_val = advapi32.InitiateSystemShutdownW(0, "Cloudbase-Init reboot", - 0, True, True) - if not ret_val: - raise exception.WindowsCloudbaseInitException("Reboot failed: %r") + with privilege.acquire_privilege(win32security.SE_SHUTDOWN_NAME): + ret_val = advapi32.InitiateSystemShutdownW( + 0, "Cloudbase-Init reboot", + 0, True, True) + if not ret_val: + raise exception.WindowsCloudbaseInitException( + "Reboot failed: %r") def _get_user_wmi_object(self, username): conn = wmi.WMI(moniker='//./root/cimv2') @@ -1047,3 +1040,10 @@ def execute_system32_process(self, args, shell=True, decode_output=False, def get_maximum_password_length(self): return 20 + + def set_timezone(self, timezone_name): + windows_name = windows_tz.tz_win.get(timezone_name) + if not windows_name: + raise exception.CloudbaseInitException( + "The given timezone name is unrecognised: %r" % timezone_name) + timezone.Timezone(windows_name).set(self) diff --git a/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py index 79c82b5..b5b3a7c 100644 --- a/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py +++ b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py @@ -20,6 +20,8 @@ PLUGINS = { 'write_files': 'cloudbaseinit.plugins.common.userdataplugins.' 'cloudconfigplugins.write_files.WriteFilesPlugin', + 'set_timezone': 'cloudbaseinit.plugins.common.userdataplugins.' + 'cloudconfigplugins.set_timezone.SetTimezonePlugin', } diff --git a/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py new file mode 100644 index 0000000..4d7689b --- /dev/null +++ b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py @@ -0,0 +1,42 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cloudbaseinit.openstack.common import log as logging +from cloudbaseinit.osutils import factory +from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import ( + base +) + + +LOG = logging.getLogger(__name__) + + +class SetTimezonePlugin(base.BaseCloudConfigPlugin): + """Change the timezone for the underlying platform. + + This uses IANA timezone names (which are mapped to the Windows + time zone names, as seen in the following link: + https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx). + + For instance, to change the timezone to 'America/Montevideo', use + this syntax:: + + set_timezone: America/Montevideo + + """ + + def process(self, data): + LOG.info("Changing timezone to %r", data) + osutils = factory.get_os_utils() + osutils.set_timezone(data) diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index add21ad..34581e6 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -51,6 +51,7 @@ def setUp(self): self._moves_mock = mock.MagicMock() self._xmlrpc_client_mock = mock.MagicMock() self._ctypes_mock = mock.MagicMock() + self._tzlocal_mock = mock.Mock() self._module_patcher = mock.patch.dict( 'sys.modules', @@ -61,7 +62,8 @@ def setUp(self): 'six.moves': self._moves_mock, 'six.moves.xmlrpc_client': self._xmlrpc_client_mock, 'ctypes': self._ctypes_mock, - 'pywintypes': self._pywintypes_mock}) + 'pywintypes': self._pywintypes_mock, + 'tzlocal': self._tzlocal_mock}) self._module_patcher.start() self.windows_utils = importlib.import_module( @@ -78,31 +80,10 @@ def setUp(self): def tearDown(self): self._module_patcher.stop() - def test_enable_shutdown_privilege(self): - fake_process = mock.MagicMock() - fake_token = True - LUID = 'fakeid' - self._win32process_mock.GetCurrentProcess.return_value = fake_process - self._win32security_mock.OpenProcessToken.return_value = fake_token - self._win32security_mock.LookupPrivilegeValue.return_value = LUID - - self._winutils._enable_shutdown_privilege() - - privilege = [(LUID, - self._win32security_mock.SE_PRIVILEGE_ENABLED)] - self._win32security_mock.AdjustTokenPrivileges.assert_called_with( - fake_token, - False, - privilege) - - self._win32security_mock.OpenProcessToken.assert_called_with( - fake_process, self._win32security_mock.TOKEN_ADJUST_PRIVILEGES | - self._win32security_mock.TOKEN_QUERY) - - @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' - '._enable_shutdown_privilege') - def _test_reboot(self, mock_enable_shutdown_privilege, ret_value, + @mock.patch('cloudbaseinit.osutils.windows.privilege') + def _test_reboot(self, mock_privilege_module, ret_value, expected_ret_value=None): + mock_privilege_module.acquire_privilege = mock.MagicMock() advapi32 = self._windll_mock.advapi32 advapi32.InitiateSystemShutdownW = mock.MagicMock( return_value=ret_value) @@ -118,6 +99,8 @@ def _test_reboot(self, mock_enable_shutdown_privilege, ret_value, 0, "Cloudbase-Init reboot", 0, True, True) + mock_privilege_module.acquire_privilege.assert_called_once_with( + self._win32security_mock.SE_SHUTDOWN_NAME) def test_reboot(self): self._test_reboot(ret_value=True) @@ -1477,3 +1460,32 @@ def test_execute_system32_process(self, mock_execute_process, def test_get_password_maximum_length(self): self.assertEqual(20, self._winutils.get_maximum_password_length()) + + @mock.patch('cloudbaseinit.osutils.windows.windows_tz') + def test_set_timezone_fails(self, mock_windows_tz): + mock_windows_tz.tz_win.get.return_value = None + + with self.assertRaises(exception.CloudbaseInitException) as cm: + self._winutils.set_timezone(mock.sentinel.timezone) + expected = ( + "The given timezone name is unrecognised: %r" + % mock.sentinel.timezone + ) + self.assertEqual(expected, str(cm.exception)) + mock_windows_tz.tz_win.get.assert_called_once_with( + mock.sentinel.timezone) + + @mock.patch('cloudbaseinit.osutils.windows.timezone') + @mock.patch('cloudbaseinit.osutils.windows.windows_tz') + def test_set_timezone(self, mock_windows_tz, mock_timezone): + mock_windows_tz.tz_win.get.return_value = ( + mock.sentinel.windows_timezone) + + self._winutils.set_timezone(mock.sentinel.timezone) + + mock_windows_tz.tz_win.get.assert_called_once_with( + mock.sentinel.timezone) + mock_timezone.Timezone.assert_called_once_with( + mock.sentinel.windows_timezone) + mock_timezone.Timezone.return_value.set.assert_called_once_with( + self._winutils) diff --git a/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py b/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py new file mode 100644 index 0000000..75dcbcf --- /dev/null +++ b/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py @@ -0,0 +1,54 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + +from cloudbaseinit.plugins.common.userdataplugins import cloudconfig +from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import ( + set_timezone +) +from cloudbaseinit.tests import testutils + + +class TestSetTimezone(unittest.TestCase): + + @mock.patch('cloudbaseinit.plugins.common.userdataplugins.' + 'cloudconfigplugins.set_timezone.factory') + def test_process(self, mock_osutils_factory): + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'userdataplugins.cloudconfigplugins.' + 'set_timezone') as snatcher: + set_timezone.SetTimezonePlugin().process(mock.sentinel.timezone) + + expected_logging = [ + 'Changing timezone to %r' % mock.sentinel.timezone + ] + mock_osutils_factory.get_os_utils.assert_called_once_with() + mock_osutils = mock_osutils_factory.get_os_utils.return_value + mock_osutils.set_timezone.assert_called_once_with( + mock.sentinel.timezone) + self.assertEqual(expected_logging, snatcher.output) + + @mock.patch('cloudbaseinit.plugins.common.userdataplugins.' + 'cloudconfigplugins.set_timezone.SetTimezonePlugin.process') + def test_timezone_dispatch(self, mock_process_plugin): + plugin = cloudconfig.CloudConfigPlugin() + plugin.process_non_multipart("set_timezone: America Standard Time") + + mock_process_plugin.assert_called_once_with("America Standard Time") diff --git a/cloudbaseinit/tests/utils/windows/test_privilege.py b/cloudbaseinit/tests/utils/windows/test_privilege.py new file mode 100644 index 0000000..c34c594 --- /dev/null +++ b/cloudbaseinit/tests/utils/windows/test_privilege.py @@ -0,0 +1,63 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import importlib +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + + +class TestPrivilege(unittest.TestCase): + + def setUp(self): + self._win32process_mock = mock.MagicMock() + self._win32security_mock = mock.MagicMock() + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'win32process': self._win32process_mock, + 'win32security': self._win32security_mock}) + + self._module_patcher.start() + self.privilege_module = importlib.import_module( + "cloudbaseinit.utils.windows.privilege") + + def tearDown(self): + self._module_patcher.stop() + + def test_privilege_context_manager(self): + fake_process = mock.MagicMock() + fake_token = True + LUID = 'fakeid' + self._win32process_mock.GetCurrentProcess.return_value = fake_process + self._win32security_mock.OpenProcessToken.return_value = fake_token + self._win32security_mock.LookupPrivilegeValue.return_value = LUID + privilege_enabled = [(LUID, + self._win32security_mock.SE_PRIVILEGE_ENABLED)] + privilege_removed = [(LUID, + self._win32security_mock.SE_PRIVILEGE_REMOVED)] + with self.privilege_module.acquire_privilege(mock.sentinel.privilege): + + self._win32security_mock.AdjustTokenPrivileges.assert_called_with( + fake_token, False, privilege_enabled) + + self._win32security_mock.OpenProcessToken.assert_called_with( + fake_process, + self._win32security_mock.TOKEN_ADJUST_PRIVILEGES | + self._win32security_mock.TOKEN_QUERY) + + self._win32security_mock.AdjustTokenPrivileges.assert_called_with( + fake_token, False, privilege_removed) diff --git a/cloudbaseinit/tests/utils/windows/test_timezone.py b/cloudbaseinit/tests/utils/windows/test_timezone.py new file mode 100644 index 0000000..8478c58 --- /dev/null +++ b/cloudbaseinit/tests/utils/windows/test_timezone.py @@ -0,0 +1,271 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import importlib +import os +import struct +import unittest +try: + import unittest.mock as mock +except ImportError: + import mock + + +from cloudbaseinit import exception + + +class FakeWindowsError(Exception): + pass + + +class TestTimezone(unittest.TestCase): + + def setUp(self): + self._mock_moves = mock.MagicMock() + self._mock_winreg = mock.Mock() + self._mock_ctypes = mock.Mock() + self._mock_win32security = mock.Mock() + self._mock_win32process = mock.Mock() + self._mock_wintypes = mock.MagicMock() + self._mock_ctypes.wintypes = self._mock_wintypes + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'ctypes': self._mock_ctypes, + 'six.moves': self._mock_moves, + 'win32process': self._mock_win32process, + 'win32security': self._mock_win32security}) + self._module_patcher.start() + self._mock_moves.winreg = self._mock_winreg + self._timezone_module = importlib.import_module( + 'cloudbaseinit.utils.windows.timezone') + self._timezone_module.WindowsError = FakeWindowsError + self._fixture_timezone_info = [ + 0, 'StandardName', list(range(8)), + 3, "DaylightName", list(reversed(range(8))), 6, + ] + + def tearDown(self): + self._module_patcher.stop() + + @mock.patch('cloudbaseinit.utils.windows.timezone.SYSTEMTIME') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info', new=mock.MagicMock()) + def test__create_system_time(self, mock_systemtime): + values = list(range(8)) + timezoneobj = self._timezone_module.Timezone(mock.sentinel.timezone) + result = timezoneobj._create_system_time(values) + mock_systemtime.assert_called_once_with() + self.assertEqual(tuple(range(8)), + (result.wYear, result.wMonth, result.wDayOfWeek, + result.wDay, result.wHour, result.wMinute, + result.wSecond, result.wMilliseconds)) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_create_system_time') + @mock.patch('cloudbaseinit.utils.windows.timezone.TIME_ZONE_INFORMATION') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + def test__get_timezone_struct(self, mock_get_timezone_info, + mock_time_zone_information, + mock_create_system_time): + mock_get_timezone_info.return_value = self._fixture_timezone_info + + timezoneobj = self._timezone_module.Timezone(mock.sentinel.timezone) + result = timezoneobj._get_timezone_struct() + + mock_time_zone_information.assert_called_once_with() + self.assertEqual(0, result.Bias) + self.assertEqual('StandardName', result.StandardName) + self.assertEqual(result.StandardDate, + mock_create_system_time.return_value) + self.assertEqual(result.DaylightDate, + mock_create_system_time.return_value) + self.assertEqual(3, result.StandardBias) + self.assertEqual("DaylightName", result.DaylightName) + self.assertEqual(6, result.DaylightBias) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_create_system_time') + @mock.patch('cloudbaseinit.utils.windows.timezone.' + 'DYNAMIC_TIME_ZONE_INFORMATION') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + def test__get_dynamic_timezone_struct(self, mock_get_timezone_info, + mock_dynamic_time_zone_information, + mock_create_system_time): + + mock_get_timezone_info.return_value = self._fixture_timezone_info + + timezoneobj = self._timezone_module.Timezone("timezone name") + result = timezoneobj._get_dynamic_timezone_struct() + + mock_dynamic_time_zone_information.assert_called_once_with() + self.assertEqual(0, result.Bias) + self.assertEqual('StandardName', result.StandardName) + self.assertEqual(3, result.StandardBias) + self.assertEqual("DaylightName", result.DaylightName) + self.assertEqual(6, result.DaylightBias) + self.assertFalse(result.DynamicDaylightTimeDisabled) + self.assertEqual("timezone name", result.TimeZoneKeyName) + self.assertEqual(result.StandardDate, + mock_create_system_time.return_value) + self.assertEqual(result.DaylightDate, + mock_create_system_time.return_value) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_unpack_timezone_info') + def test__get_timezone_info(self, mock_unpack_timezone_info): + mock_unpack_timezone_info.return_value = range(7) + registry_key = mock.MagicMock() + self._mock_winreg.OpenKey.return_value = registry_key + + self._timezone_module.Timezone("timezone test") + self._mock_winreg.OpenKey.assert_called_once_with( + self._mock_winreg.HKEY_LOCAL_MACHINE, + os.path.join(self._timezone_module.REG_TIME_ZONES, + "timezone test")) + mock_unpack_timezone_info.assert_called_once_with( + registry_key.__enter__.return_value) + + def test__get_time_zone_info_reraise_cloudbaseinit_exception(self): + error = FakeWindowsError() + error.errno = self._timezone_module.NOT_FOUND + self._mock_winreg.OpenKey.side_effect = error + + with self.assertRaises(exception.CloudbaseInitException) as cm: + self._timezone_module.Timezone("timezone test") + self.assertEqual("Timezone 'timezone test' not found", + str(cm.exception)) + + def test__get_time_zone_info_reraise_exception(self): + error = FakeWindowsError() + error.errno = 404 + self._mock_winreg.OpenKey.side_effect = error + + with self.assertRaises(FakeWindowsError) as cm: + self._timezone_module.Timezone("timezone test") + self.assertIsInstance(cm.exception, FakeWindowsError) + self.assertEqual(404, cm.exception.errno) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_query_tz_key') + def test__get_time_zone_info_real_data(self, mock_query_tz_key): + orig_unpack = struct.unpack + + def unpacker(format, blob): + if format == "l": + format = "i" + return orig_unpack(format, blob) + + mock_query_tz_key.return_value = ( + b'\xf0\x00\x00\x00\x00\x00\x00\x00\xc4\xff\xff\xff\x00\x00' + b'\x0b\x00\x00\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x03\x00\x00\x00\x02\x00\x02\x00\x00\x00\x00\x00' + b'\x00\x00', + "Atlantic Standard Time", + "Atlantic Daylight Time", + ) + registry_key = mock.MagicMock() + self._mock_winreg.OpenKey.return_value = registry_key + + with mock.patch('struct.unpack', side_effect=unpacker): + timezoneobj = self._timezone_module.Timezone("timezone test") + + mock_query_tz_key.assert_called_once_with(registry_key.__enter__()) + self.assertEqual(240, timezoneobj.bias) + self.assertEqual(-60, timezoneobj.daylight_bias) + self.assertEqual((0, 3, 0, 2, 2, 0, 0, 0), + timezoneobj.daylight_date) + self.assertEqual('Atlantic Daylight Time', timezoneobj.daylight_name) + self.assertEqual(0, timezoneobj.standard_bias) + self.assertEqual((0, 11, 0, 1, 2, 0, 0, 0), + timezoneobj.standard_date) + self.assertEqual('Atlantic Standard Time', timezoneobj.standard_name) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_set_dynamic_time_zone_information') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_set_time_zone_information') + def _test_set_time_zone_information( + self, mock__set_time_zone_information, + mock__set_dynamic_time_zone_information, + mock_get_timezone_info, windows_60=True): + mock_osutils = mock.Mock() + mock_osutils.check_os_version.return_value = windows_60 + mock_get_timezone_info.return_value = self._fixture_timezone_info + + timezoneobj = self._timezone_module.Timezone("fake") + timezoneobj.set(mock_osutils) + + if windows_60: + mock__set_dynamic_time_zone_information.assert_called_once_with() + else: + mock__set_time_zone_information.assert_called_once_with() + + def test_set_daylight_not_supported(self): + self._test_set_time_zone_information(windows_60=False) + + def test_set_daylight_supported(self): + self._test_set_time_zone_information(windows_60=True) + + @mock.patch('cloudbaseinit.utils.windows.privilege.acquire_privilege') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_struct') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_dynamic_timezone_struct') + def _test__set_time_zone_information( + self, mock__get_dynamic_timezone_struct, + mock__get_timezone_struct, + mock_get_timezone_info, + mock_acquire_privilege, + windows_60=True, + privilege=None): + mock_get_timezone_info.return_value = self._fixture_timezone_info + + mock__get_timezone_struct.return_value = ( + mock.sentinel.timezone_struct, + ) + mock__get_dynamic_timezone_struct.return_value = ( + mock.sentinel.timezone_struct, + ) + + timezoneobj = self._timezone_module.Timezone("fake") + if windows_60: + timezoneobj._set_dynamic_time_zone_information() + mock__get_dynamic_timezone_struct.assert_called_once_with() + else: + timezoneobj._set_time_zone_information() + mock__get_timezone_struct.assert_called_once_with() + + mock_acquire_privilege.assert_called_once_with(privilege) + if windows_60: + self._mock_ctypes.windll.kernel32.SetDynamicTimeZoneInformation( + self._mock_ctypes.byref(mock.sentinel.timezone_struct)) + else: + self._mock_ctypes.windll.kernel32.SetTimeZoneInformation( + self._mock_ctypes.byref(mock.sentinel.timezone_struct)) + + def test__set_time_zone_information(self): + self._test__set_time_zone_information( + windows_60=False, + privilege=self._mock_win32security.SE_SYSTEMTIME_NAME) + + def test__set_dynamic_time_zone_information(self): + self._test__set_time_zone_information( + windows_60=True, + privilege=self._mock_win32security.SE_TIME_ZONE_NAME) diff --git a/cloudbaseinit/utils/windows/privilege.py b/cloudbaseinit/utils/windows/privilege.py new file mode 100644 index 0000000..8551a36 --- /dev/null +++ b/cloudbaseinit/utils/windows/privilege.py @@ -0,0 +1,36 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +import win32process +import win32security + + +@contextlib.contextmanager +def acquire_privilege(privilege): + process = win32process.GetCurrentProcess() + token = win32security.OpenProcessToken( + process, + win32security.TOKEN_ADJUST_PRIVILEGES | + win32security.TOKEN_QUERY) + priv_luid = win32security.LookupPrivilegeValue(None, privilege) + privilege_enable = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)] + privilege_disable = [(priv_luid, win32security.SE_PRIVILEGE_REMOVED)] + win32security.AdjustTokenPrivileges(token, False, privilege_enable) + + try: + yield + finally: + win32security.AdjustTokenPrivileges(token, False, privilege_disable) diff --git a/cloudbaseinit/utils/windows/timezone.py b/cloudbaseinit/utils/windows/timezone.py new file mode 100644 index 0000000..14aa913 --- /dev/null +++ b/cloudbaseinit/utils/windows/timezone.py @@ -0,0 +1,190 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes +from ctypes import wintypes +import os +import struct + +from six.moves import winreg +import win32security + +from cloudbaseinit import exception +from cloudbaseinit.utils.windows import privilege + + +REG_TIME_ZONES = "Software\\Microsoft\\Windows NT\\CurrentVersion\\Time Zones" +NOT_FOUND = 2 +kernel32 = ctypes.windll.kernel32 + + +class SYSTEMTIME(ctypes.Structure): + _fields_ = [ + ('wYear', wintypes.WORD), + ('wMonth', wintypes.WORD), + ('wDayOfWeek', wintypes.WORD), + ('wDay', wintypes.WORD), + ('wHour', wintypes.WORD), + ('wMinute', wintypes.WORD), + ('wMilliseconds', wintypes.WORD), + ] + + +class TIME_ZONE_INFORMATION(ctypes.Structure): + _fields_ = [ + ('Bias', wintypes.LONG), + ('StandardName', wintypes.WCHAR * 32), + ('StandardDate', SYSTEMTIME), + ('StandardBias', wintypes.LONG), + ('DaylightName', wintypes.WCHAR * 32), + ('DaylightDate', SYSTEMTIME), + ('DaylightBias', wintypes.LONG), + ] + + +class DYNAMIC_TIME_ZONE_INFORMATION(ctypes.Structure): + _fields_ = [ + ('Bias', wintypes.LONG), + ('StandardName', wintypes.WCHAR * 32), + ('StandardDate', SYSTEMTIME), + ('StandardBias', wintypes.LONG), + ('DaylightName', wintypes.WCHAR * 32), + ('DaylightDate', SYSTEMTIME), + ('DaylightBias', wintypes.LONG), + ('TimeZoneKeyName', wintypes.WCHAR * 128), + ('DynamicDaylightTimeDisabled', wintypes.BOOLEAN), + ] + + +class Timezone(object): + """Class which holds details about a particular timezone. + + It also can be used to change the current timezone, + by calling the :meth:`~set`. The supported time zone names + are the ones found here: + https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx + """ + + def __init__(self, name): + self._name = name + self._timezone_info = self._get_timezone_info() + + # Public API. + self.bias = self._timezone_info[0] + self.standard_name = self._timezone_info[1] + self.standard_date = self._timezone_info[2] + self.standard_bias = self._timezone_info[3] + self.daylight_name = self._timezone_info[4] + self.daylight_date = self._timezone_info[5] + self.daylight_bias = self._timezone_info[6] + + @staticmethod + def _create_system_time(values): + mtime = SYSTEMTIME() + mtime.wYear = values[0] + mtime.wMonth = values[1] + mtime.wDayOfWeek = values[2] + mtime.wDay = values[3] + mtime.wHour = values[4] + mtime.wMinute = values[5] + mtime.wSecond = values[6] + mtime.wMilliseconds = values[7] + return mtime + + def _get_timezone_struct(self): + info = TIME_ZONE_INFORMATION() + info.Bias = self.bias + info.StandardName = self.standard_name + info.StandardDate = self._create_system_time(self.standard_date) + info.StandardBias = self.standard_bias + info.DaylightName = self.daylight_name + info.DaylightBias = self.daylight_bias + info.DaylightDate = self._create_system_time(self.daylight_date) + return info + + def _get_dynamic_timezone_struct(self): + info = DYNAMIC_TIME_ZONE_INFORMATION() + info.Bias = self.bias + info.StandardName = self.standard_name + info.StandardDate = self._create_system_time(self.standard_date) + info.StandardBias = self.standard_bias + info.DaylightName = self.daylight_name + info.DaylightBias = self.daylight_bias + info.DaylightDate = self._create_system_time(self.daylight_date) + # TODO(cpopa): should this flag be controllable? + info.DynamicDaylightTimeDisabled = False + info.TimeZoneKeyName = self._name + return info + + def _get_timezone_info(self): + keyname = os.path.join(REG_TIME_ZONES, self._name) + try: + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, keyname) as key: + return self._unpack_timezone_info(key) + except WindowsError as exc: + if exc.errno == NOT_FOUND: + raise exception.CloudbaseInitException( + "Timezone %r not found" % self._name) + else: + raise + + @staticmethod + def _unpack_system_time(tzi, offset): + # Unpack the values of a TIME_ZONE_INFORMATION structure + # from the given blob, starting at the given offset. + return [struct.unpack("H", tzi[index: index + 2])[0] + for index in range(offset, offset + 16, 2)] + + @staticmethod + def _query_tz_key(key): + tzi = winreg.QueryValueEx(key, "TZI")[0] + daylight_name = winreg.QueryValueEx(key, "Dlt")[0] + standard_name = winreg.QueryValueEx(key, "Std")[0] + return tzi, standard_name, daylight_name + + def _unpack_timezone_info(self, key): + # Get information about the current timezone from the given + # registry key. + tzi, standard_name, daylight_name = self._query_tz_key(key) + bias, = struct.unpack("l", tzi[:4]) + standard_bias, = struct.unpack("l", tzi[4:8]) + daylight_bias, = struct.unpack("l", tzi[8:12]) + standard_date = self._unpack_system_time(tzi, 12) + daylight_date = self._unpack_system_time(tzi, 12 + 16) + + return (bias, standard_name, tuple(standard_date), + standard_bias, daylight_name, + tuple(daylight_date), daylight_bias) + + def _set_time_zone_information(self): + info = self._get_timezone_struct() + with privilege.acquire_privilege(win32security.SE_SYSTEMTIME_NAME): + kernel32.SetTimeZoneInformation(ctypes.byref(info)) + + def _set_dynamic_time_zone_information(self): + info = self._get_dynamic_timezone_struct() + with privilege.acquire_privilege(win32security.SE_TIME_ZONE_NAME): + kernel32.SetDynamicTimeZoneInformation(ctypes.byref(info)) + + def set(self, osutils): + """Change the underlying timezone with this one. + + This will use SetDynamicTimeZoneInformation on Windows Vista+ and + for Windows 2003 it will fallback to SetTimeZoneInformation, which + doesn't handle Daylight Saving Time. + """ + if osutils.check_os_version(6, 0): + self._set_dynamic_time_zone_information() + else: + self._set_time_zone_information() diff --git a/requirements-windows.txt b/requirements-windows.txt index 2b26c77..3dedf2b 100644 --- a/requirements-windows.txt +++ b/requirements-windows.txt @@ -1,3 +1,4 @@ pywin32 comtypes wmi +tzlocal \ No newline at end of file