Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from monitoring.monitorlib.fetch.rid import Flight, FlightDetails, Position
from monitoring.monitorlib.geo import Altitude, LatLngPoint, validate_lat, validate_lng
from monitoring.monitorlib.rid import RIDVersion
from monitoring.monitorlib.schema_validation import F3411_19, F3411_22a
from monitoring.uss_qualifier.configurations.configuration import ParticipantID
from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration
from monitoring.uss_qualifier.scenarios.scenario import PendingCheck, TestScenarioType
Expand Down Expand Up @@ -1351,15 +1350,17 @@ def _evaluate_ua_classification_eu_category(
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

if self._rid_version == RIDVersion.f3411_19:
self._test_scenario.record_note(
key="skip_reason",
message=f"Unsupported version {self._rid_version}: skipping UA classification 'category' field for 'European Union' type",
)
return
if not injected.has_field_with_value("eu_classification"):
# skip if UA classification type is not 'European Union' type
return
def skip_eval(
_: Optional[UAClassificationEUCategory],
__: Optional[UAClassificationEUCategory],
) -> Optional[str]:
if self._rid_version == RIDVersion.f3411_19:
return f"Unsupported version {self._rid_version}"

if not injected.has_field_with_value("eu_classification"):
return "Injected UA classification type is not 'European Union' type"

return None

def cat_value_validator(
val: UAClassificationEUCategory,
Expand Down Expand Up @@ -1387,6 +1388,7 @@ def cat_value_comparator(
UAClassificationEUCategory.EUCategoryUndefined,
cat_value_comparator,
injected,
skip_eval=skip_eval,
**generic_kwargs,
)

Expand All @@ -1401,15 +1403,17 @@ def _evaluate_ua_classification_eu_class(
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

if self._rid_version == RIDVersion.f3411_19:
self._test_scenario.record_note(
key="skip_reason",
message=f"Unsupported version {self._rid_version}: skipping UA classification 'class' field for 'European Union' type",
)
return
if not injected.has_field_with_value("eu_classification"):
# skip if UA classification type is not 'European Union' type
return
def skip_eval(
_: Optional[UAClassificationEUCategory],
__: Optional[UAClassificationEUCategory],
) -> Optional[str]:
if self._rid_version == RIDVersion.f3411_19:
return f"Unsupported version {self._rid_version}"

if not injected.has_field_with_value("eu_classification"):
return "Injected UA classification type is not 'European Union' type"

return None

def class_value_validator(
val: UAClassificationEUClass,
Expand All @@ -1436,6 +1440,7 @@ def class_value_comparator(
UAClassificationEUClass.EUClassUndefined,
class_value_comparator,
injected,
skip_eval=skip_eval,
**generic_kwargs,
)

Expand All @@ -1462,6 +1467,7 @@ def _generic_evaluator(
participant: ParticipantID,
query_timestamp: datetime.datetime,
sp_is_allowed_to_generate_missing: bool = False,
skip_eval: Optional[Callable[[Optional[T], Optional[T]], str]] = None,
):
"""
Generic evaluator of a field. Exactly one of sp_observed or dp_observed must be provided.
Expand All @@ -1486,26 +1492,15 @@ def _generic_evaluator(
participant: participant providing the API through which the value was observed.
query_timestamp: timestamp of the observation query.
sp_is_allowed_to_generate_missing: Boolean to indicate that the SP may generate any value, should no value have been injected.
skip_eval: Function that, if defined, and if it returns a reason for skipping the evaluation, will skip it.


Raises:
ValueError: if a test operation wasn't performed correctly by uss_qualifier.
"""

def dotted_get(obj: Any, key: str) -> Optional[T]:
val: Any = obj
for k in key.split("."):
if val is None:
return val
if isinstance(val, dict) and k in val:
val = val[k]
else:
val = getattr(val, k, None)
return val

injected_val: Optional[T] = dotted_get(injected, injected_field_name)
injected_val: Optional[T] = _dotted_get(injected, injected_field_name)
if injected_val is not None:

if value_validator is not None:
try:
injected_val = value_validator(injected_val)
Expand All @@ -1516,12 +1511,21 @@ def dotted_get(obj: Any, key: str) -> Optional[T]:

observed_val: Optional[T]
if sp_observed is not None:
observed_val = dotted_get(sp_observed, sp_field_name)
observed_val = _dotted_get(sp_observed, sp_field_name)
elif dp_observed is not None:
observed_val = dotted_get(dp_observed, dp_field_name)
observed_val = _dotted_get(dp_observed, dp_field_name)
else:
raise ValueError("No observed flight provided.")

if skip_eval:
skip_reason = skip_eval(injected_val, observed_val)
if skip_reason:
self._test_scenario.record_note(
key="skip_reason",
message=f"Skipping {field_human_name} evaluation: {skip_reason}",
)
return

with self._test_scenario.check(
f"{field_human_name} is exposed correctly",
participant,
Expand Down Expand Up @@ -1600,3 +1604,15 @@ def dotted_get(obj: Any, key: str) -> Optional[T]:
)
},
)


def _dotted_get(obj: Any, key: str) -> Optional[T]:
val: Any = obj
for k in key.split("."):
if val is None:
return val
if isinstance(val, dict) and k in val:
val = val[k]
else:
val = getattr(val, k, None)
return val
Loading