diff --git a/hwinfo/tools/inspector.py b/hwinfo/tools/inspector.py index e2498dd..66e2c84 100644 --- a/hwinfo/tools/inspector.py +++ b/hwinfo/tools/inspector.py @@ -3,10 +3,13 @@ from argparse import ArgumentParser from prettytable import PrettyTable import paramiko +import subprocess +import os from hwinfo.pci import PCIDevice from hwinfo.pci.lspci import * -from hwinfo.host.dmidecode import * + +from hwinfo.host import dmidecode def remote_command(host, username, password, cmd): client = paramiko.SSHClient() @@ -18,12 +21,19 @@ def remote_command(host, username, password, cmd): output = stdout.readlines() error = stderr.readlines() if error: - print "stderr: %s" % error + raise Exception("stderr: %s" % error) client.close() return ''.join(output) def local_command(cmd): - return cmd + process = subprocess.Popen(cmd, stdout=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode == 0: + return str(stdout).strip() + else: + print "RC: %s" % process.returncode + print stdout + raise Exception("stderr: %s" % str(stderr)) class Host(object): @@ -38,18 +48,51 @@ def exec_command(self, cmd): else: return remote_command(self.host, self.username, self.password, cmd) + def get_lspci_data(self): + return self.exec_command(['lspci', '-nnmm']) + + def get_dmidecode_data(self): + return self.exec_command(['dmidecode']) + def get_pci_devices(self): - data = self.exec_command(['lspci', '-nnmm']) + data = self.get_lspci_data() parser = LspciNNMMParser(data) devices = parser.parse_items() return [PCIDevice(device) for device in devices] def get_info(self): - data = self.exec_command(['dmidecode']) - parser = DmidecodeParser(data) + data = self.get_dmidecode_data() + parser = dmidecode.DmidecodeParser(data) rec = parser.parse() return rec +def search_for_file(dirname, filename): + for root, _, files in os.walk(dirname): + if filename in files: + return os.path.join(root, filename) + raise Exception("Could not find '%s' in directory '%s'" % (filename, dirname)) + +def read_from_file(filename): + fh = open(filename, 'r') + data = fh.read() + fh.close() + return data + +class HostFromLogs(Host): + + def __init__(self, dirname): + self.dirname = dirname + + def _load_from_file(self, filename): + filename = search_for_file(self.dirname, filename) + return read_from_file(filename) + + def get_lspci_data(self): + return self._load_from_file('lspci-nnm.out') + + def get_dmidecode_data(self): + return self._load_from_file('dmidecode.out') + def pci_filter(devices, types): res = [] for device in devices: @@ -71,19 +114,6 @@ def pci_filter_for_gpu(devices): gpu_types = ['03'] return pci_filter(devices, gpu_types) -def print_lines(lines): - max_len = 0 - output = [] - for line in lines: - output.append(line) - if len(line) > max_len: - max_len = len(line) - print "" - print "-" * max_len - print '\n'.join(output) - print "-" * max_len - print "" - def rec_to_table(rec): table = PrettyTable(["Key", "Value"]) table.align['Key'] = 'l' @@ -115,14 +145,18 @@ def main(): parser = ArgumentParser(prog="hwinfo") filter_choices = ['bios', 'nic', 'storage', 'gpu'] - parser.add_argument("-f", "--filter", choices=filter_choices) - parser.add_argument("-m", "--machine", default='localhost') - parser.add_argument("-u", "--username") - parser.add_argument("-p", "--password") + parser.add_argument("-f", "--filter", choices=filter_choices, help="Query a specific class.") + parser.add_argument("-m", "--machine", default='localhost', help="Remote host address.") + parser.add_argument("-u", "--username", help="Username for remote host.") + parser.add_argument("-p", "--password", help="Password for remote host.") + parser.add_argument("-l", "--logs", help="Path to the directory with the logfiles.") args = parser.parse_args() - host = Host(args.machine, args.username, args.password) + if args.logs: + host = HostFromLogs(args.logs) + else: + host = Host(args.machine, args.username, args.password) options = [] diff --git a/hwinfo/tools/tests/test_inspector.py b/hwinfo/tools/tests/test_inspector.py new file mode 100644 index 0000000..3e3ec6f --- /dev/null +++ b/hwinfo/tools/tests/test_inspector.py @@ -0,0 +1,145 @@ +import unittest +import mock +from mock import patch +from StringIO import StringIO + +from hwinfo.tools import inspector + +class HostObjectTests(unittest.TestCase): + + @patch('hwinfo.tools.inspector.local_command') + def test_local_exec_command(self, local_command): + host = inspector.Host() + host.exec_command('ls') + inspector.local_command.assert_called_once_with('ls') + + @patch('hwinfo.tools.inspector.remote_command') + def test_remote_exec_command(self, remote_command): + host = inspector.Host('mymachine', 'root', 'pass') + host.exec_command('ls') + inspector.remote_command.assert_called_once_with('mymachine', 'root', 'pass', 'ls') + + @patch('hwinfo.tools.inspector.Host.exec_command') + def test_get_pci_devices(self, exec_command): + host = inspector.Host() + devs = host.get_pci_devices() + exec_command.assert_called_once_with(['lspci', '-nnmm']) + + @patch('hwinfo.host.dmidecode.DmidecodeParser') + @patch('hwinfo.tools.inspector.Host.exec_command') + def test_get_info(self, mock_exec_command, mock_dmidecode_parser_cls): + mock_exec_command.return_value = 'blah' + mparser = mock_dmidecode_parser_cls.return_value = mock.Mock() + mparser.parse.return_value = {'key':'value'} + host = inspector.Host() + rec = host.get_info() + self.assertEqual(rec, {'key':'value'}) + + +class RemoteCommandTests(unittest.TestCase): + + def setUp(self): + self.stdout = StringIO('') + self.stdin = StringIO('') + self.stderr = StringIO('') + + @patch('paramiko.SSHClient') + def test_ssh_connect(self, ssh_client_cls): + client = ssh_client_cls.return_value = mock.Mock() + client.exec_command.return_value = self.stdout, self.stdin, self.stderr + inspector.remote_command('test', 'user', 'pass', 'ls') + client.connect.assert_called_with('test', password='pass', username='user', timeout=10) + + @patch('paramiko.SSHClient') + def test_ssh_connect_error(self, ssh_client_cls): + client = ssh_client_cls.return_value = mock.Mock() + client.exec_command.return_value = self.stdout, self.stdin, StringIO("Error") + with self.assertRaises(Exception) as context: + inspector.remote_command('test', 'user', 'pass', 'ls') + self.assertEqual(context.exception.message, "stderr: ['Error']") + +class LocalCommandTests(unittest.TestCase): + + @patch('subprocess.Popen') + def test_local_call(self, mock_popen_cls): + mprocess =mock_popen_cls.return_value = mock.MagicMock() + mprocess.communicate.return_value = 'test', None + mprocess.returncode = 0 + stdout = inspector.local_command("echo 'test'") + self.assertEqual(stdout, 'test') + + @patch('subprocess.Popen') + def test_local_call_error(self, mock_popen_cls): + mprocess =mock_popen_cls.return_value = mock.MagicMock() + mprocess.communicate.return_value = 'test', 'my error' + mprocess.returncode = 1 + with self.assertRaises(Exception) as context: + stdout = inspector.local_command("echo 'test'") + self.assertEqual(context.exception.message, "stderr: my error") + + +class PCIFilterTests(unittest.TestCase): + + def setUp(self): + device_a = mock.MagicMock() + device_b = mock.MagicMock() + device_c = mock.MagicMock() + device_d = mock.MagicMock() + + device_a.get_pci_class.return_value = '0230' + device_b.get_pci_class.return_value = '0340' + device_c.get_pci_class.return_value = '0210' + device_d.get_pci_class.return_value = '0100' + + self.devices = [device_a, device_b, device_c, device_d] + + def test_pci_filter_match_all(self): + devs = inspector.pci_filter(self.devices, ['0']) + self.assertEqual(len(devs), len(self.devices)) + + def test_pci_filter_match_two(self): + devs = inspector.pci_filter(self.devices, ['02']) + for dev in devs: + print dev.get_pci_class() + self.assertEqual(len(devs), 2) + + def test_pci_filter_match_one(self): + devs = inspector.pci_filter(self.devices, ['023']) + self.assertEqual(len(devs), 1) + self.assertEqual(devs[0].get_pci_class(), '0230') + + def test_pci_filter_match_none(self): + devs = inspector.pci_filter(self.devices, ['0234']) + self.assertEqual(devs, []) + + def test_pci_filter_for_nics(self): + devs = inspector.pci_filter_for_nics(self.devices) + self.assertEqual(len(devs), 2) + + def test_pci_filter_for_storage(self): + devs = inspector.pci_filter_for_storage(self.devices) + self.assertEqual(len(devs), 1) + self.assertEqual(devs[0].get_pci_class(), '0100') + + def test_pci_filter_for_gpu(self): + devs = inspector.pci_filter_for_gpu(self.devices) + self.assertEqual(len(devs), 1) + self.assertEqual(devs[0].get_pci_class(), '0340') + + +class TabulateTests(unittest.TestCase): + + @patch('hwinfo.tools.inspector.PrettyTable') + def test_rec_to_table(self, mock_pt_cls): + mock_table = mock_pt_cls.return_value = mock.MagicMock() + rec = {'one': 1, 'two': 2, 'three': 3} + inspector.rec_to_table(rec) + self.assertEqual(mock_table.add_row.call_count, 3) + expected_calls = [ + mock.call(['one', 1]), + mock.call(['two', 2]), + mock.call(['three', 3]), + ] + mock_table.add_row.assert_has_calls(expected_calls, any_order=True) + +