diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index 36af8195..1d44b9fd 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -207,6 +207,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: y_features=y_features, y_unified=y_unified, a_features=a_features.tolist(), + artifact_versions=payload.artifact_versions, execution_time_secs=round(time.perf_counter() - _start_time, 4), ) diff --git a/numalogic/udfs/preprocess.py b/numalogic/udfs/preprocess.py index ee1cec8f..d91de0ba 100644 --- a/numalogic/udfs/preprocess.py +++ b/numalogic/udfs/preprocess.py @@ -179,7 +179,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _increment_counter(SOURCE_COUNTER, labels=("config", *_metric_label_values)) preproc_clf = self._load_model_from_config(_conf.numalogic_conf.preprocess) payload = replace(payload, status=Status.ARTIFACT_FOUND) - logger = logger.bind(model_from_config=preproc_clf) try: x_scaled = self.compute(model=preproc_clf, input_=payload.get_data()) diff --git a/numalogic/udfs/tools.py b/numalogic/udfs/tools.py index b5541db4..83075f59 100644 --- a/numalogic/udfs/tools.py +++ b/numalogic/udfs/tools.py @@ -295,7 +295,7 @@ def ack_insufficient_data(self, key: KEYS, uuid: str, train_records: int) -> boo key=key, ) return False - _struct_log.debug("Acknowledging insufficient data for the key", uuid, key) + _struct_log.debug("Acknowledging insufficient data for the key", uuid=uuid, key=key) return True def ack_read( diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index 8ab91cd3..c71b3759 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -79,6 +79,7 @@ def compute( trainer_transform: Optional[artifact_t] = None, threshold_clf: Optional[artifact_t] = None, numalogic_cfg: Optional[NumalogicConf] = None, + logger=None, ) -> dict[str, KeyedArtifact]: """ Train the model on the given input data. @@ -128,7 +129,8 @@ def compute( if threshold_clf: threshold_clf.fit(train_reconerr) - _struct_log.debug("Fit data using threshold model") + if logger: + logger.debug("Fit data using threshold model") dict_artifacts["threshold_clf"] = KeyedArtifact( dkeys=[numalogic_cfg.threshold.name], artifact=threshold_clf, @@ -255,6 +257,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: trainer_transform=trainer_transform, threshold_clf=thresh_clf, numalogic_cfg=_conf.numalogic_conf, + logger=logger, ) # Save artifacts diff --git a/tests/udfs/test_trainer.py b/tests/udfs/test_trainer.py index 2eb14f79..86e19d52 100644 --- a/tests/udfs/test_trainer.py +++ b/tests/udfs/test_trainer.py @@ -425,6 +425,15 @@ def test_TrainMsgDeduplicator_exception_1(self): train_dedup.ack_insufficient_data([*self.keys, "pipeline1"], "some-uuid", train_records=180) self.assertLogs("RedisError") + @patch("redis.Redis.hset", Mock(side_effect=mock_druid_fetch_data())) + def test_TrainMsgDeduplicator_insufficent_data(self): + with self.assertLogs(level="DEBUG") as log: + train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) + train_dedup.ack_insufficient_data( + [*self.keys, "pipeline1"], "some-uuid", train_records=180 + ) + self.assertLogs("Acknowledging insufficient data for the key", log.output[-1]) + @patch("redis.Redis.hgetall", Mock(side_effect=RedisError)) def test_TrainMsgDeduplicator_exception_2(self): train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)