diff --git a/cloudbaseinit/osutils/base.py b/cloudbaseinit/osutils/base.py index 2b206ee..3eb711d 100644 --- a/cloudbaseinit/osutils/base.py +++ b/cloudbaseinit/osutils/base.py @@ -110,3 +110,7 @@ def firewall_remove_rule(self, name, port, protocol, allow=True): def get_maximum_password_length(self): """Obtain the maximum password length tailored for each OS.""" raise NotImplementedError() + + def set_timezone(self, timezone): + """Set the timezone for this instance.""" + raise NotImplementedError() diff --git a/cloudbaseinit/osutils/windows.py b/cloudbaseinit/osutils/windows.py index 8d7fce1..12b0ac9 100644 --- a/cloudbaseinit/osutils/windows.py +++ b/cloudbaseinit/osutils/windows.py @@ -23,6 +23,7 @@ import pywintypes import six from six.moves import winreg +from tzlocal import windows_tz from win32com import client import win32process import win32security @@ -33,6 +34,8 @@ from cloudbaseinit.osutils import base from cloudbaseinit.utils import encoding from cloudbaseinit.utils.windows import network +from cloudbaseinit.utils.windows import privilege +from cloudbaseinit.utils.windows import timezone LOG = logging.getLogger(__name__) @@ -296,24 +299,14 @@ class WindowsUtils(base.BaseOSUtils): _FW_SCOPE_ALL = 0 _FW_SCOPE_LOCAL_SUBNET = 1 - def _enable_shutdown_privilege(self): - process = win32process.GetCurrentProcess() - token = win32security.OpenProcessToken( - process, - win32security.TOKEN_ADJUST_PRIVILEGES | - win32security.TOKEN_QUERY) - priv_luid = win32security.LookupPrivilegeValue( - None, win32security.SE_SHUTDOWN_NAME) - privilege = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)] - win32security.AdjustTokenPrivileges(token, False, privilege) - def reboot(self): - self._enable_shutdown_privilege() - - ret_val = advapi32.InitiateSystemShutdownW(0, "Cloudbase-Init reboot", - 0, True, True) - if not ret_val: - raise exception.WindowsCloudbaseInitException("Reboot failed: %r") + with privilege.acquire_privilege(win32security.SE_SHUTDOWN_NAME): + ret_val = advapi32.InitiateSystemShutdownW( + 0, "Cloudbase-Init reboot", + 0, True, True) + if not ret_val: + raise exception.WindowsCloudbaseInitException( + "Reboot failed: %r") def _get_user_wmi_object(self, username): conn = wmi.WMI(moniker='//./root/cimv2') @@ -1047,3 +1040,10 @@ def execute_system32_process(self, args, shell=True, decode_output=False, def get_maximum_password_length(self): return 20 + + def set_timezone(self, timezone_name): + windows_name = windows_tz.tz_win.get(timezone_name) + if not windows_name: + raise exception.CloudbaseInitException( + "The given timezone name is unrecognised: %r" % timezone_name) + timezone.Timezone(windows_name).set(self) diff --git a/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py index 79c82b5..b5b3a7c 100644 --- a/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py +++ b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/factory.py @@ -20,6 +20,8 @@ PLUGINS = { 'write_files': 'cloudbaseinit.plugins.common.userdataplugins.' 'cloudconfigplugins.write_files.WriteFilesPlugin', + 'set_timezone': 'cloudbaseinit.plugins.common.userdataplugins.' + 'cloudconfigplugins.set_timezone.SetTimezonePlugin', } diff --git a/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py new file mode 100644 index 0000000..4d7689b --- /dev/null +++ b/cloudbaseinit/plugins/common/userdataplugins/cloudconfigplugins/set_timezone.py @@ -0,0 +1,42 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cloudbaseinit.openstack.common import log as logging +from cloudbaseinit.osutils import factory +from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import ( + base +) + + +LOG = logging.getLogger(__name__) + + +class SetTimezonePlugin(base.BaseCloudConfigPlugin): + """Change the timezone for the underlying platform. + + This uses IANA timezone names (which are mapped to the Windows + time zone names, as seen in the following link: + https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx). + + For instance, to change the timezone to 'America/Montevideo', use + this syntax:: + + set_timezone: America/Montevideo + + """ + + def process(self, data): + LOG.info("Changing timezone to %r", data) + osutils = factory.get_os_utils() + osutils.set_timezone(data) diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index add21ad..34581e6 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -51,6 +51,7 @@ def setUp(self): self._moves_mock = mock.MagicMock() self._xmlrpc_client_mock = mock.MagicMock() self._ctypes_mock = mock.MagicMock() + self._tzlocal_mock = mock.Mock() self._module_patcher = mock.patch.dict( 'sys.modules', @@ -61,7 +62,8 @@ def setUp(self): 'six.moves': self._moves_mock, 'six.moves.xmlrpc_client': self._xmlrpc_client_mock, 'ctypes': self._ctypes_mock, - 'pywintypes': self._pywintypes_mock}) + 'pywintypes': self._pywintypes_mock, + 'tzlocal': self._tzlocal_mock}) self._module_patcher.start() self.windows_utils = importlib.import_module( @@ -78,31 +80,10 @@ def setUp(self): def tearDown(self): self._module_patcher.stop() - def test_enable_shutdown_privilege(self): - fake_process = mock.MagicMock() - fake_token = True - LUID = 'fakeid' - self._win32process_mock.GetCurrentProcess.return_value = fake_process - self._win32security_mock.OpenProcessToken.return_value = fake_token - self._win32security_mock.LookupPrivilegeValue.return_value = LUID - - self._winutils._enable_shutdown_privilege() - - privilege = [(LUID, - self._win32security_mock.SE_PRIVILEGE_ENABLED)] - self._win32security_mock.AdjustTokenPrivileges.assert_called_with( - fake_token, - False, - privilege) - - self._win32security_mock.OpenProcessToken.assert_called_with( - fake_process, self._win32security_mock.TOKEN_ADJUST_PRIVILEGES | - self._win32security_mock.TOKEN_QUERY) - - @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' - '._enable_shutdown_privilege') - def _test_reboot(self, mock_enable_shutdown_privilege, ret_value, + @mock.patch('cloudbaseinit.osutils.windows.privilege') + def _test_reboot(self, mock_privilege_module, ret_value, expected_ret_value=None): + mock_privilege_module.acquire_privilege = mock.MagicMock() advapi32 = self._windll_mock.advapi32 advapi32.InitiateSystemShutdownW = mock.MagicMock( return_value=ret_value) @@ -118,6 +99,8 @@ def _test_reboot(self, mock_enable_shutdown_privilege, ret_value, 0, "Cloudbase-Init reboot", 0, True, True) + mock_privilege_module.acquire_privilege.assert_called_once_with( + self._win32security_mock.SE_SHUTDOWN_NAME) def test_reboot(self): self._test_reboot(ret_value=True) @@ -1477,3 +1460,32 @@ def test_execute_system32_process(self, mock_execute_process, def test_get_password_maximum_length(self): self.assertEqual(20, self._winutils.get_maximum_password_length()) + + @mock.patch('cloudbaseinit.osutils.windows.windows_tz') + def test_set_timezone_fails(self, mock_windows_tz): + mock_windows_tz.tz_win.get.return_value = None + + with self.assertRaises(exception.CloudbaseInitException) as cm: + self._winutils.set_timezone(mock.sentinel.timezone) + expected = ( + "The given timezone name is unrecognised: %r" + % mock.sentinel.timezone + ) + self.assertEqual(expected, str(cm.exception)) + mock_windows_tz.tz_win.get.assert_called_once_with( + mock.sentinel.timezone) + + @mock.patch('cloudbaseinit.osutils.windows.timezone') + @mock.patch('cloudbaseinit.osutils.windows.windows_tz') + def test_set_timezone(self, mock_windows_tz, mock_timezone): + mock_windows_tz.tz_win.get.return_value = ( + mock.sentinel.windows_timezone) + + self._winutils.set_timezone(mock.sentinel.timezone) + + mock_windows_tz.tz_win.get.assert_called_once_with( + mock.sentinel.timezone) + mock_timezone.Timezone.assert_called_once_with( + mock.sentinel.windows_timezone) + mock_timezone.Timezone.return_value.set.assert_called_once_with( + self._winutils) diff --git a/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py b/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py new file mode 100644 index 0000000..75dcbcf --- /dev/null +++ b/cloudbaseinit/tests/plugins/common/userdataplugins/cloudconfigplugins/test_set_timezone.py @@ -0,0 +1,54 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + +from cloudbaseinit.plugins.common.userdataplugins import cloudconfig +from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import ( + set_timezone +) +from cloudbaseinit.tests import testutils + + +class TestSetTimezone(unittest.TestCase): + + @mock.patch('cloudbaseinit.plugins.common.userdataplugins.' + 'cloudconfigplugins.set_timezone.factory') + def test_process(self, mock_osutils_factory): + with testutils.LogSnatcher('cloudbaseinit.plugins.common.' + 'userdataplugins.cloudconfigplugins.' + 'set_timezone') as snatcher: + set_timezone.SetTimezonePlugin().process(mock.sentinel.timezone) + + expected_logging = [ + 'Changing timezone to %r' % mock.sentinel.timezone + ] + mock_osutils_factory.get_os_utils.assert_called_once_with() + mock_osutils = mock_osutils_factory.get_os_utils.return_value + mock_osutils.set_timezone.assert_called_once_with( + mock.sentinel.timezone) + self.assertEqual(expected_logging, snatcher.output) + + @mock.patch('cloudbaseinit.plugins.common.userdataplugins.' + 'cloudconfigplugins.set_timezone.SetTimezonePlugin.process') + def test_timezone_dispatch(self, mock_process_plugin): + plugin = cloudconfig.CloudConfigPlugin() + plugin.process_non_multipart("set_timezone: America Standard Time") + + mock_process_plugin.assert_called_once_with("America Standard Time") diff --git a/cloudbaseinit/tests/utils/windows/test_privilege.py b/cloudbaseinit/tests/utils/windows/test_privilege.py new file mode 100644 index 0000000..c34c594 --- /dev/null +++ b/cloudbaseinit/tests/utils/windows/test_privilege.py @@ -0,0 +1,63 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import importlib +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + + +class TestPrivilege(unittest.TestCase): + + def setUp(self): + self._win32process_mock = mock.MagicMock() + self._win32security_mock = mock.MagicMock() + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'win32process': self._win32process_mock, + 'win32security': self._win32security_mock}) + + self._module_patcher.start() + self.privilege_module = importlib.import_module( + "cloudbaseinit.utils.windows.privilege") + + def tearDown(self): + self._module_patcher.stop() + + def test_privilege_context_manager(self): + fake_process = mock.MagicMock() + fake_token = True + LUID = 'fakeid' + self._win32process_mock.GetCurrentProcess.return_value = fake_process + self._win32security_mock.OpenProcessToken.return_value = fake_token + self._win32security_mock.LookupPrivilegeValue.return_value = LUID + privilege_enabled = [(LUID, + self._win32security_mock.SE_PRIVILEGE_ENABLED)] + privilege_removed = [(LUID, + self._win32security_mock.SE_PRIVILEGE_REMOVED)] + with self.privilege_module.acquire_privilege(mock.sentinel.privilege): + + self._win32security_mock.AdjustTokenPrivileges.assert_called_with( + fake_token, False, privilege_enabled) + + self._win32security_mock.OpenProcessToken.assert_called_with( + fake_process, + self._win32security_mock.TOKEN_ADJUST_PRIVILEGES | + self._win32security_mock.TOKEN_QUERY) + + self._win32security_mock.AdjustTokenPrivileges.assert_called_with( + fake_token, False, privilege_removed) diff --git a/cloudbaseinit/tests/utils/windows/test_timezone.py b/cloudbaseinit/tests/utils/windows/test_timezone.py new file mode 100644 index 0000000..8478c58 --- /dev/null +++ b/cloudbaseinit/tests/utils/windows/test_timezone.py @@ -0,0 +1,271 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import importlib +import os +import struct +import unittest +try: + import unittest.mock as mock +except ImportError: + import mock + + +from cloudbaseinit import exception + + +class FakeWindowsError(Exception): + pass + + +class TestTimezone(unittest.TestCase): + + def setUp(self): + self._mock_moves = mock.MagicMock() + self._mock_winreg = mock.Mock() + self._mock_ctypes = mock.Mock() + self._mock_win32security = mock.Mock() + self._mock_win32process = mock.Mock() + self._mock_wintypes = mock.MagicMock() + self._mock_ctypes.wintypes = self._mock_wintypes + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'ctypes': self._mock_ctypes, + 'six.moves': self._mock_moves, + 'win32process': self._mock_win32process, + 'win32security': self._mock_win32security}) + self._module_patcher.start() + self._mock_moves.winreg = self._mock_winreg + self._timezone_module = importlib.import_module( + 'cloudbaseinit.utils.windows.timezone') + self._timezone_module.WindowsError = FakeWindowsError + self._fixture_timezone_info = [ + 0, 'StandardName', list(range(8)), + 3, "DaylightName", list(reversed(range(8))), 6, + ] + + def tearDown(self): + self._module_patcher.stop() + + @mock.patch('cloudbaseinit.utils.windows.timezone.SYSTEMTIME') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info', new=mock.MagicMock()) + def test__create_system_time(self, mock_systemtime): + values = list(range(8)) + timezoneobj = self._timezone_module.Timezone(mock.sentinel.timezone) + result = timezoneobj._create_system_time(values) + mock_systemtime.assert_called_once_with() + self.assertEqual(tuple(range(8)), + (result.wYear, result.wMonth, result.wDayOfWeek, + result.wDay, result.wHour, result.wMinute, + result.wSecond, result.wMilliseconds)) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_create_system_time') + @mock.patch('cloudbaseinit.utils.windows.timezone.TIME_ZONE_INFORMATION') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + def test__get_timezone_struct(self, mock_get_timezone_info, + mock_time_zone_information, + mock_create_system_time): + mock_get_timezone_info.return_value = self._fixture_timezone_info + + timezoneobj = self._timezone_module.Timezone(mock.sentinel.timezone) + result = timezoneobj._get_timezone_struct() + + mock_time_zone_information.assert_called_once_with() + self.assertEqual(0, result.Bias) + self.assertEqual('StandardName', result.StandardName) + self.assertEqual(result.StandardDate, + mock_create_system_time.return_value) + self.assertEqual(result.DaylightDate, + mock_create_system_time.return_value) + self.assertEqual(3, result.StandardBias) + self.assertEqual("DaylightName", result.DaylightName) + self.assertEqual(6, result.DaylightBias) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_create_system_time') + @mock.patch('cloudbaseinit.utils.windows.timezone.' + 'DYNAMIC_TIME_ZONE_INFORMATION') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + def test__get_dynamic_timezone_struct(self, mock_get_timezone_info, + mock_dynamic_time_zone_information, + mock_create_system_time): + + mock_get_timezone_info.return_value = self._fixture_timezone_info + + timezoneobj = self._timezone_module.Timezone("timezone name") + result = timezoneobj._get_dynamic_timezone_struct() + + mock_dynamic_time_zone_information.assert_called_once_with() + self.assertEqual(0, result.Bias) + self.assertEqual('StandardName', result.StandardName) + self.assertEqual(3, result.StandardBias) + self.assertEqual("DaylightName", result.DaylightName) + self.assertEqual(6, result.DaylightBias) + self.assertFalse(result.DynamicDaylightTimeDisabled) + self.assertEqual("timezone name", result.TimeZoneKeyName) + self.assertEqual(result.StandardDate, + mock_create_system_time.return_value) + self.assertEqual(result.DaylightDate, + mock_create_system_time.return_value) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_unpack_timezone_info') + def test__get_timezone_info(self, mock_unpack_timezone_info): + mock_unpack_timezone_info.return_value = range(7) + registry_key = mock.MagicMock() + self._mock_winreg.OpenKey.return_value = registry_key + + self._timezone_module.Timezone("timezone test") + self._mock_winreg.OpenKey.assert_called_once_with( + self._mock_winreg.HKEY_LOCAL_MACHINE, + os.path.join(self._timezone_module.REG_TIME_ZONES, + "timezone test")) + mock_unpack_timezone_info.assert_called_once_with( + registry_key.__enter__.return_value) + + def test__get_time_zone_info_reraise_cloudbaseinit_exception(self): + error = FakeWindowsError() + error.errno = self._timezone_module.NOT_FOUND + self._mock_winreg.OpenKey.side_effect = error + + with self.assertRaises(exception.CloudbaseInitException) as cm: + self._timezone_module.Timezone("timezone test") + self.assertEqual("Timezone 'timezone test' not found", + str(cm.exception)) + + def test__get_time_zone_info_reraise_exception(self): + error = FakeWindowsError() + error.errno = 404 + self._mock_winreg.OpenKey.side_effect = error + + with self.assertRaises(FakeWindowsError) as cm: + self._timezone_module.Timezone("timezone test") + self.assertIsInstance(cm.exception, FakeWindowsError) + self.assertEqual(404, cm.exception.errno) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_query_tz_key') + def test__get_time_zone_info_real_data(self, mock_query_tz_key): + orig_unpack = struct.unpack + + def unpacker(format, blob): + if format == "l": + format = "i" + return orig_unpack(format, blob) + + mock_query_tz_key.return_value = ( + b'\xf0\x00\x00\x00\x00\x00\x00\x00\xc4\xff\xff\xff\x00\x00' + b'\x0b\x00\x00\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x03\x00\x00\x00\x02\x00\x02\x00\x00\x00\x00\x00' + b'\x00\x00', + "Atlantic Standard Time", + "Atlantic Daylight Time", + ) + registry_key = mock.MagicMock() + self._mock_winreg.OpenKey.return_value = registry_key + + with mock.patch('struct.unpack', side_effect=unpacker): + timezoneobj = self._timezone_module.Timezone("timezone test") + + mock_query_tz_key.assert_called_once_with(registry_key.__enter__()) + self.assertEqual(240, timezoneobj.bias) + self.assertEqual(-60, timezoneobj.daylight_bias) + self.assertEqual((0, 3, 0, 2, 2, 0, 0, 0), + timezoneobj.daylight_date) + self.assertEqual('Atlantic Daylight Time', timezoneobj.daylight_name) + self.assertEqual(0, timezoneobj.standard_bias) + self.assertEqual((0, 11, 0, 1, 2, 0, 0, 0), + timezoneobj.standard_date) + self.assertEqual('Atlantic Standard Time', timezoneobj.standard_name) + + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_set_dynamic_time_zone_information') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_set_time_zone_information') + def _test_set_time_zone_information( + self, mock__set_time_zone_information, + mock__set_dynamic_time_zone_information, + mock_get_timezone_info, windows_60=True): + mock_osutils = mock.Mock() + mock_osutils.check_os_version.return_value = windows_60 + mock_get_timezone_info.return_value = self._fixture_timezone_info + + timezoneobj = self._timezone_module.Timezone("fake") + timezoneobj.set(mock_osutils) + + if windows_60: + mock__set_dynamic_time_zone_information.assert_called_once_with() + else: + mock__set_time_zone_information.assert_called_once_with() + + def test_set_daylight_not_supported(self): + self._test_set_time_zone_information(windows_60=False) + + def test_set_daylight_supported(self): + self._test_set_time_zone_information(windows_60=True) + + @mock.patch('cloudbaseinit.utils.windows.privilege.acquire_privilege') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_info') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_timezone_struct') + @mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.' + '_get_dynamic_timezone_struct') + def _test__set_time_zone_information( + self, mock__get_dynamic_timezone_struct, + mock__get_timezone_struct, + mock_get_timezone_info, + mock_acquire_privilege, + windows_60=True, + privilege=None): + mock_get_timezone_info.return_value = self._fixture_timezone_info + + mock__get_timezone_struct.return_value = ( + mock.sentinel.timezone_struct, + ) + mock__get_dynamic_timezone_struct.return_value = ( + mock.sentinel.timezone_struct, + ) + + timezoneobj = self._timezone_module.Timezone("fake") + if windows_60: + timezoneobj._set_dynamic_time_zone_information() + mock__get_dynamic_timezone_struct.assert_called_once_with() + else: + timezoneobj._set_time_zone_information() + mock__get_timezone_struct.assert_called_once_with() + + mock_acquire_privilege.assert_called_once_with(privilege) + if windows_60: + self._mock_ctypes.windll.kernel32.SetDynamicTimeZoneInformation( + self._mock_ctypes.byref(mock.sentinel.timezone_struct)) + else: + self._mock_ctypes.windll.kernel32.SetTimeZoneInformation( + self._mock_ctypes.byref(mock.sentinel.timezone_struct)) + + def test__set_time_zone_information(self): + self._test__set_time_zone_information( + windows_60=False, + privilege=self._mock_win32security.SE_SYSTEMTIME_NAME) + + def test__set_dynamic_time_zone_information(self): + self._test__set_time_zone_information( + windows_60=True, + privilege=self._mock_win32security.SE_TIME_ZONE_NAME) diff --git a/cloudbaseinit/utils/windows/privilege.py b/cloudbaseinit/utils/windows/privilege.py new file mode 100644 index 0000000..8551a36 --- /dev/null +++ b/cloudbaseinit/utils/windows/privilege.py @@ -0,0 +1,36 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +import win32process +import win32security + + +@contextlib.contextmanager +def acquire_privilege(privilege): + process = win32process.GetCurrentProcess() + token = win32security.OpenProcessToken( + process, + win32security.TOKEN_ADJUST_PRIVILEGES | + win32security.TOKEN_QUERY) + priv_luid = win32security.LookupPrivilegeValue(None, privilege) + privilege_enable = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)] + privilege_disable = [(priv_luid, win32security.SE_PRIVILEGE_REMOVED)] + win32security.AdjustTokenPrivileges(token, False, privilege_enable) + + try: + yield + finally: + win32security.AdjustTokenPrivileges(token, False, privilege_disable) diff --git a/cloudbaseinit/utils/windows/timezone.py b/cloudbaseinit/utils/windows/timezone.py new file mode 100644 index 0000000..14aa913 --- /dev/null +++ b/cloudbaseinit/utils/windows/timezone.py @@ -0,0 +1,190 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes +from ctypes import wintypes +import os +import struct + +from six.moves import winreg +import win32security + +from cloudbaseinit import exception +from cloudbaseinit.utils.windows import privilege + + +REG_TIME_ZONES = "Software\\Microsoft\\Windows NT\\CurrentVersion\\Time Zones" +NOT_FOUND = 2 +kernel32 = ctypes.windll.kernel32 + + +class SYSTEMTIME(ctypes.Structure): + _fields_ = [ + ('wYear', wintypes.WORD), + ('wMonth', wintypes.WORD), + ('wDayOfWeek', wintypes.WORD), + ('wDay', wintypes.WORD), + ('wHour', wintypes.WORD), + ('wMinute', wintypes.WORD), + ('wMilliseconds', wintypes.WORD), + ] + + +class TIME_ZONE_INFORMATION(ctypes.Structure): + _fields_ = [ + ('Bias', wintypes.LONG), + ('StandardName', wintypes.WCHAR * 32), + ('StandardDate', SYSTEMTIME), + ('StandardBias', wintypes.LONG), + ('DaylightName', wintypes.WCHAR * 32), + ('DaylightDate', SYSTEMTIME), + ('DaylightBias', wintypes.LONG), + ] + + +class DYNAMIC_TIME_ZONE_INFORMATION(ctypes.Structure): + _fields_ = [ + ('Bias', wintypes.LONG), + ('StandardName', wintypes.WCHAR * 32), + ('StandardDate', SYSTEMTIME), + ('StandardBias', wintypes.LONG), + ('DaylightName', wintypes.WCHAR * 32), + ('DaylightDate', SYSTEMTIME), + ('DaylightBias', wintypes.LONG), + ('TimeZoneKeyName', wintypes.WCHAR * 128), + ('DynamicDaylightTimeDisabled', wintypes.BOOLEAN), + ] + + +class Timezone(object): + """Class which holds details about a particular timezone. + + It also can be used to change the current timezone, + by calling the :meth:`~set`. The supported time zone names + are the ones found here: + https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx + """ + + def __init__(self, name): + self._name = name + self._timezone_info = self._get_timezone_info() + + # Public API. + self.bias = self._timezone_info[0] + self.standard_name = self._timezone_info[1] + self.standard_date = self._timezone_info[2] + self.standard_bias = self._timezone_info[3] + self.daylight_name = self._timezone_info[4] + self.daylight_date = self._timezone_info[5] + self.daylight_bias = self._timezone_info[6] + + @staticmethod + def _create_system_time(values): + mtime = SYSTEMTIME() + mtime.wYear = values[0] + mtime.wMonth = values[1] + mtime.wDayOfWeek = values[2] + mtime.wDay = values[3] + mtime.wHour = values[4] + mtime.wMinute = values[5] + mtime.wSecond = values[6] + mtime.wMilliseconds = values[7] + return mtime + + def _get_timezone_struct(self): + info = TIME_ZONE_INFORMATION() + info.Bias = self.bias + info.StandardName = self.standard_name + info.StandardDate = self._create_system_time(self.standard_date) + info.StandardBias = self.standard_bias + info.DaylightName = self.daylight_name + info.DaylightBias = self.daylight_bias + info.DaylightDate = self._create_system_time(self.daylight_date) + return info + + def _get_dynamic_timezone_struct(self): + info = DYNAMIC_TIME_ZONE_INFORMATION() + info.Bias = self.bias + info.StandardName = self.standard_name + info.StandardDate = self._create_system_time(self.standard_date) + info.StandardBias = self.standard_bias + info.DaylightName = self.daylight_name + info.DaylightBias = self.daylight_bias + info.DaylightDate = self._create_system_time(self.daylight_date) + # TODO(cpopa): should this flag be controllable? + info.DynamicDaylightTimeDisabled = False + info.TimeZoneKeyName = self._name + return info + + def _get_timezone_info(self): + keyname = os.path.join(REG_TIME_ZONES, self._name) + try: + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, keyname) as key: + return self._unpack_timezone_info(key) + except WindowsError as exc: + if exc.errno == NOT_FOUND: + raise exception.CloudbaseInitException( + "Timezone %r not found" % self._name) + else: + raise + + @staticmethod + def _unpack_system_time(tzi, offset): + # Unpack the values of a TIME_ZONE_INFORMATION structure + # from the given blob, starting at the given offset. + return [struct.unpack("H", tzi[index: index + 2])[0] + for index in range(offset, offset + 16, 2)] + + @staticmethod + def _query_tz_key(key): + tzi = winreg.QueryValueEx(key, "TZI")[0] + daylight_name = winreg.QueryValueEx(key, "Dlt")[0] + standard_name = winreg.QueryValueEx(key, "Std")[0] + return tzi, standard_name, daylight_name + + def _unpack_timezone_info(self, key): + # Get information about the current timezone from the given + # registry key. + tzi, standard_name, daylight_name = self._query_tz_key(key) + bias, = struct.unpack("l", tzi[:4]) + standard_bias, = struct.unpack("l", tzi[4:8]) + daylight_bias, = struct.unpack("l", tzi[8:12]) + standard_date = self._unpack_system_time(tzi, 12) + daylight_date = self._unpack_system_time(tzi, 12 + 16) + + return (bias, standard_name, tuple(standard_date), + standard_bias, daylight_name, + tuple(daylight_date), daylight_bias) + + def _set_time_zone_information(self): + info = self._get_timezone_struct() + with privilege.acquire_privilege(win32security.SE_SYSTEMTIME_NAME): + kernel32.SetTimeZoneInformation(ctypes.byref(info)) + + def _set_dynamic_time_zone_information(self): + info = self._get_dynamic_timezone_struct() + with privilege.acquire_privilege(win32security.SE_TIME_ZONE_NAME): + kernel32.SetDynamicTimeZoneInformation(ctypes.byref(info)) + + def set(self, osutils): + """Change the underlying timezone with this one. + + This will use SetDynamicTimeZoneInformation on Windows Vista+ and + for Windows 2003 it will fallback to SetTimeZoneInformation, which + doesn't handle Daylight Saving Time. + """ + if osutils.check_os_version(6, 0): + self._set_dynamic_time_zone_information() + else: + self._set_time_zone_information() diff --git a/requirements-windows.txt b/requirements-windows.txt index 2b26c77..3dedf2b 100644 --- a/requirements-windows.txt +++ b/requirements-windows.txt @@ -1,3 +1,4 @@ pywin32 comtypes wmi +tzlocal \ No newline at end of file