Skip to content

Commit

Permalink
fixed docstrings, created utils
Browse files Browse the repository at this point in the history
  • Loading branch information
abaiken committed Feb 20, 2018
1 parent c14d356 commit 726eec4
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 118 deletions.
61 changes: 11 additions & 50 deletions qpc/scan/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
# along with this software; if not, see
# https://www.gnu.org/licenses/gpl-3.0.txt.
#
"""ScanAddCommand is used to create a host scan."""
"""ScanAddCommand is used to create a scan."""

from __future__ import print_function
import sys
from requests import codes
from qpc.request import POST, GET, request
from qpc.request import POST
from qpc.clicommand import CliCommand
import qpc.source as source
import qpc.scan as scan
from qpc.scan.utils import _get_source_ids, _get_optional_products
from qpc.translation import _
import qpc.messages as messages

Expand All @@ -26,7 +26,7 @@
class ScanAddCommand(CliCommand):
"""Defines the add command.
This command is for creating host scans with a source to gather system
This command is for creating scans with a source to gather system
facts.
"""

Expand Down Expand Up @@ -59,60 +59,21 @@ def __init__(self, subparsers):
required=False)
self.source_ids = []

def _get_source_ids(self, source_names):
not_found = False
source_ids = []
for source_name in set(source_names):
# check for existence of source
response = request(parser=self.parser, method=GET,
path=source.SOURCE_URI,
params={'name': source_name},
payload=None)
if response.status_code == codes.ok: # pylint: disable=no-member
json_data = response.json()
count = json_data.get('count', 0)
results = json_data.get('results', [])
if count == 1:
source_entry = results[0]
source_ids.append(source_entry['id'])
else:
print(_(messages.SOURCE_DOES_NOT_EXIST % source_name))
not_found = True
else:
print(_(messages.SOURCE_DOES_NOT_EXIST % source_name))
not_found = True
return not_found, source_ids

def _get_optional_products(self):
"""Construct a dictionary based on the disable-optional-products args.
:returns: a dictionary representing the collection status of optional
products
"""
optional_product_status = {}

if self.args.disable_optional_products:
for product in self.args.disable_optional_products:
optional_product_status[product] = False
else:
return None

return optional_product_status

def _validate_args(self):
CliCommand._validate_args(self)
source_ids = []
if self.args.sources:
# check for existence of sources
not_found, source_ids = self._get_source_ids(self.args.sources)
not_found, source_ids = _get_source_ids(self.parser,
self.args.sources)
if not_found is True:
sys.exit(1)
self.source_ids = source_ids

def _build_data(self):
"""Construct the dictionary credential given our arguments.
"""Construct the payload for a scan given our arguments.
:returns: a dictionary representing the credential being added
:returns: a dictionary representing the scan being added
"""
self.req_payload = {
'name': self.args.name,
Expand All @@ -121,12 +82,12 @@ def _build_data(self):
'options': {
'max_concurrency': self.args.max_concurrency}
}
disable_optional_products = self._get_optional_products()
disable_optional_products \
= _get_optional_products(self.args.disable_optional_products)
if disable_optional_products is not None:
self.req_payload['options']['disable_optional_products']\
= disable_optional_products

def _handle_response_success(self):
json_data = self.response.json()
# Change id to name whenever name is added to scan model
print(_(messages.SCAN_ADDED % json_data['name']))
print(_(messages.SCAN_ADDED % json_data.get('name')))
89 changes: 25 additions & 64 deletions qpc/scan/edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
# along with this software; if not, see
# https://www.gnu.org/licenses/gpl-3.0.txt.
#
"""ScanEditCommand is used to edit existing host scans."""
"""ScanEditCommand is used to edit existing scans."""

from __future__ import print_function
import sys
from requests import codes
from qpc.request import PATCH, GET, request
from qpc.clicommand import CliCommand
import qpc.source as source
import qpc.scan as scan
from qpc.scan.utils import (_get_source_ids,
build_scan_payload,
_get_optional_products)
from qpc.translation import _
import qpc.messages as messages

Expand All @@ -26,7 +28,7 @@
class ScanEditCommand(CliCommand):
"""Defines the edit command.
This command is for editing existing host scans.
This command is for editing existing scans.
"""

SUBCOMMAND = scan.SUBCOMMAND
Expand Down Expand Up @@ -58,46 +60,6 @@ def __init__(self, subparsers):
required=False)
self.source_ids = []

def _get_source_ids(self, source_names):
not_found = False
source_ids = []
for source_name in set(source_names):
# check for existence of source
response = request(parser=self.parser, method=GET,
path=source.SOURCE_URI,
params={'name': source_name},
payload=None)
if response.status_code == codes.ok: # pylint: disable=no-member
json_data = response.json()
count = json_data.get('count', 0)
results = json_data.get('results', [])
if count == 1:
source_entry = results[0]
source_ids.append(source_entry['id'])
else:
print(_(messages.SOURCE_DOES_NOT_EXIST % source_name))
not_found = True
else:
print(_(messages.SOURCE_DOES_NOT_EXIST % source_name))
not_found = True
return not_found, source_ids

def _get_optional_products(self):
"""Construct a dictionary based on the disable-optional-products args.
:returns: a dictionary representing the collection status of optional
products
"""
optional_product_status = {}

if self.args.disable_optional_products:
for product in self.args.disable_optional_products:
optional_product_status[product] = False
else:
return None

return optional_product_status

def _validate_args(self):
"""Validate the edit arguments."""
CliCommand._validate_args(self)
Expand All @@ -109,6 +71,7 @@ def _validate_args(self):
sys.exit(1)

# check for existence of scan
exists = False
response = request(parser=self.parser, method=GET,
path=scan.SCAN_URI,
params={'name': self.args.name},
Expand All @@ -117,10 +80,14 @@ def _validate_args(self):
json_data = response.json()
count = json_data.get('count', 0)
results = json_data.get('results', [])
if count == 1:
scan_entry = results[0]
self.req_path = self.req_path + str(scan_entry['id']) + '/'
else:
if count >= 1:
for result in results:
if result['name'] == self.args.name:
scan_entry = result
self.req_path \
= self.req_path + str(scan_entry['id']) + '/'
exists = True
if not exists or count == 0:
print(_(messages.SCAN_DOES_NOT_EXIST % self.args.name))
sys.exit(1)
else:
Expand All @@ -131,30 +98,24 @@ def _validate_args(self):
source_ids = []
if self.args.sources:
# check for existence of sources
not_found, source_ids = self._get_source_ids(self.args.sources)
not_found, source_ids = _get_source_ids(self.parser,
self.args.sources)
if not_found is True:
sys.exit(1)
self.source_ids = source_ids

def _build_data(self):
"""Construct the dictionary credential given our arguments.
"""Construct the payload for a scan edit given our arguments.
:returns: a dictionary representing the credential being added
:returns: a dictionary representing the scan changes
"""
# self.req_payload = build_source_payload(self.args, add_none=False)
self.req_payload = {
'name': self.args.name,
'sources': self.source_ids,
'scan_type': scan.SCAN_TYPE_INSPECT,
'options': {
'max_concurrency': self.args.max_concurrency}
}
disable_optional_products = self._get_optional_products()
if disable_optional_products is not None:
self.req_payload['options']['disable_optional_products']\
= disable_optional_products
disable_optional_products \
= _get_optional_products(self.args.disable_optional_products)
self.req_payload \
= build_scan_payload(self.args,
self.source_ids,
disable_optional_products)

