Skip to content
Permalink
Browse files Browse the repository at this point in the history
secure_mount: add umount function
The `mount` function is called several times in different places in the
code, but it takes care of not mounting the secure directory if is
already present.  The problem is that this is never umounted.

This patch add the `umount` function, and update the `mount` one to
track when a mount point needs to be umounted.

Also adjust the keylime_agent exit point to make sure that the `umount`
function is called at the end.

Signed-off-by: Alberto Planas <aplanas@suse.com>
  • Loading branch information
aplanas authored and mpeters committed Jan 27, 2022
1 parent 1a4f31a commit d37c406
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 16 deletions.
6 changes: 4 additions & 2 deletions keylime/keylime_agent.py
Expand Up @@ -746,9 +746,11 @@ def shutdown_handler(*_):
server.shutdown()
server.server_close()
serverthread.join()
logger.debug("...HTTP server stopped")
logger.debug("HTTP server stopped...")
revocation_process.join()
logger.debug("... revocation notifier stopped")
logger.debug("Revocation notifier stopped...")
secure_mount.umount()
logger.debug("Umounting directories...")
instance_tpm.flush_keys()
logger.debug("Flushed keys successfully")
sys.exit(0)
Expand Down
28 changes: 25 additions & 3 deletions keylime/secure_mount.py
Expand Up @@ -11,11 +11,15 @@

logger = keylime_logging.init_logging('secure_mount')

# Store the mounted directories done by Keylime, so we can unmount
# them in reverse order
_MOUNTED = []


def check_mounted(secdir):
"""Inspect mountinfo to detect if a directory is mounted."""
secdir_escaped = secdir.replace(" ", r"\040")
for line in open("/proc/self/mountinfo", "r"):
for line in open("/proc/self/mountinfo", "r", encoding="utf-8"):
# /proc/[pid]/mountinfo have 10+ elements separated with
# spaces (check proc (5) for a complete description)
#
Expand All @@ -29,6 +33,7 @@ def check_mounted(secdir):
msg = "Separator filed not found. " \
"Information line cannot be parsed"
logger.error(msg)
# pylint: disable=raise-missing-from
raise Exception(msg)

if len(elements) < 10 or len(elements) - separator < 4:
Expand Down Expand Up @@ -56,10 +61,10 @@ def check_mounted(secdir):


def mount():
secdir = config.WORK_DIR + "/secure"
secdir = os.path.join(config.WORK_DIR, "secure")

if not config.MOUNT_SECURE:
secdir = config.WORK_DIR + "/tmpfs-dev"
secdir = os.path.join(config.WORK_DIR, "tmpfs-dev")
if not os.path.isdir(secdir):
os.makedirs(secdir)
return secdir
Expand All @@ -74,5 +79,22 @@ def mount():
cmd = ('mount', '-t', 'tmpfs', '-o', 'size=%s,mode=0700' % size,
'tmpfs', secdir)
cmd_exec.run(cmd)
_MOUNTED.append(secdir)

return secdir


def umount():
"""Umount all the devices mounted by Keylime."""
while _MOUNTED:
directory = _MOUNTED.pop()
logger.info("Unmounting %s", directory)
if check_mounted(directory):
cmd = ("umount", directory)
ret = cmd_exec.run(cmd, raiseOnError=False)
if ret["code"] != 0:
logger.error("%s cannot be umounted. "
"A running process can be keeping it bussy: %s",
directory, str(ret["reterr"]))
else:
logger.warning("%s already unmounted by another process", directory)
149 changes: 138 additions & 11 deletions test/test_secure_mount.py
@@ -1,33 +1,38 @@
import tempfile
import unittest
from unittest.mock import patch

from keylime import secure_mount


