Skip to content

Commit

Permalink
CP-32988 - Unit tests for cgroup functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
BenSimsCitrix committed Jul 13, 2020
1 parent 8bcb9fe commit b823bcb
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 20 deletions.
51 changes: 31 additions & 20 deletions drivers/blktap2.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,45 +66,56 @@
CGROUP_PATH = "/sys/fs/cgroup/"


def cgroup_set_param(param_path, param):
if util.pathexists(param_path):
with open(param_path, 'w') as f:
f.write(str(param))
else:
message = "No cgroup param found {}"
raise util.SMException(message.format(param_path))

def max_percentage_ram(percentage):
param_path = os.path.join(CGROUP_PATH, "memory",
"tapdisk",
"memory.limit_in_bytes")
if util.pathexists(param_path):
with open(param_path, 'w') as f:
total_mem = util.pread(["grep",
"MemTotal",
"/proc/meminfo"]).split()[1]
mem_we_want = (percentage/100)*float(total_mem)
f.write("{}k".format(str(int(mem_we_want))))
array_total_mem = util.pread(["grep",
"MemTotal",
"/proc/meminfo"]).split()[1]
total_mem = array_total_mem[1]
units = array_total_mem[2]

if units != "kB":
message = "Format of /proc/meminfo has changed"
raise util.SMException(message)

mem_we_want = (percentage/100)*float(total_mem)
param = "{}k".format(str(int(mem_we_want)))
cgroup_set_param(param_path, param)


cgroup_special_case = {"MaxPercentageRam": max_percentage_ram}


def cgroup_params(cgroup, path, data):
for key in data:
param_path = os.path.join(path, key)
if key in cgroup_special_case:
cgroup_special_case[key](data[key])
else:
if util.pathexists(param_path):
with open(param_path, 'w') as f:
f.write(data[key])
else:
message = "No cgroup param found {}"
raise util.SMException(message.format(param_path))
param_path = os.path.join(path, key)
cgroup_set_param(param_path, data[key])

def cgroup_create(data):
for key in data:
path = os.path.join(CGROUP_PATH, key, "tapdisk")
if not util.pathexists(path):
os.mkdir(path)
cgroup_params(key, path, data[key])

def cgroup_config():
if os.path.isfile(CONF_FILE):
if util.pathexists(CONF_FILE):
with open(CONF_FILE) as json_file:
data = json.load(json_file)
for key in data:
path = os.path.join(CGROUP_PATH, key, "tapdisk")
if not util.pathexists(path):
os.mkdir(path)
cgroup_params(key, path, data[key])
cgroup_create(data)
_cgroups = list(data.keys())
return _cgroups
return None
Expand Down
161 changes: 161 additions & 0 deletions tests/test_blktap2.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,164 @@ def test_major(self):
results = blktap2.TapCtl.major()

self.assertEqual(254, results)


class Cgroups_move(unittest.TestCase):

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
def test_nothing_happens_when_cgroup_nonexistant(self, mock_open,
mock_exists):
mock_exists.return_value = False
pid = 123
_cgroups = ["blkio", "memory"]
blktap2.cgroup_move(pid, _cgroups)
self.assertEqual(mock_open.call_count, 0)

cgroup_procs = "tapdisk/cgroup.procs"

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
def test_opens_memory_cgroup(self, mock_open, mock_exists):
mock_exists.side_effect = iter([False, True])
pid = 123
_cgroups = ["blkio", "memory"]
path = os.path.join(blktap2.CGROUP_PATH, _cgroups[1],
self.cgroup_procs)
blktap2.cgroup_move(pid, _cgroups)
mock_open.assert_called_with(path, 'w')

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
def test_opens_blkio_cgroup(self, mock_open, mock_exists):
mock_exists.side_effect = iter([True, False])
pid = 123
_cgroups = ["blkio", "memory"]
path = os.path.join(blktap2.CGROUP_PATH, _cgroups[0],
self.cgroup_procs)
blktap2.cgroup_move(pid, _cgroups)
mock_open.assert_called_with(path, 'w')

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
def test_pid_moved_to_cgroup(self, mock_open, mock_exists):
mock_exists.side_effect = iter([True, True])
pid = 123
_cgroups = ["blkio", "memory"]
blktap2.cgroup_move(pid, _cgroups)
mock_open.return_value.__enter__().write.assert_called_with(str(pid))
self.assertEqual(mock_open.return_value.__enter__().write.call_count,
2)


