Skip to content

Commit

Permalink
Fix error handling for ansible errors and resume of paused scans (#673)
Browse files Browse the repository at this point in the history
  • Loading branch information
kholdaway committed Feb 8, 2018
1 parent 9805871 commit 8434cb1
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 56 deletions.
10 changes: 6 additions & 4 deletions quipucords/api/scanjob/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def fail(self, message):

self.status = target_status
self.status_message = message
logger.error(self.status_message)
self.log_message(self.status_message, log_level=logging.ERROR)
self.save()
self._log_stats('FAILURE STATS.')
self.log_current_status(show_status_message=True,
Expand All @@ -351,12 +351,14 @@ def validate_status_change(self, target_status, valid_current_status):
:returns bool indicating if it was successful:
"""
if target_status == self.status:
logger.debug('ScanJob status is already %s', target_status)
self.log_message('ScanJob status is already %s' %
target_status, log_level=logging.DEBUG)
return False

if self.status not in valid_current_status:
logger.error('Cannot change job state to %s when it is %s',
target_status, self.status)
self.log_message('Cannot change job state to %s when it is %s' %
(target_status, self.status),
log_level=logging.ERROR)
return True
return False

Expand Down
2 changes: 1 addition & 1 deletion quipucords/api/scantasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def update_stats(self,

if stats_changed:
self.save()
self._log_stats(description)
self._log_stats(description)

@transaction.atomic
def increment_stats(self, name,
Expand Down
63 changes: 38 additions & 25 deletions quipucords/scanner/network/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from ansible.executor.task_queue_manager import TaskQueueManager
from api.credential.serializer import CredentialSerializer
from api.models import (ScanTask, ConnectionResult,
SystemConnectionResult)
SystemConnectionResult,
SystemInspectionResult)
from scanner.task import ScanTaskRunner
from scanner.network.inspect_callback import InspectResultCallback
from scanner.network.utils import (_construct_error_msg,
Expand Down Expand Up @@ -102,16 +103,24 @@ def run(self):

try:
# Execute scan
connected, _, completed = self.obtain_discovery_data()
num_completed = len(completed)
num_remaining = len(connected)
num_total = num_remaining + num_completed
connected, failed, completed = self._obtain_discovery_data()
processed_hosts = failed + completed
num_total = len(connected) + len(processed_hosts)

if num_total == 0:
msg = 'Inventory provided no reachable hosts.'
raise ScannerException(msg)

self.inspect_scan(connected)
self.scan_task.update_stats('INITIAL NETWORK INSPECT STATS',
sys_count=len(connected),
sys_scanned=len(completed),
sys_failed=len(failed))

# remove completed hosts
remaining = [
unprocessed for unprocessed in connected
if unprocessed[0] not in processed_hosts]
scan_message, scan_result = self._inspect_scan(remaining)

temp_facts = self.get_facts()
fact_size = len(temp_facts)
Expand Down Expand Up @@ -139,12 +148,12 @@ def run(self):
if self.scan_task.systems_failed > 0:
return '%d systems could not be scanned.' % \
self.scan_task.systems_failed, ScanTask.FAILED
return None, ScanTask.COMPLETED
return scan_message, scan_result

# pylint: disable=too-many-locals,W0102
def inspect_scan(self, connected, roles=DEFAULT_ROLES,
base_ssh_executable=None,
ssh_timeout=None):
def _inspect_scan(self, connected, roles=DEFAULT_ROLES,
base_ssh_executable=None,
ssh_timeout=None):
"""Execute the host scan with the initialized source.
:param connected: list of (host, credential) pairs to inspect
Expand All @@ -167,11 +176,6 @@ def inspect_scan(self, connected, roles=DEFAULT_ROLES,
extra_vars = self.scan_job.get_extra_vars()
forks = self.scan_job.options.max_concurrency

# Save counts
self.scan_task.update_stats('INITIAL NETWORK INSPECT STATS',
sys_count=len(connected),
sys_scanned=0, sys_failed=0)

ssh_executable = os.path.abspath(
os.path.join(os.path.dirname(__file__),
'../../../bin/timeout_ssh'))
Expand All @@ -181,7 +185,7 @@ def inspect_scan(self, connected, roles=DEFAULT_ROLES,
ssh_args = ['--executable=' + base_ssh_executable,
'--timeout=' + ssh_timeout]

group_names, inventory = construct_scan_inventory(
group_names, inventory = _construct_scan_inventory(
connected, connection_port, forks,
ssh_executable=ssh_executable,
ssh_args=ssh_args)
Expand All @@ -190,6 +194,8 @@ def inspect_scan(self, connected, roles=DEFAULT_ROLES,
error_msg = ''
log_message = 'START PROCESSING GROUPS of size %d' % forks
self.scan_task.log_message(log_message)
scan_result = ScanTask.COMPLETED
scan_message = 'success'
for idx, group_name in enumerate(group_names):
log_message = 'START PROCESSING GROUP %d of %d' % (
(idx + 1), len(group_names))
Expand All @@ -207,19 +213,25 @@ def inspect_scan(self, connected, roles=DEFAULT_ROLES,

if result != TaskQueueManager.RUN_OK:
new_error_msg = _construct_error_msg(result)
logger.error(new_error_msg)
error_msg += '{}\n'.format(new_error_msg)
scan_message = new_error_msg
scan_result = ScanTask.FAILED
callback.finalize_failed_hosts()
if result != TaskQueueManager.RUN_UNREACHABLE_HOSTS and \
result != TaskQueueManager.RUN_FAILED_HOSTS:
error_msg += '{}\n'.format(new_error_msg)

if error_msg != '':
raise AnsibleError(error_msg)

# Clear this cache since new results are available
self.facts = None
return scan_message, scan_result

def obtain_discovery_data(self):
def _obtain_discovery_data(self):
"""Obtain discover scan data. Either via new scan or paused scan.
:returns: List of connected, failed, and completed.
:returns: List of connected, inspection failed, and
inspection completed.
"""
connected = []
failed = []
Expand All @@ -231,18 +243,19 @@ def obtain_discovery_data(self):
host_cred = result.credential
serializer = CredentialSerializer(host_cred)
connected.append((result.name, serializer.data))
elif result.status == SystemConnectionResult.FAILED:
failed.append(result.name)

for inspect in self.inspect_results.results.all():
for result in inspect.systems.all():
completed.append(result.name)
if result.status == SystemInspectionResult.SUCCESS:
completed.append(result.name)
else:
failed.append(result.name)
return connected, failed, completed


# pylint: disable=too-many-locals
def construct_scan_inventory(hosts, connection_port, concurrency_count,
ssh_executable=None, ssh_args=None):
def _construct_scan_inventory(hosts, connection_port, concurrency_count,
ssh_executable=None, ssh_args=None):
"""Create a dictionary inventory for Ansible to execute with.
:param hosts: The collection of hosts/credential tuples
Expand Down
38 changes: 27 additions & 11 deletions quipucords/scanner/network/inspect_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,17 @@ def handle_result(self, result):
self.scan_task.log_message(log_message)
for key, value in results_to_store:
if key == HOST_DONE:
self._finalize_host(host)
self._finalize_host(host, SystemInspectionResult.SUCCESS)
else:
processed_value = process.process(
self.scan_task, host_facts, key, value, host)
host_facts[key] = processed_value

if host in self._ansible_facts:
self._ansible_facts[host].update(host_facts)
else:
self._ansible_facts[host] = host_facts
if bool(host_facts):
if host in self._ansible_facts:
self._ansible_facts[host].update(host_facts)
else:
self._ansible_facts[host] = host_facts

def _get_inspect_result(self):
# Have to save inspect_result (if creating it) because
Expand All @@ -152,19 +153,34 @@ def _get_inspect_result(self):
# results needs to be atomic so that the host won't be marked
# as complete unless we actually save its results.
@transaction.atomic
def _finalize_host(self, host):
results = self._ansible_facts.get(host, {})
def finalize_failed_hosts(self):
"""Finalize failed host."""
host_list = list(self._ansible_facts.keys())
for host in host_list:
self._finalize_host(host, SystemInspectionResult.FAILED)

logger.debug('host scan complete for %s with facts %s',
host, results)
# Called after all fact collection is complete for host. Writing
# results needs to be atomic so that the host won't be marked
# as complete unless we actually save its results.
@transaction.atomic
def _finalize_host(self, host, host_status):
results = self._ansible_facts.pop(host, {})
self.scan_task.log_message('host scan complete for %s. '
'Status: %s. Facts %s' %
(host, host_status, results),
log_level=logging.DEBUG)

# Update scan counts
if self.scan_task is not None:
self.scan_task.increment_stats(host, increment_sys_scanned=True)
if host_status == SystemInspectionResult.SUCCESS:
self.scan_task.increment_stats(
host, increment_sys_scanned=True)
else:
self.scan_task.increment_stats(host, increment_sys_failed=True)

inspect_result = self._get_inspect_result()
sys_result = SystemInspectionResult(
name=host, status=SystemInspectionResult.SUCCESS)
name=host, status=host_status)
sys_result.save()

inspect_result.systems.add(sys_result)
Expand Down
30 changes: 15 additions & 15 deletions quipucords/scanner/network/tests_network_inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
SystemConnectionResult,
InspectionResults)
from api.serializers import CredentialSerializer, SourceSerializer
from scanner.network.inspect import (construct_scan_inventory)
from scanner.network.inspect import (_construct_scan_inventory)
from scanner.network import InspectTaskRunner
from scanner.network.inspect_callback import InspectResultCallback, \
normalize_result, ANSIBLE_FACTS
Expand Down Expand Up @@ -126,10 +126,10 @@ def test_internal_ansible_fact(self):
[])


class HostScannerTest(TestCase):
"""Tests against the HostScanner class and functions."""
class NetworkInspectScannerTest(TestCase):
"""Tests network inspect scan task class."""

# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes, protected-access
def setUp(self):
"""Create test case setup."""
self.cred = Credential(
Expand Down Expand Up @@ -211,9 +211,9 @@ def test_scan_inventory(self):
serializer = SourceSerializer(self.source)
source = serializer.data
connection_port = source['port']
inventory_dict = construct_scan_inventory(self.host_list,
connection_port,
50)
inventory_dict = _construct_scan_inventory(self.host_list,
connection_port,
50)
expected = {
'all': {
'children': {
Expand All @@ -240,7 +240,7 @@ def test_scan_inventory_grouping(self):
connection_port = source['port']
hc_serializer = CredentialSerializer(self.cred)
cred = hc_serializer.data
inventory_dict = construct_scan_inventory(
inventory_dict = _construct_scan_inventory(
[
('1.2.3.1', cred),
('1.2.3.2', cred),
Expand Down Expand Up @@ -302,10 +302,10 @@ def test_inspect_scan_failure(self, mock_run):
# Init for unit test as run is not called
scanner.connect_scan_task = self.connect_scan_task
with self.assertRaises(AnsibleError):
scanner.inspect_scan(self.host_list)
scanner._inspect_scan(self.host_list)
mock_run.assert_called()

@patch('scanner.network.inspect.InspectTaskRunner.inspect_scan',
@patch('scanner.network.inspect.InspectTaskRunner._inspect_scan',
side_effect=mock_scan_error)
def test_inspect_scan_error(self, mock_scan):
"""Test scan flow with mocked manager and failure."""
Expand Down Expand Up @@ -347,10 +347,10 @@ def test_ssh_crash(self):
path = os.path.abspath(
os.path.join(os.path.dirname(__file__),
'../../../test_util/crash.py'))
with self.assertRaises(AnsibleError):
scanner.inspect_scan(
self.host_list,
base_ssh_executable=path)
_, result = scanner._inspect_scan(
self.host_list,
base_ssh_executable=path)
self.assertEqual(result, ScanTask.FAILED)

def test_ssh_hang(self):
"""Simulate an ssh hang."""
Expand All @@ -359,7 +359,7 @@ def test_ssh_hang(self):
path = os.path.abspath(
os.path.join(os.path.dirname(__file__),
'../../../test_util/hang.py'))
scanner.inspect_scan(
scanner._inspect_scan(
self.host_list,
roles=['redhat_release'],
base_ssh_executable=path,
Expand Down

0 comments on commit 8434cb1

Please sign in to comment.