# Object mappings

## Imports

In [None]:
# |default_exp mappings

In [None]:
# | hide
%load_ext autoreload
%autoreload 2

In [None]:
# | hide
from stringdale.core import get_git_root, load_env, checkLogs
import pytest
import nest_asyncio


In [None]:

#| hide
load_env()
nest_asyncio.apply()

In [None]:
# | export
import os
import enum
from pathlib import Path
from copy import deepcopy
import itertools
import functools
from enum import Enum
from parse import parse
from typing import Any
from copy import deepcopy,copy


from collections import defaultdict,OrderedDict
from contextlib import ExitStack
from singleton_decorator import singleton

from pprint import pprint
from pydantic import BaseModel
from pydantic_core import SchemaValidator

from typing import Optional,Union,List,Dict,Any,Literal

import networkx as nx
from networkx.readwrite import json_graph
from fastcore.basics import patch
from tqdm.auto import tqdm
import logging


In [None]:
#| export
logger = logging.getLogger(__name__)

In [None]:
import asyncio
# ! pip install nest_asyncio


In [None]:
# | export
from stringdale.core import jinja_render,checkLogs,maybe_await,_duplicates

from textwrap import dedent
from parse import parse
import re
import itertools as it
import types
import uuid
import time
import json

## Accessing nested objects

In [None]:
#| export
def access_object(obj, attr_path, missing_ok=False):
    """Access an attribute or item of an object, using a path of attribute/item names.
    
    Args:
        obj: The object to access
        attr_path: A string or tuple/list of strings representing the path to the attribute/item
        missing_ok: If True, return None if the attribute/item is not found
    """
    if isinstance(attr_path, str):
        attr_path = (attr_path,)
    elif isinstance(attr_path, (tuple, list)):
        pass
    else:
        raise ValueError(f"Invalid attribute path {attr_path}, must be a string or a tuple/list of strings")

    inner_obj = obj
    for attribute_name in attr_path:
        if attribute_name == ".":
            continue  # Keep current object
        elif attribute_name == "**":
            if isinstance(inner_obj, dict):
                continue  # Keep current dict
            else:
                inner_obj = vars(inner_obj)
                continue
        
        # Try both attribute and item access
        attr_error = item_error = None
        try:
            inner_obj = getattr(inner_obj, attribute_name)
            continue  # If successful, continue to next attribute
        except (AttributeError, TypeError) as e:
            attr_error = e
        
        try:
            inner_obj = inner_obj[attribute_name]
            continue  # If successful, continue to next attribute
        except (KeyError, IndexError, TypeError) as e:
            item_error = e
        
        # If both attempts failed
        if missing_ok:
            return None
        else:
            raise ValueError(f"When navigating path\n{attr_path} on object\n{obj}:\n"
                           f"attribute '{attribute_name}' is not a valid attribute or item of subobject:\n{inner_obj}")
    
    return inner_obj

### Tests

In [None]:
# Test access_object function
test_dict = {
    'a': {
        'b': {
            'c': 1
        }
    },
    'list': [1, 2, 3],
    'mixed': {
        'list': [{'x': 1}, {'x': 2}]
    }
}

class TestObj:
    def __init__(self):
        self.x = 1
        self.y = {'z': 2}

test_obj = TestObj()

# Test dictionary access
assert access_object(test_dict, ('a', 'b', 'c')) == 1

# Test list access
assert access_object(test_dict, ['list', 1]) == 2

# Test mixed nested access
assert access_object(test_dict, ['mixed', 'list', 0, 'x']) == 1

# Test object attribute access
assert access_object(test_obj, 'x') == 1
assert access_object(test_obj, ('y','z')) == 2

# Test missing_ok parameter
assert access_object(test_dict, 'missing', missing_ok=True) is None
assert access_object(test_dict, 'a.missing', missing_ok=True) is None

