In [1]:
from __future__ import annotations
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from typing import *

In [2]:
AP_TRUE = 'true'

OP_NOT = 'NOT'
OP_AND = 'AND'
OP_OR = 'OR'
OP_UNTIL = 'UNTIL'
OP_ALWAYS = 'ALWAYS'

type Atomic = str
type Operator = str
type Formula = (Atomic                                  # Proposition
                | tuple[Operator, Formula]              # Unary operation
                | tuple[Operator, Formula, Formula])    # Binary operation

RULES = {}

def __repr__(self) -> str:
        op = type(self).__name__
        next(it := iter(self)) # skip first
        body = ',\n'.join(map(repr, it))
        sep = '\n' + ' ' * (len(op)+1)
        body = sep.join(body.splitlines())
        return f'{op}({body})'

def iter_rule(rule):
    if isinstance(rule, Atomic):
        yield rule
    else:
        _, *args = rule
        yield rule
        for arg in args:
            yield from iter_rule(arg)

def repr_rule(rule):
    if isinstance(rule, Atomic):
        return f'<{rule}>'
    else:
        op = rule[0]
        body = ',\n'.join(map(repr_rule, iter_rule(rule)))
        sep = '\n' + ' ' * (len(op)+1)
        body = sep.join(body.splitlines())
        return f'{op}({body})'


def declare_rule(name, equiv):
    assert name not in RULES, 'Rule already exists'
    RULES[name] = equiv
    return lambda *args: (name, *args)

def expand_rule(rule):
    if isinstance(rule, Atomic):
        return rule
    else:
        op, *args = rule
        return RULES[op](*args) if op in RULES else rule

not_ = lambda arg: (OP_NOT, arg)
and_ = lambda lhs, rhs, *rest: and_((OP_AND, lhs, rhs), *rest) if rest else (OP_AND, lhs, rhs)
or_ = lambda lhs, rhs, *rest: or_((OP_OR, lhs, rhs), *rest) if rest else (OP_OR, lhs, rhs)
until_ = lambda lhs, rhs: (OP_UNTIL, lhs, rhs)
always_ = lambda arg: (OP_ALWAYS, arg)

implies_ = declare_rule('IMPLIES', lambda lhs, rhs: or_(not_(lhs), rhs))
eventually_ = declare_rule('EVENTUALLY', lambda arg: until_(AP_TRUE, arg))

### Implementation

In [3]:
class ImplComplement[R](Protocol):
    def complement(self, s: R) -> R: ...

class ImplIntersect[R](Protocol):
    def intersect(self, s1: R, s2: R) -> R: ...

class ImplUnion[R](Protocol):
    def union(self, s1: R, s2: R) -> R: ...

class ImplMinus[R](Protocol):
    def minus(self, s1: R, s2: R) -> R: ...

class ImplRCI[R](Protocol):
    def rci(self, s: R) -> R: ...

class ImplReachForw(Protocol):
    def reach_forw(self, target: R, constraints: R) -> R: ...

class ImplReachBack(Protocol):
    def reach_back(self, target: R, constraints: R) -> R: ...

# =================================================

class Impl[R](ABC):
    """Implementation Base Class"""

class SetOpImpl[R](Impl[R]):

    @abstractmethod
    def is_empty(self) -> bool: ...

    @abstractmethod
    def membership[P](self, p: P) -> bool: ...

    @abstractmethod
    def complement(self, s: R) -> R: ...

    @abstractmethod
    def instersect(self, s1: R, s2: R) -> R: ...

    def union(self, s1: R, s2: R) -> R:
        return self.complement(self.intersect(self.complement(s1), self.complement(s2)))
    
    def minus(self, s1: R, s2: R) -> R:
        return self.intersect(s1, self.complement(s2))

class TemporalOpImpl[R](Impl[R]):

    @abstractmethod
    def rci(self, target: R): ...

    @abstractmethod
    def reach_forw(self, target: R, constraints: R): ...

    @abstractmethod
    def reach_back(self, target: R, constraints: R): ...