class TestSecureMount(unittest.TestCase):
def setUp(self):
"""Remove global state from secure_mount module."""
# pylint: disable=protected-access
secure_mount._MOUNTED = []

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_not_found(self, open_mock, logger_mock):
def test_check_mounted_not_found(self, open_mock, _logger_mock):
"""Test when secdir is not mounted."""
open_mock.return_value = (
"23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw",
)
)
self.assertFalse(secure_mount.check_mounted("/secdir"))

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_found(self, open_mock, logger_mock):
def test_check_mounted_found(self, open_mock, _logger_mock):
"""Test when secdir is mounted."""
open_mock.return_value = (
"23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw",
"303 154 0:69 / /secdir rw,relatime shared:130 - tmpfs tmpfs rw,size=1024k,mode=700,inode64",
)
)
self.assertTrue(secure_mount.check_mounted("/secdir"))

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_found_zero_optional_fields(self, open_mock, logger_mock):
def test_check_mounted_found_zero_optional_fields(self, open_mock, _logger_mock):
"""Test when secdir is mounted when there are no optional fields."""
open_mock.return_value = (
"23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw",
Expand All @@ -37,7 +42,7 @@ def test_check_mounted_found_zero_optional_fields(self, open_mock, logger_mock):

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_found_extra_optional_fields(self, open_mock, logger_mock):
def test_check_mounted_found_extra_optional_fields(self, open_mock, _logger_mock):
"""Test when secdir is mounted when there are extra optional fields."""
open_mock.return_value = (
"23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw",
Expand All @@ -47,35 +52,157 @@ def test_check_mounted_found_extra_optional_fields(self, open_mock, logger_mock)

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_found_wrong_fs(self, open_mock, logger_mock):
def test_check_mounted_found_wrong_fs(self, open_mock, _logger_mock):
"""Test when secdir is mounted but under a wrong fs."""
open_mock.return_value = (
"23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw",
"303 154 0:69 / /secdir rw,relatime shared:130 - btrfs /dev/sda2 rw",
)
)
with self.assertRaises(Exception) as e:
secure_mount.check_mounted("/secdir")
self.assertTrue("wrong file system" in str(e.exception))

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_found_spaces(self, open_mock, logger_mock):
def test_check_mounted_found_spaces(self, open_mock, _logger_mock):
"""Test when secdir is mounted and contains spaces."""
open_mock.return_value = (
"23 106 0:21 / /proc rw,nosuid,nodev,noexec,relatime shared:26 - proc proc rw",
r"303 154 0:69 / /sec\040dir rw,relatime shared:130 - tmpfs tmpfs rw,size=1024k,mode=700,inode64",
)
)
self.assertTrue(secure_mount.check_mounted("/sec dir"))

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.open")
def test_check_mounted_wrong_format(self, open_mock, logger_mock):
def test_check_mounted_wrong_format(self, open_mock, _logger_mock):
"""Test when the mount info lines are wrong."""
open_mock.return_value = ("invalid line",)
with self.assertRaises(Exception) as e:
secure_mount.check_mounted("/secdir")
self.assertTrue("cannot be parsed" in str(e.exception))

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.config")
def test_check_mount_no_secure(self, config_mock, _logger_mock):
"""Test when mounted outside tmpfs."""
config_mock.MOUNT_SECURE = False

with tempfile.TemporaryDirectory() as tmpdirname:
config_mock.WORK_DIR = tmpdirname
self.assertEqual(secure_mount.mount(), f"{tmpdirname}/tmpfs-dev")
# pylint: disable=protected-access
self.assertEqual(secure_mount._MOUNTED, [])

@patch("keylime.secure_mount.check_mounted")
@patch("keylime.secure_mount.config")
def test_check_mount_secure_already_mounted(self, config_mock, check_mounted_mock):
"""Test when mounting in tmpfs but is already present."""
config_mock.MOUNT_SECURE = True
check_mounted_mock.return_value = True

