From 1e4fa449a25fdf848272a4a27431c2d3d076f0e9 Mon Sep 17 00:00:00 2001 From: Gulshan Bhatia Date: Thu, 25 Apr 2024 18:16:10 -0700 Subject: [PATCH 1/5] remove Pipeline object from preprocess logs Signed-off-by: Gulshan Bhatia --- numalogic/udfs/preprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/numalogic/udfs/preprocess.py b/numalogic/udfs/preprocess.py index ee1cec8f..d91de0ba 100644 --- a/numalogic/udfs/preprocess.py +++ b/numalogic/udfs/preprocess.py @@ -179,7 +179,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _increment_counter(SOURCE_COUNTER, labels=("config", *_metric_label_values)) preproc_clf = self._load_model_from_config(_conf.numalogic_conf.preprocess) payload = replace(payload, status=Status.ARTIFACT_FOUND) - logger = logger.bind(model_from_config=preproc_clf) try: x_scaled = self.compute(model=preproc_clf, input_=payload.get_data()) From e6f15b2587a852e05552aa2a56d2ef903cff5fea Mon Sep 17 00:00:00 2001 From: Gulshan Bhatia Date: Tue, 30 Apr 2024 15:26:00 -0700 Subject: [PATCH 2/5] add artifact version in logs for postprocess Signed-off-by: Gulshan Bhatia --- numalogic/udfs/postprocess.py | 1 + 1 file changed, 1 insertion(+) diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index 36af8195..1d44b9fd 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -207,6 +207,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: y_features=y_features, y_unified=y_unified, a_features=a_features.tolist(), + artifact_versions=payload.artifact_versions, execution_time_secs=round(time.perf_counter() - _start_time, 4), ) From 8688bf9cf7eb97c0e341554f5a0c182fd1e85451 Mon Sep 17 00:00:00 2001 From: Gulshan Bhatia Date: Thu, 2 May 2024 13:18:48 -0700 Subject: [PATCH 3/5] use logger object in trainer UDF for compute method Signed-off-by: Gulshan Bhatia --- numalogic/udfs/trainer/_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index 8ab91cd3..3f080268 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -79,6 +79,7 @@ def compute( trainer_transform: Optional[artifact_t] = None, threshold_clf: Optional[artifact_t] = None, numalogic_cfg: Optional[NumalogicConf] = None, + logger=None, ) -> dict[str, KeyedArtifact]: """ Train the model on the given input data. @@ -128,7 +129,7 @@ def compute( if threshold_clf: threshold_clf.fit(train_reconerr) - _struct_log.debug("Fit data using threshold model") + logger.debug("Fit data using threshold model") dict_artifacts["threshold_clf"] = KeyedArtifact( dkeys=[numalogic_cfg.threshold.name], artifact=threshold_clf, @@ -255,6 +256,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: trainer_transform=trainer_transform, threshold_clf=thresh_clf, numalogic_cfg=_conf.numalogic_conf, + logger=logger, ) # Save artifacts From 1d5d035486632e279b4738649292858634dbe567 Mon Sep 17 00:00:00 2001 From: Gulshan Bhatia Date: Thu, 2 May 2024 13:35:58 -0700 Subject: [PATCH 4/5] check for NoneType logger object Signed-off-by: Gulshan Bhatia --- numalogic/udfs/trainer/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index 3f080268..c71b3759 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -129,7 +129,8 @@ def compute( if threshold_clf: threshold_clf.fit(train_reconerr) - logger.debug("Fit data using threshold model") + if logger: + logger.debug("Fit data using threshold model") dict_artifacts["threshold_clf"] = KeyedArtifact( dkeys=[numalogic_cfg.threshold.name], artifact=threshold_clf, From 31d7897d87a273c6fb6908c51262da0b1ffe99e1 Mon Sep 17 00:00:00 2001 From: Gulshan Bhatia Date: Mon, 6 May 2024 10:16:36 -0700 Subject: [PATCH 5/5] log for ack_insuff_data and add unit test Signed-off-by: Gulshan Bhatia --- numalogic/udfs/tools.py | 2 +- tests/udfs/test_trainer.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/numalogic/udfs/tools.py b/numalogic/udfs/tools.py index b5541db4..83075f59 100644 --- a/numalogic/udfs/tools.py +++ b/numalogic/udfs/tools.py @@ -295,7 +295,7 @@ def ack_insufficient_data(self, key: KEYS, uuid: str, train_records: int) -> boo key=key, ) return False - _struct_log.debug("Acknowledging insufficient data for the key", uuid, key) + _struct_log.debug("Acknowledging insufficient data for the key", uuid=uuid, key=key) return True def ack_read( diff --git a/tests/udfs/test_trainer.py b/tests/udfs/test_trainer.py index 2eb14f79..86e19d52 100644 --- a/tests/udfs/test_trainer.py +++ b/tests/udfs/test_trainer.py @@ -425,6 +425,15 @@ def test_TrainMsgDeduplicator_exception_1(self): train_dedup.ack_insufficient_data([*self.keys, "pipeline1"], "some-uuid", train_records=180) self.assertLogs("RedisError") + @patch("redis.Redis.hset", Mock(side_effect=mock_druid_fetch_data())) + def test_TrainMsgDeduplicator_insufficent_data(self): + with self.assertLogs(level="DEBUG") as log: + train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) + train_dedup.ack_insufficient_data( + [*self.keys, "pipeline1"], "some-uuid", train_records=180 + ) + self.assertLogs("Acknowledging insufficient data for the key", log.output[-1]) + @patch("redis.Redis.hgetall", Mock(side_effect=RedisError)) def test_TrainMsgDeduplicator_exception_2(self): train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)