class AxesImpl[R](Impl[R]):

    @property
    @abstractmethod
    def ndim(self) -> int: ...

    @abstractmethod
    def axis(self, name: str) -> int: ...

    @abstractmethod
    def axis_name(self, i: int) -> str: ...

    @abstractmethod
    def axis_is_periodic(self, i: int) -> bool: ...

class ShapesImpl[R](SetOpImpl[R], AxesImpl[R]):

    @abstractmethod
    def empty(self) -> R: ...

    @abstractmethod
    def hyperplane(self, normal: list[float], offset: Optional[list[float]] = None, axes: Optional[int|list[int]] = None) -> R: ...

    type TwoNum = tuple[float, float]
    type ThreeNum = tuple[float, float, int]

    def hyperrect(self, bounds: list[TwoNum|ThreeNum]) -> R:              
        s = self.empty()
        for n, row in enumerate(bounds):
            vmin, vmax, i = (row if len(row) == 3 else [*row, n])
            if vmax < vmin and self.axis_is_periodic(i):
                upper_bound = self.hyperplane(normal=[0 if i != j else +1 for j in range(self.ndim)],
                                              offset=[0 if i != j else vmin for j in range(self.ndim)])
                lower_bound = self.hyperplane(normal=[0 if i != j else -1 for j in range(self.ndim)],
                                              offset=[0 if i != j else vmax for j in range(self.ndim)])
                axis_range = self.complement(self.instersect(upper_bound, lower_bound))
            else:
                upper_bound = self.hyperplane(normal=[0 if i != j else +1 for j in range(self.ndim)],
                                              offset=[0 if i != j else vmax for j in range(self.ndim)])
                lower_bound = self.hyperplane(normal=[0 if i != j else -1 for j in range(self.ndim)],
                                              offset=[0 if i != j else vmin for j in range(self.ndim)])
                axis_range = self.instersect(upper_bound, lower_bound)
            s = self.intersect(s, axis_range)
        
        return s

### Sets

In [4]:
@dataclass(slots=True, frozen=True)
class LazySet[I: Impl, R]:

    _sb: Callable[[I], R] # Set Builder

    def __call__(self, impl: I[R]) -> R:
        return self._sb(impl)

    @classmethod
    def complement(cls, s: LazySet[ImplComplement[R], R]) -> LazySet[I, R]:
        return cls(lambda impl: impl.complement(s(impl)))

    @classmethod
    def intersect(cls, s1: Self, s2: Self) -> Self:
        return cls(lambda impl: impl.instersect(s1(impl), s2(impl)))

    @classmethod
    def union(cls, s1: Self, s2: Self) -> Self:
        return cls(lambda impl: impl.union(s1(impl), s2(impl)))
    
    @classmethod
    def minus(cls, s1: Self, s2: Self) -> Self:
        return Set(lambda impl: impl.minus(s1(impl), s2(impl)))

    

class Set[R]:

    type SetBuilder = Callable[[SetOpImpl[R]], R]

    _sb: SetBuilder

    def __init__(self, sb: SetBuilder):
        self._sb = sb

    def __call__(self, impl: SetOpImpl[R]) -> R:
        return self._sb(impl)

    @classmethod
    def complement(cls, s: Self) -> Self:
        return cls(lambda impl: impl.complement(s(impl)))

    @classmethod
    def intersect(cls, s1: Self, s2: Self) -> Self:
        return cls(lambda impl: impl.instersect(s1(impl), s2(impl)))

    @classmethod
    def union(cls, s1: Self, s2: Self) -> Self:
        return cls(lambda impl: impl.union(s1(impl), s2(impl)))
    
    @classmethod
    def minus(cls, s1: Self, s2: Self) -> Self:
        return Set(lambda impl: impl.minus(s1(impl), s2(impl)))

class StoredSet[R](Set[R]):

    def __init__(self, s: R):
        self._sb = lambda impl: s

class ShapesSet[R](Set[R]):

    type SetBuilder = Callable[[ShapesImpl[R]], R]

    _sb: SetBuilder

    def __init__(self, sb: SetBuilder):
        self._sb = sb