# Test error cases
with pytest.raises(ValueError, match="Invalid attribute path"):
    access_object(test_dict, 123)

with pytest.raises(ValueError, match="attribute 'missing' is not a valid attribute"):
    access_object(test_dict, 'missing', missing_ok=False)

with pytest.raises(ValueError, match="attribute 'a.missing' is not a valid attribute"):
    access_object(test_dict, 'a.missing', missing_ok=False)

In [None]:
# Test access_object function
test_dict = {
    'a': {
        'b': {
            'c': 1
        }
    },
    'list': [1, 2, 3],
    'mixed': {
        'list': [{'x': 1}, {'x': 2}]
    }
}

class TestObj:
    def __init__(self):
        self.x = 1
        self.y = {'z': 2}
        self.d = {'a': 3}

test_obj = TestObj()

# Test ** syntax for getting all attributes/items
assert access_object(test_dict, '**') == test_dict
assert access_object(test_obj, '**') == {'x': 1, 'y': {'z': 2}, 'd': {'a': 3}}

# Test . syntax for accessing current object
assert access_object(test_dict, '.') == test_dict
assert access_object(test_obj, '.') == test_obj

# Test combining . and ** 
assert access_object(test_obj, ('.', '**')) == {'x': 1, 'y': {'z': 2}, 'd': {'a': 3}}


In [None]:
from pydantic import BaseModel

In [None]:
class TestModel(BaseModel):
    x: int
    d: dict[str,int]

test_model = TestModel(x=1,d={'a':2})
assert access_object(test_model, '**') == {'x': 1, 'd': {'a': 2}}

nested_model = {
    'foo': 'bar',
    'mod':test_model
}
assert access_object(nested_model, ('mod','x')) == 1

## Set access

In [None]:
#| export
def set_access(obj, attr_path, value):
    """set an attribute or item of an object, using a path of attribute/item names.
    
    Args:
        obj: The object to set
        attr_path: A string or tuple/list of strings representing the path to the attribute/item
        value: The value to set
    """
    sub_obj = obj
    for attr in attr_path[:-1]:
        if attr not in sub_obj:
            sub_obj[attr] = {}
        sub_obj = sub_obj[attr]
    sub_obj[attr_path[-1]] = value

### Tests


In [None]:
obj = {'a':1,'b':2,'c':{'d':3,'e':4}}
set_access(obj,('c','f','g'),5)
assert obj == {'a': 1, 'b': 2, 'c': {'d': 3, 'e': 4, 'f': {'g': 5}}}




## mapping dicts utilities

In [None]:
#| export

def assert_keys_contiguous(list_of_keys):
    numbers = sorted(key for key in list_of_keys if isinstance(key,int))
    if len(numbers) > 0 and numbers[0] != 0:
        return False
    return all(numbers[i+1] - numbers[i] == 1 
                for i in range(len(numbers)-1))



In [None]:
input_ = {'x':1,
        'y':2,'z':3
        }.keys()
assert assert_keys_contiguous(input_)


input_ = {0:1,
        1:2,'z':3
        }.keys()
assert assert_keys_contiguous(input_)

input_ = {0:1,
        2:2,'z':3
        }.keys()
assert not assert_keys_contiguous(input_)

In [None]:
#| export
def object_to_args_kwargs(input):
    arg_keys = sorted([i for i in input.keys() if isinstance(i,int)])
    args = [input[i] for i in arg_keys]
    kwargs = {k:v for k,v in input.items() if not isinstance(k,int)}
    return args,kwargs


In [None]:
input_ = {0:1,
        2:2,'z':3
        }
args,kwargs = object_to_args_kwargs(input_)
assert args == [1,2]
assert kwargs == {'z':3}

## Mapping objects

