Skip to content
Merged
2 changes: 1 addition & 1 deletion apps/api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -15517,7 +15517,7 @@
}
}
},
"description": "Assembly is Deprecated (cannot instantiate), OR a referenced Asset is Decommissioned (lifecycle disallows attachment), OR a concurrent write to the new Fixture stream conflicted (optimistic concurrency; essentially impossible with UUIDv7 ids)."
"description": "Assembly is Deprecated (cannot instantiate), OR a referenced Asset is Decommissioned (lifecycle disallows attachment), OR a referenced Asset is not currently installed in any Mount (install_asset first), OR a concurrent write to the new Fixture stream conflicted (optimistic concurrency; essentially impossible with UUIDv7 ids)."
},
"422": {
"description": "Request body failed schema validation OR Idempotency-Key was reused with a different request body."
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/cora/equipment/aggregates/assembly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
FixtureAssetFamilyMismatchError,
FixtureAssetNotAttachableError,
FixtureAssetNotFoundError,
FixtureAssetNotInstalledError,
FixtureMappingIncompleteError,
FixtureParameterOverridesInvalidError,
InvalidAssemblyNameError,
Expand Down Expand Up @@ -84,6 +85,7 @@
"FixtureAssetFamilyMismatchError",
"FixtureAssetNotAttachableError",
"FixtureAssetNotFoundError",
"FixtureAssetNotInstalledError",
"FixtureMappingIncompleteError",
"FixtureParameterOverridesInvalidError",
"InvalidAssemblyNameError",
Expand Down
21 changes: 21 additions & 0 deletions apps/api/src/cora/equipment/aggregates/assembly/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,27 @@ def __init__(self, asset_id: UUID, current_lifecycle: str) -> None:
self.current_lifecycle = current_lifecycle


class FixtureAssetNotInstalledError(Exception):
"""A referenced Asset is not currently installed in any Mount.

Fires when `proj_equipment_asset_location` carries no row for the
Asset at register_fixture time, i.e., the Asset exists but has
not been physically racked. A Fixture should materialize only
equipment that is already on the floor, so the
install-then-register-fixture choreography is the contract.

Carries the sorted-first offending `asset_id` for deterministic
error responses.
"""

def __init__(self, asset_id: UUID) -> None:
super().__init__(
f"Asset {asset_id} cannot be bound into a Fixture: not currently "
f"installed in any Mount; install_asset first"
)
self.asset_id = asset_id


class FixtureMappingIncompleteError(Exception):
"""`register_fixture`'s slot_asset_bindings does not satisfy
the required cardinality of one or more slots.
Expand Down
15 changes: 14 additions & 1 deletion apps/api/src/cora/equipment/features/register_fixture/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
genesis and cannot be amended). Empty dict (default) means no
lifecycle info was loaded; the decider skips the guard entirely
(useful for decider unit tests that exercise other invariants).

`mount_id_by_asset_id` maps each referenced asset_id to the Mount
currently holding it (sourced from `proj_equipment_asset_location`),
or `None` when the Asset is not currently installed. The whole field
is `None` when the handler ran without a pool (test path) and the
orphan guard is disabled entirely; this matches the
install_asset / decommission_asset projection-precondition
short-circuit convention. When non-None and an entry maps to
`None`, the decider raises `FixtureAssetNotInstalledError` carrying
the sorted-first orphan id: a Fixture should snapshot only
equipment already on the floor, so install-then-register is
the contract.
"""

from dataclasses import dataclass, field
Expand All @@ -34,7 +46,7 @@

@dataclass(frozen=True)
class RegisterFixtureContext:
"""Snapshot of Assembly + Asset existence + lifecycle checks."""
"""Snapshot of Assembly + Asset existence + lifecycle + install checks."""

assembly_state: Assembly | None
family_ids_by_asset_id: dict[UUID, frozenset[UUID] | None] = field(
Expand All @@ -43,3 +55,4 @@ class RegisterFixtureContext:
lifecycle_by_asset_id: dict[UUID, AssetLifecycle | None] = field(
default_factory=dict[UUID, AssetLifecycle | None]
)
mount_id_by_asset_id: dict[UUID, UUID | None] | None = None
29 changes: 29 additions & 0 deletions apps/api/src/cora/equipment/features/register_fixture/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
-> FixtureAssetNotAttachableError carrying the sorted-first
offending asset_id (mirrors AssetCannotAttachToFixtureError
at attach-time).
- Every referenced Asset must currently be installed in some Mount
(when the handler loaded asset_location info)
-> FixtureAssetNotInstalledError carrying the sorted-first
orphan id. A Fixture should materialize only equipment that is
already on the floor; install_asset is a hard precondition.
- Each TemplateSlot's cardinality is satisfied by the count of
bindings carrying its slot_name
-> FixtureMappingIncompleteError carrying the offending
Expand Down Expand Up @@ -55,6 +60,7 @@
FixtureAssetFamilyMismatchError,
FixtureAssetNotAttachableError,
FixtureAssetNotFoundError,
FixtureAssetNotInstalledError,
FixtureMappingIncompleteError,
FixtureParameterOverridesInvalidError,
SlotCardinality,
Expand Down Expand Up @@ -130,6 +136,11 @@ def decide(
-> FixtureAssetNotAttachableError carrying the sorted-first
offending asset_id (mirrors AssetCannotAttachToFixtureError
at attach-time).
- Every referenced Asset must currently be installed in some Mount
(when the handler loaded asset_location info)
-> FixtureAssetNotInstalledError carrying the sorted-first
orphan id. A Fixture should materialize only equipment that is
already on the floor; install_asset is a hard precondition.
- Each TemplateSlot's cardinality must be satisfied by the count
of bindings carrying its slot_name
-> FixtureMappingIncompleteError carrying the offending
Expand Down Expand Up @@ -189,6 +200,24 @@ def decide(
AssetLifecycle.DECOMMISSIONED.value,
)

# Cross-aggregate guard: every referenced Asset must currently be
# installed in some Mount. A Fixture should snapshot only equipment
# already racked on the floor, so the install-then-register-fixture
# choreography is the contract.
# `mount_id_by_asset_id is None` means the handler ran without a
# pool (test path) and the guard is disabled entirely.
if context.mount_id_by_asset_id is not None:
orphan_asset_ids = sorted(
(
asset_id
for asset_id, mount_id in context.mount_id_by_asset_id.items()
if mount_id is None
),
key=str,
)
if orphan_asset_ids:
raise FixtureAssetNotInstalledError(orphan_asset_ids[0])

slots_by_name = {slot.slot_name.value: slot for slot in assembly.required_slots}
binding_counts: Counter[str] = Counter(
binding.slot_name for binding in command.slot_asset_bindings
Expand Down
42 changes: 34 additions & 8 deletions apps/api/src/cora/equipment/features/register_fixture/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from cora.equipment.features.register_fixture.command import RegisterFixture
from cora.equipment.features.register_fixture.context import RegisterFixtureContext
from cora.equipment.features.register_fixture.decider import decide
from cora.equipment.projections.asset_location import load_asset_location
from cora.infrastructure.event_envelope import to_new_event
from cora.infrastructure.kernel import Kernel
from cora.infrastructure.logging import get_logger
Expand Down Expand Up @@ -130,15 +131,36 @@ async def handler(
now = deps.clock.now()

asset_ids = _referenced_asset_ids(command)
# Split gather across the two aggregate types so pyright keeps
# `assembly_state` narrowed to `Assembly | None` and each item
# in `assets` narrowed to `Asset | None`; a single gather across
# both would widen everything to the union.
assembly_state_task = asyncio.create_task(
load_assembly(deps.event_store, command.assembly_id)
# All three I/O streams run concurrently inside a TaskGroup so
# that a failure in any one of them cancels the siblings before
# the handler returns. Without this discipline, a load_asset
# raise would leak the assembly_state and mount_ids work as
# "task exception never retrieved" warnings and tie up pool
# connections after the request had already errored out.
# Per-asset tasks keep each result narrowed (Asset | None,
# Assembly | None) for pyright; the asset_location stream is
# only scheduled when deps.pool is set (pool=None short-circuit
# preserves the permissive default for the pool-less test path;
# matches install_asset / decommission_asset shape). In
# production deps.pool is always set, so every referenced
# asset_id gets an entry in mount_id_by_asset_id (mount_id when
# installed, None when orphan).
pool = deps.pool
async with asyncio.TaskGroup() as tg:
assembly_task = tg.create_task(load_assembly(deps.event_store, command.assembly_id))
asset_tasks = [tg.create_task(load_asset(deps.event_store, aid)) for aid in asset_ids]
mount_id_tasks: list[asyncio.Task[UUID | None]] | None = (
[tg.create_task(load_asset_location(pool, aid)) for aid in asset_ids]
if pool is not None
else None
)

assembly_state = assembly_task.result()
assets = [t.result() for t in asset_tasks]
mount_ids: list[UUID | None] | None = (
[t.result() for t in mount_id_tasks] if mount_id_tasks is not None else None
)
assets = await asyncio.gather(*(load_asset(deps.event_store, aid) for aid in asset_ids))
assembly_state = await assembly_state_task

family_ids_by_asset_id: dict[UUID, frozenset[UUID] | None] = {
aid: (asset.family_ids if asset is not None else None)
for aid, asset in zip(asset_ids, assets, strict=True)
Expand All @@ -147,10 +169,14 @@ async def handler(
aid: (asset.lifecycle if asset is not None else None)
for aid, asset in zip(asset_ids, assets, strict=True)
}
mount_id_by_asset_id: dict[UUID, UUID | None] | None = (
dict(zip(asset_ids, mount_ids, strict=True)) if mount_ids is not None else None
)
context = RegisterFixtureContext(
assembly_state=assembly_state,
family_ids_by_asset_id=family_ids_by_asset_id,
lifecycle_by_asset_id=lifecycle_by_asset_id,
mount_id_by_asset_id=mount_id_by_asset_id,
)

# Decider raises FixtureAlreadyExistsError defensively when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@ def _get_handler(request: Request) -> IdempotentHandler:
"description": (
"Assembly is Deprecated (cannot instantiate), OR a "
"referenced Asset is Decommissioned (lifecycle disallows "
"attachment), OR a concurrent write to the new Fixture "
"stream conflicted (optimistic concurrency; essentially "
"impossible with UUIDv7 ids)."
"attachment), OR a referenced Asset is not currently "
"installed in any Mount (install_asset first), OR a "
"concurrent write to the new Fixture stream conflicted "
"(optimistic concurrency; essentially impossible with "
"UUIDv7 ids)."
),
},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/cora/equipment/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
FixtureAssetFamilyMismatchError,
FixtureAssetNotAttachableError,
FixtureAssetNotFoundError,
FixtureAssetNotInstalledError,
FixtureMappingIncompleteError,
FixtureParameterOverridesInvalidError,
InvalidAssemblyNameError,
Expand Down Expand Up @@ -562,6 +563,7 @@ def register_equipment_routes(app: FastAPI) -> None:
AssetAttachedToDifferentFixtureError,
AssetCannotUpdatePartitionRuleError,
FixtureAssetNotAttachableError,
FixtureAssetNotInstalledError,
):
app.add_exception_handler(cannot_transition_cls, _handle_cannot_transition)
for pidinst_state_cls in (
Expand Down
Loading
Loading