In [1]:
import os
import json
import yaml
from loguru import logger
from rich import print as prt
from typing import Dict, Any, List, Optional

### Helper Functions

In [2]:
def fetch_all_objects(directory: str) -> List[Dict[str, Any]]:
    """Load all JSON files from a directory into a list of dicts."""
    objects = []

    for entry in os.scandir(directory):
        if entry.is_file() and entry.name.endswith(".json"):
            try:
                with open(entry.path, "r") as f:
                    objects.append(json.load(f))
            except json.JSONDecodeError as e:
                logger.error(f"Failed to decode {entry.name}: {e}")

    logger.info(f"Loaded {len(objects)} objects from {directory}")
    return objects

In [3]:
def load_classification_config(config_path: str) -> Dict[str, Any]:
    """
    Load classification configuration from a JSON file.
    """
    try:
        with open(config_path, "r") as file:
            config = yaml.safe_load(file)
            if "restrictions" not in config:
                raise ValueError("Missing 'restrictions' key in classification config")

            logger.info("Classification configuration loaded successfully.")
            return config["restrictions"]
    except FileNotFoundError:
        raise FileNotFoundError(f"Configuration file not found: {config_path}")
    except yaml.YAMLError as e:
        raise ValueError(f"Error parsing YAML file: {e}")

### Policy Handler class code
* The code below is a refactored version of what the policy handler looks like in the old code
* For the most part it keeps the same behavior.
* Main difference is that it's broken out into functions rather than being in a class

In [None]:
def is_classif_too_high(ism: Dict[str, Any], config: Dict[str, Any]) -> bool:
    """
    Determine if an ISM (Information Security Marking) is too highly classified
    based on the provided classification configuration.

    Args:
        ism (Dict[str, Any]): The ISM dictionary to check.
        config (Dict[str, Any]): The classification configuration dictionary.

    Returns:
        bool: True if the ISM is too highly classified or contains forbidden controls/terms, False otherwise.
    """
    # If the ISM is empty, return False
    if not ism:
        logger.warning("ISM is empty, cannot determine classification level.")
        return False

    # Check if the classification is 'TS'
    classif = ism.get("classification")
    if classif == "TS":
        logger.warning(f"Classification {classif}, is too high.")
        return True

    # Check if any forbidden SCI controls are present
    if set(ism.get("sciControls", [])) & set(config["forbidden_sci"]):
        logger.warning(
            f"ISM contains forbidden SCI controls: {ism.get('sciControls')}."
        )
        return True

    # Check if any forbidden dissemination controls are present
    if set(ism.get("disseminationControls", [])) & set(config["forbidden_controls"]):
        logger.warning(
            f"ISM contains forbidden dissemination controls: {ism.get('disseminationControls')}."
        )
        return True

    # Check if the banner contains any forbidden terms
    banner = ism.get("banner", "").upper()
    if any(term in banner for term in config["forbidden_terms"]):
        logger.warning(f"ISM banner contains forbidden terms: {banner}.")
        return True

    # If none of the conditions are met, the classification is acceptable
    return False

