# core

> a library that wraps jsonschema's `validate()` to validate a configuration dict (`.env` in particular)

In [None]:
#| default_exp core

In [None]:
#| export
from nbdev.showdoc import *
from fastcore.test import *
from unittest.mock import patch

import dotenv
import json
import os
import jsonschema
import logging
from jsonschema import validate, ValidationError
from typing import Union
from fs.base import FS
from fs.osfs import OSFS


logger = logging.getLogger(__name__)

In [None]:
#| export
def coerce_primitive_values(json_schema: dict, data: dict) -> dict:
    '''
    given a JSON schema dict, return a dict where the values that have
    primitive types (`string`, `integer`, `number`, `boolean`) as described
    in the schema are converted to the corresponding types from `str`
    
    :param json_schema: expected (but not validated) to be a json schema
    :param data: the data (e.g. dotenv, os.environ) to extract from
    :return: coerced dict
    '''
    if not isinstance(data, dict):
        return data
    out = data.copy()
    # use the json schema to convert types on known properties
    for property_name, property_schema in json_schema['properties'].items():
        property_type = property_schema.get('type')
        property_value = data.get(property_name)
        if property_value is None:
            continue
        # property_value should be a string at this point
        try:
            if property_type == 'integer':
                out[property_name] = int(data[property_name])
            elif property_type == 'number':
                out[property_name] = float(data[property_name])
            elif property_type == 'boolean':
                parsed_boolean = None
                if property_value.lower() in (
                    'true', 'yes', 'y', '1', 'on',
                ):
                    parsed_boolean = True
                elif property_value.lower() in (
                    'false', 'no', 'n', '0', 'off',
                ):
                    parsed_boolean = False
                out[property_name] = parsed_boolean
        except:
            # leave any validation error descriptions to json schema
            continue
    return out

In [None]:
#| hide
# everything is correct
test_eq(coerce_primitive_values({
    'type': 'object',
    'properties': {
        'STRING': { 'type': 'string' },
        'NUMBER': { 'type': 'number' },
        'INTEGER': { 'type': 'number' },
        'BOOLEAN': { 'type': 'boolean' },
    },
}, {
    'STRING': 'asdf',
    'NUMBER': '1232529.56',
    'INTEGER': '98758585858232',
    'BOOLEAN': 'TRUE',
}), {
    'STRING': 'asdf',
    'NUMBER': 1232529.56,
    'INTEGER': 98758585858232,
    'BOOLEAN': True,
})

In [None]:
#|hide
# undeclared types do not get converted
test_eq(coerce_primitive_values({
    'type': 'object',
    'properties': {
        'STRING': { 'type': 'string' },
        'SOMETHING_ELSE': {},
    },
}, {
    'NUMBER': '1232529.56',
    'INTEGER': '98758585858232',
    'BOOLEAN': 'TRUE',
    'SOMETHING_ELSE': {'a': 1, 'b': ['c', 3]},
}), {
    'NUMBER': '1232529.56',
    'INTEGER': '98758585858232',
    'BOOLEAN': 'TRUE',
    'SOMETHING_ELSE': {'a': 1, 'b': ['c', 3]},
})

In [None]:
#| export
def extract_declared_items(json_schema: dict, data: dict) -> dict:
    '''
    given a JSON schema dict, return a dict where
    - all keys that are not declared in the schema are removed
    - all keys that are declared in the schem are present;
      if a key is declared in the schema with a default,
      BUT NOT present in the original data, it will be added
      using the 'default' value
      
    :param json_schema: expected (but not validated) to be a json schema
    :param data: the data (e.g. dotenv, os.environ) to extract from
    :return: extracted dict
    '''
    properties = json_schema['properties']
    out = {key: value for (key, value) in data.items() if key in properties}
    for required_property, property_schema in properties.items():
        if required_property not in out and 'default' in property_schema:
            out[required_property] = property_schema['default']
    return out

