Skip to content

Commit

Permalink
Add tests/stubs for vcenter and satellite scans
Browse files Browse the repository at this point in the history
Adds test for vcenter and satellite.

NOTES:

    * Satellite scans skip for now because they are not implemented yet.

    * Facts collected by the vcenter scan cannot yet be accepted by the
      server's fact endpoint, so we do not test for this -- but this should
      be done eventually.

Closes #92
  • Loading branch information
kdelee committed Jan 5, 2018
1 parent b770e37 commit d21644b
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 76 deletions.
103 changes: 27 additions & 76 deletions camayoc/tests/qcs/api/v1/scans/test_network_scans.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
import time

from camayoc import config
from camayoc.exceptions import ConfigFileNotFoundError, WaitTimeError
from camayoc.exceptions import ConfigFileNotFoundError
from camayoc.qcs_models import (
Credential,
Source,
Scan,
)
from camayoc.tests.qcs.api.v1.scans.utils import (
wait_until_state,
prep_broken_scan,
)


def sources():
def network_sources():
"""Gather sources from config file.
If no config file is found, or no sources defined,
Expand All @@ -38,7 +42,7 @@ def sources():
return srcs


def first_source():
def first_network_source():
"""Gather sources from config file.
If no source is found in the config file, or no config file is found, a
Expand All @@ -47,63 +51,10 @@ def first_source():
try:
src = [config.get_config()['qcs']['sources'][0]]
except (ConfigFileNotFoundError, KeyError):
src = [{'hosts': ['localhost'], 'name':'localhost', 'auths':'default'}]
src = [{'hosts': ['localhost'], 'name':'localhost', 'auths':'root'}]
return src


def wait_until_state(scan, timeout=120, state='completed'):
"""Wait until the scan has failed or reached desired state.
The default state is 'completed'.
This method should not be called on scan jobs that have not yet been
created, are paused, or are canceled.
The default timeout is set to 120 seconds, but can be overridden if it is
anticipated that a scan may take longer to complete.
"""
while (
not scan.status() or not scan.status() in [
'failed',
state]) and timeout > 0:
time.sleep(5)
timeout -= 5
if timeout <= 0:
raise WaitTimeError(
'You have called wait_until_state() on a scan and the scan\n'
'timed out while waiting to achieve the state="{}" When the\n'
'scan timed out, it had the state="{}".\n'
.format(state, scan.status())
)


def prep_broken_network_scan(cleanup):
"""Return a scan that can be created but will fail to complete.
Create and return a source with a non-existent host and dummy credential.
It is returned ready to be POSTed to the server via the create() instance
method.
"""
bad_cred = Credential(
username='broken',
password='broken',
cred_type='network'
)
bad_cred.create()
cleanup.append(bad_cred)
bad_src = Source(
source_type='network',
hosts=['1.0.0.0'],
credential_ids=[bad_cred._id],
)
bad_src.create()
cleanup.append(bad_src)
bad_scn = Scan(
source_ids=[bad_src._id],
)
return bad_scn