In [None]:
def is_more_restrictive(
    ism1: Dict[str, Any], ism2: Dict[str, Any], config: Dict[str, Any]
) -> bool:
    """
    Compare two ISM (Information Security Marking) dictionaries and determine if ism1 is more restrictive than ism2.

    The function evaluates restrictiveness based on the following criteria (in order):
    - Presence of FGI controls in sciControls.
    - Presence of NOFORN in disseminationControls.
    - REL controls and the groups they apply to (special_groups and number of releasable entities).
    - Classification level using the provided hierarchy in config.

    Args:
        ism1 (Dict[str, Any]): The first ISM dictionary to compare.
        ism2 (Dict[str, Any]): The second ISM dictionary to compare.
        config (Dict[str, Any]): The classification configuration dictionary.

    Returns:
        bool: True if ism1 is more restrictive than ism2, False otherwise.
    """
    # If either ISM is missing, niether is more restrictive
    if not ism1 or not ism2:
        return False

    # Check if ism1 has FGI controls while ism2 does not
    ism1_fgi = any(c.startswith("FGI") for c in ism1.get("sciControls", []))
    ism2_fgi = any(c.startswith("FGI") for c in ism2.get("sciControls", []))

    if ism1_fgi != ism2_fgi:
        return ism1_fgi  # FGI controls are more restrictive

    # Check if ism1 has NOFORN while ism2 does not
    if "NOFORN" in ism1.get("disseminationControls", []) and "NOFORN" not in ism2.get(
        "disseminationControls", []
    ):
        return True
    if "NOFORN" in ism2.get("disseminationControls", []) and "NOFORN" not in ism1.get(
        "disseminationControls", []
    ):
        return False

    # Compare REL controls and the groups they apply to
    if "REL" in ism1.get("disseminationControls", []) and "REL" in ism2.get(
        "disseminationControls", []
    ):
        ism1_release = set(ism1.get("releasableTo", []))
        ism2_release = set(ism2.get("releasableTo", []))
        ism1_groups = ism1_release & set(config["special_groups"])
        ism2_groups = ism2_release & set(config["special_groups"])

        # If the groups differ, the one with no groups is less restrictive
        if ism1_groups != ism2_groups:
            return not ism1_groups and ism2_groups

        # Otherwise, compare the number of releasable entities
        return len(ism1_release) < len(ism2_release)

    # Compare classifications using predefined hierarchy
    classif1 = ism1.get("classification", "U")
    classif2 = ism2.get("classification", "U")

    return config["classifications"].get(classif1, 0) > config["classifications"].get(
        classif2, 0
    )

