# FROZEN!

This notebook is an initial version of API funcions which were subsequently moved into dms3.py. Any updates done here will *not* propagate to dms3.py.



In [146]:
import pydantic 
from typing import Literal,Optional,Union
import json
import typing
import astropy.units as au
from rich.pretty import pprint

class ItemSchema(pydantic.BaseModel):
    'One data item in database schema'
    dtype: Literal['f','i','?','str','bytes','object']='f'
    unit: Optional[str]=None
    shape: pydantic.conlist(item_type=int,min_items=0,max_items=5)=[]
    link: Optional[str]=None

    @pydantic.validator("unit")
    def unit_valid(cls,v):
        if v is None: return
        try: au.Unit(v)
        except BaseException as e:
            print('Error validating unit {v} but exception not propagated??')
            raise
        return v

    @pydantic.root_validator
    def link_shape(cls,attrs):
        if attrs['link'] is not None:
            if len(attrs['shape'])>1: raise ValueError('links must be either scalar (shape=[]) or 1d array (shape=[num]).')
            if attrs['unit'] is not None: raise ValueError('unit not permitted with links')
        return attrs

class SchemaSchema(pydantic.BaseModel):
    'Schema of the schema itself; read via parse_obj'
    __root__: typing.Dict[str,typing.Dict[str,ItemSchema]]
    def getRoot(self): return self.__root__

    @pydantic.root_validator
    def links_valid(cls,attrs):
        root=attrs['__root__']
        for T,fields in root.items():
            for f,i in fields.items():
                if i.link is None: continue
                if i.link not in root.keys(): raise ValueError(f'{T}.{f}: link to undefined collection {i.link}.')
        return root

    
import pymongo
DB=pymongo.MongoClient("localhost",27017).dms0

##
## schema PUT, GET
## 
def dms_api_schema_put(schema,force=False):
    coll=DB['schema']
    if (s:=coll.find_one()) is not None and not force: raise ValueError('Schema already defined (use force=True if you are sure).')
    DB['schema'].delete_one(s)
    DB['schema'].insert_one(schema)
def dms_api_schema_get(include_id=False):
    ret=DB['schema'].find_one()
    if not include_id: del ret['_id']
    return ret
 
rawSchema=json.loads(open('dms-schema.json').read())
schema=SchemaSchema.parse_obj(rawSchema)
schemaDict=schema.dict()
dms_api_schema_put(rawSchema,force=True)
rawSchema2=dms_api_schema_get()
print(rawSchema)
schema2=SchemaSchema.parse_obj(rawSchema2)
print(str(schema)==str(schema2))
#pprint(strschema)
#pprint(schema2)a