def prep_network_scan(source, cleanup, client):
"""Given a source from config file, prep the scan.
Expand All @@ -112,7 +63,7 @@ def prep_network_scan(source, cleanup, client):
"""
cfg = config.get_config()
auth_ids = []
for auth in cfg['qcs'].get('auths'):
for auth in cfg.get('auths'):
if auth['name'] in source['auths']:
cred = Credential(
cred_type='network',
Expand All @@ -136,10 +87,11 @@ def prep_network_scan(source, cleanup, client):
return scan


@pytest.mark.parametrize('source', sources(), ids=[
@pytest.mark.create
@pytest.mark.parametrize('source', network_sources(), ids=[
'{}-{}'.format(
s['name'],
s['auths']) for s in sources()]
s['auths']) for s in network_sources()]
)
def test_create(shared_client, cleanup, source, isolated_filesystem):
"""Run a scan on a system and confirm it completes.
Expand All @@ -156,15 +108,14 @@ def test_create(shared_client, cleanup, source, isolated_filesystem):
"""
scan = prep_network_scan(source, cleanup, shared_client)
scan.create()
wait_until_state(scan)
assert scan.status() == 'completed'
assert isinstance(scan.read().json().get('fact_collection_id'), int)
wait_until_state(scan, state='completed')
assert scan.read().json().get('fact_collection_id') > 0


@pytest.mark.parametrize('source', first_source(), ids=[
@pytest.mark.parametrize('source', first_network_source(), ids=[
'{}-{}'.format(
first_source()[0]['name'],
first_source()[0]['auths'])]
first_network_source()[0]['name'],
first_network_source()[0]['auths'])]
)
def test_pause_cancel(shared_client, cleanup, source, isolated_filesystem):
"""Run a scan on a system and confirm we can pause and cancel it.
Expand All @@ -189,10 +140,10 @@ def test_pause_cancel(shared_client, cleanup, source, isolated_filesystem):
wait_until_state(scan, state='canceled')


@pytest.mark.parametrize('source', first_source(), ids=[
@pytest.mark.parametrize('source', first_network_source(), ids=[
'{}-{}'.format(
first_source()[0]['name'],
first_source()[0]['auths'])]
first_network_source()[0]['name'],
first_network_source()[0]['auths'])]
)
def test_restart(shared_client, cleanup, source, isolated_filesystem):
"""Run a scan on a system and confirm we can pause and restart it.
Expand Down Expand Up @@ -220,10 +171,10 @@ def test_restart(shared_client, cleanup, source, isolated_filesystem):
assert scan.read().json().get('fact_collection_id') > 0


@pytest.mark.parametrize('source', first_source(), ids=[
@pytest.mark.parametrize('source', first_network_source(), ids=[
'{}-{}'.format(
first_source()[0]['name'],
first_source()[0]['auths'])]
first_network_source()[0]['name'],
first_network_source()[0]['auths'])]
)
def test_queue_mix_valid_invalid(
shared_client,
Expand All @@ -247,7 +198,7 @@ def test_queue_mix_valid_invalid(
for k in range(6):
time.sleep(2)
if not k % 2:
bad_scn = prep_broken_network_scan(cleanup)
bad_scn = prep_broken_scan('network', cleanup)
bad_scn.create()
bad_scans.append(bad_scn)
else:
Expand All @@ -263,10 +214,10 @@ def test_queue_mix_valid_invalid(
wait_until_state(scan, state='failed')


@pytest.mark.parametrize('source', first_source(), ids=[
@pytest.mark.parametrize('source', first_network_source(), ids=[
'{}-{}'.format(
first_source()[0]['name'],
first_source()[0]['auths'])]
first_network_source()[0]['name'],
first_network_source()[0]['auths'])]
)
def test_queue_mix_paused_canceled(
shared_client,
Expand Down
167 changes: 167 additions & 0 deletions camayoc/tests/qcs/api/v1/scans/test_satellite_scans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# coding=utf-8
"""Tests for ``Scan`` API endpoint for quipucords server.
:caseautomation: automated
:casecomponent: api
:caseimportance: high
:caselevel: integration
:requirement: Sonar
:testtype: functional
:upstream: yes
"""

import pytest

from camayoc import config
from camayoc.exceptions import ConfigFileNotFoundError
from camayoc.qcs_models import (
Credential,
Source,
Scan
)
from camayoc.tests.qcs.api.v1.scans.utils import wait_until_state


TIMEOUT = 540


def sat_source():
"""Gather Satellite source from config file.
If no config file is found, or no sources defined,
then an empty list will be returned.
If a test is parametrized on the sources in the config file, it will skip
if no source is found.
"""
try:
src = [config.get_config()['satellite']]
except (ConfigFileNotFoundError, KeyError):
src = []
return src


def prep_broken_sat_scan(cleanup):
"""Return a scan that can be created but will fail to complete.
Create and return a source with a non-existent host and dummy credential.
It is returned ready to be POSTed to the server via the create() instance
method.
"""
bad_cred = Credential(
username='broken',
password='broken',
cred_type='satellite'
)
bad_cred.create()
cleanup.append(bad_cred)
bad_src = Source(
source_type='satellite',
hosts=['1.0.0.0'],
credential_ids=[bad_cred._id],
)
bad_src.create()
cleanup.append(bad_src)
bad_scn = Scan(
source_ids=[bad_src._id],
)
return bad_scn


def prep_sat_scan(source, cleanup, client):
"""Given a source from config file, prep the scan.
Takes care of creating the Credential and Source objects on the server and
staging them for cleanup after the tests complete.
"""
cred_ids = []
cred = Credential(
cred_type='sat',
client=client,
password=source['password'],
username=source['username'],
)
cred.create()
cleanup.append(cred)
cred_ids.append(cred._id)

satsrc = Source(
source_type='sat',
client=client,
hosts=[source['hostname']],
credential_ids=cred_ids,
)
satsrc.create()
cleanup.append(satsrc)
scan = Scan(source_ids=[satsrc._id])
return scan


@pytest.mark.skip
@pytest.mark.create
@pytest.mark.parametrize('source', sat_source())
def test_create(shared_client, cleanup, source):
"""Run a scan on a system and confirm it completes.
:id: 52835270-dba9-4a68-bd36-225e0ef4679b
:description: Create a satellite scan and assert that it completes
:steps:
1) Create satellite credential
2) Create satellite source
3) Create a satellite scan
4) Assert that the scan completes
:expectedresults: A scan is run and reaches completion
"""
scan = prep_sat_scan(source, cleanup, shared_client)
scan.create()
wait_until_state(scan, state='completed', timeout=TIMEOUT)


@pytest.mark.skip
@pytest.mark.parametrize('source', sat_source())
def test_pause_cancel(shared_client, cleanup, source):
"""Run a scan on a system and confirm we can pause and cancel it.
:id: 7d61014a-7791-4f35-8020-6081c7e31227
:description: Assert that scans can be paused and then canceled.
:steps:
1) Create satellite credential
2) Create satellite source
3) Create a satellite scan
4) Assert that the scan can be paused
5) Assert that the scan can be canceled
:expectedresults: A satellite scan can be paused and canceled
"""
scan = prep_sat_scan(source, cleanup, shared_client)
scan.create()
wait_until_state(scan, state='running', timeout=TIMEOUT)
scan.pause()
wait_until_state(scan, state='paused', timeout=TIMEOUT)
scan.cancel()
wait_until_state(scan, state='canceled', timeout=TIMEOUT)


@pytest.mark.skip
@pytest.mark.parametrize('source', sat_source())
def test_restart(shared_client, cleanup, source):
"""Run a scan on a system and confirm we can pause and restart it.
:id: 6b20ec6e-83c5-4fcc-9f0b-21eacc8f732e
:description: Assert that scans can be paused and then restarted.
:steps:
1) Create satellite credential
2) Create satellite source
3) Create a satellite scan
4) Assert that the scan can be paused
5) Assert that the scan can be restarted
6) Assert that he scan completes
:expectedresults: A restarted scan completes
"""
scan = prep_sat_scan(source, cleanup, shared_client)
scan.create()
wait_until_state(scan, state='running', timeout=TIMEOUT)
scan.pause()
wait_until_state(scan, state='paused', timeout=TIMEOUT)
scan.restart()
wait_until_state(scan, state='running', timeout=TIMEOUT)
wait_until_state(scan, state='completed', timeout=TIMEOUT)
Loading

0 comments on commit d21644b

Please sign in to comment.