In [None]:
#| export
def map_object(obj,mapping):
    """Map an object according to a mapping.
    
    Args:
        obj: The object to map
        mapping: A mapping of the form {key: accessor}
    Returns:
        A dict with the mapped values
    """
    mapped_dict = {}
    for key,access_path in mapping.items():
        if key == '_':
            continue
        elif access_path == '_' or '_' in access_path:
            raise ValueError(f"Invalid access path {access_path}, must not contain '_'")
        elif key == "**":
            mapped_dict.update(access_object(obj,access_path))
        else:
            mapped_dict[key] = access_object(obj,access_path)
    return mapped_dict

In [None]:
obj = {'a':1,'b':2,'c':{'d':3,'e':4}}
mapping = {'x':'a','y':'.','z':('c','d'),'**':'**'}
mapped_dict = map_object(obj,mapping)
assert mapped_dict == {'x': 1,
 'y': {'a': 1, 'b': 2, 'c': {'d': 3, 'e': 4}},
 'z': 3,
 'a': 1,
 'b': 2,
 'c': {'d': 3, 'e': 4}}

In [None]:
#| export
def append_dicts(dict_list: list[dict]) -> dict:
    """Combine multiple dictionaries with the same keys into a single dictionary where each value is a list.
    
    Args:
        dict_list: List of dictionaries with the same keys
        
    Returns:
        A dictionary where each key maps to a list of values from the input dictionaries
        
    Example:
        >>> dicts = [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
        >>> append_dicts(dicts)
        {'a': [1, 3], 'b': [2, 4]}
    """
    if not dict_list:
        return {}
    
    result = defaultdict(list)
    for d in dict_list:
        for key, value in d.items():
            result[key].append(value)
    
    return dict(result)

In [None]:
#| export
def multi_map(obj_dict,mappings_dict,as_list_keys=None):
    """Map a dictionary of objects according to a dictionary of mappings.
    
    Args:
        obj_dict: A dictionary of objects to map {obj_name:obj}
        mappings_dict: A dictionary of mappings of the form {mapping_name:mapping}
    Returns:
        A dictionary that is the union of the mapped objects {obj_name:mapped_obj}
        If a key is present in multiple mappings, the value from the last mapping will be used.
        If an obj referenced in the mapping dict is not found in the obj_dict, it will be skipped.
    """
    if as_list_keys is None:
        as_list_keys = set()
    mapped_dict = {}
    for mapping_name,mapping in mappings_dict.items():
        if not mapping_name in obj_dict:
            continue
        if mapping_name in as_list_keys:
            sub_objects_list = [map_object(obj,mapping) for obj in obj_dict[mapping_name]]
            sub_obj = append_dicts(sub_objects_list)
        else:
            sub_obj = map_object(obj_dict[mapping_name],mapping)

        mapped_dict.update(sub_obj)
    return mapped_dict

In [None]:
objects = {'obj1':{'a':1,'b':2,'c':{'d':3,'e':4}}}
mappings = {'obj1':{'x':'a','y':'b','z':('c','d'),'**':'**'}}
assert multi_map(objects,mappings) == {'x': 1, 'y': 2, 'z': 3, 'a': 1, 'b': 2, 'c': {'d': 3, 'e': 4}}


objects = {'obj1':{'a':1,'b':2,'c':3},'obj2':{'a':4,'b':5,'c':6}}
mappings = {'obj1':{0:'a'},'obj2':{'y':'a','z':'b','**':'**'}}
assert multi_map(objects,mappings) == {0: 1, 'y': 4, 'z': 5, 'a': 4, 'b': 5, 'c': 6}

objects = {'obj1':{'a':1,'b':2,'c':3},'obj2':{'a':4,'b':5,'c':6}}
mappings = {'obj1':{'_':'a'},'obj2':{'y':'a'}}

out = multi_map(objects,mappings)
assert out == {'y': 4},out


In [None]:
objects = {'obj1':[{'a':1,'b':2,'c':3},{'a':10,'b':20,'c':30}],'obj2':{'a':4,'b':5,'c':6}}
mappings = {'obj1':{'c':'a','d':'b'},'obj2':{'y':'a'}}