In [None]:
#| hide
test_eq(extract_declared_items({
    'type': 'object',
    'properties': {
        'STRING': { 'type': 'string' },
        'SOMETHING_ELSE': {},
        'HAS_DEFAULT': { 'type': 'boolean', 'default': 'NO COERCION!' },
    },
}, {
    'NUMBER': '1232529.56',
    'INTEGER': '98758585858232',
    'BOOLEAN': 'TRUE',
    'SOMETHING_ELSE': {'a': 1, 'b': ['c', 3]},
}), {
    'SOMETHING_ELSE': {'a': 1, 'b': ['c', 3]},
    'HAS_DEFAULT': 'NO COERCION!',
})

In [None]:
#| export
class ConfigValidatorException(Exception):
    
    def __init__(self, errors):
        super().__init__('config failed to validate against JSON schema')
        self.errors = errors


class ConfigValidator(object):

    DEFAULT_STORAGE_DRIVER: FS = OSFS('.')
    
    CONFIG_VALIDATOR_JSON_SCHEMA_ENVVAR_NAME = 'CONFIG_VALIDATOR_JSON_SCHEMA'

    @classmethod
    def load_json(cls, json_source: Union[str, dict]=None, storage_driver: FS = None) -> dict:
        '''
        convenience method to return a dict from either
        a file path or an already-loaded dict
        '''
        storage_driver = storage_driver or cls.DEFAULT_STORAGE_DRIVER
        if isinstance(json_source, str):
            with storage_driver.open(json_source) as ifile:
                return json.load(ifile)
        elif isinstance(json_source, dict):
            return json_source

    @classmethod
    def get_default_json_schema(cls, storage_driver: FS = None) -> dict:
        storage_driver = storage_driver or cls.DEFAULT_STORAGE_DRIVER
        if cls.CONFIG_VALIDATOR_JSON_SCHEMA_ENVVAR_NAME in os.environ:
            expected_json_schema_path = \
                os.environ[cls.CONFIG_VALIDATOR_JSON_SCHEMA_ENVVAR_NAME]
            with storage_driver.open(expected_json_schema_path) as ifile:
                return json.load(ifile)
        return None

    def __init__(self, json_schema: Union[str, dict]=None, storage_driver: FS=None):
        '''
        :param json_schema: a str path to a json schema file, or a schema in dict form
        
        if no value is provided, it will fall back to looking for
        an environment variable corresponding to the class variable
        `CONFIG_VALIDATOR_JSON_SCHEMA_ENVVAR_NAME`
        to find a JSON schema file
        '''
        self.storage_driver = storage_driver or self.__class__.DEFAULT_STORAGE_DRIVER
        if isinstance(json_schema, (str, dict)):
            self._json_schema = self.__class__.load_json(json_schema, storage_driver=self.storage_driver)
        elif (default_schema := self.__class__.get_default_json_schema(storage_driver=self.storage_driver)):
            self._json_schema = default_schema
        else:
            raise Exception('did not receive or find a JSON schema')

    def load_config(self, config: dict):
        extracted_config = extract_declared_items(self._json_schema, config)
        coerced_config = coerce_primitive_values(self._json_schema, extracted_config)
        validator = jsonschema.Draft4Validator(self._json_schema)
        errors = list(validator.iter_errors(coerced_config))
        if errors:
            for error in errors:
                logger.error(f'{error.json_path}:\t{error.message}')
            raise ConfigValidatorException(errors)
        return coerced_config
    
    @classmethod
    def load_validated_config(cls, json_schema: Union[str, dict], config: dict, **kwargs):
        return cls(json_schema, **kwargs).load_config(config)

    @classmethod
    def load_validated_environment(cls, json_schema: Union[str, dict]=None, **kwargs):
        return cls.load_validated_config(json_schema, dict(os.environ), **kwargs)
        
    @classmethod
    def load_dotenv(cls,
                    json_schema: Union[str, dict]=None,
                    dotenv_path: str=None,
                    storage_driver: FS=None,
                    override: bool=False,
                   ):
        '''
        :param override: set variables into os.environ where applicable; i.e.
        - if set in os.environ already and valid, leave alone
        - if not set in os.environ already, read from .env or schema default
        '''
        
        storage_driver = storage_driver or cls.DEFAULT_STORAGE_DRIVER
        if dotenv_path is None:
            maybe_dotenv_path = dotenv.find_dotenv()  # '' if not exist
            if maybe_dotenv_path:
                logger.debug(f'using detected dotenv path; {maybe_dotenv_path}')
                dotenv_path = maybe_dotenv_path
        if dotenv_path:
            with storage_driver.open(dotenv_path) as ifile:
                config = dotenv.dotenv_values(stream=ifile)
        else:
            config = {}
        loaded_config = config.copy()
        
        json_schema_dict = cls.load_json(json_schema, storage_driver=storage_driver) or {}
        for key in json_schema_dict.get('properties', []):
            if key not in os.environ:
                continue
            if key in config and config[key] != os.environ[key]:
                logger.debug(f'os.environ key "{key}" overriding value present in {dotenv_path}')
            config[key] = os.environ[key]
        validated_config = cls.load_validated_config(
            json_schema or cls.get_default_json_schema(storage_driver=storage_driver),
            config, storage_driver=storage_driver)
        
        if override:
            for key, value in validated_config.items():
                if key in os.environ:
                    continue
                os.environ[key] = str(value)
                
        return validated_config