with tempfile.TemporaryDirectory() as tmpdirname:
config_mock.WORK_DIR = tmpdirname
self.assertEqual(secure_mount.mount(), f"{tmpdirname}/secure")
# pylint: disable=protected-access
self.assertEqual(secure_mount._MOUNTED, [])

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.check_mounted")
@patch("keylime.secure_mount.config")
@patch("keylime.secure_mount.os.path.exists")
@patch("keylime.secure_mount.cmd_exec")
def test_check_mount_secure_already_created(
self, _cmd_exec_mock, exists_mock, config_mock, check_mounted_mock, _logger_mock
):
"""Test when mounting in tmpfs but the mount point is present."""
exists_mock.return_value = True
config_mock.MOUNT_SECURE = True
check_mounted_mock.return_value = False

with tempfile.TemporaryDirectory() as tmpdirname:
config_mock.WORK_DIR = tmpdirname
self.assertEqual(secure_mount.mount(), f"{tmpdirname}/secure")
# pylint: disable=protected-access
self.assertEqual(secure_mount._MOUNTED, [f"{tmpdirname}/secure"])

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.check_mounted")
@patch("keylime.secure_mount.config")
@patch("keylime.secure_mount.os.path.exists")
@patch("keylime.secure_mount.os.makedirs")
@patch("keylime.secure_mount.cmd_exec")
def test_check_mount(
self,
_cmd_exec_mock,
_makedirs_mock,
exists_mock,
config_mock,
check_mounted_mock,
_logger_mock,
):
"""Test when mounting in tmpfs but the mount point is not present."""
exists_mock.return_value = False
config_mock.MOUNT_SECURE = True
check_mounted_mock.return_value = False

with tempfile.TemporaryDirectory() as tmpdirname:
config_mock.WORK_DIR = tmpdirname
self.assertEqual(secure_mount.mount(), f"{tmpdirname}/secure")
# pylint: disable=protected-access
self.assertEqual(secure_mount._MOUNTED, [f"{tmpdirname}/secure"])

def test_check_umount_empty(self):
"""Test umount when there are nothing to clean."""
secure_mount.umount()
# pylint: disable=protected-access
self.assertEqual(secure_mount._MOUNTED, [])

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.check_mounted")
@patch("keylime.secure_mount.cmd_exec")
def test_check_umount_mount(self, cmd_exec_mock, check_mounted_mock, logger_mock):
"""Test umount for a single mount."""
# pylint: disable=protected-access
secure_mount._MOUNTED = ["/secdir"]
check_mounted_mock.return_value = True
cmd_exec_mock.run.return_value = {"code": 0}
secure_mount.umount()
self.assertEqual(secure_mount._MOUNTED, [])
cmd_exec_mock.run.assert_called_once()
logger_mock.error.assert_not_called()
logger_mock.warning.assert_not_called()

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.check_mounted")
@patch("keylime.secure_mount.cmd_exec")
def test_check_umount_mount_busy(
self, cmd_exec_mock, check_mounted_mock, logger_mock
):
"""Test umount for a single mount that fail."""
# pylint: disable=protected-access
secure_mount._MOUNTED = ["/secdir"]
check_mounted_mock.return_value = True
cmd_exec_mock.run.return_value = {"code": 1, "reterr": "Device busy"}
secure_mount.umount()
self.assertEqual(secure_mount._MOUNTED, [])
cmd_exec_mock.run.assert_called_once()
logger_mock.error.assert_called_once()
logger_mock.warning.assert_not_called()

@patch("keylime.secure_mount.logger")
@patch("keylime.secure_mount.check_mounted")
@patch("keylime.secure_mount.cmd_exec")
def test_check_umount_umount(self, cmd_exec_mock, check_mounted_mock, logger_mock):
"""Test umount for a single umount and no creation."""
# pylint: disable=protected-access
secure_mount._MOUNTED = ["/secdir"]
check_mounted_mock.return_value = False
secure_mount.umount()
self.assertEqual(secure_mount._MOUNTED, [])
cmd_exec_mock.run.assert_not_called()
logger_mock.error.assert_not_called()
logger_mock.warning.assert_called_once()


if __name__ == "__main__":
unittest.main()

0 comments on commit d37c406

Please sign in to comment.