class Cgroups_config(unittest.TestCase):

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
def test_config_file_does_not_exist(self, mock_open, mock_exists):
mock_exists.return_value = False
blktap2.cgroup_config()
self.assertEqual(mock_open.call_count, 0)
mock_exists.assert_called_with(blktap2.CONF_FILE)

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
@mock.patch("blktap2.json.load", autospec=True)
@mock.patch("blktap2.cgroup_create", autospec=True)
def test_config_file_parsed_correctly(self, mock_create, mock_json,
mock_open, mock_exists):
mock_exists.return_value = True
json_return = {"blkio": {"weight": 10, "time": 123}, "memory": {}}
mock_json.return_value = json_return
ret = blktap2.cgroup_config()

mock_exists.assert_called_with(blktap2.CONF_FILE)
self.assertEqual(mock_open.call_count, 1)
mock_create.assert_called_with(json_return)
self.assertEqual(ret, list(json_return.keys()))


class Cgroups_create(unittest.TestCase):

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch('blktap2.os.mkdir', autospec=True)
@mock.patch('blktap2.cgroup_params', autospec=True)
def test_blkio_already_exists(self, mock_params, mock_mkdir, mock_exists):
data = {"blkio": {"weight": 10, "time": 123}, "memory": {}}
mock_exists.side_effect = iter([True, False])
blktap2.cgroup_create(data)
expected_path = os.path.join(blktap2.CGROUP_PATH, "memory", "tapdisk")
mock_exists.assert_called_with(expected_path)
self.assertEqual(mock_exists.call_count, 2)
mock_mkdir.assert_called_with(expected_path)
self.assertEqual(mock_mkdir.call_count, 1)
mock_params.assert_called_with("memory", expected_path, {})


class Cgroups_params(unittest.TestCase):

@mock.patch('blktap2.max_percentage_ram', autospec=True)
def test_max_percentage_ram_called(self, mock_per):
_path = os.path.join(blktap2.CGROUP_PATH, "memory", "tapdisk")
with mock.patch('blktap2.cgroup_special_case', {"MaxPercentageRam": mock_per}):
blktap2.cgroup_params("memory", _path, {"MaxPercentageRam": 25.0})
mock_per.assert_called_with(25.0)

@mock.patch('blktap2.cgroup_set_param', autospec=True)
def test_max_percentage_ram_called(self, mock_set_param):
_path = os.path.join(blktap2.CGROUP_PATH, "blkio", "tapdisk")
blktap2.cgroup_params("blkio", _path, {"weight": 10})
expected_path = os.path.join(_path, "weight")
mock_set_param.assert_called_with(expected_path, 10)


class Cgroups_set_param(unittest.TestCase):

@mock.patch('blktap2.util.pathexists', autospec=True)
def test_path_does_no_exist(self, mock_exists):
mock_exists.return_value = False
_path = os.path.join(blktap2.CGROUP_PATH, "memory", "tapdisk")
with self.assertRaises(blktap2.util.SMException) as ex:
blktap2.cgroup_set_param(_path, 25.0)

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch("__builtin__.open", autospec=True)
def test_param_written_correctly(self, mock_open, mock_exists):
mock_exists.return_value = True
_path = os.path.join(blktap2.CGROUP_PATH, "memory", "tapdisk")
blktap2.cgroup_set_param(_path, 25.0)
mock_open.assert_called_with(_path, 'w')
mock_open.return_value.__enter__().write.assert_called_with(str(25.0))


class Cgroups_max_percentage_ram(unittest.TestCase):

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch('blktap2.util.pread', autospec=True)
@mock.patch('blktap2.cgroup_set_param', autospec=True)
def test_correct_percentage_set(self, mock_set_param, mock_pread, mock_exists):
cmd_result = "MemTotal: 4007880 kB"
mock_pread.return_value = cmd_result
expected_path = os.path.join(blktap2.CGROUP_PATH,
"memory",
"tapdisk",
"memory.limit_in_bytes")
blktap2.max_percentage_ram(25.0)
mock_set_param.assert_called_with(expected_path, "1001970k")

@mock.patch('blktap2.util.pathexists', autospec=True)
@mock.patch('blktap2.util.pread', autospec=True)
@mock.patch('blktap2.cgroup_set_param', autospec=True)
def test_correct_percentage_set(self, mock_set_param, mock_pread, mock_exists):

# /proc/meminfo hardcoded to kB in the kernel
# but test this just in case it ever changes.

cmd_result = "MemTotal: 4007880 gB"
mock_pread.return_value = cmd_result
expected_path = os.path.join(blktap2.CGROUP_PATH,
"memory",
"tapdisk",
"memory.limit_in_bytes")
with self.assertRaises(blktap2.util.SMException) as ex:
blktap2.max_percentage_ram(25.0)

0 comments on commit b823bcb

Please sign in to comment.