Skip to content

Commit

Permalink
Add initial CLI scan tests
Browse files Browse the repository at this point in the history
Close #125
  • Loading branch information
elyezer committed Feb 14, 2018
1 parent 84abbde commit 2b4925c
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 0 deletions.
259 changes: 259 additions & 0 deletions camayoc/tests/qcs/cli/test_scans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# coding=utf-8
"""Tests for ``qpc scan`` commands.
:caseautomation: automated
:casecomponent: cli
:caseimportance: high
:caselevel: integration
:requirement: Sonar
:testtype: functional
:upstream: yes
"""
import operator
import re
import time
from pprint import pformat

import pexpect
import pytest

from .conftest import qpc_server_config
from .utils import (
cred_add,
source_add,
scan_cancel,
scan_pause,
scan_restart,
scan_show,
scan_start,
)
from camayoc.config import get_config
from camayoc.constants import BECOME_PASSWORD_INPUT, CONNECTION_PASSWORD_INPUT
from camayoc.exceptions import (
ConfigFileNotFoundError,
FailedScanException,
WaitTimeError,
)


name_getter = operator.itemgetter('name')
"""Fetche the value of the ``name`` item."""


def config_credentials():
"""Return all credentials available on configuration file."""
try:
return get_config().get('credentials', [])
except ConfigFileNotFoundError:
return []


def config_sources():
"""Return all sources available on configuration file."""
try:
return get_config().get('qcs', {}).get('sources', [])
except ConfigFileNotFoundError:
return []


@pytest.fixture(params=config_credentials(), ids=name_getter)
def credentials(request):
"""Return each credential available on the config file."""
return request.param


@pytest.fixture(params=config_sources(), ids=name_getter)
def source(request):
"""Return each source available on the config file."""
return request.param


@pytest.fixture(autouse=True, scope='module')
def setup_credentials():
"""Create all credentials on the server."""
qpc_server_config()

qpc_cred_clear = pexpect.spawn(
'qpc cred clear --all'
)
assert qpc_cred_clear.expect(pexpect.EOF) == 0
qpc_cred_clear.close()

credentials = get_config().get('credentials', [])
for credential in credentials:
inputs = []
if 'password' in credential:
inputs.append((CONNECTION_PASSWORD_INPUT, credential['password']))
credential['password'] = None
if 'become-password' in credential:
inputs.append(
(BECOME_PASSWORD_INPUT, credential['become-password']))
credential['become-password'] = None
cred_add(credential, inputs)


@pytest.fixture(autouse=True, scope='module')
def setup_sources():
"""Create all sources on the server."""
qpc_server_config()

qpc_cred_clear = pexpect.spawn(
'qpc source clear --all'
)
assert qpc_cred_clear.expect(pexpect.EOF) == 0
qpc_cred_clear.close()

sources = get_config().get('qcs', {}).get('sources', [])
for source in sources:
source['cred'] = source.pop('credentials')
options = source.pop('options', {})
for k, v in options.items():
source[k.replace('_', '-')] = v
source_add(source)


def wait_for_scan(scan_id, status='completed', timeout=900):
"""Wait for a scan to reach some ``status`` up to ``timeout`` seconds.
:param scan_id: Scan ID to wait for.
:param status: Scan status which will wait for. Default is completed.
:param timeout: wait up to this amount of seconds. Default is 900.
"""
while timeout > 0:
result = scan_show({'id': scan_id})
if status != 'failed' and result['status'] == 'failed':
raise FailedScanException(
'The scan with ID "{}" has failed unexpectedly.\n\n'
'The information about the scan is:\n{}\n'
.format(scan_id, pformat(result))
)
if result['status'] == status:
return
time.sleep(5)
timeout -= 5
raise WaitTimeError(
'Timeout waiting for scan with ID "{}" to achieve the "{}" status.\n\n'
'The information about the scan is:\n{}\n'
.format(scan_id, status, pformat(result))
)


def test_scan(isolated_filesystem, qpc_server_config, source):
"""Scan a single source type.
:id: 49ae6fef-ea41-4b91-b310-6054678bfbb4
:description: Perform a scan on a single source type.
:steps: Run ``qpc scan start --sources <source>``
:expectedresults: The scan must completed without any error and a report
should be available.
"""
result = scan_start({
'source': source['name'],
})
match = re.match(r'Scan "(\d+)" started.', result)
assert match is not None
scan_id = match.group(1)
wait_for_scan(scan_id)
result = scan_show({
'id': scan_id,
'result': None,
})
assert 'connection_results' in result
assert 'inspection_results' in result