out = multi_map(objects,mappings,as_list_keys={'obj1'})
assert out == {'c': [1, 10], 'd': [2, 20], 'y': 4},out

## Declerative map syntax

In [None]:
#| export
from lark import Lark, Transformer


_EDGE_GRAMMAR = """
    %import common.WS
    %ignore WS
    NAME: /[a-zA-Z0-9_<>]+/  
    
    accessor_step : NAME 
        | "**" -> kwargs_accessor
        | "." -> self_accessor
        | "_" -> underscore

    accessor: accessor_step ("." accessor_step)*

    accessor_assignment: accessor "=" accessor

    state: ("state"|"State") "/" NAME

    map_key: NAME
        | "**" -> kwargs_map_key

    single_map: map_key -> implicit_map
        | map_key "=" accessor -> keyword_map

    mapping_expr: single_map 
        | "(" single_map ("," single_map)* ")"

    edge: NAME "->" NAME ( mapping_expr )?
        | NAME "->" state ( mapping_expr )? -> write_edge
        | state "->" NAME ( mapping_expr )? -> read_edge
    

    implicit_edge: NAME ( mapping_expr )?
        | state ( mapping_expr )? -> implicit_state_edge

"""


In [None]:
#|export
class _EdgeTransformer(Transformer):
    def NAME(self, item):
        try:
            return int(item)
        except:
            return str(item)

    def accessor_step(self, items):
        return items[0]

    def map_key(self, items):
        return items[0]

    def kwargs_map_key(self, items):
        return '**'

    def self_accessor(self, items):
        return '.'

    def kwargs_accessor(self, items):
        return '**'

    def underscore(self, items):
        return '_'

    def accessor(self, items):
        return tuple(items)

    def accessor_assignment(self, items):
        return items[0],items[1]

    def implicit_map(self, items):
        name = items[0]
        return {name:('.',)}

    def keyword_map(self, items):
        name, accessor = items
        return {name:accessor}



    def mapping_expr(self, items):
        mapping_dict = {}
        for item in items:
            mapping_dict.update(item)
        return mapping_dict

    def state(self,items):
        return items[0] # return the key

    def _default_mapping(self,):
        return {0:('.',)}


    def edge(self,items):
        return {
                'edge_type':'edge',
                'source_node':items[0],
                'target_node':items[1],
                'mapping':items[2] if len(items) == 3 else self._default_mapping()
            }

    def read_edge(self,items):
        return {
            'edge_type':'read_edge',
            'source_node':items[0],
            'target_node':items[1],
            'mapping':items[2] if len(items) == 3 else self._default_mapping()
        }

    def write_edge(self,items):
        return {
            'edge_type':'write_edge',
            'source_node':items[0],
            'target_node':items[1],
            'mapping':items[2] if len(items) == 3 else self._default_mapping()
        }

    def implicit_edge(self,items):
        return {
            'edge_type':'implicit_edge',
            'node':items[0],
            'mapping':items[1] if len(items) == 2 else self._default_mapping()
        }
    
    def implicit_state_edge(self,items):
        return {
            'edge_type':'implicit_state_edge',
            'node':items[0],
            'mapping':items[1] if len(items) == 2 else self._default_mapping()
        }
    

In [None]:
#| export
from lark.visitors import VisitError

In [None]:
#| export
lark_edge_parser = Lark(_EDGE_GRAMMAR, parser='lalr',start=['edge','implicit_edge','mapping_expr','accessor_assignment'])

def parse_edge_descriptor(edge_str:str,start='edge'):
 
    try:
        tree = lark_edge_parser.parse(edge_str, start=start)
    except Exception as e:
        raise SyntaxError(f"Edge string '{edge_str}' in not formatted correctly\n"
            f"{e}"
        ) 

    try:
        transformed = _EdgeTransformer().transform(tree)
    except Exception as e:
        raise ValueError(f"Transformation error when parsing edge '{edge_str}'. Parse tree:\n{tree.pretty()}\n"
            f"{e}"
        ) 

    return transformed    



