Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 58 additions & 24 deletions hwinfo/tools/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from argparse import ArgumentParser
from prettytable import PrettyTable
import paramiko
import subprocess
import os

from hwinfo.pci import PCIDevice
from hwinfo.pci.lspci import *
from hwinfo.host.dmidecode import *

from hwinfo.host import dmidecode

def remote_command(host, username, password, cmd):
client = paramiko.SSHClient()
Expand All @@ -18,12 +21,19 @@ def remote_command(host, username, password, cmd):
output = stdout.readlines()
error = stderr.readlines()
if error:
print "stderr: %s" % error
raise Exception("stderr: %s" % error)
client.close()
return ''.join(output)

def local_command(cmd):
return cmd
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode == 0:
return str(stdout).strip()
else:
print "RC: %s" % process.returncode
print stdout
raise Exception("stderr: %s" % str(stderr))

class Host(object):

Expand All @@ -38,18 +48,51 @@ def exec_command(self, cmd):
else:
return remote_command(self.host, self.username, self.password, cmd)

def get_lspci_data(self):
return self.exec_command(['lspci', '-nnmm'])

def get_dmidecode_data(self):
return self.exec_command(['dmidecode'])

def get_pci_devices(self):
data = self.exec_command(['lspci', '-nnmm'])
data = self.get_lspci_data()
parser = LspciNNMMParser(data)
devices = parser.parse_items()
return [PCIDevice(device) for device in devices]

def get_info(self):
data = self.exec_command(['dmidecode'])
parser = DmidecodeParser(data)
data = self.get_dmidecode_data()
parser = dmidecode.DmidecodeParser(data)
rec = parser.parse()
return rec

def search_for_file(dirname, filename):
for root, _, files in os.walk(dirname):
if filename in files:
return os.path.join(root, filename)
raise Exception("Could not find '%s' in directory '%s'" % (filename, dirname))

def read_from_file(filename):
fh = open(filename, 'r')
data = fh.read()
fh.close()
return data

class HostFromLogs(Host):

def __init__(self, dirname):
self.dirname = dirname

def _load_from_file(self, filename):
filename = search_for_file(self.dirname, filename)
return read_from_file(filename)

def get_lspci_data(self):
return self._load_from_file('lspci-nnm.out')

def get_dmidecode_data(self):
return self._load_from_file('dmidecode.out')

def pci_filter(devices, types):
res = []
for device in devices:
Expand All @@ -71,19 +114,6 @@ def pci_filter_for_gpu(devices):
gpu_types = ['03']
return pci_filter(devices, gpu_types)

def print_lines(lines):
max_len = 0
output = []
for line in lines:
output.append(line)
if len(line) > max_len:
max_len = len(line)
print ""
print "-" * max_len
print '\n'.join(output)
print "-" * max_len
print ""

def rec_to_table(rec):
table = PrettyTable(["Key", "Value"])
table.align['Key'] = 'l'
Expand Down Expand Up @@ -115,14 +145,18 @@ def main():
parser = ArgumentParser(prog="hwinfo")

filter_choices = ['bios', 'nic', 'storage', 'gpu']
parser.add_argument("-f", "--filter", choices=filter_choices)
parser.add_argument("-m", "--machine", default='localhost')
parser.add_argument("-u", "--username")
parser.add_argument("-p", "--password")
parser.add_argument("-f", "--filter", choices=filter_choices, help="Query a specific class.")
parser.add_argument("-m", "--machine", default='localhost', help="Remote host address.")
parser.add_argument("-u", "--username", help="Username for remote host.")
parser.add_argument("-p", "--password", help="Password for remote host.")
parser.add_argument("-l", "--logs", help="Path to the directory with the logfiles.")

args = parser.parse_args()

host = Host(args.machine, args.username, args.password)
if args.logs:
host = HostFromLogs(args.logs)
else:
host = Host(args.machine, args.username, args.password)

options = []

Expand Down
145 changes: 145 additions & 0 deletions hwinfo/tools/tests/test_inspector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import unittest
import mock
from mock import patch
from StringIO import StringIO

from hwinfo.tools import inspector

class HostObjectTests(unittest.TestCase):

@patch('hwinfo.tools.inspector.local_command')
def test_local_exec_command(self, local_command):
host = inspector.Host()
host.exec_command('ls')
inspector.local_command.assert_called_once_with('ls')

@patch('hwinfo.tools.inspector.remote_command')
def test_remote_exec_command(self, remote_command):
host = inspector.Host('mymachine', 'root', 'pass')
host.exec_command('ls')
inspector.remote_command.assert_called_once_with('mymachine', 'root', 'pass', 'ls')

