Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add unified conf #342

Merged
merged 10 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions numalogic/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
LightningTrainerConf,
RegistryInfo,
TrainerConf,
UnifiedScoreConf,
)
from numalogic.config.factory import (
ModelFactory,
Expand All @@ -37,4 +38,5 @@
"ThresholdFactory",
"RegistryFactory",
"TrainerConf",
"UnifiedScoreConf",
]
5 changes: 5 additions & 0 deletions numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ class NumalogicConf:
@dataclass
class DataConnectorConf:
source: str


@dataclass
class UnifiedScoreConf:
s0nicboOm marked this conversation as resolved.
Show resolved Hide resolved
scoring_function: str = "max"
7 changes: 4 additions & 3 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from numalogic.connectors._config import Pivot
from typing import Optional

from numalogic.tools.exceptions import DruidFetcherError

_LOGGER = logging.getLogger(__name__)
TIMEOUT = 10000

Expand Down Expand Up @@ -123,9 +125,8 @@ def fetch(
)
try:
response = self.client.groupby(**query_params)
except Exception:
_LOGGER.exception("Problem with getting response from client")
return pd.DataFrame()
except Exception as err:
raise DruidFetcherError("Druid Exception:\n") from err
s0nicboOm marked this conversation as resolved.
Show resolved Hide resolved
else:
df = response.export_pandas()
if df.empty or df.shape[0] == 0:
Expand Down
6 changes: 6 additions & 0 deletions numalogic/tools/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,9 @@ class DataFormatError(Exception):
"""Raised when the data format is not valid."""

pass


class DruidFetcherError(Exception):
"""Base class for all exceptions raised by the PrometheusFetcher class."""

pass
3 changes: 2 additions & 1 deletion numalogic/udfs/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from omegaconf import OmegaConf

from numalogic.config import NumalogicConf, RegistryInfo
from numalogic.config import NumalogicConf, RegistryInfo, UnifiedScoreConf

from numalogic.connectors import (
ConnectorType,
Expand All @@ -21,6 +21,7 @@
class MLPipelineConf:
pipeline_id: str = "default"
metrics: list[str] = field(default_factory=list)
unified_scoring_conf: UnifiedScoreConf = field(default_factory=lambda: UnifiedScoreConf())
numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf())


Expand Down
18 changes: 16 additions & 2 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ def __init__(
)
self.postproc_factory = PostprocessFactory()

@staticmethod
def _calculate_unified_score(anomaly_scores, scoring_function):
if scoring_function == "max":
return np.max(anomaly_scores)
if scoring_function == "min":
return np.min(anomaly_scores)
if scoring_function == "mean":
return np.mean(anomaly_scores)
raise NotImplementedError(f"Unsupported loss function provided: {scoring_function}")
s0nicboOm marked this conversation as resolved.
Show resolved Hide resolved

@UDF_TIME.time()
def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
Expand Down Expand Up @@ -151,16 +161,20 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
pipeline_id=payload.pipeline_id,
composite_keys=payload.composite_keys,
timestamp=payload.end_ts,
unified_anomaly=np.max(anomaly_scores),
unified_anomaly=self._calculate_unified_score(
anomaly_scores, _conf.unified_scoring_conf.scoring_function
),
data=self._per_feature_score(payload.metrics, anomaly_scores),
metadata=payload.metadata,
)
_LOGGER.info(
"%s - Successfully post-processed, Keys: %s, Scores: %s, Payload: %s",
"%s - Successfully post-processed, Keys: %s, Scores: %s, Payload: %s "
"using strategy: %s",
out_payload.uuid,
out_payload.composite_keys,
out_payload.unified_anomaly,
payload,
_conf.unified_scoring_conf.scoring_function,
)
_LOGGER.info("%s-%s", payload.uuid, out_payload)
messages.append(Message(keys=keys, value=out_payload.to_json(), tags=["output"]))
Expand Down
18 changes: 16 additions & 2 deletions numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,22 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
# Fetch data
df = self.fetch_data(payload)

# Retry the training if df is returning None due to some errors/exception
# while fetching the data
if df is None:
s0nicboOm marked this conversation as resolved.
Show resolved Hide resolved
_increment_counter(
counter=MSG_DROPPED_COUNTER,
labels=(self._vtx, *_metric_label_values),
)
_LOGGER.warning(
"%s - Caught exception/error while fetching from source" " for key: %s",
payload.uuid,
payload.composite_keys,
)
return Messages(Message.to_drop())

# Check if data is sufficient
if df.empty or not self._is_data_sufficient(payload, df):
if not self._is_data_sufficient(payload, df):
_LOGGER.warning(
"%s - Insufficient data found for keys %s, shape: %s",
payload.uuid,
Expand Down Expand Up @@ -369,7 +383,7 @@ def get_feature_arr(
feat_df = feat_df.fillna(fill_value).replace([np.inf, -np.inf], fill_value)
return feat_df.to_numpy(dtype=np.float32), nan_counter, inf_counter

def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame:
def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]:
"""
Fetch data from a data connector.

Expand Down
9 changes: 5 additions & 4 deletions numalogic/udfs/trainer/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from numalogic.config.factory import ConnectorFactory
from numalogic.connectors import DruidFetcherConf
from numalogic.tools.exceptions import ConfigNotFoundError
from numalogic.tools.exceptions import ConfigNotFoundError, DruidFetcherError
from numalogic.tools.types import redis_client_t
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import TrainerPayload
Expand Down Expand Up @@ -80,7 +80,7 @@ def get_druid_fetcher_conf(self, config_id: str, pipeline_id: str) -> DruidFetch
except KeyError as err:
raise ConfigNotFoundError(f"Config with ID {fetcher_id} not found!") from err

def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame:
def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]:
"""
Fetch data from druid.

Expand Down Expand Up @@ -127,19 +127,20 @@ def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame:
pivot=_fetcher_conf.pivot,
hours=_conf.numalogic_conf.trainer.train_hours,
)
except Exception:
except DruidFetcherError:
_increment_counter(
counter=FETCH_EXCEPTION_COUNTER,
labels=_metric_label_values,
)
_LOGGER.exception("%s - Error while fetching data from druid", payload.uuid)
return pd.DataFrame()
return None
_end_time = time.perf_counter() - _start_time
_add_summary(
FETCH_TIME_SUMMARY,
labels=_metric_label_values,
data=_end_time,
)

