Skip to content

Commit

Permalink
Updated flatten policy recursive function
Browse files Browse the repository at this point in the history
  • Loading branch information
cloutierMat committed Apr 20, 2024
1 parent 26005fb commit 9d574ca
Showing 1 changed file with 37 additions and 36 deletions.
73 changes: 37 additions & 36 deletions localstack/services/sns/publisher.py
Expand Up @@ -1251,55 +1251,56 @@ def _evaluate_numeric_condition(conditions, value):
return True

@staticmethod
def flatten_policy(nested_dict: dict):
def flatten_policy(nested_dict: dict) -> list[dict]:
"""
Takes a dictionary as input and will output the dictionary on a single level.
Input:
`{"field1": {"field2": {"field3": "val1", "field4": "val2"}}}`
Output:
`{
"field1.field2.field3": "val1",
"field1.field2.field4": "val2"
}`
`[
{
"field1.field2.field3": "val1",
"field1.field2.field4": "val2"
}
]`
Input with $or will create multiple outputs:
`{"$or": [{"field1": "val1"}, {"field2": "val2"}], "field3": "val3"}`
Output:
`[
{"field1": "val1", "field3": "val3"},
{"field2": "val2", "field3": "val3"}
]`
:param nested_dict: a (nested) dictionary
:return: a list of flattened dictionaries with no nested dict or list inside, flattened to a
single level, one list item for every list item encountered
"""
flattened = []

def _traverse(_policy: dict, parent_key=None, current_rule=None):
if not current_rule:
current_rule = {}

if (or_val := _policy.get("$or")) and isinstance(or_val, list) and len(or_val) > 1:
pol = {k: v for k, v in _policy.items() if k != "$or"}
for value in or_val:
p = {**value, **pol}
_traverse(p, parent_key=parent_key, current_rule={**current_rule})

else:
for key, values in _policy.items():
flattened_parent_key = key if not parent_key else f"{parent_key}.{key}"

if not isinstance(values, dict):
current_rule[flattened_parent_key] = values
def _traverse_policy(obj, array=None, parent_key=None) -> list:
if array is None:
array = [{}]

for key, values in obj.items():
if key == "$or" and isinstance(values, list) and len(values) > 1:
# $or will create multiple new branches in the array.
# Each current branch will traverse with each choice in $or
array = [
i for value in values for i in _traverse_policy(value, array, parent_key)
]
else:
# We update the parent key do that {"key1": {"key2": ""}} becomes "key1.key2"
_parent_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(values, dict):
# If the current key has child dict -- key: "key1", child: {"key2": ["val1", val2"]}
# We only update the parent_key and traverse its children with the current branches
array = _traverse_policy(values, array, _parent_key)
else:
_traverse(
values,
parent_key=flattened_parent_key,
current_rule=current_rule,
)

# this check is to validate we're properly at the end node, and that we can safely "commit" the rule before
# iterating out of the current branch
if current_rule and not (
any(isinstance(v, dict) for v in _policy.values()) or "$or" in _policy
):
flattened.append({**current_rule})
# If the current key has no child, this means we found the values to match -- child: ["val1", val2"]
# we update the branches with the parent chain and the values -- {"key1.key2": ["val1, val2"]}
array = [{**item, _parent_key: values} for item in array]

_traverse(nested_dict)
return array

return flattened
return _traverse_policy(nested_dict)

@staticmethod
def flatten_payload(nested_dict: dict) -> list[dict]:
Expand Down

0 comments on commit 9d574ca

Please sign in to comment.