Skip to content

Commit

Permalink
chg: dev: oscal module and test cleanup, update packaging
Browse files Browse the repository at this point in the history
* add sorted output option for ssg control set match
* eliminate unnecessary variable in yasort module
* update test data and move most output to verbose only
* update project files with natsort package dep

Signed-off-by: Steve Arnold <sarnold@vctlabs.com>
  • Loading branch information
sarnold committed Apr 18, 2024
1 parent 671658e commit 755318b
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 44 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ repos:
additional_dependencies:
- importlib_metadata
- importlib_resources
- natsort
- munch
- munch-stubs
- ruamel.yaml
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ exclude_lines = [
]

[tool.black]
line-length = 90
line-length = 88
skip-string-normalization = true
include = '\.py$'
exclude = '''
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dpath
importlib-metadata; python_version < '3.8'
importlib-resources; python_version < '3.10'
munch
natsort
nested-lookup
pystache==0.6.5
PyYAML
Expand Down
19 changes: 12 additions & 7 deletions scripts/analyze_ssg_controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@

import os
import sys
import tempfile
import typing
from collections import Counter
from pathlib import Path

from diskcache import Deque
from nested_lookup import nested_lookup

from nested_lookup import nested_lookup
from ymltoxml.templates import xform_id
from ymltoxml.utils import (
FileTypeError,
get_cachedir,
get_filelist,
text_file_reader,
)
from ymltoxml.utils import FileTypeError, get_filelist, text_file_reader

id_count: typing.Counter[str] = Counter()
id_queue = Deque(get_cachedir(dir_name='id_queue'))
Expand All @@ -37,6 +33,15 @@
]


def get_cachedir(dir_name='yml_cache'):
"""
Get temp cachedir (create it if needed) and override the dir_name if
passed.
"""
cache_dir = tempfile.gettempdir()
return os.path.join(cache_dir, dir_name)


def set_unique(sequence):
"""
Remove duplicates and emulate a set with ordered elements.
Expand Down
5 changes: 4 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ setup_requires =

install_requires =
importlib-resources; python_version < '3.10'
diskcache
natsort
nested-lookup
xmltodict
munch
Expand Down Expand Up @@ -57,6 +57,9 @@ console_scripts =
# extra deps are included here mainly for local/venv installs using pip
# otherwise deps are handled via tox, ci config files or pkg managers
[options.extras_require]
demos =
diskcache

doc =
sphinx
sphinx_git
Expand Down
1 change: 1 addition & 0 deletions src/ymltoxml/data/oscal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default_ssg_glob: 'nist_rhcos4.yml'
default_ssg_path: 'ext/content/controls'
default_lookup_key: 'controls'
default_csv_hdr: null
cvs_hdr_appends: []
input_format: null
output_format: 'json'
preserve_quotes: true
Expand Down
69 changes: 52 additions & 17 deletions src/ymltoxml/oscal.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from pathlib import Path

from munch import Munch
from natsort import os_sorted

from nested_lookup import nested_lookup

from .templates import xform_id
Expand Down Expand Up @@ -43,6 +45,8 @@ def load_input_data(filepath, prog_opts, use_ssg=False, debug=False):
if use_ssg:
prog_opts['default_content_path'] = prog_opts['default_ssg_path']
prog_opts['default_profile_glob'] = prog_opts['default_ssg_glob']
else:
print(f"Loading content from: {prog_opts['default_content_path']}")

ctl_files = get_filelist(
prog_opts['default_content_path'],
Expand All @@ -56,7 +60,8 @@ def load_input_data(filepath, prog_opts, use_ssg=False, debug=False):
print(f'Using control file(s): {file_tuples}')

for path in file_tuples:
print(f'Extracting IDs from {path[1]}')
if debug:
print(f'Extracting IDs from {path[1]}')

try:
indata = text_file_reader(Path(path[0]), prog_opts)
Expand All @@ -67,7 +72,7 @@ def load_input_data(filepath, prog_opts, use_ssg=False, debug=False):
path_ids = [
xform_id(x)
for x in nested_lookup('id', indata)
if x.islower() and '_' not in x
if x.islower() and '_' not in x and '-' in x
]
else:
path_ids = [x for x in nested_lookup('id', indata) if x.isupper()]
Expand All @@ -80,39 +85,60 @@ def load_input_data(filepath, prog_opts, use_ssg=False, debug=False):
if debug:
print(f'ID queue Front: {id_queue[0]}')
print(f'Control queue Front: {ctl_queue[0]}')
print(f"\nUser control Ids -> {len(in_ids)}")

return in_ids, id_queue, ctl_queue


def process_data(filepath, prog_opts, use_ssg=False, debug=False):
def process_data(filepath, prog_opts, uargs):
"""
Process inputs, print some output.
"""
input_ids, id_queue, ctls = load_input_data(
filepath, prog_opts, use_ssg=use_ssg, debug=debug
filepath, prog_opts, use_ssg=uargs.ssg, debug=uargs.verbose
)
print(f"\nInput control Ids -> {len(input_ids)}")
id_set_match(input_ids, id_queue, debug=debug)
if uargs.verbose:
print(f"\nInput control Ids -> {len(input_ids)}")
id_set_match(input_ids, id_queue, uargs=uargs)


def id_set_match(in_ids, id_q, debug=False):
def id_set_match(in_ids, id_q, uargs):
"""
Quick set match analysis of ID sets.
"""
in_set = SortedSet(in_ids)
q_size = len(id_q)

for _ in range(len(id_q)):
for _ in range(q_size):
pname, id_list = id_q.popleft()
print(f"\n{pname} control IDs -> {len(id_list)}")
if uargs.verbose:
print(f"\n{pname} control IDs -> {len(id_list)}")
id_set = SortedSet(id_list)

print(f"Input set is in {pname} set: {id_set > in_set}")
if uargs.verbose:
print(f"Input set is in {pname} set: {id_set > in_set}")
common_set = id_set & in_set
print(f"Num input controls in {pname} set -> {len(common_set)}")
if uargs.verbose:
print(f"Num input controls in {pname} set -> {len(common_set)}")
not_in_set = in_set - id_set
print(f"Num input controls not in {pname} set -> {len(not_in_set)}")
print(f"Input control IDs not in {pname} set: {list(not_in_set)}")
if uargs.verbose:
print(f"Num input controls not in {pname} set -> {len(not_in_set)}")
print(f"Input control IDs not in {pname} set: {list(not_in_set)}")

# this requires a single filename in the search glob resulting in a control
# ID queue size of 1 (as well as the sort-ids argument)
if q_size == 1 and uargs.sort:
sort_in = (
[xform_id(x) for x in common_set] if in_ids[0].isupper() else common_set
)
sort_out = (
[xform_id(x) for x in not_in_set] if in_ids[0].isupper() else not_in_set
)
print(f'\nInput IDs in {pname}:')
for ctl in os_sorted(sort_in):
print(ctl)
print(f'\nInput IDs not in {pname}:')
for ctl in os_sorted(sort_out):
print(ctl)


def self_test(ucfg):
Expand Down Expand Up @@ -171,21 +197,28 @@ def main(argv=None): # pragma: no cover
action='store_true',
dest="ssg",
)
parser.add_argument(
'-s',
'--sort-ids',
help='output report sorted IDs',
action='store_true',
dest="sort",
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
help='display more processing info',
)
parser.add_argument(
'-d',
'-D',
'--dump-config',
help='dump active configuration to stdout and exit',
action='store_true',
dest='dump',
)
parser.add_argument(
'-s',
'-S',
'--save-config',
action='store_true',
dest="save",
Expand Down Expand Up @@ -224,8 +257,10 @@ def main(argv=None): # pragma: no cover
print(f"Path to content: {cfg.default_content_path}")
print(f"Content file glob: {cfg.default_profile_glob}")
print(f"Input file: {infile}")
else:
print(f"Processing input file: {infile}")

process_data(infile, popts, args.ssg, args.verbose)
process_data(infile, popts, args)


if __name__ == "__main__":
Expand Down
7 changes: 4 additions & 3 deletions src/ymltoxml/yasort.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ def process_inputs(filepath, prog_opts, debug=False):
"""

fpath = Path(filepath)
outdir = Path(prog_opts['output_dirname'])
opath = outdir.joinpath(fpath.stem)
opath = Path(prog_opts['output_dirname']).joinpath(fpath.stem)

if not fpath.exists():
print(f'Input file {fpath} not found! Skipping...')
Expand Down Expand Up @@ -121,7 +120,9 @@ def main(argv=None): # pragma: no cover
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='Sort YAML lists and write new files.',
)
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
parser.add_argument(
"--version", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument(
"-v",
"--verbose",
Expand Down
3 changes: 0 additions & 3 deletions tests/data/OE-expanded-profile-all-ids.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,10 @@ CM-5(1)
CM-5(3)
CM-5(6)
CM-6
CM-6 b
CM-6(a)
CM-6(b)
CM-6(c)
CM-6(d)
CM-6(iv)
CM-7
CM-7(1)
CM-7(2)
Expand All @@ -100,7 +98,6 @@ CM-7(a)
CM-7(b)
CM-8(3)
CM-8(3)(a)
CM6(a)
IA-11
IA-2
IA-2(1)
Expand Down
70 changes: 59 additions & 11 deletions tests/test_oscal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
default_ssg_glob: 'nist_ocp4.yml'
default_ssg_path: 'ext/content/controls'
default_lookup_key: 'controls'
default_csv_hdr: null
cvs_hdr_appends: []
input_format: null
output_format: 'json'
preserve_quotes: true
Expand Down Expand Up @@ -67,39 +69,85 @@
- moderate
"""


def test_process_data(capfd, tmp_path):
args_obj = Munch.fromDict(
{
"sort": False,
"ssg": True,
"verbose": False,
}
)

testdata = [
(
False,
False,
False,
"Loading content",
),
(
False,
False,
True,
"Input control IDs",
),
]

testdata2 = [
(
False,
True,
True,
"test2.yaml",
),
(
True,
True,
True,
"Normalized input",
),
]


@pytest.mark.parametrize("a,b,c,expected", testdata)
def test_process_data(a, b, c, expected, capfd, tmp_path):
args_obj.sort = a
args_obj.ssg = b
args_obj.verbose = c
yaml = StrYAML()
infile = 'tests/data/OE-expanded-profile-ids.txt'
data_file = tmp_path / "test.yaml"
data_file.write_text(yaml_str, encoding="utf-8")

popts = yaml.load(defconfig_str)
popts['default_content_path'] = tmp_path
popts['default_profile_glob'] = 'test*.yaml'
popts['default_profile_glob'] = 'test.yaml'

process_data(infile, popts, False, False)
process_data(infile, popts, False, True)
process_data(infile, popts, args_obj)
process_data(infile, popts, args_obj)
out, err = capfd.readouterr()
print(out)
assert 'Input control IDs' in out
assert expected in out


def test_process_data_alt(capfd, tmp_path):
@pytest.mark.parametrize("a,b,c,expected", testdata2)
def test_process_data_alt(a, b, c, expected, capfd, tmp_path):
args_obj.sort = a
args_obj.ssg = b
args_obj.verbose = c
yaml = StrYAML()
infile = 'tests/data/OE-expanded-profile-ids.txt'
data_file = tmp_path / "test2.yaml"
data_file.write_text(yaml_str, encoding="utf-8")

popts = yaml.load(defconfig_str)
popts['default_ssg_path'] = tmp_path
popts['default_ssg_glob'] = 'test*.yaml'
popts['default_ssg_glob'] = 'test2.yaml'

process_data(infile, popts, True, False)
process_data(infile, popts, True, True)
process_data(infile, popts, args_obj)
process_data(infile, popts, args_obj)
out, err = capfd.readouterr()
print(out)
assert 'Input control IDs' in out
assert expected in out


def test_self_test(capfd):
Expand Down
Loading

0 comments on commit 755318b

Please sign in to comment.