Skip to content

Commit

Permalink
[feat][resotocore] Allow accessing detailed model information (#1816)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Nov 8, 2023
1 parent 6a8eed7 commit 06c1d95
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 43 deletions.
73 changes: 73 additions & 0 deletions resotocore/resotocore/model/exportable_model.py
@@ -0,0 +1,73 @@
from typing import List

from resotocore.model.model import (
Model,
ComplexKind,
predefined_kinds_by_name,
Kind,
SimpleKind,
ArrayKind,
DictionaryKind,
TransformKind,
Property,
)
from resotocore.types import Json, JsonElement


def json_export_simple_schema(model: Model) -> List[Json]:
def export_simple(kind: SimpleKind) -> Json:
result = kind.as_json()
result["type"] = "simple"
return result

def export_property(prop: Property, kind: Kind) -> Json:
def prop_kind(kd: Kind) -> JsonElement:
if isinstance(kd, TransformKind):
assert kd.source_kind is not None, "Source of TransformKind is None!"
return prop_kind(kd.source_kind)
elif isinstance(kd, ArrayKind):
return dict(type="array", items=prop_kind(kd.inner))
elif isinstance(kd, DictionaryKind):
return dict(type="dictionary", key=prop_kind(kd.key_kind), value=prop_kind(kd.value_kind))
elif isinstance(kd, ComplexKind):
return dict(type="object", fqn=kd.fqn)
elif isinstance(kd, SimpleKind):
return dict(type="simple", fqn=kd.fqn)
else:
raise ValueError(f"Can not handle kind {kd.fqn}")

p = dict(
kind=prop_kind(kind),
required=prop.required,
description=prop.description,
metadata=prop.metadata,
)
return p

def export_complex(kind: ComplexKind) -> Json:
return dict(
type="object",
fqn=kind.fqn,
bases=kind.bases,
allow_unknown_props=kind.allow_unknown_props,
predecessor_kinds=kind.predecessor_kinds(),
successor_kinds=kind.successor_kinds,
aggregate_root=kind.aggregate_root,
metadata=kind.metadata,
properties={prop.name: export_property(prop, kind) for prop, kind in kind.all_props_with_kind()},
)

def export_kind(kind: Kind) -> Json:
if isinstance(kind, SimpleKind):
return export_simple(kind)
elif isinstance(kind, ComplexKind):
return export_complex(kind)
elif isinstance(kind, ArrayKind):
return {"type": "array", "items": export_kind(kind.inner)}
elif isinstance(kind, DictionaryKind):
return {"type": "dictionary", "key": export_kind(kind.key_kind), "value": export_kind(kind.value_kind)}
else:
raise ValueError(f"Unexpected kind: {kind}")

# filter out predefined properties
return [export_kind(k) for k in model.kinds.values() if k.fqn not in predefined_kinds_by_name]
141 changes: 113 additions & 28 deletions resotocore/resotocore/model/model.py
@@ -1,5 +1,6 @@
from __future__ import annotations

import copy
import json
import re
import sys
Expand Down Expand Up @@ -42,6 +43,7 @@
from resotolib.utils import is_env_var_string

T = TypeVar("T")
AvailableEdgeTypes: List[EdgeType] = ["default", "delete"]


def check_type_fn(t: type, type_name: str) -> ValidationFn:
Expand Down Expand Up @@ -303,7 +305,15 @@ def from_json(js: Json, _: type = object, **kwargs: object) -> Kind:
successor_kinds = js.get("successor_kinds")
aggregate_root = js.get("aggregate_root", True)
metadata = js.get("metadata")
return ComplexKind(js["fqn"], bases, props, allow_unknown_props, successor_kinds, aggregate_root, metadata)
return ComplexKind(
fqn=js["fqn"],
bases=bases,
properties=props,
allow_unknown_props=allow_unknown_props,
successor_kinds=successor_kinds,
aggregate_root=aggregate_root,
metadata=metadata,
)
else:
raise JSONDecodeError("Given type can not be read.", json.dumps(js), 0)

Expand Down Expand Up @@ -848,6 +858,7 @@ def __init__(
self.successor_kinds = successor_kinds or {}
self.aggregate_root = aggregate_root
self.metadata = metadata or {}
self._predecessor_kinds: Dict[EdgeType, List[str]] = {}
self.__prop_by_name = {prop.name: prop for prop in properties}
self.__resolved = False
self.__resolved_kinds: Dict[str, Tuple[Property, Kind]] = {}
Expand All @@ -857,6 +868,23 @@ def __init__(
self.__property_by_path: List[ResolvedProperty] = []
self.__synthetic_props: List[ResolvedProperty] = []

def copy(
self,
*,
bases: Optional[List[str]] = None,
properties: Optional[List[Property]] = None,
successor_kinds: Optional[Dict[EdgeType, List[str]]] = None,
predecessor_kinds: Optional[Dict[EdgeType, List[str]]] = None,
metadata: Optional[Json] = None,
) -> ComplexKind:
result = copy.copy(self)
result.bases = bases or self.bases
result.properties = properties or self.properties
result.successor_kinds = successor_kinds or self.successor_kinds
result._predecessor_kinds = predecessor_kinds or self._predecessor_kinds
result.metadata = metadata or self.metadata
return result

def resolve(self, model: Dict[str, Kind]) -> None:
if not self.__resolved:
self.__resolved = True
Expand Down Expand Up @@ -886,9 +914,18 @@ def resolve(self, model: Dict[str, Kind]) -> None:

# property path -> kind
self.__property_by_path = ComplexKind.resolve_properties(self, model)

self.__synthetic_props = [p for p in self.__property_by_path if p.prop.synthetic]

# resolve predecessor kinds
self._predecessor_kinds = {
edge_type: [
kind.fqn
for kind in model.values()
if isinstance(kind, ComplexKind) and self.fqn in kind.successor_kinds.get(edge_type, [])
]
for edge_type in AvailableEdgeTypes
}

def __eq__(self, other: Any) -> bool:
if isinstance(other, ComplexKind):
return (
Expand Down Expand Up @@ -940,6 +977,9 @@ def all_props_with_kind(self) -> List[Tuple[Property, Kind]]:
def synthetic_props(self) -> List[ResolvedProperty]:
return self.__synthetic_props

def predecessor_kinds(self) -> Dict[EdgeType, List[str]]:
return self._predecessor_kinds

def transitive_complex_types(self, with_bases: bool = True, with_properties: bool = True) -> List[ComplexKind]:
result: Dict[str, ComplexKind] = {}
visited: Set[str] = set()
Expand Down Expand Up @@ -1174,6 +1214,7 @@ def path_for(
date_kind = DateKind("date")
datetime_kind = DateTimeKind("datetime")
duration_kind = DurationKind("duration")
empty_complex_kind = ComplexKind("empty", [], [], allow_unknown_props=True)

# Define synthetic properties in the metadata section (not defined in the model)
# The predefined_properties kind is used to define the properties there.
Expand Down Expand Up @@ -1234,28 +1275,29 @@ def path_for(
class Model:
@staticmethod
def empty() -> Model:
return Model({})
return Model({}, [])

@staticmethod
def from_kinds(kinds: List[Kind]) -> Model:
all_kinds = kinds + predefined_kinds
kind_dict = {kind.fqn: kind for kind in all_kinds}
for kind in all_kinds:
kind.resolve(kind_dict)
return Model(kind_dict)

def __init__(self, kinds: Dict[str, Kind]):
self.kinds = kinds
self.__property_kind_by_path: List[ResolvedProperty] = list(
resolved = list(
# several complex kinds might have the same property
# reduce the list by hash over the path.
{
r.path: r
for c in kinds.values()
for c in all_kinds
if isinstance(c, ComplexKind) and c.aggregate_root
for r in c.resolved_properties()
}.values()
)
return Model(kind_dict, resolved)

def __init__(self, kinds: Dict[str, Kind], property_kind_by_path: List[ResolvedProperty]):
self.kinds = kinds
self.__property_kind_by_path: List[ResolvedProperty] = property_kind_by_path

def __contains__(self, name_or_object: Union[str, Json]) -> bool:
if isinstance(name_or_object, str):
Expand Down Expand Up @@ -1408,55 +1450,98 @@ def update_is_valid(from_kind: Kind, to_kind: Kind) -> None:
if check_overlap:
check_overlap_for([to_js(a) for a in updated.values()])

return Model(updated)
return Model.from_kinds(list(updated.values()))

def flat_kinds(self) -> List[Kind]:
def flat_kinds(self, lookup_model: Optional[Model] = None) -> Model:
"""
Returns a list of all kinds. The hierarchy of complex kinds is flattened:
- all properties of all base kinds.
- all metadata merged in hierarchy.
- all successor kinds combined in hierarchy.
"""
cpl: Dict[str, ComplexKind] = {kind.fqn: kind for kind in self.complex_kinds()}
model = lookup_model if lookup_model else self

def get_complex(name: str) -> ComplexKind:
return ck if isinstance(ck := model.get(name), ComplexKind) else empty_complex_kind

def all_bases(kind: ComplexKind) -> Set[str]:
bases: Set[str] = set()
for base, base_kind in kind.resolved_bases().items():
bases.add(base)
bases |= all_bases(base_kind)
return bases

def all_props(kind: ComplexKind) -> Dict[str, Property]:
props_by_name = {}
for props in [all_props(cpl[fqn]) for fqn in kind.bases] + [{p.name: p for p in kind.properties}]:
for props in [all_props(get_complex(fqn)) for fqn in kind.bases] + [{p.name: p for p in kind.properties}]:
for key, value in props.items():
props_by_name[key] = value
return props_by_name

def all_metadata(kind: ComplexKind) -> Dict[str, Any]:
metadata = {}
for meta in [all_metadata(cpl[fqn]) for fqn in kind.bases] + [kind.metadata]:
for meta in [all_metadata(get_complex(fqn)) for fqn in kind.bases] + [kind.metadata]:
for key, value in meta.items():
metadata[key] = value
return metadata

def all_successor_kinds(kind: ComplexKind) -> Dict[EdgeType, List[str]]:
successor_kinds = kind.successor_kinds.copy()
for base in kind.bases:
for edge_type, succ in all_successor_kinds(cpl[base]).items():
for edge_type, succ in all_successor_kinds(get_complex(base)).items():
successor_kinds[edge_type] = successor_kinds.get(edge_type, []) + succ
return successor_kinds

result: List[Kind] = []
def all_predecessor_kinds(kind: ComplexKind) -> Dict[EdgeType, List[str]]:
predecessor_kinds = kind.predecessor_kinds().copy()
for base in kind.bases:
for edge_type, succ in all_predecessor_kinds(get_complex(base)).items():
predecessor_kinds[edge_type] = predecessor_kinds.get(edge_type, []) + succ
return predecessor_kinds

result: Dict[str, Kind] = {}
for kind in self.kinds.values():
if isinstance(kind, ComplexKind):
result.append(
ComplexKind(
fqn=kind.fqn,
bases=kind.bases,
properties=list(all_props(kind).values()),
allow_unknown_props=kind.allow_unknown_props,
successor_kinds=all_successor_kinds(kind),
aggregate_root=kind.aggregate_root,
metadata=all_metadata(kind),
)
result[kind.fqn] = kind.copy(
bases=list(all_bases(kind)),
properties=list(all_props(kind).values()),
successor_kinds=all_successor_kinds(kind),
predecessor_kinds=all_predecessor_kinds(kind),
metadata=all_metadata(kind),
)
else:
result.append(kind)
return result
result[kind.fqn] = kind
return Model(result, self.__property_kind_by_path)

def filter_complex(
self, filter_fn: Callable[[ComplexKind], bool], with_bases: bool = True, with_prop_types: bool = True
) -> Model:
"""
Returns a model that contains only the aggregates for which the filter function returns true.
"""
visited: Set[str] = set()
kinds: Dict[str, Kind] = {}

def add_kind(cpl: ComplexKind) -> None:
if cpl.fqn in visited:
return
visited.add(cpl.fqn)
kinds[cpl.fqn] = cpl
# add bases
if with_bases:
for _, base in cpl.resolved_bases().items():
add_kind(base)
# add prop types
if with_prop_types:
for prop in cpl.resolved_properties():
if isinstance(prop.kind, ComplexKind):
add_kind(prop.kind)

for kind in self.kinds.values():
if isinstance(kind, ComplexKind) and filter_fn(kind):
add_kind(kind)

return Model(kinds, self.__property_kind_by_path)


@frozen
Expand Down
37 changes: 37 additions & 0 deletions resotocore/resotocore/static/api-doc.yaml
Expand Up @@ -207,6 +207,43 @@ paths:
schema:
type: boolean
default: false
- name: kind
description: "Only return information about the defined kinds. Comma separated list."
in: query
explode: false
schema:
type: string
default: null
- name: filter
description: "Only return information about kinds that include given string. Comma separated list."
in: query
explode: false
schema:
type: string
default: null
- name: with_bases
description: "Render all base classes. Only together with kind or filter"
in: query
schema:
type: boolean
default: false
- name: format
description: "The format of the returned json"
in: query
schema:
type: string
default: null
enum:
- schema
- simple
- name: with_property_kinds
description: "Render types of property values. Only together with kind or filter"
in: query
schema:
type: boolean
default: false


responses:
"200":
description: "The list of all kinds."
Expand Down

0 comments on commit 06c1d95

Please sign in to comment.