In [None]:
#| hide
# assume we're only dealing with primitive types for now
example_properties_schema = {
    'title': 'example-properties-schema',
    'description': 'example JSON Schema for demonstration and testing in documentation using nbdev',
    'type': 'object',
    'required': [
        'string_value_with_enum',
        'MY_INTEGER_VALUE',
        'A_NUMERIC_VALUE',
    ],
    'properties': {
        # string
        'SOME_STRING_VALUE_funnyCaSe345': {
            'type': 'string',
        },
        'string_value_with_enum': {
            'description': 'this one is in the <required> list!',
            'type': 'string',
            'enum': [
                'it', 'can', 'only', 'be', 'one', 'of', 'these',
            ],
        },
        '_____A_STRING_VALUE____with_default__': {
            'description': 'values with a default get hydrated using the default if not present in input',
            'type': 'string',
            'default': 'underscores_and spaces',
        },
        
        # integer
        'MY_INTEGER_VALUE': {
            'type': 'integer',
            'description': 'not used for validation, but your benefit',
        },
        
        # number
        'A_NUMERIC_VALUE': {
            'type': 'number',
            'description': 'continuous and real and reasonable',
            'minimum': 22,
            'maximum': 33333.4,
        },
        
        # boolean
        'true_or_false__but_also_nothing': {
            'type': 'boolean',
        }
    },
}

In [None]:
#| hide
test_fail(ConfigValidator.load_validated_config, args=(example_properties_schema, {}))

$:	'string_value_with_enum' is a required property
$:	'MY_INTEGER_VALUE' is a required property
$:	'A_NUMERIC_VALUE' is a required property


In [None]:
#| hide
test_eq(ConfigValidator.load_validated_config(example_properties_schema, {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': '1122334',
    'A_NUMERIC_VALUE': '24.89',
}), {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': 1122334,
    'A_NUMERIC_VALUE': 24.89,
    '_____A_STRING_VALUE____with_default__': 'underscores_and spaces',
})

In [None]:
#| hide
test_eq(ConfigValidator.load_validated_config(example_properties_schema, {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': '-85',
    'A_NUMERIC_VALUE': '1.23e4',
}), {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': -85,
    'A_NUMERIC_VALUE': 12300.0,
    '_____A_STRING_VALUE____with_default__': 'underscores_and spaces',
})

In [None]:
#| hide
test_fail(ConfigValidator.load_validated_config,
          'should fail because string_value_with_enum is outside enum',
          args=(example_properties_schema, {
    'string_value_with_enum': 'blah-blah',
    'MY_INTEGER_VALUE': '1122334',
    'A_NUMERIC_VALUE': '24.89',
}))