_LOGGER.debug(
"%s - Time taken to fetch data: %.3f sec, df shape: %s",
payload.uuid,
Expand Down
8 changes: 4 additions & 4 deletions numalogic/udfs/trainer/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytz

from numalogic.config.factory import ConnectorFactory
from numalogic.tools.exceptions import ConfigNotFoundError
from numalogic.tools.exceptions import ConfigNotFoundError, PrometheusFetcherError
from numalogic.tools.types import redis_client_t
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import TrainerPayload
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
except AttributeError as err:
raise ConfigNotFoundError("Prometheus config not found!") from err

def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame:
def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]:
"""
Fetch data from Prometheus/Thanos.

Expand Down Expand Up @@ -84,13 +84,13 @@ def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame:
**dict(zip(_stream_conf.composite_keys, payload.composite_keys)),
},
)
except Exception:
except PrometheusFetcherError:
_increment_counter(
counter=FETCH_EXCEPTION_COUNTER,
labels=_metric_label_values,
)
_LOGGER.exception("%s - Error while fetching data from Prometheus", payload.uuid)
return pd.DataFrame()
return None
_end_time = time.perf_counter() - _start_time
_add_summary(
FETCH_TIME_SUMMARY,
Expand Down
31 changes: 16 additions & 15 deletions tests/connectors/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
postaggregator as _post_agg,
aggregators as _agg,
)
from numalogic.tools.exceptions import DruidFetcherError

logging.basicConfig(level=logging.DEBUG)

Expand Down Expand Up @@ -156,21 +157,21 @@ def test_build_param(self):

@patch.object(PyDruid, "groupby", Mock(side_effect=OSError))
def test_fetch_exception(self):
_out = self.druid.fetch(
filter_keys=["assetId"],
filter_values=["5984175597303660107"],
dimensions=["ciStatus"],
datasource="customer-interaction-metrics",
aggregations={"count": aggregators.doublesum("count")},
group_by=["timestamp", "ciStatus"],
hours=36,
pivot=Pivot(
index="timestamp",
columns=["ciStatus"],
value=["count"],
),
)
self.assertTrue(_out.empty)
with self.assertRaises(DruidFetcherError):
_out = self.druid.fetch(
filter_keys=["assetId"],
filter_values=["5984175597303660107"],
dimensions=["ciStatus"],
datasource="customer-interaction-metrics",
aggregations={"count": aggregators.doublesum("count")},
group_by=["timestamp", "ciStatus"],
hours=36,
pivot=Pivot(
index="timestamp",
columns=["ciStatus"],
value=["count"],
),
)


if __name__ == "__main__":
Expand Down
20 changes: 20 additions & 0 deletions tests/udfs/resources/_config3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ stream_confs:
pltrainer_conf:
accelerator: cpu
max_epochs: 5
pipeline1:
pipeline_id: "pipeline1"
metrics: [ "namespace_app_rollouts_http_request_error_rate" ]
numalogic_conf:
model:
name: "Conv1dVAE"
conf:
seq_len: 12
n_features: 1
latent_dim: 1
preprocess:
- name: "StandardScaler"
threshold:
name: "MahalanobisThreshold"
trainer:
train_hours: 3
min_train_size: 100
pltrainer_conf:
accelerator: cpu
max_epochs: 5

redis_conf:
url: "http://localhost:6379"
Expand Down
10 changes: 9 additions & 1 deletion tests/udfs/test_postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
],
"status": Status.ARTIFACT_STALE,
"header": Header.MODEL_INFERENCE,
"artifact_versions": {"pipeline1:StdDevThreshold": "0"},
"artifact_versions": {"pipeline1:StdDevThreshold": "0", "pipeline1:VanillaAE": "0"},
"metadata": {
"tags": {"asset_alias": "data", "asset_id": "123456789", "env": "prd"},
},
Expand Down Expand Up @@ -181,6 +181,14 @@ def test_preprocess_4(self):
payload = TrainerPayload(**orjson.loads(msg[0].value))
self.assertEqual(payload.header, Header.TRAIN_REQUEST)

def test_postprocess_NotImplementedMethod(self):
arr = np.asarray([[1, 1], [2, 2]])
self.assertEqual(1.5, self.udf._calculate_unified_score(arr, "mean"))
self.assertEqual(1, self.udf._calculate_unified_score(arr, "min"))
self.assertEqual(2, self.udf._calculate_unified_score(arr, "max"))
with self.assertRaises(NotImplementedError):
self.udf._calculate_unified_score(arr, "mesadasd")


if __name__ == "__main__":
unittest.main()
Loading
Loading