def test_scan_with_multiple_sources(isolated_filesystem, qpc_server_config):
"""Scan multiple source types.
:id: 58fde39c-52d8-42ee-af4c-1d75a6dc80b0
:description: Perform a scan on multiple source types.
:steps: Run ``qpc scan start --sources <source1> <source2> ...``
:expectedresults: The scan must completed without any error and a report
should be available.
"""
result = scan_start({
'source': ' '.join([source['name'] for source in config_sources()]),
})
match = re.match(r'Scan "(\d+)" started.', result)
assert match is not None
scan_id = match.group(1)
wait_for_scan(scan_id, timeout=1200)
result = scan_show({
'id': scan_id,
'result': None,
})
assert 'connection_results' in result
assert 'inspection_results' in result


def test_scan_with_disabled_products(isolated_filesystem, qpc_server_config):
"""Perform a scan and disable an optional product.
:id: b1cd9901-44eb-4e71-846c-34e1b19751d0
:description: Perform a scan and disable an optional product.
:steps: Run ``qpc scan start --sources <source> --disable-optional-products
<optional-product>``
:expectedresults: The scan must completed without any error and a report
should be available. The disabled products should not have results in
the report.
:caseautomation: notautomated
"""


def test_scan_restart(isolated_filesystem, qpc_server_config):
"""Perform a scan and ensure it can be paused and restarted.
:id: 7eb79aa8-fe3d-4fcd-9f1a-5e2d4df2f3b6
:description: Start a scan, then pause it and finally restart it.
:steps:
1) Run ``qpc scan start --sources <source>`` and store its ID.
2) Stop the scan by running ``qpc scan stop --id <id>``
3) Restart the scan by running ``qpc scan restart --id <id>``
:expectedresults: The scan must completed without any error and a report
should be available.
"""
result = scan_start({
'source': config_sources()[0]['name'],
})
match = re.match(r'Scan "(\d+)" started.', result)
assert match is not None
scan_id = match.group(1)
wait_for_scan(scan_id, status='running')
scan_pause({'id': scan_id})
wait_for_scan(scan_id, status='paused')
scan_restart({'id': scan_id})
wait_for_scan(scan_id)
result = scan_show({
'id': scan_id,
'result': None,
})
assert 'connection_results' in result
assert 'inspection_results' in result


def test_scan_cancel(isolated_filesystem, qpc_server_config):
"""Perform a scan and ensure it can be canceled.
:id: b5c11b82-e86e-478b-b885-89a577f81b13
:description: Start a scan, then cancel it and finally check it can't be
restarted.
:steps:
1) Run ``qpc scan start --sources <source>`` and store its ID.
2) Cancel the scan by running ``qpc scan cancel --id <id>``
3) Try to restart the scan by running ``qpc scan restart --id <id>``
:expectedresults: The scan must be canceled and can't not be restarted.
"""
result = scan_start({
'source': config_sources()[0]['name'],
})
match = re.match(r'Scan "(\d+)" started.', result)
assert match is not None
scan_id = match.group(1)
wait_for_scan(scan_id, status='running')
scan_cancel({'id': scan_id})
wait_for_scan(scan_id, status='canceled')
result = scan_restart({'id': scan_id}, exitstatus=1)
assert result.startswith(
'Error: Scan cannot be restarted. The scan must be paused for it to '
'be restarted.'
)
85 changes: 85 additions & 0 deletions camayoc/tests/qcs/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
# coding=utf-8
"""Utility functions for Quipucords cli tests."""
import functools
import io
import json

import pexpect


