diff --git a/localstack/services/sns/publisher.py b/localstack/services/sns/publisher.py index f12e02c102b92..57ba8d86f3c95 100644 --- a/localstack/services/sns/publisher.py +++ b/localstack/services/sns/publisher.py @@ -1251,55 +1251,56 @@ def _evaluate_numeric_condition(conditions, value): return True @staticmethod - def flatten_policy(nested_dict: dict): + def flatten_policy(nested_dict: dict) -> list[dict]: """ Takes a dictionary as input and will output the dictionary on a single level. Input: `{"field1": {"field2": {"field3": "val1", "field4": "val2"}}}` Output: - `{ - "field1.field2.field3": "val1", - "field1.field2.field4": "val2" - }` + `[ + { + "field1.field2.field3": "val1", + "field1.field2.field4": "val2" + } + ]` + Input with $or will create multiple outputs: + `{"$or": [{"field1": "val1"}, {"field2": "val2"}], "field3": "val3"}` + Output: + `[ + {"field1": "val1", "field3": "val3"}, + {"field2": "val2", "field3": "val3"} + ]` :param nested_dict: a (nested) dictionary :return: a list of flattened dictionaries with no nested dict or list inside, flattened to a single level, one list item for every list item encountered """ - flattened = [] - def _traverse(_policy: dict, parent_key=None, current_rule=None): - if not current_rule: - current_rule = {} - - if (or_val := _policy.get("$or")) and isinstance(or_val, list) and len(or_val) > 1: - pol = {k: v for k, v in _policy.items() if k != "$or"} - for value in or_val: - p = {**value, **pol} - _traverse(p, parent_key=parent_key, current_rule={**current_rule}) - - else: - for key, values in _policy.items(): - flattened_parent_key = key if not parent_key else f"{parent_key}.{key}" - - if not isinstance(values, dict): - current_rule[flattened_parent_key] = values + def _traverse_policy(obj, array=None, parent_key=None) -> list: + if array is None: + array = [{}] + + for key, values in obj.items(): + if key == "$or" and isinstance(values, list) and len(values) > 1: + # $or will create multiple new branches in the array. + # Each current branch will traverse with each choice in $or + array = [ + i for value in values for i in _traverse_policy(value, array, parent_key) + ] + else: + # We update the parent key do that {"key1": {"key2": ""}} becomes "key1.key2" + _parent_key = f"{parent_key}.{key}" if parent_key else key + if isinstance(values, dict): + # If the current key has child dict -- key: "key1", child: {"key2": ["val1", val2"]} + # We only update the parent_key and traverse its children with the current branches + array = _traverse_policy(values, array, _parent_key) else: - _traverse( - values, - parent_key=flattened_parent_key, - current_rule=current_rule, - ) - - # this check is to validate we're properly at the end node, and that we can safely "commit" the rule before - # iterating out of the current branch - if current_rule and not ( - any(isinstance(v, dict) for v in _policy.values()) or "$or" in _policy - ): - flattened.append({**current_rule}) + # If the current key has no child, this means we found the values to match -- child: ["val1", val2"] + # we update the branches with the parent chain and the values -- {"key1.key2": ["val1, val2"]} + array = [{**item, _parent_key: values} for item in array] - _traverse(nested_dict) + return array - return flattened + return _traverse_policy(nested_dict) @staticmethod def flatten_payload(nested_dict: dict) -> list[dict]: