In [195]:
import pydantic 
from typing import Literal,Optional,Union
import json
import typing
import astropy.units as au
from rich.pretty import pprint

class ItemSchema(pydantic.BaseModel):
    'One data item in database schema'
    dtype: Literal['f','i','?','str','bytes','object']='f'
    unit: Optional[str]=None
    shape: pydantic.conlist(item_type=int,min_items=0,max_items=5)=[]
    link: Optional[str]=None

    @pydantic.validator("unit")
    def unit_valid(cls,v):
        if v is None: return
        try: au.Unit(v)
        except BaseException as e:
            print('Error validating unit {v} but exception not propagated??')
            raise
        return v

    @pydantic.root_validator
    def link_shape(cls,attrs):
        if attrs['link'] is not None:
            if len(attrs['shape'])>1: raise ValueError('links must be either scalar (shape=[]) or 1d array (shape=[num]).')
            if attrs['unit'] is not None: raise ValueError('unit not permitted with links')
        return attrs

class SchemaSchema(pydantic.BaseModel):
    'Schema of the schema itself; read via parse_obj'
    __root__: typing.Dict[str,typing.Dict[str,ItemSchema]]
    def getRoot(self): return self.__root__

    @pydantic.root_validator
    def links_valid(cls,attrs):
        root=attrs['__root__']
        for T,fields in root.items():
            for f,i in fields.items():
                if i.link is None: continue
                if i.link not in root.keys(): raise ValueError(f'{T}.{f}: link to undefined collection {i.link}.')
        return root

    
import pymongo
DB=pymongo.MongoClient("localhost",27017).dms0

##
## schema PUT, GET
## 
def dms_api_schema_put(schema=schema,force=False):
    coll=DB['schema']
    if coll.find_one() is not None and not force: raise ValueError('Schema already defined (use force=True if you are sure).')
    DB['schema'].insert_one(schema)
def dms_api_schema_get():
    return DB['schema'].find_one()
 
rawSchema=json.loads(open('dms-schema.json').read())
schema=SchemaSchema.parse_obj(rawSchema)
schemaDict=schema.dict()
dms_api_schema_put(rawSchema,force=True)
pprint(dms_api_schema_get())
print(schema)

Beam={'cs': ItemSchema(dtype='f', unit=None, shape=[], link='CrossSection'), 'length': ItemSchema(dtype='f', unit='m', shape=[], link=None), 'height': ItemSchema(dtype='f', unit='m', shape=[], link=None), 'density': ItemSchema(dtype='f', unit='kg/m3', shape=[], link=None), 'bc_0': ItemSchema(dtype='?', unit=None, shape=[3], link=None), 'bc_1': ItemSchema(dtype='?', unit=None, shape=[3], link=None)} CrossSection={'rve': ItemSchema(dtype='f', unit=None, shape=[], link='ConcreteRVE'), 'rvePositions': ItemSchema(dtype='f', unit='m', shape=[-1, 3], link=None)} ConcreteRVE={'ct': ItemSchema(dtype='f', unit=None, shape=[], link='CTScan'), 'origin': ItemSchema(dtype='f', unit='m', shape=[3], link=None), 'size': ItemSchema(dtype='f', unit='um', shape=[3], link=None), 'materials': ItemSchema(dtype='f', unit=None, shape=[-1], link='MaterialRecord')} CTScan={'id': ItemSchema(dtype='str', unit=None, shape=[], link=None), 'image': ItemSchema(dtype='bytes', unit=None, shape=[], link=None)} BeamState=

In [196]:

def _is_object_id(o):
    return isinstance(o,str) and len(o)==24

def _apply_link(item,o,func):
    assert item.link is not None
    assert len(item.shape) in (0,1)
    if len(item.shape)==1: return [func(o_) for o_ in o]
    return func(o)

from collections.abc import Iterable
def _flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types): yield from _flatten(x, ignore_types)
        else: yield x

import numpy as np
import collections.abc as abc
Seq=abc.Sequence
        
@pydantic.validate_arguments()
def _validated_quantity_2(
        item: ItemSchema,
        value: Union[int,float,Seq[int],Seq[float],Seq[Seq[int]],Seq[Seq[float]],Seq[Seq[Seq[int]]],Seq[Seq[Seq[float]]],Seq[Seq[Seq[Seq[int]]]],Seq[Seq[Seq[Seq[float]]]]],
        unit: Optional[str]=None
    ):             
    assert item.link is None
    # 1. create np.array
    # 1a. check numeric type convertibility (must be done item-by-item; perhaps can be optimized later?)
    for it in _flatten(value):
        if not np.can_cast(it,item.dtype,casting='same_kind'): raise ValueError(f'Type mismatch: item {it} cannot be cast to dtype {cls.dtype} (using same_kind)')
    np_val=np.array(value,dtype=item.dtype)
    # 1b. check shape
    if len(item.shape) is not None:
        if len(item.shape)!=np_val.ndim: raise ValueError(f'Dimension mismatch: {np_val.ndim} (shape {np_val.shape}), should be {len(item.shape)} (shape {item.shape})')
        for d in range(np_val.ndim):
            if item.shape[d]>0 and np_val.shape[d]!=item.shape[d]: raise ValueError(f'Shape mismatch: axis {d}: {np_val.shape[d]} (should be {item.shape[d]})')
    # 2. handle units
    # 2a. schema has unit, data does not; or vice versa
    if (unit is None)!=(item.unit is None): raise ValueError(f'Unit mismatch: item {it} stored unit is {unit} but schema unit is {item.unit}')
    
    # 2b. no unit, return np_val only
    if item.unit is None: return np_val
    # 2c. has unit, convert to schema unit (will raise exception is units are not compatible) and return au.Quantity
    return (np_val*au.Unit(unit)).to(item.unit)
                     
def _validated_quantity(item: ItemSchema, data):
    if isinstance(data,abc.Sequence): return _validated_quantity_2(item,data)
    elif isinstance(data,dict):
        if extras:=(data.keys()-{'value','unit'}):
            raise ValueError('Quantity has extra keywords: {", ".join(extras)} (only value, unit allowed).')
        return _validated_quantity_2(item,data['value'],data.get('unit',None))
    
@pydantic.validate_arguments(config=dict(arbitrary_types_allowed=True))
def _quantity_to_dict(q: Union[np.ndarray,au.Quantity]) -> dict: 
    if isinstance(q,au.Quantity): return {'value':q.value.tolist(),'unit':str(q.unit)}
    return {'value':q.tolist()}
           
def dms_api_object_post(type,data):
    # uses schema as compiled above; normally should be cached?
    def _new_object(klass,dta):
        klassSchema=getattr(schema,klass)
        rec=dict()
        for key,val in dta.items():
            if not key in klassSchema: raise AttributeError(f'Invalid attribute {klass}.{key} (hint: {klass} defines: {", ".join(klassKeys)}).')
            item=klassSchema[key]
            if item.link is not None:
                rec[key]=_apply_link(item,val,lambda o: o if _is_object_id(o) else _new_object(item.link,o))
            elif item.dtype in ('str','bytes'):
                T={'str':str,'bytes':bytes}[item.dtype]
                if not isinstance(val,T): raise TypeError(f'{klass}.{key} must be a {item.dtype} (not a {type(val)})')
                rec[key]=val
            elif item.dtype=='object':
                rec[key]=json.loads(json.dumps(val))
            else:
                # not a link, should validate and unit-convert data
                q=_validated_quantity(item,val)
                rec[key]=_quantity_to_dict(q)
        ins=DB[klass].insert_one(rec)
        return str(ins.inserted_id)
    return _new_object(type,data)
     
CRVE_ID=dms_api_object_post('ConcreteRVE',
    {
        'origin':{'value':[1,2,3],'unit':'mm'},
        'size':{'value':[1,2,3],'unit':'km'},
        'materials':[
            {'name':'mat1','props':{'origin':'CZ'}},
            {'name':'mat2','props':{'origin':'DE'}}
        ],
         'ct':{'id':'bar','image':bytes(range(70,80))}
    }
)
print(CRVE_ID)

634d6ca0129a38509f6a9cbe


In [197]:
import bson
def dms_api_object_get(type: str, id: str, max_level: int=1):
    def _get_object(klass,dbId,level):
        if max_level>=0 and level>max_level: return {}
        obj=DB[klass].find_one({'_id':bson.objectid.ObjectId(dbId)})
        if obj is None: raise KeyError('No object {klass} with id={dbId} in the database')
        klassSchema=getattr(schema,klass)
        ret=dict()
        for key,val in obj.items():
            if key in ('_id',): continue
            if not key in klassSchema: raise AttributeError(f'Invalid stored attribute {klass}.{key} (not in schema)')
            item=klassSchema[key]
            if item.link is not None:
                if level==max_level: continue
                def _resolve(o,*,i=item,level=level): return _get_object(i.link,o,level=level+1)
                ret[key]=_apply_link(item,val,_resolve)
            else:
                ret[key]=val
        ret['_id']=dbId
        return ret
    return _get_object(type,id,level=0)

pprint(dms_api_object_get(type='ConcreteRVE',id=CRVE_ID,max_level=2))
            

In [198]:
def dms_api_attribute_get(type: str, id: str, attr: str):
    obj=dms_api_object_get(type,id,max_level=0)
    klassSchema=getattr(schema,type)
    item=klassSchema[attr]
    if item.link is not None: raise ValueError(f'{type}.{attr} is a link (not a value)')
    return obj[attr]

#def dms_api_attribute_set(type: str, id: str, attr: str, value):
#    obj=dms_api_object_get(type,id,max_level=0)
#    klassSchema=getattr(schema,type)
#    item=klassSchema[attr]
#    if item.link is not None: raise ValueError(f'{type}.{attr} is a link (not a value)')
#    return obj[attr]



pprint(dms_api_attribute_get(type='ConcreteRVE',id=CRVE_ID,attr='origin'))