In [None]:
# Regular edge tests
out = parse_edge_descriptor('x ->y')
assert out == {'edge_type':'edge','source_node': 'x', 'target_node': 'y', 'mapping': {0: ('.',)}}, out

out = parse_edge_descriptor('node1->node2 (in=out)')
assert out == {'edge_type':'edge','source_node': 'node1', 'target_node': 'node2', 'mapping': {'in': ('out',)}} , out

out = parse_edge_descriptor('node1->node2 (first,second)')
assert out == {'edge_type':'edge','source_node': 'node1', 'target_node': 'node2', 'mapping': {'first': ('.',),'second': ('.',)}} , out

out = parse_edge_descriptor('node1->node2 (0=out)')
assert out == {'edge_type':'edge','source_node': 'node1', 'target_node': 'node2', 'mapping': {0: ('out',)}} , out


out = parse_edge_descriptor("x -> y  (c,**,a=b)")
assert out == {'edge_type':'edge','source_node': 'x',
 'target_node': 'y',
 'mapping': {'c': ('.',), '**': ('.',), 'a': ('b',)}}

out = parse_edge_descriptor("x -> y  (d=**,a=b,**=c.**)")
assert out == {'edge_type':'edge','source_node': 'x', 'target_node': 'y', 'mapping': {'d': ('**',), 'a': ('b',), '**': ('c', '**')}},out


In [None]:


with pytest.raises(SyntaxError, match="not formatted correctly"):
    parse_edge_descriptor('invalid->format->')

out = parse_edge_descriptor("x (a=b)",start='implicit_edge')
assert out == {'edge_type':'implicit_edge','node': 'x', 'mapping': {'a': ('b',)}}

out = parse_edge_descriptor("(a=b)",start='mapping_expr')
assert out == {'a': ('b',)}


out = parse_edge_descriptor("0=a.b.c",start='mapping_expr')
assert out ==  {0: ('a', 'b', 'c')},out

out = parse_edge_descriptor("**=a",start='mapping_expr')
assert out ==  {'**': ('a',)},out

In [None]:
# Test state edges
out = parse_edge_descriptor('x -> state/key')
assert out == {'edge_type': 'write_edge', 'source_node': 'x', 'target_node': 'key', 'mapping': {0: ('.',)}}, out

out = parse_edge_descriptor('state/key -> x')
assert out == {'edge_type': 'read_edge', 'source_node': 'key', 'target_node': 'x', 'mapping': {0: ('.',)}}, out

out = parse_edge_descriptor('x -> State/key (0=c.d,a=b)')
assert out == {'edge_type': 'write_edge', 'source_node': 'x', 'target_node': 'key', 'mapping': {0: ('c', 'd'), 'a': ('b',)}}, out

# Test implicit state edges
out = parse_edge_descriptor('state/key', start='implicit_edge')
assert out == {'edge_type': 'implicit_state_edge', 'node': 'key', 'mapping': {0: ('.',)}}, out

out = parse_edge_descriptor('State/key (0=c.d,a=b)', start='implicit_edge')
assert out == {'edge_type': 'implicit_state_edge', 'node': 'key', 'mapping': {0: ('c', 'd'), 'a': ('b',)}}, out

In [None]:
out = parse_edge_descriptor('State/key(_)', start='implicit_edge')
assert out == {'edge_type': 'implicit_state_edge', 'node': 'key', 'mapping': {'_': ('.',)}}, out



In [None]:
out = parse_edge_descriptor('a.b.cs=d.e.f',start='accessor_assignment')
assert out == (('a', 'b', 'cs'), ('d', 'e', 'f'))

## Export

In [None]:
# |hide
import nbdev

nbdev.nbdev_export()