From 1a4f31a6368d651222683c9debe7d6832db6f607 Mon Sep 17 00:00:00 2001 From: Alberto Planas Date: Fri, 14 Jan 2022 14:27:26 +0100 Subject: [PATCH] secure_mount: use /proc/self/mountinfo The function `check_mounted()` is parsing the output of the `mount` command. This command can have an unknown number of spaces, that can break the parsing logic (and produces security issues) This commit take the mount information data from `/proc/self/mountinfo`, that is more friendly parsed as it will escape the spaces. Signed-off-by: Alberto Planas --- keylime/secure_mount.py | 56 +++++++++++++++++---------- test/test_secure_mount.py | 81 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 20 deletions(-) create mode 100644 test/test_secure_mount.py diff --git a/keylime/secure_mount.py b/keylime/secure_mount.py index 8bf1a31df..b9ebf5a21 100644 --- a/keylime/secure_mount.py +++ b/keylime/secure_mount.py @@ -13,29 +13,45 @@ def check_mounted(secdir): - whatsmounted = cmd_exec.run("mount")['retout'] - whatsmounted_converted = config.convert(whatsmounted) - for line in whatsmounted_converted: - tokens = line.split() - tmpfs = False - if len(tokens) < 3: - continue - if tokens[0] == 'tmpfs': - tmpfs = True - if tokens[2] == secdir: - if not tmpfs: - logger.error("secure storage location %s already mounted " - "on wrong file system type: %s. Unmount to" - "continue.", secdir, tokens[0]) - raise Exception( - f"secure storage location {secdir} already mounted on " - f"wrong file system type: {tokens[0]}. Unmount to " - f"continue.") + """Inspect mountinfo to detect if a directory is mounted.""" + secdir_escaped = secdir.replace(" ", r"\040") + for line in open("/proc/self/mountinfo", "r"): + # /proc/[pid]/mountinfo have 10+ elements separated with + # spaces (check proc (5) for a complete description) + # + # At position 7 there are some optional fields, so we need + # first to determine the separator mark, and validate the + # final total number of fields. + elements = line.split() + try: + separator = elements.index("-") + except ValueError: + msg = "Separator filed not found. " \ + "Information line cannot be parsed" + logger.error(msg) + raise Exception(msg) + + if len(elements) < 10 or len(elements) - separator < 4: + msg = "Mount information line cannot be parsed" + logger.error(msg) + raise Exception(msg) + + mount_point = elements[4] + filesystem_type = elements[separator + 1] + if mount_point == secdir_escaped: + if filesystem_type != "tmpfs": + msg = f"Secure storage location {secdir} already mounted " \ + f"on wrong file system type: {filesystem_type}. " \ + "Unmount to continue." + logger.error(msg) + raise Exception(msg) logger.debug( - "secure storage location %s already mounted on tmpfs" % secdir) + "Secure storage location %s already mounted on tmpfs", secdir + ) return True - logger.debug("secure storage location %s not mounted " % secdir) + + logger.debug("Secure storage location %s not mounted", secdir) return False diff --git a/test/test_secure_mount.py b/test/test_secure_mount.py new file mode 100644 index 000000000..f3b9e70c8 --- /dev/null +++ b/test/test_secure_mount.py @@ -0,0 +1,81 @@ +import unittest +from unittest.mock import patch + +from keylime import secure_mount + + +class TestSecureMount(unittest.TestCase): + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_not_found(self, open_mock, logger_mock): + """Test when secdir is not mounted.""" + open_mock.return_value = ( + "23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw", + ) + self.assertFalse(secure_mount.check_mounted("/secdir")) + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_found(self, open_mock, logger_mock): + """Test when secdir is mounted.""" + open_mock.return_value = ( + "23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw", + "303 154 0:69 / /secdir rw,relatime shared:130 - tmpfs tmpfs rw,size=1024k,mode=700,inode64", + ) + self.assertTrue(secure_mount.check_mounted("/secdir")) + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_found_zero_optional_fields(self, open_mock, logger_mock): + """Test when secdir is mounted when there are no optional fields.""" + open_mock.return_value = ( + "23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw", + "303 154 0:69 / /secdir rw,relatime - tmpfs tmpfs rw,size=1024k,mode=700,inode64", + ) + self.assertTrue(secure_mount.check_mounted("/secdir")) + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_found_extra_optional_fields(self, open_mock, logger_mock): + """Test when secdir is mounted when there are extra optional fields.""" + open_mock.return_value = ( + "23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw", + "303 154 0:69 / /secdir rw,relatime shared:130 extra:1 - tmpfs tmpfs rw,size=1024k,mode=700,inode64", + ) + self.assertTrue(secure_mount.check_mounted("/secdir")) + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_found_wrong_fs(self, open_mock, logger_mock): + """Test when secdir is mounted but under a wrong fs.""" + open_mock.return_value = ( + "23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw", + "303 154 0:69 / /secdir rw,relatime shared:130 - btrfs /dev/sda2 rw", + ) + with self.assertRaises(Exception) as e: + secure_mount.check_mounted("/secdir") + self.assertTrue("wrong file system" in str(e.exception)) + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_found_spaces(self, open_mock, logger_mock): + """Test when secdir is mounted and contains spaces.""" + open_mock.return_value = ( + "23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw", + r"303 154 0:69 / /sec\040dir rw,relatime shared:130 - tmpfs tmpfs rw,size=1024k,mode=700,inode64", + ) + self.assertTrue(secure_mount.check_mounted("/sec dir")) + + @patch("keylime.secure_mount.logger") + @patch("keylime.secure_mount.open") + def test_check_mounted_wrong_format(self, open_mock, logger_mock): + """Test when the mount info lines are wrong.""" + open_mock.return_value = ("invalid line",) + with self.assertRaises(Exception) as e: + secure_mount.check_mounted("/secdir") + self.assertTrue("cannot be parsed" in str(e.exception)) + + +if __name__ == "__main__": + unittest.main()