Skip to content

Commit

Permalink
Cleanup and Type hint to improve readability of ConfigurationManager …
Browse files Browse the repository at this point in the history
…class (config_manager.py) (#1195)

Authored-by: Sohaib Kaidali

* Reorganize ConfigurationManager class
- Added function comments to ALL functions within the class.
- Replaced all single letter variables with more appropriately named variables within the class.
- Added type hinting to ALL objects/variables within the class.
  • Loading branch information
kaidalisohaib committed Nov 14, 2022
1 parent ba7a30d commit 5c59781
Showing 1 changed file with 98 additions and 49 deletions.
147 changes: 98 additions & 49 deletions ivadomed/config_manager.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,61 @@
import json
import collections.abc
from typing import Dict, List, Any, KeysView, Union

from loguru import logger
from pathlib import Path
from ivadomed import utils as imed_utils
from ivadomed.keywords import ConfigKW, LoaderParamsKW, SplitDatasetKW, DataTestingKW
import copy


def update(d, u):
def update(source_dict: dict, destination_dict: dict) -> dict:
"""Update dictionary and nested dictionaries.
Args:
d (dict): Source dictionary that is updated by destination dictionary.
u (dict): Destination dictionary.
source_dict (dict): Source dictionary that is updated by destination dictionary.
destination_dict (dict): Destination dictionary.
Returns:
dict: updated dictionary
"""
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = update(d.get(k, {}), v)
for key, value in destination_dict.items():
if isinstance(value, collections.abc.Mapping):
source_dict[key] = update(source_dict.get(key, {}), value)
else:
# If source dictionary has keys that the destination dict doesn't have, keep these keys
if k in d and isinstance(d[k], collections.abc.Mapping) and not isinstance(v, collections.abc.Mapping):
if key in source_dict and isinstance(source_dict[key], collections.abc.Mapping) and not isinstance(value,
collections.abc.Mapping):
pass
else:
d[k] = v
return d
source_dict[key] = value
return source_dict


def deep_dict_compare(source_dict, dest_dict, keyname=None):
def deep_dict_compare(source_dict: dict, destination_dict: dict, keyname: str = None):
"""Compare and display differences between dictionaries (and nested dictionaries).
Args:
source_dict (dict): Source dictionary.
dest_dict (dict): Destination dictionary.
destination_dict (dict): Destination dictionary.
keyname (str): Key name to indicate the path to nested parameter.
"""
for key in dest_dict:
for key in destination_dict:
if key not in source_dict:
key_str = key if keyname is None else keyname + key
logger.info(f' {key_str}: {dest_dict[key]}')
logger.info(f' {key_str}: {destination_dict[key]}')

else:
if isinstance(dest_dict[key], collections.abc.Mapping):
if isinstance(destination_dict[key], collections.abc.Mapping):
if isinstance(source_dict[key], collections.abc.Mapping):
deep_dict_compare(source_dict[key], dest_dict[key], key + ": ")
deep_dict_compare(source_dict[key], destination_dict[key], key + ": ")
# In case a new dictionary appears in updated file
else:
deep_dict_compare(source_dict, dest_dict[key], key + ": ")
deep_dict_compare(source_dict, destination_dict[key], key + ": ")


def load_json(config_path):
def load_json(config_path: str) -> dict:
"""Load json file content
Args:
Expand All @@ -67,7 +70,7 @@ def load_json(config_path):
return default_config


# To ensure retrocompatibility for parameter changes in configuration file
# To ensure retro-compatibility for parameter changes in configuration file
KEY_CHANGE_DICT = {'UNet3D': ConfigKW.MODIFIED_3D_UNET, 'bids_path': LoaderParamsKW.PATH_DATA,
'log_directory': ConfigKW.PATH_OUTPUT}
KEY_SPLIT_DATASET_CHANGE_LST = ['method', 'center_test']
Expand All @@ -84,68 +87,114 @@ class ConfigurationManager(object):
context_original (dict): Provided configuration file.
config_updated (dict): Updated configuration file.
"""
def __init__(self, path_context):
self.path_context = path_context
self.key_change_dict = KEY_CHANGE_DICT
self.key_split_dataset_change_lst = KEY_SPLIT_DATASET_CHANGE_LST

def __init__(self, path_context: str):
"""
Initialize the ConfigurationManager by validating the given path and loading the file.
Also load the default configuration file.
Args:
path_context (str): Path to configuration file.
"""
self.path_context: str = path_context
self.key_change_dict: Dict[str, str] = KEY_CHANGE_DICT
self.key_split_dataset_change_lst: List[str] = KEY_SPLIT_DATASET_CHANGE_LST
self._validate_path()
default_config_path = str(Path(imed_utils.__ivadomed_dir__, "ivadomed", "config", "config_default.json"))
self.config_default = load_json(default_config_path)
self.context_original = load_json(path_context)
self.config_updated = {}
default_config_path: str = str(Path(imed_utils.__ivadomed_dir__, "ivadomed", "config", "config_default.json"))
self.config_default: dict = load_json(default_config_path)
self.context_original: dict = load_json(path_context)
self.config_updated: dict = {}

@property
def config_updated(self):
def config_updated(self) -> dict:
"""
This function simply returns the attribute `_config_updated`.
Returns:
dict: `_config_updated` attribute
"""
return self._config_updated

@config_updated.setter
def config_updated(self, config_updated):
def config_updated(self, config_updated: dict):
"""
If config_updated is empty we copy the loaded configuration into it and apply some changes (changing keys name,
changing values, deleting key-value pair) to ensure retro-compatibility.
Sets the new config_updated to the attribute `_config_updated`.
Args:
config_updated (dict): The new configuration to set.
"""
if config_updated == {}:
context = copy.deepcopy(self.context_original)
context: dict = copy.deepcopy(self.context_original)
self.change_keys(context, list(context.keys()))
config_updated = update(self.config_default, context)
self.change_keys_values(config_updated[ConfigKW.SPLIT_DATASET], config_updated[ConfigKW.SPLIT_DATASET].keys())
config_updated: dict = update(self.config_default, context)
self.change_keys_values(config_updated[ConfigKW.SPLIT_DATASET],
config_updated[ConfigKW.SPLIT_DATASET].keys())

self._config_updated = config_updated
self._config_updated: dict = config_updated
if config_updated['debugging']:
self._display_differing_keys()

def get_config(self):
def get_config(self) -> dict:
"""Get updated configuration file with all parameters from the default config file.
Returns:
dict: Updated configuration dict.
"""
return self.config_updated

def change_keys(self, context, keys):
for k in keys:
def change_keys(self, context: Union[dict, collections.abc.Mapping], keys: List[str]):
"""
This function changes the name of the keys of the context dictionary, that are also in the `key_change_dict`
attribute, to the values that are associated with them in the `key_change_dict` attribute.
Args:
context (Union[dict, collections.abc.Mapping]): The dictionary to change.
keys (List[str]): The keys in context to consider.
"""
for key_to_change in keys:
# Verify if key is still in the dict
if k in context:
if k == "NumpyToTensor":
del context[k]
if key_to_change in context:
# If the key_to_change is "NumpyToTensor", remove it from the context.
if key_to_change == "NumpyToTensor":
del context[key_to_change]
continue
v = context[k]
value_to_change: Any = context[key_to_change]
# Verify if value is a dictionary
if isinstance(v, collections.abc.Mapping):
self.change_keys(v, list(v.keys()))
if isinstance(value_to_change, collections.abc.Mapping):
self.change_keys(value_to_change, list(value_to_change.keys()))
else:
# Change keys from the key_change_dict
for key in self.key_change_dict:
if key in context:
context[self.key_change_dict[key]] = context[key]
del context[key]

def change_keys_values(self, config_updated, keys):
for k in self.key_split_dataset_change_lst:
if k in keys:
value = config_updated[k]
if k == 'method' and value == "per_center":
def change_keys_values(self, config_updated: dict, keys: List[str]):
"""
This function sets DATA_TESTING->DATA_TYPE to "institution_id" if method value is per_center,
DATA_TESTING->DATA_VALUE to the value of center_test.
It is basically verifying some conditions and set values to the `config_updated`.
Args:
config_updated (dict): Configuration dictionary to update.
keys (List[str]): The keys to consider.
"""
for key_to_change in self.key_split_dataset_change_lst:
if key_to_change in keys:
value: Any = config_updated[key_to_change]
# If the method is per_center, the data_testing->data_type value becomes "institution_id".
if key_to_change == 'method' and value == "per_center":
config_updated[SplitDatasetKW.DATA_TESTING][DataTestingKW.DATA_TYPE] = "institution_id"
if k == 'center_test' and \
# If [the key is center_test], [data_testing->data_type == "institution_id"] and [the value is not None]
# data_testing->data_type value becomes value of config_updated
if key_to_change == 'center_test' and \
config_updated[SplitDatasetKW.DATA_TESTING][DataTestingKW.DATA_TYPE] == "institution_id" and \
value is not None:
value is not None:
config_updated[SplitDatasetKW.DATA_TESTING][DataTestingKW.DATA_VALUE] = value
del config_updated[k]
# Remove the value of the current key
del config_updated[key_to_change]

def _display_differing_keys(self):
"""Display differences between dictionaries.
Expand Down

0 comments on commit 5c59781

Please sign in to comment.