@patch('hwinfo.tools.inspector.Host.exec_command')
def test_get_pci_devices(self, exec_command):
host = inspector.Host()
devs = host.get_pci_devices()
exec_command.assert_called_once_with(['lspci', '-nnmm'])

@patch('hwinfo.host.dmidecode.DmidecodeParser')
@patch('hwinfo.tools.inspector.Host.exec_command')
def test_get_info(self, mock_exec_command, mock_dmidecode_parser_cls):
mock_exec_command.return_value = 'blah'
mparser = mock_dmidecode_parser_cls.return_value = mock.Mock()
mparser.parse.return_value = {'key':'value'}
host = inspector.Host()
rec = host.get_info()
self.assertEqual(rec, {'key':'value'})


class RemoteCommandTests(unittest.TestCase):

def setUp(self):
self.stdout = StringIO('')
self.stdin = StringIO('')
self.stderr = StringIO('')

@patch('paramiko.SSHClient')
def test_ssh_connect(self, ssh_client_cls):
client = ssh_client_cls.return_value = mock.Mock()
client.exec_command.return_value = self.stdout, self.stdin, self.stderr
inspector.remote_command('test', 'user', 'pass', 'ls')
client.connect.assert_called_with('test', password='pass', username='user', timeout=10)

@patch('paramiko.SSHClient')
def test_ssh_connect_error(self, ssh_client_cls):
client = ssh_client_cls.return_value = mock.Mock()
client.exec_command.return_value = self.stdout, self.stdin, StringIO("Error")
with self.assertRaises(Exception) as context:
inspector.remote_command('test', 'user', 'pass', 'ls')
self.assertEqual(context.exception.message, "stderr: ['Error']")

class LocalCommandTests(unittest.TestCase):

@patch('subprocess.Popen')
def test_local_call(self, mock_popen_cls):
mprocess =mock_popen_cls.return_value = mock.MagicMock()
mprocess.communicate.return_value = 'test', None
mprocess.returncode = 0
stdout = inspector.local_command("echo 'test'")
self.assertEqual(stdout, 'test')

@patch('subprocess.Popen')
def test_local_call_error(self, mock_popen_cls):
mprocess =mock_popen_cls.return_value = mock.MagicMock()
mprocess.communicate.return_value = 'test', 'my error'
mprocess.returncode = 1
with self.assertRaises(Exception) as context:
stdout = inspector.local_command("echo 'test'")
self.assertEqual(context.exception.message, "stderr: my error")


class PCIFilterTests(unittest.TestCase):

def setUp(self):
device_a = mock.MagicMock()
device_b = mock.MagicMock()
device_c = mock.MagicMock()
device_d = mock.MagicMock()

device_a.get_pci_class.return_value = '0230'
device_b.get_pci_class.return_value = '0340'
device_c.get_pci_class.return_value = '0210'
device_d.get_pci_class.return_value = '0100'

self.devices = [device_a, device_b, device_c, device_d]

def test_pci_filter_match_all(self):
devs = inspector.pci_filter(self.devices, ['0'])
self.assertEqual(len(devs), len(self.devices))

def test_pci_filter_match_two(self):
devs = inspector.pci_filter(self.devices, ['02'])
for dev in devs:
print dev.get_pci_class()
self.assertEqual(len(devs), 2)

def test_pci_filter_match_one(self):
devs = inspector.pci_filter(self.devices, ['023'])
self.assertEqual(len(devs), 1)
self.assertEqual(devs[0].get_pci_class(), '0230')

def test_pci_filter_match_none(self):
devs = inspector.pci_filter(self.devices, ['0234'])
self.assertEqual(devs, [])

def test_pci_filter_for_nics(self):
devs = inspector.pci_filter_for_nics(self.devices)
self.assertEqual(len(devs), 2)

def test_pci_filter_for_storage(self):
devs = inspector.pci_filter_for_storage(self.devices)
self.assertEqual(len(devs), 1)
self.assertEqual(devs[0].get_pci_class(), '0100')

def test_pci_filter_for_gpu(self):
devs = inspector.pci_filter_for_gpu(self.devices)
self.assertEqual(len(devs), 1)
self.assertEqual(devs[0].get_pci_class(), '0340')


class TabulateTests(unittest.TestCase):

@patch('hwinfo.tools.inspector.PrettyTable')
def test_rec_to_table(self, mock_pt_cls):
mock_table = mock_pt_cls.return_value = mock.MagicMock()
rec = {'one': 1, 'two': 2, 'three': 3}
inspector.rec_to_table(rec)
self.assertEqual(mock_table.add_row.call_count, 3)
expected_calls = [
mock.call(['one', 1]),
mock.call(['two', 2]),
mock.call(['three', 3]),
]
mock_table.add_row.assert_has_calls(expected_calls, any_order=True)