In [None]:
def find_most_restrictive_valid_ism(
    obj: Dict[str, Any], config: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """
    Traverse a nested object to find the most restrictive valid ISM (Information Security Marking).

    The function searches through all dictionaries and lists within the provided object,
    identifies ISMs that are not too highly classified (using is_classif_too_high),
    and returns the most restrictive valid ISM according to the is_more_restrictive function.

    Args:
        obj (Dict[str, Any]): The object to search for ISMs.
        config (Dict[str, Any]): The classification configuration dictionary.

    Returns:
        Optional[Dict[str, Any]]: The most restrictive valid ISM found, or None if no valid ISM exists.
    """
    most_restrictive = None
    stack = [obj]  # Use a stack to traverse the object hierarchy

    while stack:
        item = stack.pop()

        if isinstance(item, dict):
            # Check if the current item has an ISM and if it's valid
            if "ism" in item:
                ism = item.get("ism")
                if ism and not is_classif_too_high(ism, config):
                    # Early exit if 'TS' found
                    if ism.get("classification") == "TS":
                        return ism.copy()

                    if most_restrictive is None or is_more_restrictive(
                        ism, most_restrictive, config
                    ):
                        most_restrictive = ism.copy()

            # Add all dictionary values to the stack
            stack.extend(item.values())
        elif isinstance(item, list):
            # Add all list items to the stack
            stack.extend(item)

    # Print the most restrictive ISM found
    # logger.info(f"Most restrictive valid ISM found: {most_restrictive}")
    return most_restrictive

In [None]:
def apply_restrictions(
    standard_object: Dict[str, Any], config: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """
    Recursively process a standard object to remove or redact data that is too highly classified,
    according to the provided classification configuration.

    The function finds the most restrictive valid ISM (Information Security Marking) within the object,
    then traverses all nested dictionaries and lists, replacing any data with a classification that is
    considered too high with a placeholder. The processed object will include an 'overallClassification'
    field set to the most restrictive valid ISM found.

    Args:
        standard_object (Dict[str, Any]): The object to process and apply restrictions to.
        config (Dict[str, Any]): The classification configuration dictionary.

    Returns:
        Optional[Dict[str, Any]]: The processed object with restricted data redacted, or None if no valid ISM is found.
    """
    # Find the most restrictive valid ISM in the object
    most_restrictive_ism = find_most_restrictive_valid_ism(standard_object, config)

    if not most_restrictive_ism:
        # If no valid ISM is found, return None
        logger.warning("No valid ISM found for object")
        return None

    def process_item(item: Any) -> Any:
        """Recursively process items in the object, removing data with high classifications."""
        if isinstance(item, dict):
            # If the item has an ISM, check if it is too high
            if "ism" in item and is_classif_too_high(item["ism"], config):
                logger.debug(f"Removing item due to high classification: {item}")
                return {"value": None, "ism": None}

            # Process all key-value pairs in the dictionary
            return {k: process_item(v) for k, v in item.items()}
        elif isinstance(item, list):
            # Process all items in the list
            return [process_item(x) for x in item if x is not None]

        return item  # Return the item as is if it's neither a dict nor a list

    # Process the object and add the overall classification
    processed_object = process_item(standard_object)
    processed_object["overallClassification"] = most_restrictive_ism
    return processed_object

### Simplified Policy Handler Code
* This version of the functions are more refined.

In [None]:
def is_classif_too_high(ism: Dict[str, Any], config: Dict[str, Any]) -> bool:
    """Return True if ISM is too highly classified or contains forbidden controls/terms."""
    if not ism:
        logger.warning("ISM is empty, cannot determine classification level.")
        return False

    if (
        ism.get("classification") == "TS"
        or set(ism.get("sciControls", [])) & set(config["forbidden_sci"])
        or set(ism.get("disseminationControls", [])) & set(config["forbidden_controls"])
        or any(
            term in ism.get("banner", "").upper() for term in config["forbidden_terms"]
        )
    ):
        # logger.warning(f"ISM too high or contains forbidden values: {ism}")
        return True

    return False

In [5]:
def is_more_restrictive(
    ism1: Dict[str, Any], ism2: Dict[str, Any], config: Dict[str, Any]
) -> bool:
    """Return True if ism1 is more restrictive than ism2."""
    if not ism1 or not ism2:
        return False

    # FGI controls
    ism1_fgi = any(c.startswith("FGI") for c in ism1.get("sciControls", []))
    ism2_fgi = any(c.startswith("FGI") for c in ism2.get("sciControls", []))
    if ism1_fgi != ism2_fgi:
        return ism1_fgi

    # NOFORN
    ism1_noforn = "NOFORN" in ism1.get("disseminationControls", [])
    ism2_noforn = "NOFORN" in ism2.get("disseminationControls", [])
    if ism1_noforn != ism2_noforn:
        return ism1_noforn

    # REL controls
    rel1 = "REL" in ism1.get("disseminationControls", [])
    rel2 = "REL" in ism2.get("disseminationControls", [])
    if rel1 and rel2:
        ism1_release = set(ism1.get("releasableTo", []))
        ism2_release = set(ism2.get("releasableTo", []))
        ism1_groups = ism1_release & set(config["special_groups"])
        ism2_groups = ism2_release & set(config["special_groups"])
        # More restrictive if ism1 has fewer groups or fewer releasable entities
        if ism1_groups != ism2_groups:
            return len(ism1_groups) < len(ism2_groups)
        return len(ism1_release) < len(ism2_release)

    # Classification hierarchy
    classif1 = ism1.get("classification", "U")
    classif2 = ism2.get("classification", "U")
    return config["classifications"].get(classif1, 0) > config["classifications"].get(
        classif2, 0
    )

In [6]:
def find_most_restrictive_valid_ism(
    obj: Dict[str, Any], config: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """
    Traverse a nested object to find the most restrictive valid ISM (Information Security Marking).

    The function searches through all dictionaries and lists within the provided object,
    identifies ISMs that are not too highly classified (using is_classif_too_high),
    and returns the most restrictive valid ISM according to the is_more_restrictive function.

    Args:
        obj (Dict[str, Any]): The object to search for ISMs.
        config (Dict[str, Any]): The classification configuration dictionary.

    Returns:
        Optional[Dict[str, Any]]: The most restrictive valid ISM found, or None if no valid ISM exists.
    """
    most_restrictive = None
    stack = [obj]  # Use a stack to traverse the object hierarchy

    while stack:
        item = stack.pop()

        if isinstance(item, dict):
            # Check if the current item has an ISM and if it's valid
            if "ism" in item:
                ism = item.get("ism")
                if ism and not is_classif_too_high(ism, config):
                    # Early exit if 'TS' found
                    if ism.get("classification") == "TS":
                        return ism.copy()
                    if most_restrictive is None or is_more_restrictive(
                        ism, most_restrictive, config
                    ):
                        most_restrictive = ism.copy()

            # Add all dictionary values to the stack
            stack.extend(item.values())
        elif isinstance(item, list):
            # Add all list items to the stack
            stack.extend(item)

    # Print the most restrictive ISM found
    # logger.info(f"Most restrictive valid ISM found: {most_restrictive}")
    return most_restrictive

In [7]:
def apply_restrictions(
    standard_object: Dict[str, Any], config: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """
    Recursively process a standard object to remove or redact data that is too highly classified,
    according to the provided classification configuration.

    The function finds the most restrictive valid ISM (Information Security Marking) within the object,
    then traverses all nested dictionaries and lists, replacing any data with a classification that is
    considered too high with a placeholder. The processed object will include an 'overallClassification'
    field set to the most restrictive valid ISM found.

    Args:
        standard_object (Dict[str, Any]): The object to process and apply restrictions to.
        config (Dict[str, Any]): The classification configuration dictionary.

    Returns:
        Optional[Dict[str, Any]]: The processed object with restricted data redacted, or None if no valid ISM is found.
    """
    # Find the most restrictive valid ISM in the object
    most_restrictive_ism = find_most_restrictive_valid_ism(standard_object, config)

    if not most_restrictive_ism:
        # If no valid ISM is found, return None
        logger.warning("No valid ISM found for object")
        return None

    def process_item(item: Any) -> Any:
        if isinstance(item, dict):
            # If the item has an ISM, check if it is too high
            if "ism" in item and is_classif_too_high(item["ism"], config):
                logger.debug(f"Removing item due to high classification: {item}")
                return None

            # Process all key-value pairs in the dictionary
            return {k: process_item(v) for k, v in item.items()}
        if isinstance(item, list):
            # Process all items in the list
            return [process_item(x) for x in item]

        return item  # Return the item as is if it's neither a dict nor a list

    # Process the object and add the overall classification
    processed_object = process_item(standard_object)
    if isinstance(processed_object, dict):
        processed_object["overallClassification"] = most_restrictive_ism

    return processed_object

### Apply the functions on the standard_objects

In [8]:
# Load classification configuration
config_path = "../config/classifications_config.yaml"
classification_config = load_classification_config(config_path)

[32m2025-09-09 14:49:03.755[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_classification_config[0m:[36m11[0m - [1mClassification configuration loaded successfully.[0m


In [9]:
data_path = "../data/2_processed/output"
standard_objects = fetch_all_objects(data_path)

[32m2025-09-09 14:49:04.602[0m | [1mINFO    [0m | [36m__main__[0m:[36mfetch_all_objects[0m:[36m13[0m - [1mLoaded 13 objects from ../data/2_processed/output[0m


In [None]:
# Test 'is_classif_too_high'
ism = standard_objects[4]["operationalStatus"]["ism"]
result = is_classif_too_high(ism, classification_config)

In [None]:
# Test 'is_more_restrictive' function
ism1 = standard_objects[4]["operationalStatus"]["ism"]
ism2 = standard_objects[7]["location"]["ism"]
result = is_more_restrictive(ism1, ism2, classification_config)

In [None]:
# Test 'find_most_restrictive_valid_ism' function
most_restrictive_ism = find_most_restrictive_valid_ism(
    standard_objects[4], classification_config
)

In [10]:
# Test 'apply_restrictions' function
processed_object = apply_restrictions(standard_objects[4], classification_config)

[32m2025-09-09 14:49:12.952[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mprocess_item[0m:[36m32[0m - [34m[1mRemoving item due to high classification: {'value': 'OPR', 'ism': {'banner': 'TOP SECRET//REL TO USA', 'classification': 'TS', 'ownerProducer': ['USA'], 'releaseableTo': ['USA'], 'disseminationControls': ['REL']}}[0m


In [12]:
prt(processed_object)