Skip to content

Commit

Permalink
Avoid redundant fetch by creating map first
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshi-1224 committed Jul 9, 2019
1 parent d8538ae commit 9b36b0c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 64 deletions.
8 changes: 4 additions & 4 deletions examples/kubernetes/inventory/classes/component/mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ parameters:
output_path: .
output_type: yaml
validate:
- kind: secret # temporarily replaced with 'deployment' during test
- type: kubernetes
output_paths:
- manifests/mysql_secret.yml
type: kubernetes
kind: secret # temporarily replaced with 'deployment' during test
version: 1.14.0 # optional, defaults to 1.14.0
- kind: service
- type: kubernetes
output_paths:
- manifests/mysql_service_jsonnet.yml
- manifests/mysql_service_simple.yml
type: kubernetes
kind: service
version: 1.14.0
mysql:
image: mysql:latest
Expand Down
102 changes: 71 additions & 31 deletions kapitan/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import sys
import multiprocessing
import tempfile
from collections import defaultdict

import jsonschema
import yaml
import time
Expand All @@ -39,7 +41,7 @@

from reclass.errors import NotFoundError, ReclassException

from kapitan.validator.kubernetes_validator import KubernetesManifestValidator
from kapitan.validator.kubernetes_validator import KubernetesManifestValidator, DEFAULT_KUBERNETES_VERSION

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,9 +109,9 @@ def compile_targets(inventory_path, search_paths, output_path, parallel, targets

# validate the compiled outputs
if kwargs.get('validate', False):
worker = partial(schema_validate_output, search_path=compile_path,
cache_dir=kwargs.get('schemas_path', './schemas'))
[p.get() for p in pool.imap_unordered(worker, target_objs) if p]
validate_map = create_validate_mapping(target_objs, compile_path)
worker = partial(schema_validate_kubernetes_output, cache_dir=kwargs.get('schemas_path', './schemas'))
[p.get() for p in pool.imap_unordered(worker, validate_map.items()) if p]

# Save inventory and folders cache
save_inv_cache(compile_path, targets)
Expand Down Expand Up @@ -389,12 +391,36 @@ def valid_target_obj(target_obj):
"type": "object",
"properties": {
"output_paths": {"type": "array"},
"type": {"type": "string"},
"type": {
"type": "string",
"enum": ["kubernetes"]
},
"kind": {"type": "string"},
"version": {"type": "string"},
},
"required": ["output_paths", "type"],
"minItems": 1,
"allOf": [
{
"if": {
"properties": {"type": {"const": "kubernetes"}}
},
"then": {
"properties": {
"type": {
},
"kind": {
},
"output_paths": {
},
"version": {
}
},
"additionalProperties": False,
"required": ["kind"],
}
},
]
}
},
"dependencies": {
Expand Down Expand Up @@ -476,12 +502,14 @@ def schema_validate_compiled(targets, compiled_path, inventory_path, schema_cach
os.makedirs(schema_cache_path)
logger.info("created schema-cache-path {}".format(schema_cache_path))

worker = partial(schema_validate_output, search_path=compiled_path, cache_dir=schema_cache_path)
worker = partial(schema_validate_kubernetes_output, cache_dir=schema_cache_path)
pool = multiprocessing.Pool(parallel)

try:
target_objs = load_target_inventory(inventory_path, targets)
[p.get() for p in pool.imap_unordered(worker, target_objs) if p]
validate_map = create_validate_mapping(target_objs, compiled_path)

[p.get() for p in pool.imap_unordered(worker, validate_map.items()) if p]
pool.close()

except ReclassException as e:
Expand All @@ -505,32 +533,44 @@ def schema_validate_compiled(targets, compiled_path, inventory_path, schema_cach
pool.join()


def schema_validate_output(target_obj, search_path, cache_dir):
def create_validate_mapping(target_objs, compiled_path):
"""
validates compiled output for target_obj, as specified in kapitan.validate
search_path is where compiled outputs are stored, usually ./compiled
cache_dir is a directory where schema files will be cached
creates mapping of (kind, version) tuple to output_paths across different targets
this is required to avoid redundant schema fetch when multiple targets use the same schema for validation
"""
os.makedirs(cache_dir, exist_ok=True)
target_name = target_obj['vars']['target']
kubernetes_validator = KubernetesManifestValidator(cache_dir)
try:
# if validate options are specified for this target
validate_options = target_obj['validate']
for v in validate_options:
validate_type = v['type']
output_paths = v.pop('output_paths')
validate_files_map = defaultdict(list)
for target_obj in target_objs:
target_name = target_obj['vars']['target']
if 'validate' not in target_obj:
logger.debug("target '{}' does not have 'validate' parameter. skipping".format(target_name))
continue

for validate_item in target_obj['validate']:
validate_type = validate_item['type']
if validate_type == 'kubernetes':
for output_path in output_paths:
file_path = os.path.join(search_path, target_name, output_path)
v['output_path'] = output_path
if not os.path.exists(file_path):
logger.info("Validation: {} does not exist. Skipping".format(file_path))
else:
kubernetes_validator.validate(file_path, target_name=target_name, **v)
kind_version_pair = (validate_item['kind'],
validate_item.get('version', DEFAULT_KUBERNETES_VERSION))
for output_path in validate_item['output_paths']:
full_output_path = os.path.join(compiled_path, target_name, output_path)
if not os.path.isfile(full_output_path):
logger.warning("{} does not exist for target '{}'. skipping".
format(full_output_path, target_name))
continue
validate_files_map[kind_version_pair].append(full_output_path)
else:
logger.error("type '{}' is not supported".format(validate_type))
logger.warning('type {} is not supported for validation. skipping'.format(validate_type))

return validate_files_map


def schema_validate_kubernetes_output(validate_data, cache_dir):
"""
validates given files according to kubernetes manifest schemas
schemas are cached from/to cache_dir
validate_data must be of structure ((kind, version), validate_files)
"""
(kind, version), validate_files = validate_data
KubernetesManifestValidator(cache_dir).validate(validate_files, kind=kind, version=version)



except KeyError:
logger.debug("target '{}' does not have validate parameter. skipping".format(target_name))
pass
41 changes: 18 additions & 23 deletions kapitan/validator/kubernetes_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,29 @@ class KubernetesManifestValidator(Validator):
def __init__(self, cache_dir, **kwargs):
super().__init__(cache_dir, **kwargs)

def validate(self, validate_instance, **kwargs):
def validate(self, validate_paths, **kwargs):
"""
validates validate_obj against json schema as specified by
'kind' and 'version' in kwargs. if validate_obj is of type str, it will be
used as the file path to the obj to be validated.
raises KubernetesManifestValidationError if validation fails, listing all the errors
inside validate_obj
validates manifests at validate_paths against json schema as specified by
'kind' and 'version' in kwargs.
raises KubernetesManifestValidationError encountering the first validation error
"""
output_path = kwargs.get('output_path', None)
if isinstance(validate_instance, str):
full_file_path = validate_instance
with open(full_file_path, 'r') as fp:
validate_instance = yaml.safe_load(fp.read())
target_name = kwargs.get('target_name', '')
kind = kwargs.get('kind')
version = kwargs.get('version', DEFAULT_KUBERNETES_VERSION)
schema = self._get_schema(kind, version)
v = jsonschema.Draft4Validator(schema)
errors = sorted(v.iter_errors(validate_instance), key=lambda e: e.path)
if errors:
error_message = 'invalid manifest for target "{}" at {}\n'.format(
kwargs.get('target_name'), output_path)

error_message += '\n'.join(['{} {}'.format(list(error.path), error.message) for error in errors])
raise KubernetesManifestValidationError(error_message)
else:
logger.info("Validation: manifest validation successful for target '{}'"
" for {}".format(target_name, output_path))
validator = jsonschema.Draft4Validator(schema)
for validate_path in validate_paths:
if not os.path.isfile(validate_path):
logger.warning('{} does not exist. skipping'.format(validate_path))
continue
with open(validate_path, 'r') as fp:
validate_instance = yaml.safe_load(fp.read())
errors = sorted(validator.iter_errors(validate_instance), key=lambda e: e.path)
if errors:
error_message = 'invalid manifest for {}\n'.format(validate_path)
error_message += '\n'.join(['{} {}'.format(list(error.path), error.message) for error in errors])
raise KubernetesManifestValidationError(error_message)
else:
logger.info("Validation: manifest validation successful for {}".format(validate_path))

@lru_cache(maxsize=256)
def _get_schema(self, kind, version):
Expand Down
12 changes: 6 additions & 6 deletions tests/test_kubernetes_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ def test_validate(self):
port: 80
targetPort: 9376
"""

service_manifest = yaml.safe_load(service_manifest_string)
self.validator.validate(service_manifest, kind='service', version='1.14.0')
manifest_path = os.path.join(self.cache_dir, 'service_manifest.yaml')
with open(manifest_path, 'w') as fp:
fp.write(service_manifest_string)
self.validator.validate([manifest_path], kind='service', version='1.14.0')

with self.assertRaises(KubernetesManifestValidationError):
self.validator.validate(service_manifest, kind='deployment', version='1.14.0',
file_path='service/manifest', target_name='example')
self.validator.validate([manifest_path], kind='deployment', version='1.14.0')

def test_validate_command_pass(self):
sys.argv = ['kapitan', 'validate', '--schemas-path', self.cache_dir]
try:
main()
except KubernetesManifestValidationError:
except SystemExit:
self.fail("Kubernetes manifest validation error raised unexpectedly")

def test_validate_command_fail(self):
Expand Down

0 comments on commit 9b36b0c

Please sign in to comment.