{'Beam': {'cs': {'link': 'CrossSection'}, 'length': {'unit': 'm'}, 'height': {'unit': 'm'}, 'density': {'unit': 'kg/m3'}, 'bc_0': {'shape': [3], 'dtype': '?'}, 'bc_1': {'shape': [3], 'dtype': '?'}}, 'CrossSection': {'rve': {'link': 'ConcreteRVE'}, 'rvePositions': {'shape': [-1, 3], 'unit': 'm'}}, 'ConcreteRVE': {'ct': {'link': 'CTScan'}, 'origin': {'shape': [3], 'unit': 'm'}, 'size': {'shape': [3], 'unit': 'um'}, 'materials': {'link': 'MaterialRecord', 'shape': [-1]}}, 'CTScan': {'id': {'dtype': 'str'}, 'image': {'dtype': 'bytes', 'externalize': 'true'}}, 'BeamState': {'beam': {'link': 'Beam'}, 'cs': {'link': 'CrossSection'}, 'npointz': {'dtype': 'i'}, 'csState': {'link': 'CrossSectionState', 'shape': [-1]}}, 'CrossSectionState': {'rveStates': {'link': 'ConcreteRVEState', 'shape': [-1]}, 'bendingMoment': {'unit': 'kN*m'}, 'kappa': {}, 'eps_axial': {'unit': 'um/m'}}, 'ConcreteRVEState': {'rve': {'link': 'ConcreteRVE'}, 'sigmaHom': {'unit': 'MPa'}, 'epsHom': {'unit': 'um/m'}, 'stiffness'

In [102]:

def _is_object_id(o):
    '''Desides whether *o* is string representation of bson.objectid.ObjectId'''
    return isinstance(o,str) and len(o)==24

def _apply_link(item,o,func):
    '''Applies *func* to scalar link or list link, and returns the result (as scalar or list, depending on the schema)'''
    assert item.link is not None
    assert len(item.shape) in (0,1)
    if len(item.shape)==1: return [func(o_) for o_ in o]
    return func(o)

from collections.abc import Iterable
def _flatten(items, ignore_types=(str, bytes)):
    '''Flattens possibly nested sequence'''
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types): yield from _flatten(x, ignore_types)
        else: yield x

import numpy as np
import collections.abc as abc
Seq=abc.Sequence
        
@pydantic.validate_arguments()
def _validated_quantity_2(
        item: ItemSchema,
        value: Union[int,float,Seq[int],Seq[float],Seq[Seq[int]],Seq[Seq[float]],Seq[Seq[Seq[int]]],Seq[Seq[Seq[float]]],Seq[Seq[Seq[Seq[int]]]],Seq[Seq[Seq[Seq[float]]]]],
        unit: Optional[str]=None
    ):
    '''
    Converts value and optional unit to either np.array or astropy.units.Quantity — depending on whether the schema has unit or not. Checks 
    
    * dtype compatibility (won't accept floats/complex into an integer array)
    * dimension compatibility (will reject 2d array where schema specifies scalar or 1d array, and similar)
    * shape compatibility (will reject 4-vector where schema specified 3-vector; schema may use -1 in dimension where no check will be done; i.e. 3×? array has shape [3,-1])
    * unit compatibility: whether *unit* and schema unit are compatible; and will convert value to the schema unit before returning
    
    Returns np.array (no unit in the schema) or astropy.unit.Quantity (in schema units).
    '''
    assert item.link is None
    # 1. create np.array
    # 1a. check numeric type convertibility (must be done item-by-item; perhaps can be optimized later?)
    for it in _flatten(value):
        if not np.can_cast(it,item.dtype,casting='same_kind'): raise ValueError(f'Type mismatch: item {it} cannot be cast to dtype {cls.dtype} (using same_kind)')
    np_val=np.array(value,dtype=item.dtype)
    # 1b. check shape
    if len(item.shape) is not None:
        if len(item.shape)!=np_val.ndim: raise ValueError(f'Dimension mismatch: {np_val.ndim} (shape {np_val.shape}), should be {len(item.shape)} (shape {item.shape})')
        for d in range(np_val.ndim):
            if item.shape[d]>0 and np_val.shape[d]!=item.shape[d]: raise ValueError(f'Shape mismatch: axis {d}: {np_val.shape[d]} (should be {item.shape[d]})')
    # 2. handle units
    # 2a. schema has unit, data does not; or vice versa
    if (unit is None)!=(item.unit is None): raise ValueError(f'Unit mismatch: item {it} stored unit is {unit} but schema unit is {item.unit}')
    
    # 2b. no unit, return np_val only
    if item.unit is None: return np_val
    # 2c. has unit, convert to schema unit (will raise exception is units are not compatible) and return au.Quantity
    return (np_val*au.Unit(unit)).to(item.unit)
                     
def _validated_quantity(item: ItemSchema, data):
    '''
    Gets sequence (value only) or dict as {'value':..} or {'value':..,'unit':..};
    passes that to _validated_quantity_2, which will do the proper data check and conversions;
    returns validated quantity as either np.array or astropy.units.Quantity
    '''
    if isinstance(data,abc.Sequence): return _validated_quantity_2(item,data)
    elif isinstance(data,dict):
        if extras:=(data.keys()-{'value','unit'}):
            raise ValueError('Quantity has extra keywords: {", ".join(extras)} (only value, unit allowed).')
        return _validated_quantity_2(item,data['value'],data.get('unit',None))
    
def _parse_path(path: str) -> [(str,Optional[int])]:
    '''
    Parses path *p* in dot notation, returning list of [(stem,index),...], where index is possibly None. For example:
    
    dot[1].notation → [('dot',1),('notation',None)]
    '''
    if path=='': return []
    pp=path.split('.')                      # split by ., dot may not appear inside [..] anyway
    pat=re.compile(r'''                  # no whitespace allowed in the expression
        (?P<stem>[a-zA-Z][a-zA-Z0-9_]*)  # stem: starts with letter, may continue with letters/numbers/_
        (\[(?P<index>[0-9]+)\])?         # optional index: decimals insides [...]
    ''',re.X)
    def _int_or_none(o): return None if o is None else int(o)
    def _match_part(p):
        match=pat.match(p)
        if match is None: raise ValueError(f'Failed to parse path {path} (component {p}).')
        return match['stem'],_int_or_none(match['index'])
    return [_match_part(p) for p in pp]
def _unparse_path(path: [(str,Optional[int])]):
    return '.'.join([stem+(f'[{index}]' if index is not None else '') for stem,index in path])
    
@pydantic.validate_arguments(config=dict(arbitrary_types_allowed=True))
def _quantity_to_dict(q: Union[np.ndarray,au.Quantity]) -> dict: 
    if isinstance(q,au.Quantity): return {'value':q.value.tolist(),'unit':str(q.unit)}
    return {'value':q.tolist()}
           
def dms_api_object_post(type,data):
    # uses schema as compiled above; normally should be cached?
    def _new_object(klass,dta):
        klassSchema=getattr(schema,klass)
        rec=dict()
        for key,val in dta.items():
            if not key in klassSchema: raise AttributeError(f'Invalid attribute {klass}.{key} (hint: {klass} defines: {", ".join(klassKeys)}).')
            item=klassSchema[key]
            if item.link is not None:
                rec[key]=_apply_link(item,val,lambda o: o if _is_object_id(o) else _new_object(item.link,o))
            elif item.dtype in ('str','bytes'):
                T={'str':str,'bytes':bytes}[item.dtype]
                if not isinstance(val,T): raise TypeError(f'{klass}.{key} must be a {item.dtype} (not a {type(val)})')
                rec[key]=val
            elif item.dtype=='object':
                rec[key]=json.loads(json.dumps(val))
            else:
                # not a link, should validate and unit-convert data
                q=_validated_quantity(item,val)
                rec[key]=_quantity_to_dict(q)
        ins=DB[klass].insert_one(rec)
        return str(ins.inserted_id)
    return _new_object(type,data)
     
CRVE_ID=dms_api_object_post('ConcreteRVE',
    {
        'origin':{'value':[1,2,3],'unit':'mm'},
        'size':{'value':[1,2,3],'unit':'km'},
        'materials':[
            {'name':'mat1','props':{'origin':'CZ'}},
            {'name':'mat2','props':{'origin':'DE'}}
        ],
         'ct':{'id':'bar','image':bytes(range(70,80))}
    }
)
print(CRVE_ID)

634e789457fa21906826e7ea


In [94]:
def _resolve_path_head(root: (str,str), path: str) -> ((str,str),str):
    '''
    Resolves path head, descending as far as it can get, and returns (klass,dbId),path_tail.
    '''
    def _descend(klass,dbId,path,level):
        if len(path)==0: return ((klass,dbId),None)
        obj=DB[klass].find_one({'_id':bson.objectid.ObjectId(dbId)})
        # print(f'{" "*level} {path=} {len(path)=} {obj=}')
        if obj is None: raise KeyError('No object {klass} with id={dbId} in the database')
        klassSchema=getattr(schema,klass)
        attr,index=path[0]
        item=klassSchema[attr]
        if item.link is not None:
            if index is not None:
                if len(item.shape)==0: raise IndexError(f'{klass}.{attr} is scalar, but was indexed with {index}.')
                linkId=obj[attr][index]
            else:
                if len(item.shape)>0: raise IndexError(f'{klass}.{attr} is a list, but was not indexed.')
                linkId=obj[attr]
            if len(path)==1: return ((item.link,linkId),None) # path leaf
            else: return _descend(klass=item.link,dbId=obj[attr][index],path=path[1:],level=level+1)
        else:
            return ((klass,dbId),path)
            
    return _descend(root[0],root[1],path=_parse_path(path),level=0)

print(_resolve_path_head(root=('ConcreteRVE',CRVE_ID),path='materials[0].name'))
print(_resolve_path_head(root=('ConcreteRVE',CRVE_ID),path=''))

(('MaterialRecord', '634e76c557fa21906826e7e3'), [('name', None)])
(('ConcreteRVE', '634e76c557fa21906826e7e6'), None)


In [119]:
import bson
def dms_api_object_get(root: (str,str), path: str, max_level: int=1) -> dict:
    def _get_object(klass,dbId,level):
        if max_level>=0 and level>max_level: return {}
        obj=DB[klass].find_one({'_id':bson.objectid.ObjectId(dbId)})
        if obj is None: raise KeyError('No object {klass} with id={dbId} in the database.')
        klassSchema=getattr(schema,klass)
        ret=dict()
        for key,val in obj.items():
            if key in ('_id',): continue
            if not key in klassSchema: raise AttributeError(f'Invalid stored attribute {klass}.{key} (not in schema).')
            item=klassSchema[key]
            if item.link is not None:
                if level==max_level: continue
                def _resolve(o,*,i=item,level=level): return _get_object(i.link,o,level=level+1)
                ret[key]=_apply_link(item,val,_resolve)
            else:
                ret[key]=val
        ret['_id']=dbId
        return ret
    root2,path2=_resolve_path_head(root,path)
    if path2 is not None: raise ValueError(f'Path {path} does not lead to an object (tail: {_unparse_path(path2)}).')
    return _get_object(root2[0],root2[1],level=0)

def dms_api_attr_get(root: (str,str), path: str) -> dict:
    root2,path2=_resolve_path_head(root,path)
    if path2 is None or len(path2)==0: raise ValueError(f'Path {path} does leads to an object ({root2[0]}), not an attribute.')
    if len(path2)>1: raise ValueError(f'Path {path} has too long tail ({_unparse_path(path2)}).')
    if path2[0][1] is not None: raise ValueError(f'Path {path} has leaf index {path2[0][1]}.')
    klass,dbId=root2
    attr=path2[0][0]
    obj=DB[klass].find_one({'_id':bson.objectid.ObjectId(dbId)})
    if obj is None: raise KeyError(f'No object {klass} with id={dbId} in the database.')
    klassSchema=getattr(schema,klass)
    item=klassSchema[attr]
    assert item.link is None
    return obj[attr]

pprint(dms_api_object_get(root=('ConcreteRVE',CRVE_ID),path='materials[1]',max_level=1))
pprint(dms_api_attr_get(root=('ConcreteRVE',CRVE_ID),path='materials[1].name'))
pprint(dms_api_attr_get(root=('ConcreteRVE',CRVE_ID),path='origin'))

In [132]:
def dms_api_type_list():
    return list(schema.dict().keys())

def dms_api_object_list(type: str):
    res=DB[type].find()
    return [str(r['_id']) for r in res]

print(dms_api_type_list())
for T in dms_api_type_list():
    print(T,dms_api_object_list(T))

['Beam', 'CrossSection', 'ConcreteRVE', 'CTScan', 'BeamState', 'CrossSectionState', 'ConcreteRVEState', 'MaterialRecord']
Beam []
CrossSection []
ConcreteRVE ['634d6683129a38509f6a9cb0', '634d676b129a38509f6a9cb4', '634d6c9d129a38509f6a9cb8', '634d6ca0129a38509f6a9cbe', '634e592857fa21906826e7b0', '634e652a57fa21906826e7b6', '634e68d357fa21906826e7ba', '634e6bd557fa21906826e7be', '634e6c0457fa21906826e7c2', '634e6c5257fa21906826e7c6', '634e6c8557fa21906826e7ca', '634e745e57fa21906826e7ce', '634e750757fa21906826e7d2', '634e751857fa21906826e7d6', '634e751b57fa21906826e7da', '634e754b57fa21906826e7de', '634e76a957fa21906826e7e2', '634e76c557fa21906826e7e6', '634e789457fa21906826e7ea']
CTScan ['634d6683129a38509f6a9caf', '634d676b129a38509f6a9cb3', '634d6c9d129a38509f6a9cb7', '634d6ca0129a38509f6a9cbd', '634e592857fa21906826e7af', '634e652a57fa21906826e7b5', '634e68d357fa21906826e7b9', '634e6bd557fa21906826e7bd', '634e6c0457fa21906826e7c1', '634e6c5257fa21906826e7c5', '634e6c8557fa21906826