From eec71e0043a7063e963d470149f61e5719699971 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Misbach?= Date: Wed, 18 Jun 2025 14:22:16 +0200 Subject: [PATCH] [uss_qualifier/netrid/common_dictionary_evaluator] Add to generic evaluator optional ability to skip evaluation; factor away 'dotted_get' --- .../netrid/common_dictionary_evaluator.py | 84 +++++++++++-------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py b/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py index 75258e144e..af235406bb 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py @@ -37,7 +37,6 @@ from monitoring.monitorlib.fetch.rid import Flight, FlightDetails, Position from monitoring.monitorlib.geo import Altitude, LatLngPoint, validate_lat, validate_lng from monitoring.monitorlib.rid import RIDVersion -from monitoring.monitorlib.schema_validation import F3411_19, F3411_22a from monitoring.uss_qualifier.configurations.configuration import ParticipantID from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration from monitoring.uss_qualifier.scenarios.scenario import PendingCheck, TestScenarioType @@ -1351,15 +1350,17 @@ def _evaluate_ua_classification_eu_category( ValueError: if a test operation wasn't performed correctly by uss_qualifier. """ - if self._rid_version == RIDVersion.f3411_19: - self._test_scenario.record_note( - key="skip_reason", - message=f"Unsupported version {self._rid_version}: skipping UA classification 'category' field for 'European Union' type", - ) - return - if not injected.has_field_with_value("eu_classification"): - # skip if UA classification type is not 'European Union' type - return + def skip_eval( + _: Optional[UAClassificationEUCategory], + __: Optional[UAClassificationEUCategory], + ) -> Optional[str]: + if self._rid_version == RIDVersion.f3411_19: + return f"Unsupported version {self._rid_version}" + + if not injected.has_field_with_value("eu_classification"): + return "Injected UA classification type is not 'European Union' type" + + return None def cat_value_validator( val: UAClassificationEUCategory, @@ -1387,6 +1388,7 @@ def cat_value_comparator( UAClassificationEUCategory.EUCategoryUndefined, cat_value_comparator, injected, + skip_eval=skip_eval, **generic_kwargs, ) @@ -1401,15 +1403,17 @@ def _evaluate_ua_classification_eu_class( ValueError: if a test operation wasn't performed correctly by uss_qualifier. """ - if self._rid_version == RIDVersion.f3411_19: - self._test_scenario.record_note( - key="skip_reason", - message=f"Unsupported version {self._rid_version}: skipping UA classification 'class' field for 'European Union' type", - ) - return - if not injected.has_field_with_value("eu_classification"): - # skip if UA classification type is not 'European Union' type - return + def skip_eval( + _: Optional[UAClassificationEUCategory], + __: Optional[UAClassificationEUCategory], + ) -> Optional[str]: + if self._rid_version == RIDVersion.f3411_19: + return f"Unsupported version {self._rid_version}" + + if not injected.has_field_with_value("eu_classification"): + return "Injected UA classification type is not 'European Union' type" + + return None def class_value_validator( val: UAClassificationEUClass, @@ -1436,6 +1440,7 @@ def class_value_comparator( UAClassificationEUClass.EUClassUndefined, class_value_comparator, injected, + skip_eval=skip_eval, **generic_kwargs, ) @@ -1462,6 +1467,7 @@ def _generic_evaluator( participant: ParticipantID, query_timestamp: datetime.datetime, sp_is_allowed_to_generate_missing: bool = False, + skip_eval: Optional[Callable[[Optional[T], Optional[T]], str]] = None, ): """ Generic evaluator of a field. Exactly one of sp_observed or dp_observed must be provided. @@ -1486,26 +1492,15 @@ def _generic_evaluator( participant: participant providing the API through which the value was observed. query_timestamp: timestamp of the observation query. sp_is_allowed_to_generate_missing: Boolean to indicate that the SP may generate any value, should no value have been injected. + skip_eval: Function that, if defined, and if it returns a reason for skipping the evaluation, will skip it. Raises: ValueError: if a test operation wasn't performed correctly by uss_qualifier. """ - def dotted_get(obj: Any, key: str) -> Optional[T]: - val: Any = obj - for k in key.split("."): - if val is None: - return val - if isinstance(val, dict) and k in val: - val = val[k] - else: - val = getattr(val, k, None) - return val - - injected_val: Optional[T] = dotted_get(injected, injected_field_name) + injected_val: Optional[T] = _dotted_get(injected, injected_field_name) if injected_val is not None: - if value_validator is not None: try: injected_val = value_validator(injected_val) @@ -1516,12 +1511,21 @@ def dotted_get(obj: Any, key: str) -> Optional[T]: observed_val: Optional[T] if sp_observed is not None: - observed_val = dotted_get(sp_observed, sp_field_name) + observed_val = _dotted_get(sp_observed, sp_field_name) elif dp_observed is not None: - observed_val = dotted_get(dp_observed, dp_field_name) + observed_val = _dotted_get(dp_observed, dp_field_name) else: raise ValueError("No observed flight provided.") + if skip_eval: + skip_reason = skip_eval(injected_val, observed_val) + if skip_reason: + self._test_scenario.record_note( + key="skip_reason", + message=f"Skipping {field_human_name} evaluation: {skip_reason}", + ) + return + with self._test_scenario.check( f"{field_human_name} is exposed correctly", participant, @@ -1600,3 +1604,15 @@ def dotted_get(obj: Any, key: str) -> Optional[T]: ) }, ) + + +def _dotted_get(obj: Any, key: str) -> Optional[T]: + val: Any = obj + for k in key.split("."): + if val is None: + return val + if isinstance(val, dict) and k in val: + val = val[k] + else: + val = getattr(val, k, None) + return val