Skip to content

Commit

Permalink
Optimize UDF logs (#370)
Browse files Browse the repository at this point in the history
Observe UDF logs and optimize as much possible.

---------

Signed-off-by: Gulshan Bhatia <gulshan_bhatia@intuit.com>
  • Loading branch information
gulshan02 committed May 6, 2024
1 parent 6975394 commit 8bba9ca
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 3 deletions.
1 change: 1 addition & 0 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
y_features=y_features,
y_unified=y_unified,
a_features=a_features.tolist(),
artifact_versions=payload.artifact_versions,
execution_time_secs=round(time.perf_counter() - _start_time, 4),
)

Expand Down
1 change: 0 additions & 1 deletion numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_increment_counter(SOURCE_COUNTER, labels=("config", *_metric_label_values))
preproc_clf = self._load_model_from_config(_conf.numalogic_conf.preprocess)
payload = replace(payload, status=Status.ARTIFACT_FOUND)
logger = logger.bind(model_from_config=preproc_clf)
try:
x_scaled = self.compute(model=preproc_clf, input_=payload.get_data())

Expand Down
2 changes: 1 addition & 1 deletion numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def ack_insufficient_data(self, key: KEYS, uuid: str, train_records: int) -> boo
key=key,
)
return False
_struct_log.debug("Acknowledging insufficient data for the key", uuid, key)
_struct_log.debug("Acknowledging insufficient data for the key", uuid=uuid, key=key)
return True

def ack_read(
Expand Down
5 changes: 4 additions & 1 deletion numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def compute(
trainer_transform: Optional[artifact_t] = None,
threshold_clf: Optional[artifact_t] = None,
numalogic_cfg: Optional[NumalogicConf] = None,
logger=None,
) -> dict[str, KeyedArtifact]:
"""
Train the model on the given input data.
Expand Down Expand Up @@ -128,7 +129,8 @@ def compute(

if threshold_clf:
threshold_clf.fit(train_reconerr)
_struct_log.debug("Fit data using threshold model")
if logger:
logger.debug("Fit data using threshold model")
dict_artifacts["threshold_clf"] = KeyedArtifact(
dkeys=[numalogic_cfg.threshold.name],
artifact=threshold_clf,
Expand Down Expand Up @@ -255,6 +257,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
trainer_transform=trainer_transform,
threshold_clf=thresh_clf,
numalogic_cfg=_conf.numalogic_conf,
logger=logger,
)
# Save artifacts

Expand Down
9 changes: 9 additions & 0 deletions tests/udfs/test_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,15 @@ def test_TrainMsgDeduplicator_exception_1(self):
train_dedup.ack_insufficient_data([*self.keys, "pipeline1"], "some-uuid", train_records=180)
self.assertLogs("RedisError")

@patch("redis.Redis.hset", Mock(side_effect=mock_druid_fetch_data()))
def test_TrainMsgDeduplicator_insufficent_data(self):
with self.assertLogs(level="DEBUG") as log:
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
train_dedup.ack_insufficient_data(
[*self.keys, "pipeline1"], "some-uuid", train_records=180
)
self.assertLogs("Acknowledging insufficient data for the key", log.output[-1])

@patch("redis.Redis.hgetall", Mock(side_effect=RedisError))
def test_TrainMsgDeduplicator_exception_2(self):
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
Expand Down

0 comments on commit 8bba9ca

Please sign in to comment.