$.string_value_with_enum:	'blah-blah' is not one of ['it', 'can', 'only', 'be', 'one', 'of', 'these']


In [None]:
#| hide
test_fail(ConfigValidator.load_validated_config,
          'should fail because MY_INTEGER_VALUE is not an integer',
          args=(example_properties_schema, {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': '5555.999',
    'A_NUMERIC_VALUE': '24.89',
}))

$.MY_INTEGER_VALUE:	'5555.999' is not of type 'integer'


In [None]:
#| hide
test_fail(ConfigValidator.load_validated_config,
          'should fail because A_NUMERIC_VALUE is not numeric',
          args=(example_properties_schema, {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': '1122334',
    'A_NUMERIC_VALUE': 'WHAT???',
}))

$.A_NUMERIC_VALUE:	'WHAT???' is not of type 'number'


In [None]:
#| hide
test_fail(ConfigValidator.load_validated_config,
          'should fail beacuse A_NUMERIC_VALUE is less than the allowed minimum',
          args=(example_properties_schema, {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': '1122334',
    'A_NUMERIC_VALUE': '13',
}))

$.A_NUMERIC_VALUE:	13.0 is less than the minimum of 22


In [None]:
#| hide
from fs.memoryfs import MemoryFS

In [None]:
#| hide
# test ability to override the storage driver (memoryfs here)

memfs = MemoryFS()

memfs.makedirs('extra-long-directory-place', recreate=True)
with memfs.open('extra-long-directory-place/schema.json', 'w') as ofile:
    ofile.write(json.dumps(example_properties_schema))
    os.environ['CONFIG_VALIDATOR_JSON_SCHEMA'] = ofile.name

validator = ConfigValidator(storage_driver=memfs)
validated_config = validator.load_config({
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': '-85',
    'A_NUMERIC_VALUE': '1.23e4',
})

test_eq(validated_config, {
    'string_value_with_enum': 'these',
    'MY_INTEGER_VALUE': -85,
    'A_NUMERIC_VALUE': 12300.0,
    '_____A_STRING_VALUE____with_default__': 'underscores_and spaces',
})

In [None]:
#| hide
# test loading dotenv from an arbitrary file

memfs.makedirs('special-bespoke-location', recreate=True)
with memfs.open('special-bespoke-location/my-own.env', 'w') as ofile:
    ofile.write('\n'.join([
        'string_value_with_enum=only',
        'MY_INTEGER_VALUE=9989998',
        'A_NUMERIC_VALUE=1167.89',
    ]))

validated_dotenv = validator.load_dotenv(
    dotenv_path='special-bespoke-location/my-own.env',
    storage_driver=memfs,
)

test_eq(validated_dotenv, {
    'string_value_with_enum': 'only',
    'MY_INTEGER_VALUE': 9989998,
    'A_NUMERIC_VALUE': 1167.89,
    '_____A_STRING_VALUE____with_default__': 'underscores_and spaces',
})

In [None]:
#| hide
# test using custom json schema

with memfs.open('foo.schema.json', 'w') as ofile:
    ofile.write(json.dumps({
        'type': 'object',
        'properties': {
            'A_NUMERIC_VALUE': { 'type': 'number' },
        }
    }))
validated_dotenv = ConfigValidator.load_dotenv(
    json_schema='foo.schema.json',
    dotenv_path='special-bespoke-location/my-own.env',
    storage_driver=memfs,
)
test_eq(validated_dotenv, {
    'A_NUMERIC_VALUE': 1167.89,
})

test_fail(validator.load_dotenv, kwargs={'dotenv_path': 'non-existent-location-own.env'})

In [None]:
#| hide
# test data load precedence

with memfs.open('bar.schema.json', 'w') as ofile:
    ofile.write(json.dumps({
        'type': 'object',
        'properties': {
            'A_VALUE_TO_OVERRIDE': { 'type': 'string', 'default': 'change me' },
        }
    }))
    