class BoundedSet[R](Set[R]):

    _bounds: dict[str, tuple[float, float]] # <axis-name>: (<vmin>, <vmax>)

    def __init__(self, **bounds):
        self._bounds = bounds

    def _sb(self, impl: ShapesImpl[R]) -> R:
        return impl.hyperrect([(vmin, vmax, impl.axis(name))
                               for name, (vmin, vmax) in self._bounds.items()])

class TemporalSet[R](Set[R]):

    type SetBuilder = Callable[[TemporalOpImpl[R]], R]

    _sb: SetBuilder

    def __init__(self, sb: SetBuilder):
        self._sb = sb


### TLT

In [5]:
from enum import IntEnum

class APPROX(IntEnum):
    UNDER = -1
    EXACT = 0
    OVER = +1

type ApproxType = Optional[APPROX]

type LabelMap[R] = Callable[[Atomic], TLT[R]]

@dataclass(slots=True, frozen=True)
class TLT[R]:
    set: Set[R]
    spec: Formula = '_0'
    approx: ApproxType = APPROX.EXACT

    def eval(self, impl: Impl[R]) -> R:
        return self.set(impl)

    @classmethod
    def construct(cls, spec: Formula, labels: LabelMap[R], **kw_labels: TLT[R]) -> TLT[R]:
        labels |= kw_labels

        if isinstance(spec, Atomic):
            return labels[spec]
        
        op, *args = expand_rule(spec)
        args_tlt = [TLT.construct(arg, labels) for arg in args]

        _set = TLT._construct_set(op, *map(lambda tlt: tlt.set, args_tlt))
        _apprx = TLT._construct_approx(op, *map(lambda tlt: tlt.approx, args_tlt))
        return cls(_set, spec, _apprx)

    def _construct_set(self, op: Operator, *args: Set[R]) -> Set[R]:
        if op == OP_NOT:
            (arg,) = args
            return -arg.set
        
        elif op == OP_AND:
            (lhs, rhs) = args
            return lhs * rhs
        
        elif op == OP_OR:
            (lhs, rhs) = args
            return lhs + rhs
        
        elif op == OP_UNTIL:
            (lhs, rhs) = args
            return TemporalSet(lambda impl: impl.reach_forw(rhs, lhs))
        
        elif op == OP_ALWAYS:
            (arg,) = args
            return TemporalSet(lambda impl: impl.rci(arg))
        
        else:
            raise ValueError(f'Invalid operator: {op}')
        
    def _construct_approx(self, op: Operator, *args: ApproxType) -> ApproxType:
        if op == OP_NOT:
            (a,) = args
            return None if a is None else APPROX(-1 * a)

        elif op == OP_AND:
            (a1, a2) = args
            if None in (a1, a2):
                return None
            elif APPROX.UNDER in (a1, a2):
                return None
            else:
                return APPROX.OVER

        elif op == OP_OR:
            (a1, a2) = args
            if None in (a1, a2):
                return None
            elif a1 == a2:
                return a1
            elif a1 == APPROX.EXACT:
                return a2
            elif a2 == APPROX.EXACT:
                return a1
            else:
                return None
        
        elif op == OP_UNTIL:
            (a1, a2) = args
            return APPROX.EXACT # TODO

        elif op == OP_ALWAYS:
            (a,) = args
            return APPROX.EXACT # TODO

        else:
            raise ValueError(f'Invalid operator: {op}')
    
type TLTLike[R] = Set[R] | TLT[R]

def as_tlt[R](arg: TLTLike[R]):
    if isinstance(arg, Set[R]):
        return TLT(arg)
    else:
        return arg

## User

In [6]:
print('Creating speed limits')
limit_30 = BoundedSet(v=(0.3, 0.6))
limit_50 = BoundedSet(v=(0.4, 1.0))