def _handle_response_success(self):
json_data = self.response.json()
# Change id to name whenever name is added to scan model
print(_(messages.SCAN_UPDATED % json_data['id']))
print(_(messages.SCAN_UPDATED % json_data.get('name')))
4 changes: 0 additions & 4 deletions qpc/scan/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ def _validate_args(self):
json_data = response.json()
count = json_data.get('count', 0)
results = json_data.get('results', [])
# if count == 1:
# scan_entry = results[0]
# self.req_path = self.req_path + str(scan_entry['id']) + '/'
# found = True
if count >= 1:
for result in results:
if result['name'] == self.args.name:
Expand Down
97 changes: 97 additions & 0 deletions qpc/scan/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python
#
# Copyright (c) 2018 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 3 (GPLv3). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv3
# along with this software; if not, see
# https://www.gnu.org/licenses/gpl-3.0.txt.
#
"""Utilities for the scan module."""

from __future__ import print_function
from requests import codes
from qpc.request import GET, request
import qpc.source as source
import qpc.scan as scan
from qpc.translation import _
import qpc.messages as messages


def _get_source_ids(parser, source_names):
not_found = False
source_ids = []
for source_name in set(source_names):
# check for existence of source
response = request(parser=parser, method=GET,
path=source.SOURCE_URI,
params={'name': source_name},
payload=None)
if response.status_code == codes.ok: # pylint: disable=no-member
json_data = response.json()
count = json_data.get('count', 0)
results = json_data.get('results', [])
if count == 1:
source_entry = results[0]
source_ids.append(source_entry['id'])
else:
print(_(messages.SOURCE_DOES_NOT_EXIST % source_name))
not_found = True
else:
print(_(messages.SOURCE_DOES_NOT_EXIST % source_name))
not_found = True
return not_found, source_ids


def _get_optional_products(disable_optional_products):
"""Construct a dictionary based on the disable-optional-products args.
:returns: a dictionary representing the collection status of optional
products
"""
optional_product_status = {}

if disable_optional_products:
for product in disable_optional_products:
optional_product_status[product] = False
else:
return None

return optional_product_status


# pylint: disable=R0912
def build_scan_payload(args, sources, disable_optional_products):
"""Construct payload from command line arguments.
:param args: the command line arguments
:param add_none: add None for a key if True vs. not in dictionary
:returns: the dictionary for the request payload
"""
req_payload = {'name': args.name}
options = None

if hasattr(args, 'sources') and args.sources:
req_payload['sources'] = sources

if hasattr(args, 'max_concurrency') and args.max_concurrency:
if options is None:
options = {'max_concurrency': args.max_concurrency}
else:
options['max_concurrency'] = args.max_concurrency
if hasattr(args, 'disable_optional_products') \
and args.disable_optional_products:
req_payload['disable_optional_products'] \
= args.disable_optional_products
if options is None:
options = \
{'disable_optional_products': args.disable_optional_products}
else:
options['disable_optional_products'] = disable_optional_products
if options is not None:
req_payload['options'] = options
req_payload['scan_type'] = scan.SCAN_TYPE_INSPECT

return req_payload

0 comments on commit 726eec4

Please sign in to comment.