memfs.makedirs('precedence-test', recreate=True)
with memfs.open('precedence-test/.env', 'w') as ofile:
    ofile.write('\n'.join([
        'A_VALUE_TO_OVERRIDE=in dotenv',
    ]))

os.environ.pop('A_VALUE_TO_OVERRIDE', None)
validated_dotenv = ConfigValidator.load_dotenv(
    json_schema='bar.schema.json',
    dotenv_path='precedence-test/.env',
    storage_driver=memfs,
)
test_eq(validated_dotenv, {
    'A_VALUE_TO_OVERRIDE': 'in dotenv',
})

os.environ['A_VALUE_TO_OVERRIDE'] = 'overrode from environ'
test_eq(ConfigValidator.load_dotenv(
    json_schema='bar.schema.json',
    storage_driver=memfs,
), {
    'A_VALUE_TO_OVERRIDE': 'overrode from environ',
})

test_eq(ConfigValidator.load_dotenv(
    json_schema='bar.schema.json',
    dotenv_path='precedence-test/.env',
    storage_driver=memfs,
), {
    'A_VALUE_TO_OVERRIDE': 'overrode from environ',
})

os.environ.pop('A_VALUE_TO_OVERRIDE', None)
test_eq(ConfigValidator.load_dotenv(
    json_schema='bar.schema.json',
    dotenv_path='precedence-test/.env',
    storage_driver=memfs,
), {
    'A_VALUE_TO_OVERRIDE': 'in dotenv',
})

In [None]:
#| hide
# test propagating values into os.environ depending on flag

def test_propagate_values_into_os_environ():
    
    with memfs.open('flag.schema.json', 'w') as ofile:
        ofile.write(json.dumps({
            'type': 'object',
            'properties': {
                'UNINVITED_GUEST': { 'type': 'string', 'default': 'from schema' },
                'convert_me': { 'type': 'integer', 'default': 1 },
            }
        }))
    
    with memfs.open('.flag-env1', 'w') as ofile:
        ofile.write('')
    
    with memfs.open('.flag-env2', 'w') as ofile:
        ofile.write('\n'.join([
            'UNINVITED_GUEST=from dotenv',
        ]))
    
    with memfs.open('.flag-env3', 'w') as ofile:
        ofile.write('\n'.join([
            'UNINVITED_GUEST=I should be ignored!',
        ]))

    mock_env = {"already_here": "no touch me"}

    with patch.dict('os.environ', mock_env):
        
        # don't update os.environ
        validated_config1 = ConfigValidator.load_dotenv(
            json_schema='flag.schema.json',
            dotenv_path='.flag-env1',
            storage_driver=memfs,
        )
        test_eq(os.environ.get("already_here"), "no touch me")
        test_eq(os.environ.get("UNINVITED_GUEST"), None)
        test_eq(validated_config1.get("UNINVITED_GUEST"), "from schema")
        
        # update os.environ, loading from schema
        validated_config2 = ConfigValidator.load_dotenv(
            json_schema='flag.schema.json',
            dotenv_path='.flag-env1',
            storage_driver=memfs,
            override=True
        )
        test_eq(validated_config1, validated_config2)
        test_eq(os.environ.get("UNINVITED_GUEST"), "from schema")
        test_eq(validated_config1.get("convert_me"), 1)
        test_eq(os.environ.get("convert_me"), "1")
        
        os.environ.pop('UNINVITED_GUEST')
        # update os.environ, loading from dotenv
        validated_config3 = ConfigValidator.load_dotenv(
            json_schema='flag.schema.json',
            dotenv_path='.flag-env2',
            storage_driver=memfs,
            override=True
        )
        test_eq(os.environ.get("UNINVITED_GUEST"), "from dotenv")
        
        # os.environ is set; takes precedence
        validated_config3 = ConfigValidator.load_dotenv(
            json_schema='flag.schema.json',
            dotenv_path='.flag-env3',
            storage_driver=memfs,
            override=True
        )
        test_eq(os.environ.get("UNINVITED_GUEST"), "from dotenv")
    
test_propagate_values_into_os_environ()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()