print('Creating road geometries')
intersection_geometry = TLT.Intersect(
    BoundedSet(x=(4.20/8, 6.87/8), y=(1.20/6, 3.04/6)),
    ShapesSet(lambda impl: impl.hyperplane(normal=[-1, 1],  offset=[4.60/8, 2.67/6])),
    ShapesSet(lambda impl: impl.hyperplane(normal=[1, 1], offset=[6.40/8, 2.67/6])),
)
kyrkogatan_left_geometry = (
    intersection_geometry
    + BoundedSet(x=(0.00/8, 5.33/8), y=(1.85/6, 2.37/6), h=(+np.pi - np.pi/5, -np.pi + np.pi/5)) 
    + BoundedSet(x=(6.00/8, 8.00/8), y=(1.70/6, 2.24/6), h=(+np.pi - np.pi/5, -np.pi + np.pi/5))
)
kyrkogatan_right_geometry = (
    intersection_geometry
    + BoundedSet(x=(0.00/8, 5.33/8), y=(1.20/6, 1.84/6), h=(-np.pi/5, +np.pi/5))
    + BoundedSet(x=(6.00/8, 8.00/8), y=(1.20/6, 1.73/6), h=(-np.pi/5, +np.pi/5))
)
nygatan_down_geometry = (
    intersection_geometry
    + BoundedSet(x=(4.94/8, 5.53/8), y=(2.26/6, 6.00/6), h=(-np.pi/2 - np.pi/5, -np.pi/2 + np.pi/5))
    + BoundedSet(x=(5.07/8, 5.47/8), y=(0.00/6, 1.60/6), h=(-np.pi/2 - np.pi/5, -np.pi/2 + np.pi/5))
)
nygatan_up_geometry = (
    intersection_geometry
    + BoundedSet(x=(5.60/8, 6.14/8), y=(2.27/6, 6.00/6), h=(+np.pi/2 - np.pi/5, +np.pi/2 + np.pi/5))
    + BoundedSet(x=(5.33/8, 5.74/8), y=(0.00/6, 1.60/6), h=(+np.pi/2 - np.pi/5, +np.pi/2 + np.pi/5))
) 

print('Creating streets')
kyrkogatan_vel  = And(Implies(ShapesSet(lambda impl: impl.hyperplane(normal=[-1], offset=[3.01/8])), limit_50),
                    Implies(ShapesSet(lambda impl: impl.hyperplane(normal=[+1], offset=[3.00/8])), limit_30))
nygatan_vel     = And(Implies(ShapesSet(lambda impl: impl.hyperplane(normal=[-1], offset=[4.01/6])), limit_30),
                    Implies(ShapesSet(lambda impl: impl.hyperplane(normal=[+1], offset=[4.00/6])), limit_50))
kyrkogatan_left     = And(kyrkogatan_left_geometry, kyrkogatan_vel)
kyrkogatan_right    = And(kyrkogatan_right_geometry, kyrkogatan_vel)
nygatan_down        = And(nygatan_down_geometry, nygatan_vel)
nygatan_up          = And(nygatan_up_geometry, nygatan_vel) 
kyrkogatan  = Or(kyrkogatan_left, kyrkogatan_right)
nygatan     = Or(nygatan_down, nygatan_up)

print('Creating entry/exit zones')
exit_zone     = BoundedSet(x=(5.67/8, 6.13/8), y=(5.47/6, 5.93/6))
entry_zone    = BoundedSet(x=(1.50/8, 1.95/8), y=(1.87/6, 2.33/6))
parking_start = BoundedSet(x=(2.30/8, 2.75/8), y=(1.87/6, 2.33/6))

print('Creating parking lot')
parking_spot_1 = BoundedSet(x=(2.13/8, 2.40/8), y=(5.54/6, 6.00/6), h=(+np.pi/2 - np.pi/5, +np.pi/2 + np.pi/5))
parking_spot_2 = BoundedSet(x=(3.15/8, 3.47/8), y=(4.33/6, 4.80/6), h=(-np.pi/2 - np.pi/5, -np.pi/2 + np.pi/5))
parking_spot_entry_1 = BoundedSet(x=(2.00/8, 2.53/8), y=(5.33/6, 5.73/6))
parking_spot_entry_2 = BoundedSet(x=(3.02/8, 3.61/8), y=(4.67/6, 5.07/6))

