From 618db56aff9339aef3a7310a555d7ac93dc76665 Mon Sep 17 00:00:00 2001 From: ben sims Date: Fri, 1 Nov 2019 14:45:01 +0000 Subject: [PATCH] CA-329845 - Remove usage of credentials file for CIF Signed-off-by: ben sims --- Makefile | 1 + drivers/ISOSR.py | 63 +++--------- drivers/SMBSR.py | 71 +++---------- drivers/cifutils.py | 84 +++++++++++++++ drivers/util.py | 10 +- mk/sm.spec.in | 3 + tests/test_ISOSR.py | 92 ++++++++++++++--- tests/test_SMBSR.py | 47 ++++++++- tests/test_cifutils.py | 225 +++++++++++++++++++++++++++++++++++++++++ tests/test_utils.py | 37 +++++++ tests/testlib.py | 2 +- 11 files changed, 511 insertions(+), 124 deletions(-) create mode 100755 drivers/cifutils.py create mode 100644 tests/test_cifutils.py create mode 100644 tests/test_utils.py diff --git a/Makefile b/Makefile index 50c06325c..d78f29fd6 100755 --- a/Makefile +++ b/Makefile @@ -31,6 +31,7 @@ SM_LIBS += scsiutil SM_LIBS += scsi_host_rescan SM_LIBS += vhdutil SM_LIBS += lvhdutil +SM_LIBS += cifutils SM_LIBS += xs_errors SM_LIBS += nfs SM_LIBS += devscan diff --git a/drivers/ISOSR.py b/drivers/ISOSR.py index 5bab7bd4a..d2ec80261 100755 --- a/drivers/ISOSR.py +++ b/drivers/ISOSR.py @@ -21,6 +21,7 @@ import nfs import os, re import xs_errors +import cifutils CAPABILITIES = ["VDI_CREATE", "VDI_DELETE", "VDI_ATTACH", "VDI_DETACH", "SR_SCAN", "SR_ATTACH", "SR_DETACH"] @@ -230,7 +231,6 @@ def load(self, sr_uuid): # Some info we need: self.sr_vditype = 'phy' - self.credentials = None def delete(self, sr_uuid): pass @@ -254,7 +254,6 @@ def attach(self, sr_uuid): mountcmd=[] location = util.to_plain_string(self.dconf['location']) - self.credentials = os.path.join("/tmp", util.gen_uuid()) # TODO: Have XC standardise iso type string protocol = 'nfs_iso' options = '' @@ -305,7 +304,6 @@ def attach(self, sr_uuid): # Check the validity of 'smbversion'. # Raise an exception for any invalid version. if self.smbversion not in [SMB_VERSION_1, SMB_VERSION_3]: - self._cleanupcredentials() raise xs_errors.XenError('ISOInvalidSMBversion') # Attempt mounting @@ -344,21 +342,18 @@ def attach(self, sr_uuid): mountcmd.extend(options) self.mountOverSMB(mountcmd) else: - self._cleanupcredentials() raise xs_errors.XenError( 'ISOMountFailure', opterr=inst.reason) else: util.SMlog('ISOSR mount over smb 1.0') self.mountOverSMB(mountcmd) except util.CommandException, inst: - self._cleanupcredentials() if not self.is_smbversion_specified: raise xs_errors.XenError( 'ISOMountFailure', opterr=smb3_fail_reason) else: raise xs_errors.XenError( 'ISOMountFailure', opterr=inst.reason) - self._cleanupcredentials() # Check the iso_path is accessible if not self._checkmount(): @@ -384,7 +379,9 @@ def getSMBVersion(self): def mountOverSMB(self, mountcmd): """This function raises util.CommandException""" - util.pread(mountcmd, True) + new_env, domain = cifutils.getCIFCredentials(self.dconf, self.session) + + util.pread(mountcmd, True, new_env) try: if not self.is_smbversion_specified: # Store the successful smb version in PBD config @@ -421,10 +418,16 @@ def appendCIFSMountOptions(self, mountcmd): """Append options to mount.cifs""" options = [] try: - options.append(self.getCIFSPasswordOptions()) options.append(self.getCacheOptions()) - options.append('guest') + + if not cifutils.containsCredentials(self.dconf): + options.append('guest') + options.append(self.getSMBVersion()) + + username, domain = cifutils.splitDomainAndUsername(self.dconf['username']) + if domain: + options.append('domain='+ domain) except: util.SMlog("Exception while attempting to append mount options") raise @@ -438,48 +441,6 @@ def getCacheOptions(self): """Pass cache options to mount.cifs""" return "cache=none" - def getCIFSPasswordOptions(self): - if self.dconf.has_key('username') \ - and (self.dconf.has_key('cifspassword') or self.dconf.has_key('cifspassword_secret')): - dom_username = self.dconf['username'].split('\\') - if len(dom_username) == 1: - domain = None - username = dom_username[0] - elif len(dom_username) == 2: - domain = dom_username[0] - username = dom_username[1] - else: - err_str = ("A maximum of 2 tokens are expected " - "(\). {} were given." - .format(len(dom_username))) - util.SMlog('CIFS ISO SR mount error: ' + err_str) - raise xs_errors.XenError('ISOMountFailure', opterr=err_str) - - if self.dconf.has_key('cifspassword_secret'): - password = util.get_secret(self.session, self.dconf['cifspassword_secret']) - else: - password = self.dconf['cifspassword'] - - domain = util.to_plain_string(domain) - username = util.to_plain_string(username) - password = util.to_plain_string(password) - - cred_str = 'username={}\npassword={}\n'.format(username, password) - - if domain: - cred_str += 'domain={}\n'.format(domain) - - # Open credentials file and truncate - f = open(self.credentials, 'w') - f.write(cred_str) - f.close() - credentials = "credentials=%s" % self.credentials - return credentials - - def _cleanupcredentials(self): - if self.credentials and os.path.exists(self.credentials): - os.unlink(self.credentials) - def detach(self, sr_uuid): """Std. detach""" # This handles legacy mode too, so no need to check diff --git a/drivers/SMBSR.py b/drivers/SMBSR.py index a9b0a83b9..52dd0c5ad 100755 --- a/drivers/SMBSR.py +++ b/drivers/SMBSR.py @@ -25,6 +25,7 @@ import vhdutil from lock import Lock import cleanup +import cifutils CAPABILITIES = ["SR_PROBE","SR_UPDATE", "SR_CACHING", "VDI_CREATE","VDI_DELETE","VDI_ATTACH","VDI_DETACH", @@ -80,7 +81,6 @@ def load(self, sr_uuid): self.sm_config = self.session.xenapi.SR.get_sm_config(self.sr_ref) else: self.sm_config = self.srcmd.params.get('sr_sm_config') or {} - self.credentials = None self.mountpoint = os.path.join(SR.MOUNT_BASE, 'SMB', self.__extract_server(), sr_uuid) self.linkpath = os.path.join(self.mountpoint, sr_uuid or "") @@ -95,7 +95,7 @@ def checkmount(self): util.ismount(self.mountpoint)) and \ util.pathexists(self.linkpath))) - def mount(self, mountpoint=None): + def makeMountPoint(self, mountpoint): """Mount the remote SMB export at 'mountpoint'""" if mountpoint == None: mountpoint = self.mountpoint @@ -108,27 +108,27 @@ def mount(self, mountpoint=None): except util.CommandException, inst: raise SMBException("Failed to make directory: code is %d" % inst.code) + return mountpoint + + def mount(self, mountpoint=None): + + mountpoint = self.makeMountPoint(mountpoint) - self.credentials = os.path.join("/tmp", util.gen_uuid()) + new_env, domain = cifutils.getCIFCredentials(self.dconf, self.session, smbsr=True) - options = self.getMountOptions() + options = self.getMountOptions(domain) if options: options = ",".join(str(x) for x in options if x) try: + util.ioretry(lambda: util.pread(["mount.cifs", self.remoteserver, - mountpoint, "-o", options]), + mountpoint, "-o", options], new_env), errlist=[errno.EPIPE, errno.EIO], maxretry=2, nofail=True) except util.CommandException, inst: raise SMBException("mount failed with return code %d" % inst.code) - finally: - try: - os.unlink(self.credentials) - except OSError: - util.SMlog("Error when trying to delete " - "credentials files /tmp/") # Sanity check to ensure that the user has at least RO access to the # mounted share. Windows sharing and security settings can be tricky. @@ -142,57 +142,18 @@ def mount(self, mountpoint=None): raise SMBException("Permission denied. " "Please check user privileges.") - def getMountOptions(self): + def getMountOptions(self, domain): """Creates option string based on parameters provided""" options = ['cache=loose', 'vers=3.0', 'actimeo=0' ] - if self.dconf.has_key('username') and \ - (self.dconf.has_key('password') or - self.dconf.has_key('password_secret')): - - dom_username = self.dconf['username'].split('\\') - - if len(dom_username) == 1: - domain = None - username = dom_username[0] - elif len(dom_username) == 2: - domain = dom_username[0] - username = dom_username[1] - else: - raise SMBException("A maximum of 2 tokens are expected " - "(\). {} were given." - .format(len(dom_username))) - - domain = util.to_plain_string(domain) - username = util.to_plain_string(username) - - if self.dconf.has_key('password_secret'): - password = util.get_secret( - self.session, - self.dconf['password_secret'] - ) - else: - password = self.dconf['password'] - - password = util.to_plain_string(password) - - cred_str = 'username={}\npassword={}\n'.format(username, password) - - if domain: - cred_str += 'domain={}\n'.format(domain) - - # Open credentials file and truncate - try: - with open(self.credentials, 'w') as f: - f.write(cred_str) - except IOError, e: - raise SMBException("Failed to create credentials file") + if domain: + options.append('domain='+domain) - options.append('credentials=%s' % self.credentials) - else: + if not cifutils.containsCredentials(self.dconf, smbsr=True): + # No login details provided. options.append('guest') return options diff --git a/drivers/cifutils.py b/drivers/cifutils.py new file mode 100755 index 000000000..cba80eb53 --- /dev/null +++ b/drivers/cifutils.py @@ -0,0 +1,84 @@ +#!/usr/bin/python +# +# Copyright (C) Citrix Systems Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation; version 2.1 only. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# ISOSR: remote iso storage repository + +import util +import xs_errors + + +class CIFSException(Exception): + def __init__(self, errstr): + self.errstr = errstr + +def getDconfPasswordKey(smbsr=False): + key_password = 'cifspassword' + key_secret = 'cifspassword_secret' + + if smbsr: + key_password = 'password' + key_secret = 'password_secret' + return key_password, key_secret + +def containsPassword(dconf, smbsr=False): + key_password, key_secret = getDconfPasswordKey(smbsr) + return ((key_password in dconf) or (key_secret in dconf)) + + +def containsCredentials(dconf, smbsr=False): + return ((('username' in dconf)) and (containsPassword(dconf, smbsr))) + + +def splitDomainAndUsername(uname): + + username = None + domain = None + dom_username = uname.split('\\') + + if len(dom_username) == 1: + domain = None + username = dom_username[0] + elif len(dom_username) == 2: + domain = dom_username[0] + username = dom_username[1] + else: + raise CIFSException("A maximum of 2 tokens are expected " + "(\). {} were given." + .format(len(dom_username))) + return username, domain + + +def getCIFCredentials(dconf, session, smbsr=False): + credentials = {} + domain = None + if (containsCredentials(dconf, smbsr)): + + username, domain = splitDomainAndUsername(dconf['username']) + + credentials["USER"] = util.to_plain_string(username) + + key_password, key_secret = getDconfPasswordKey(smbsr) + if key_secret in dconf: + password = util.get_secret(session, dconf[key_secret]) + else: + password = dconf[key_password] + + credentials["PASSWD"] = util.to_plain_string(password) + + domain = util.to_plain_string(domain) + + return credentials, domain diff --git a/drivers/util.py b/drivers/util.py index 28dbdab07..4fa8fe4af 100755 --- a/drivers/util.py +++ b/drivers/util.py @@ -130,9 +130,11 @@ def _getDateString(): return "%s-%s-%s:%s:%s:%s" % \ (t[0],t[1],t[2],t[3],t[4],t[5]) -def doexec(args, inputtext=None): +def doexec(args, inputtext=None, new_env={}): """Execute a subprocess, then return its return code, stdout and stderr""" - proc = subprocess.Popen(args,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,close_fds=True) + env=dict(os.environ) + env.update(new_env) + proc = subprocess.Popen(args,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,close_fds=True, env=env) (stdout,stderr) = proc.communicate(inputtext) # Workaround for a pylint bug, can be removed after upgrade to # python 3.x or maybe a newer version of pylint in the future @@ -152,7 +154,7 @@ def is_string(value): # each pair, the first component is passed to exec while the second is # written to the logs. def pread(cmdlist, close_stdin = False, scramble = None, expect_rc = 0, - quiet = False): + quiet = False, new_env = {}): cmdlist_for_exec = [] cmdlist_for_log = [] for item in cmdlist: @@ -171,7 +173,7 @@ def pread(cmdlist, close_stdin = False, scramble = None, expect_rc = 0, if not quiet: SMlog(cmdlist_for_log) - (rc,stdout,stderr) = doexec(cmdlist_for_exec) + (rc,stdout,stderr) = doexec(cmdlist_for_exec, new_env=new_env) if rc != expect_rc: SMlog("FAILED in util.pread: (rc %d) stdout: '%s', stderr: '%s'" % \ (rc, stdout, stderr)) diff --git a/mk/sm.spec.in b/mk/sm.spec.in index e4208a9ad..35698fbb0 100755 --- a/mk/sm.spec.in +++ b/mk/sm.spec.in @@ -262,6 +262,9 @@ tests/run_python_unittests.sh /opt/xensource/sm/mpathutil.py /opt/xensource/sm/mpathutil.pyc /opt/xensource/sm/mpathutil.pyo +/opt/xensource/sm/cifutils.py +/opt/xensource/sm/cifutils.pyc +/opt/xensource/sm/cifutils.pyo /opt/xensource/sm/mpp_mpathutil.py /opt/xensource/sm/mpp_mpathutil.pyc /opt/xensource/sm/mpp_mpathutil.pyo diff --git a/tests/test_ISOSR.py b/tests/test_ISOSR.py index 86e52931f..4de31bc99 100644 --- a/tests/test_ISOSR.py +++ b/tests/test_ISOSR.py @@ -2,11 +2,12 @@ import nfs import ISOSR import unittest -import xs_errors +import xs_errors import util import SR import errno -import testlib +import testlib +import pprint class FakeISOSR(ISOSR.ISOSR): @@ -19,6 +20,7 @@ def __init__(self, srcmd, none): self.dconf = srcmd.dconf self.srcmd = srcmd + class TestISOSR_overNFS(unittest.TestCase): def create_isosr(self, location='aServer:/aLocation', atype=None, @@ -117,7 +119,8 @@ class TestISOSR_overSMB(unittest.TestCase): def create_smbisosr(self, location='\\aServer\aLocation', atype=None, sr_uuid='asr_uuid', server='\\aServer', serverpath='/aServerpath', username='aUsername', - password='aPassword', vers=None, options=''): + password='aPassword', vers=None, options='', + dconf_update={}): srcmd = mock.Mock() srcmd.dconf = { 'location': location, @@ -135,6 +138,7 @@ def create_smbisosr(self, location='\\aServer\aLocation', atype=None, 'command': 'some_command', 'device_config': {} } + srcmd.dconf.update(dconf_update) isosr = FakeISOSR(srcmd, None) isosr.load(sr_uuid) return isosr @@ -156,6 +160,53 @@ def test_attach_with_smb_version_1(self, context, _checkmount, pread, smbsr = self.create_smbisosr(atype='cifs', vers='1.0') _checkmount.side_effect = [False, True] smbsr.attach(None) + pread.assert_called_with(['mount.cifs', '\\aServer\x07Location', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=none,guest,vers=1.0'], True, {}) + + @testlib.with_context + @mock.patch('util.makedirs') + @mock.patch('ISOSR.ISOSR._checkTargetStr') + @mock.patch('util.pread') + @mock.patch('ISOSR.ISOSR._checkmount') + def test_attach_with_smb_credentials(self, context, _checkmount, pread, + _checkTargetStr, makedirs): + """ + Positive case, over XC/XE CLI with version 1.0. + """ + context.setup_error_codes() + update = {'cifspassword': 'winter2019'} + smbsr = self.create_smbisosr(atype='cifs', vers='1.0', + dconf_update=update) + _checkmount.side_effect = [False, True] + smbsr.attach(None) + pread.assert_called_with(['mount.cifs', '\\aServer\x07Location', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=none,vers=1.0'], True, + {'PASSWD': 'winter2019', 'USER': 'aUsername'}) + + @testlib.with_context + @mock.patch('util.makedirs') + @mock.patch('ISOSR.ISOSR._checkTargetStr') + @mock.patch('util.pread') + @mock.patch('ISOSR.ISOSR._checkmount') + def test_attach_with_smb_credentials_domain(self, context, + _checkmount, pread, + _checkTargetStr, makedirs): + """ + Positive case, over XC/XE CLI with version 1.0. + """ + context.setup_error_codes() + update = {'cifspassword': 'winter2019'} + smbsr = self.create_smbisosr(atype='cifs', vers='1.0', + username='citrix\jsmith', + dconf_update=update) + _checkmount.side_effect = [False, True] + smbsr.attach(None) + pread.assert_called_with(['mount.cifs', '\\aServer\x07Location', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=none,vers=1.0,domain=citrix'], True, + {'PASSWD': 'winter2019', 'USER': 'jsmith'}) @testlib.with_context @mock.patch('util.makedirs') @@ -171,6 +222,9 @@ def test_attach_with_smb_version_3(self, context, _checkmount, pread, smbsr = self.create_smbisosr(atype='cifs', vers='3.0') _checkmount.side_effect = [False, True] smbsr.attach(None) + pread.assert_called_with(['mount.cifs', '\\aServer\x07Location', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=none,guest,vers=3.0'], True, {}) @testlib.with_context @mock.patch('util.makedirs') @@ -178,8 +232,10 @@ def test_attach_with_smb_version_3(self, context, _checkmount, pread, @mock.patch('util.pread') @mock.patch('ISOSR.ISOSR._checkmount') @mock.patch('ISOSR.ISOSR.updateSMBVersInPBDConfig') - def test_attach_with_smb_no_version(self, context, updateSMBVersInPBDConfig, - _checkmount, pread, _checkTargetStr, makedirs): + def test_attach_with_smb_no_version(self, context, + updateSMBVersInPBDConfig, + _checkmount, pread, + _checkTargetStr, makedirs): """ Positive case, over XC/XE CLI without version. """ @@ -187,13 +243,17 @@ def test_attach_with_smb_no_version(self, context, updateSMBVersInPBDConfig, smbsr = self.create_smbisosr(atype='cifs') _checkmount.side_effect = [False, True] smbsr.attach(None) + pread.assert_called_with(['mount.cifs', '\\aServer\x07Location', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=none,guest,vers=3.0'], True, {}) @testlib.with_context @mock.patch('util.gen_uuid') @mock.patch('util.makedirs') @mock.patch('ISOSR.ISOSR._checkTargetStr') @mock.patch('ISOSR.ISOSR._checkmount') - def test_attach_smb_via_xemount_version_1(self, context, _checkmount, + @mock.patch('util.pread') + def test_attach_smb_via_xemount_version_1(self, context, pread, _checkmount, _checkTargetStr, makedirs, gen_uuid): """ Positive case, over xe-sr-mount CLI with version 1.0. @@ -201,20 +261,24 @@ def test_attach_smb_via_xemount_version_1(self, context, _checkmount, context.setup_error_codes() smbsr = self.create_smbisosr(options='-o username=administrator,password=password,vers=1.0') smbsr.attach(None) + self.assertEqual(0, pread.call_count) @testlib.with_context @mock.patch('util.gen_uuid') @mock.patch('util.makedirs') @mock.patch('ISOSR.ISOSR._checkTargetStr') @mock.patch('ISOSR.ISOSR._checkmount') - def test_attach_smb_via_xemount_version_3(self, context, _checkmount, - _checkTargetStr, makedirs, gen_uuid): + @mock.patch('util.pread') + def test_attach_smb_via_xemount_version_3(self, context, pread, + _checkmount, _checkTargetStr, + makedirs, gen_uuid): """ Positive case, over xe-sr-mount CLI with version 3.0. """ context.setup_error_codes() smbsr = self.create_smbisosr(options='-o username=administrator,password=password,vers=3.0') smbsr.attach(None) + self.assertEqual(0, pread.call_count) @testlib.with_context @mock.patch('util.gen_uuid') @@ -222,7 +286,8 @@ def test_attach_smb_via_xemount_version_3(self, context, _checkmount, @mock.patch('ISOSR.ISOSR._checkTargetStr') @mock.patch('ISOSR.ISOSR._checkmount') @mock.patch('ISOSR.ISOSR.updateSMBVersInPBDConfig') - def test_attach_smb_via_xemount_no_version(self, context, + @mock.patch('util.pread') + def test_attach_smb_via_xemount_no_version(self, context, pread, updateSMBVersInPBDConfig, _checkmount, _checkTargetStr, makedirs, @@ -233,6 +298,7 @@ def test_attach_smb_via_xemount_no_version(self, context, context.setup_error_codes() smbsr = self.create_smbisosr(options='-o username=administrator,password=password') smbsr.attach(None) + self.assertEqual(0, pread.call_count) @testlib.with_context @mock.patch('util.gen_uuid') @@ -292,6 +358,9 @@ def test_attach_smb_version_fallback_with_smb_3_disabled(self, context, pread.side_effect = [util.CommandException(errno.EHOSTDOWN), " "] _checkmount.side_effect = [False, True] smbsr.attach(None) + pread.assert_called_with(['mount.cifs', '\\aServer\x07Location', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=none,guest,vers=1.0'], True, {}) @testlib.with_context @mock.patch('util.gen_uuid') @@ -337,7 +406,6 @@ def test_attach_smb_via_xemount_no_version_fallback(self, context, context.setup_error_codes() smbsr = self.create_smbisosr(options='-o username=administrator,password=password') pread.side_effect = [util.CommandException(errno.EHOSTDOWN), " "] - smbsr.attach(None) @testlib.with_context @mock.patch('util.makedirs') @@ -352,8 +420,8 @@ def test_attach_smb_version_fallback_error(self, context, _checkmount, """ context.setup_error_codes() smbsr = self.create_smbisosr(atype='cifs') - pread.side_effect = [util.CommandException(errno.EHOSTDOWN), \ + pread.side_effect = [util.CommandException(errno.EHOSTDOWN), util.CommandException(errno.EHOSTDOWN)] _checkmount.side_effect = [False, True] - with self.assertRaises(util.CommandException): + with self.assertRaises(Exception): smbsr.attach(None) diff --git a/tests/test_SMBSR.py b/tests/test_SMBSR.py index 2b4a84905..d7bdcf39c 100644 --- a/tests/test_SMBSR.py +++ b/tests/test_SMBSR.py @@ -30,7 +30,7 @@ def __init__(self, srcmd, none): class Test_SMBSR(unittest.TestCase): - def create_smbsr(self, sr_uuid='asr_uuid', server='\\aServer', serverpath = '/aServerpath', username = 'aUsername', password = 'aPassword'): + def create_smbsr(self, sr_uuid='asr_uuid', server='\\aServer', serverpath = '/aServerpath', username = 'aUsername', password = 'aPassword', dconf_update={}): srcmd = mock.Mock() srcmd.dconf = { 'server': server, @@ -42,6 +42,7 @@ def create_smbsr(self, sr_uuid='asr_uuid', server='\\aServer', serverpath = '/aS 'command': 'some_command', 'device_config': {} } + srcmd.dconf.update(dconf_update) smbsr = FakeSMBSR(srcmd, None) smbsr.load(sr_uuid) return smbsr @@ -70,6 +71,50 @@ def test_attach_if_mounted_then_attached(self, mock_lock, mock_checkmount): smbsr.attach('asr_uuid') self.assertTrue(smbsr.attached) + @mock.patch('SMBSR.SMBSR.checkmount', autospec=True) + @mock.patch('SMBSR.SMBSR.makeMountPoint', autospec=True) + @mock.patch('SMBSR.Lock', autospecd=True) + @mock.patch('util.pread') + @mock.patch('os.symlink', autospec=True) + @mock.patch('util.listdir', autospec=True) + def test_attach_vanilla(self, listdir, symlink, pread, mock_lock, makeMountPoint, mock_checkmount): + mock_checkmount.return_value=False + smbsr = self.create_smbsr() + makeMountPoint.return_value = "/var/mount" + smbsr.attach('asr_uuid') + self.assertTrue(smbsr.attached) + pread.assert_called_with(['mount.cifs', '\\aServer', "/var/mount", '-o', 'cache=loose,vers=3.0,actimeo=0'], + {'PASSWD': 'aPassword', 'USER': 'aUsername'}) + + @mock.patch('SMBSR.SMBSR.checkmount', autospec=True) + @mock.patch('SMBSR.SMBSR.makeMountPoint', autospec=True) + @mock.patch('SMBSR.Lock', autospecd=True) + @mock.patch('util.pread') + @mock.patch('os.symlink', autospec=True) + @mock.patch('util.listdir', autospec=True) + def test_attach_with_cifs_password(self, listdir, symlink, pread, mock_lock, makeMountPoint, mock_checkmount): + smbsr = self.create_smbsr(dconf_update={"password":"winter2019"}) + mock_checkmount.return_value=False + makeMountPoint.return_value = "/var/mount" + smbsr.attach('asr_uuid') + self.assertTrue(smbsr.attached) + pread.assert_called_with(['mount.cifs', '\\aServer', "/var/mount", '-o', 'cache=loose,vers=3.0,actimeo=0'], {'PASSWD': 'winter2019', 'USER': 'aUsername'}) + + @mock.patch('SMBSR.SMBSR.checkmount', autospec=True) + @mock.patch('SMBSR.SMBSR.makeMountPoint', autospec=True) + @mock.patch('SMBSR.Lock', autospecd=True) + @mock.patch('util.pread') + @mock.patch('os.symlink', autospec=True) + @mock.patch('util.listdir', autospec=True) + def test_attach_with_cifs_password_and_domain(self, listdir, symlink, pread, mock_lock, makeMountPoint, mock_checkmount): + smbsr = self.create_smbsr(username="citrix\jsmith", dconf_update={"password":"winter2019"}) + mock_checkmount.return_value=False + makeMountPoint.return_value = "/var/mount" + smbsr.attach('asr_uuid') + self.assertTrue(smbsr.attached) + # We mocked listdir as this calls pread and assert_called_with only records the last call. + pread.assert_called_with(['mount.cifs', '\\aServer', "/var/mount", '-o', 'cache=loose,vers=3.0,actimeo=0,domain=citrix'], {'PASSWD': 'winter2019', 'USER': 'jsmith'}) + #Detach @testlib.with_context @mock.patch('SMBSR.SMBSR.checkmount',return_value=True, autospec=True) diff --git a/tests/test_cifutils.py b/tests/test_cifutils.py new file mode 100644 index 000000000..fab4668e7 --- /dev/null +++ b/tests/test_cifutils.py @@ -0,0 +1,225 @@ +import unittest +import testlib +import mock + +import os +import cifutils + + +class TestCreate(unittest.TestCase): + + def test_empty_credentials_and_domain_for_bad_dconf(self): + junk_dconf = {"junk1": "123"} + junk_session = 123 + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session) + self.assertEquals(credentials, {}) + self.assertEquals(domain, None) + + def test_empty_credentials_and_username(self): + junk_dconf = {"junk1": "123"} + junk_session = 123 + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session, + 'jsmith') + self.assertEquals(credentials, {}) + self.assertEquals(domain, None) + + def test_password_and_username(self): + junk_dconf = {"cifspassword": "123", "username": "jsmith"} + junk_session = 123 + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session) + expected_credentials = {"USER": "jsmith", "PASSWD": "123"} + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, None) + + def test_password_and_username_smbsr(self): + junk_dconf = {"password": "123", "username": "jsmith"} + junk_session = 123 + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session, + smbsr=True) + expected_credentials = {"USER": "jsmith", "PASSWD": "123"} + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, None) + + def test_password_and_username_domain(self): + junk_dconf = {"cifspassword": "123", "username": "citrix\jsmith"} + junk_session = 123 + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session) + expected_credentials = {"USER": "jsmith", "PASSWD": "123"} + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, "citrix") + + def test_password_and_username_domain_smbsr(self): + junk_dconf = {"password": "123", "username": "citrix\jsmith"} + junk_session = 123 + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session, + smbsr=True) + expected_credentials = {"USER": "jsmith", "PASSWD": "123"} + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, "citrix") + + @mock.patch('util.get_secret', autospec=True) + def test_password_secret_and_username(self, get_secret): + junk_dconf = {"cifspassword_secret": "123", "username": "jsmith"} + junk_session = 123 + get_secret.return_value = 'winter2019' + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session) + expected_credentials = {"USER": "jsmith", "PASSWD": "winter2019"} + get_secret.assert_called_with(junk_session, "123") + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, None) + + @mock.patch('util.get_secret', autospec=True) + def test_password_secret_and_username_smbsr(self, get_secret): + junk_dconf = {"password_secret": "123", "username": "jsmith"} + junk_session = 123 + get_secret.return_value = 'winter2019' + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session, + smbsr=True) + expected_credentials = {"USER": "jsmith", "PASSWD": "winter2019"} + get_secret.assert_called_with(junk_session, "123") + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, None) + + @mock.patch('util.get_secret', autospec=True) + def test_password_secret_and_username_also_domain(self, get_secret): + junk_dconf = {"cifspassword_secret": "123", + "username": "citrix\jsmith"} + junk_session = 123 + get_secret.return_value = 'winter2019' + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session) + expected_credentials = {"USER": "jsmith", "PASSWD": "winter2019"} + get_secret.assert_called_with(junk_session, "123") + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, "citrix") + + @mock.patch('util.get_secret', autospec=True) + def test_password_secret_and_username_also_domain_smbsr(self, get_secret): + junk_dconf = {"password_secret": "123", + "username": "citrix\jsmith"} + junk_session = 123 + get_secret.return_value = 'winter2019' + credentials, domain = cifutils.getCIFCredentials(junk_dconf, + junk_session, + smbsr=True) + expected_credentials = {"USER": "jsmith", "PASSWD": "winter2019"} + get_secret.assert_called_with(junk_session, "123") + self.assertEquals(credentials, expected_credentials) + self.assertEquals(domain, "citrix") + + def test_username_bad_domain(self): + junk_dconf = {"cifspassword_secret": "123", + "username": "citrix\gjk\jsmith"} + junk_session = 123 + with self.assertRaises(cifutils.CIFSException) as cm: + cifutils.getCIFCredentials(junk_dconf, junk_session) + expected_message = ("A maximum of 2 tokens are expected " + "(\). 3 were given.") + the_exception = cm.exception + self.assertEqual(the_exception.errstr, expected_message) + + def test_username_bad_domain_smbsr(self): + junk_dconf = {"password_secret": "123", + "username": "citrix\gjk\jsmith"} + junk_session = 123 + with self.assertRaises(cifutils.CIFSException) as cm: + cifutils.getCIFCredentials(junk_dconf, junk_session, smbsr=True) + expected_message = ("A maximum of 2 tokens are expected " + "(\). 3 were given.") + the_exception = cm.exception + self.assertEqual(the_exception.errstr, expected_message) + + def test_got_credentials_empty_dconf(self): + junk_dconf = {} + self.assertEquals(cifutils.containsCredentials(junk_dconf), False) + + def test_got_credentials_empty_dconf_smbsr(self): + junk_dconf = {} + self.assertEquals(cifutils.containsCredentials(junk_dconf, smbsr=True), False) + + def test_got_credentials_username_only(self): + junk_dconf = {'username': 'jsmith'} + self.assertEquals(cifutils.containsCredentials(junk_dconf), False) + + def test_got_credentials_username_only_smbsr(self): + junk_dconf = {'username': 'jsmith'} + self.assertEquals(cifutils.containsCredentials(junk_dconf, smbsr=True), False) + + def test_got_credentials_password_only(self): + junk_dconf = {'cifspassword': 'password123'} + self.assertEquals(cifutils.containsCredentials(junk_dconf), False) + + def test_got_credentials_password_only_smbsr(self): + junk_dconf = {'password': 'password123'} + self.assertEquals(cifutils.containsCredentials(junk_dconf, smbsr=True), False) + + def test_got_credentials_secret_only(self): + junk_dconf = {'cifspassword_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(junk_dconf), False) + + def test_got_credentials_secret_only_smbsr(self): + junk_dconf = {'password_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(junk_dconf, smbsr=True), False) + + def test_got_credentials_password_and_secret(self): + junk_dconf = {'cifspassword': 'password123', + 'cifspassword_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(junk_dconf), False) + + def test_got_credentials_password_and_secret_smbsr(self): + junk_dconf = {'password': 'password123', + 'password_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(junk_dconf, smbsr=True), False) + + def test_got_credentials_user_and_password(self): + good_dconf = {'username': 'jsmith', 'cifspassword': 'password123'} + self.assertEquals(cifutils.containsCredentials(good_dconf), True) + + def test_got_credentials_user_and_password_smbsr(self): + good_dconf = {'username': 'jsmith', 'password': 'password123'} + self.assertEquals(cifutils.containsCredentials(good_dconf, smbsr=True), True) + + def test_got_credentials_user_and_secret(self): + good_dconf = {'username': 'jsmith', 'cifspassword_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(good_dconf), True) + + def test_got_credentials_user_and_secret_smbsr(self): + good_dconf = {'username': 'jsmith', 'password_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(good_dconf, smbsr=True), True) + + def test_got_credentials_everything(self): + good_dconf = {'username': 'jsmith', 'cifspassword': 'password123', + 'cifspassword_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(good_dconf), True) + + def test_got_credentials_everything_but_smbsr(self): + good_dconf = {'username': 'jsmith', 'cifspassword': 'password123', + 'cifspassword_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(good_dconf, smbsr=True), False) + + def test_got_credentials_everything_smbsr(self): + good_dconf = {'username': 'jsmith', 'password': 'password123', + 'password_secret': 'secret123'} + self.assertEquals(cifutils.containsCredentials(good_dconf, smbsr=True), True) + + def test_got_credentials_everything_and_padding(self): + good_dconf = {'sahara': 'camel', 'username': 'jsmith', + 'cifspassword': 'password123', + 'cifspassword_secret': 'secret123', + 'nile': 'crocodile'} + self.assertEquals(cifutils.containsCredentials(good_dconf), True) + + def test_got_credentials_everything_and_padding_smbsr(self): + good_dconf = {'sahara': 'camel', 'username': 'jsmith', + 'password': 'password123', + 'password_secret': 'secret123', + 'nile': 'crocodile'} + self.assertEquals(cifutils.containsCredentials(good_dconf, smbsr=True), True) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 000000000..dd03ad4b5 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,37 @@ +import unittest +import testlib +import mock + +import os +import util +import subprocess + + +class fake_proc: + + def __init__(self): + self.returncode = 0 + + def communicate(self, inputtext): + return "hello", "hello" + + +class TestCreate(unittest.TestCase): + + @mock.patch('subprocess.Popen', autospec=True) + def test_env_concatenated(self, popen): + new_env = {"NewVar1": "yadayada", "NewVar2": "blah"} + popen.return_value = fake_proc() + with mock.patch.dict('os.environ', {'hello': 'world'}, clear=True): + self.assertEqual(os.environ.get('hello'), 'world') + util.pread(['mount.cifs', '\\aServer', + '/var/run/sr-mount/asr_uuid', + '-o', 'cache=loose,vers=3.0,actimeo=0,domain=citrix'], + new_env=new_env) + expected_cmd = ['mount.cifs', '\\aServer', + '/var/run/sr-mount/asr_uuid', '-o', + 'cache=loose,vers=3.0,actimeo=0,domain=citrix'] + popen.assert_called_with(expected_cmd, + close_fds=True, stdin=-1, stderr=-1, + env={'hello': 'world', 'NewVar2': 'blah', + 'NewVar1': 'yadayada'}, stdout=-1) diff --git a/tests/testlib.py b/tests/testlib.py index 393e8b841..9c8cdb8b7 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -173,7 +173,7 @@ def fake_modinfo(self, args, stdin_data): assert args[1] == '-d' return (0, args[2] + '-description', '') - def fake_popen(self, args, stdin, stdout, stderr, close_fds): + def fake_popen(self, args, stdin, stdout, stderr, close_fds, env={}): import subprocess assert stdin == subprocess.PIPE assert stdout == subprocess.PIPE