def cli_command(command, options=None, exitstatus=0):
"""Run a cli ``command`` with the ``options``.
:param command: the base command to be run
:param options: dictionary mapping the command options. Each item will be
mapped to ``--key value``, if the item's value is ``None`` then a flag
option will be created ``--key``.
:param exitstatus: expected exit status. If for some reason the command
exit status is different then an AssertionError will be raised with the
command output as the exception messege.
"""
if options is None:
options = {}
for key, value in options.items():
if value is None:
command += ' --{}'.format(key)
else:
command += ' --{} {}'.format(key, value)
child = pexpect.spawn(command)
child.logfile = io.BytesIO()
assert child.expect(pexpect.EOF) == 0
child.close()
output = child.logfile.getvalue().decode('utf-8')
assert child.exitstatus == exitstatus, output
return output


def cred_add(options, inputs=None, exitstatus=0):
"""Add a new credential entry.
Expand Down Expand Up @@ -61,6 +92,42 @@ def cred_show(options, output, exitstatus=0):
assert qpc_cred_show.exitstatus == exitstatus


def source_add(options, inputs=None, exitstatus=0):
"""Add a new source entry.
:param options: A dictionary mapping the option names and their values.
Pass ``None`` for flag options.
:param inputs: A list of tuples mapping the input prompts and the value to
be filled. For example::
inputs=[('prompt1:', 'input1'), ('prompt2:', 'input2')]
:param exitstatus: Expected exit status code.
"""
if 'cred' in options:
options['cred'] = ' '.join(options['cred'])
if 'hosts' in options:
options['hosts'] = ' '.join(options['hosts'])
if 'type' not in options:
options['type'] = 'network'
command = 'qpc source add'
for key, value in options.items():
if value is None:
command += ' --{}'.format(key)
else:
command += ' --{}={}'.format(key, value)
qpc_source_add = pexpect.spawn(command)
if inputs is None:
inputs = []
for prompt, value in inputs:
assert qpc_source_add.expect(prompt) == 0
qpc_source_add.sendline(value)
assert qpc_source_add.expect(
'Source "{}" was added'.format(options['name'])) == 0
assert qpc_source_add.expect(pexpect.EOF) == 0
qpc_source_add.close()
assert qpc_source_add.exitstatus == exitstatus


def source_show(options, output, exitstatus=0):
"""Show a source entry.
Expand All @@ -81,3 +148,21 @@ def source_show(options, output, exitstatus=0):
assert qpc_source_show.expect(pexpect.EOF) == 0
qpc_source_show.close()
assert qpc_source_show.exitstatus == exitstatus


scan_cancel = functools.partial(cli_command, 'qpc scan cancel')
"""Run ``qpc scan cancel`` command with ``options`` returning its output."""

scan_pause = functools.partial(cli_command, 'qpc scan pause')
"""Run ``qpc scan pause`` command with ``options`` returning its output."""

scan_restart = functools.partial(cli_command, 'qpc scan restart')
"""Run ``qpc scan restart`` command with ``options`` returning its output."""

scan_start = functools.partial(cli_command, 'qpc scan start')
"""Run ``qpc scan start`` command with ``options`` returning its output."""


def scan_show(options=None, exitstatus=0):
"""Run ``qpc scan show`` command with ``options`` returning its output."""
return json.loads(cli_command('qpc scan show', options, exitstatus))
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ reference for developers, not a gospel.
api/camayoc.tests.qcs.api.v1.sources.test_manager_sources.rst
api/camayoc.tests.qcs.api.v1.sources.test_sources_common.rst
api/camayoc.tests.qcs.cli.test_credentials.rst
api/camayoc.tests.qcs.cli.test_scans.rst
api/camayoc.tests.qcs.cli.test_sources.rst
api/camayoc.tests.qcs.cli.utils.rst
api/camayoc.tests.qcs.cli.conftest.rst
api/camayoc.tests.qcs.cli.utils.rst
1 change: 1 addition & 0 deletions docs/api/camayoc.tests.qcs.cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Submodules

camayoc.tests.qcs.cli.conftest
camayoc.tests.qcs.cli.test_credentials
camayoc.tests.qcs.cli.test_scans
camayoc.tests.qcs.cli.test_sources
camayoc.tests.qcs.cli.utils

7 changes: 7 additions & 0 deletions docs/api/camayoc.tests.qcs.cli.test_scans.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
camayoc\.tests\.qcs\.cli\.test\_scans module
============================================

.. automodule:: camayoc.tests.qcs.cli.test_scans
:members:
:undoc-members:
:show-inheritance:

0 comments on commit 2b4925c

Please sign in to comment.