parking_lot_geometry = Set(+ BoundedSet(x=(1.20/8, 2.10/8), y=(2.73/6, 6.00/6)),  # left side
                        # + BoundedSet(x=(1.20/8, 4.40/8), y=(3.27/6, 3.74/6)),  # bottom
                        + BoundedSet(x=(3.75/8, 4.60/8), y=(3.33/6, 5.47/6)),  # right side
                        + BoundedSet(x=(1.20/8, 4.60/8), y=(4.87/6, 5.47/6)),  # top
                        + BoundedSet(x=(1.20/8, 2.10/8), y=(2.73/6, 3.50/6)),  # entry inner
                        + BoundedSet(x=(1.30/8, 1.95/8), y=(2.13/6, 3.00/6)))  # entry out
parking_spots = Or(parking_spot_1, parking_spot_2)
parking_spots_entry = Or(parking_spot_entry_1, parking_spot_entry_2)
parking_lot = ManyOr(And(parking_lot_geometry, limit_30), parking_spots_entry, parking_spots)

print('Environment created!')
    # return dict(entry_zone=entry_zone,          exit_zone=exit_zone,
    #             kyrkogatan=kyrkogatan,          kyrkogatan_left=kyrkogatan_left,    kyrkogatan_right=kyrkogatan_right,
    #             nygatan=nygatan,                nygatan_up=nygatan_up,              nygatan_down=nygatan_down,
    #             parking_start=parking_start,    parking_lot=parking_lot,            parking_spots=parking_spots)

In [7]:
import numpy as np
import numpy.typing as npt
import hj_reachability as hj

type Repr = npt.NDArray[Any]

@dataclass
class LevelSetImpl(SetOpImpl[Repr], TemporalOpImpl[Repr], ShapesImpl[Repr]):

    grid: hj.Grid
    timeline: np.ndarray
    dynamics: hj.Dynamics

    solver_settings = hj.SolverSettings.with_accuracy("low")
    
    def empty(self):
        return np.ones(self.grid.shape)

    def complement(self, vf):
        return -vf

    def intersect(self, vf_1, vf_2):
        return np.maximum(vf_1, vf_2)

    def union(self, vf_1, vf_2):
        return np.minimum(vf_1, vf_2)
    
    def reach_forw(self, target, constraints=None):
        vf = hj.solve(self.solver_settings,
                      self.dynamics,
                      self.grid,
                      self.timeline,
                      target,
                      None if constraints is None else constraints)
        return np.flip(np.asarray(vf), axis=0)
        
    def reach_back(self, target, constraints=None):
        vf = hj.solve(self.solver_settings,
                      self.dynamics,
                      self.grid,
                      -self.timeline,
                      target,
                      None if constraints is None else constraints)
        return np.flip(np.asarray(vf), axis=0)

    def rci(self, target):
        target = self._make_tube(target)
        aux_target = np.ones_like(target)
        aux_target[-1, ...] = target[-1, ...]
        aux_constraint = target
        vf = hj.solver(self.solver_settings,
                       self.dynamics,
                       self.grid,
                       self.timeline,
                       aux_target,
                       aux_constraint)
        return np.asarray(np.flip(vf, axis=0))

    def hyperplane(self, normal, offset, axes=None):
        vf = self.empty()
        axes = axes or list(range(self.ndim))
        xs = lambda i: self.states[..., i]
        for i, k, m in zip(axes, normal, offset, strict=True):
            vf += k*( xs(i) - m )
        return vf
    
    def _is_invariant(self, vf):
        return len(vf.shape) != len(self.timeline.shape + self.grid.shape)

    def _make_tube(self, vf):
        if self._is_invariant():    
            vf = np.concatenate([vf[np.newaxis, ...]] * len(self.timeline))
        return vf


TypeError: Cannot create a consistent method resolution
order (MRO) for bases SetOpImpl, TemporalOpImpl, ShapesImpl

In [None]:
I = LevelSetImpl([2, 2], [[-1, -1], [+1, +1]])
I.set_periodic(0)

phi = Until(Or('kyrkogatan', 'nygatan'), 'exit_zone')
node = I.solve(phi)