Skip to content

Commit

Permalink
refactor druid connector (#301)
Browse files Browse the repository at this point in the history
- use pydruid and not fork
- add support for doubles sketch aggregator and post aggregator
- add query timeout of 10 seconds
- fix logging
---------

Signed-off-by: shrivardhan <shrivardhan92@gmail.com>
  • Loading branch information
cosmic-chichu committed Sep 27, 2023
1 parent 2973dd2 commit dfab26e
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 37 deletions.
3 changes: 3 additions & 0 deletions numalogic/connectors/druid/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from numalogic.connectors.druid._druid import DruidFetcher, make_filter_pairs, build_params

__all__ = ["DruidFetcher", "make_filter_pairs", "build_params"]
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Optional

_LOGGER = logging.getLogger(__name__)
TIMEOUT = 10000


# TODO: pass dictionary of keys and values as dict
Expand All @@ -30,26 +31,29 @@ def make_filter_pairs(filter_keys: list[str], filter_values: list[str]) -> dict[


def build_params(
aggregations: list[str],
datasource: str,
dimensions: list[str],
filter_pairs: dict,
granularity: str,
hours: float,
delay: float,
aggregations: Optional[list[str]] = None,
post_aggregations: Optional[list[str]] = None,
) -> dict:
"""
Args:
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
datasource: Data source to query
dimensions: The dimensions to group by
filter_pairs: Indicates which rows of
data to include in the query
granularity: Time bucket to aggregate data by hour, day, minute, etc.,
hours: Hours from now to skip training.
delay: Added delay to the fetch query from current time.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: A map from post aggregator name to one of the
``pydruid.utils.postaggregator`` e.g., ``QuantilesDoublesSketchToQuantile``.
Returns: a dict of parameters
Expand All @@ -66,22 +70,17 @@ def build_params(
intervals = [f"{start_dt.isoformat()}/{end_dt.isoformat()}"]
dimension_specs = map(lambda d: DimensionSpec(dimension=d, output_name=d), dimensions)

params = {
return {
"datasource": datasource,
"granularity": granularity,
"intervals": intervals,
"aggregations": aggregations,
"aggregations": aggregations or dict(),
"post_aggregations": post_aggregations or dict(),
"filter": _filter,
"dimensions": dimension_specs,
"context": {"timeout": TIMEOUT},
}

_LOGGER.debug(
"Druid query params: %s",
params,
)

return params


class DruidFetcher(DataFetcher):
"""
Expand All @@ -105,14 +104,22 @@ def fetch(
delay: float = 3.0,
granularity: str = "minute",
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
pivot: Optional[Pivot] = None,
hours: float = 24,
) -> pd.DataFrame:
_start_time = time.perf_counter()
filter_pairs = make_filter_pairs(filter_keys, filter_values)
query_params = build_params(
aggregations, datasource, dimensions, filter_pairs, granularity, hours, delay
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
granularity=granularity,
hours=hours,
delay=delay,
aggregations=aggregations,
post_aggregations=post_aggregations,
)
try:
response = self.client.groupby(**query_params)
Expand All @@ -130,7 +137,7 @@ def fetch(
if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot.columns:
if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
Expand All @@ -140,7 +147,7 @@ def fetch(
df.reset_index(inplace=True)

_end_time = time.perf_counter() - _start_time
_LOGGER.debug("Druid query latency: %.6fs", _end_time)
_LOGGER.debug("params: %s latency: %.6fs", query_params, _end_time)
return df

def raw_fetch(self, *args, **kwargs) -> pd.DataFrame:
Expand Down
22 changes: 22 additions & 0 deletions numalogic/connectors/druid/aggregators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
def quantiles_doubles_sketch(
raw_column: str, aggregator_name: str, k: int = 128, max_stream_length: int = 1000000000
) -> dict:
"""
Args:
raw_column: Name of the column in druid
aggregator_name: Arbitrary aggregator name
k: Controls accuracy, higher the better. Must be a power of 2 from 2 to 32768
max_stream_length: This parameter defines the number of items that can be presented to
each sketch before it may need to move from off-heap to on-heap memory.
Returns: quantilesDoublesSketch aggregator dict
"""
return {
"type": "quantilesDoublesSketch",
"name": aggregator_name,
"fieldName": raw_column,
"k": k,
"maxStreamLength": max_stream_length,
}
19 changes: 19 additions & 0 deletions numalogic/connectors/druid/postaggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pydruid.utils.postaggregator import Field
from pydruid.utils.postaggregator import Postaggregator


class QuantilesDoublesSketchToQuantile(Postaggregator):
"""Class for building QuantilesDoublesSketchToQuantile post aggregator."""

def __init__(self, field: Field, fraction: float, output_name=None):
name = output_name or "quantilesDoublesSketchToQuantile"

super().__init__(None, None, name)
self.field = field
self.fraction = fraction
self.post_aggregator = {
"type": "quantilesDoublesSketchToQuantile",
"name": name,
"field": self.field.post_aggregator,
"fraction": self.fraction,
}
Empty file.
19 changes: 7 additions & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pynumaflow = "~0.5"
mlflow-skinny = { version = "^2.0", optional = true }
redis = {extras = ["hiredis"], version = "^5.0", optional = true}
boto3 = { version = "^1.24.64", optional = true }
pydruid= {git="https://github.com/cosmic-chichu/pydruid.git", branch = "master", optional = true }
pydruid= { version= "^0.6", optional = true }

[tool.poetry.extras]
mlflow = ["mlflow-skinny"]
Expand Down
80 changes: 71 additions & 9 deletions tests/connectors/test_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@
import unittest
import datetime
from unittest.mock import patch, Mock

import pydruid.query
from pydruid.client import PyDruid
from pydruid.utils.dimensions import DimensionSpec
from pydruid.utils.aggregators import doublesum
from pydruid.utils import aggregators
from pydruid.utils import postaggregator
from pydruid.utils.filters import Filter
from deepdiff import DeepDiff

from numalogic.connectors._config import Pivot
from numalogic.connectors.druid import DruidFetcher, make_filter_pairs, build_params
from numalogic.connectors.druid import (
DruidFetcher,
make_filter_pairs,
build_params,
postaggregator as _post_agg,
aggregators as _agg,
)

logging.basicConfig(level=logging.DEBUG)

Expand All @@ -35,6 +43,35 @@ def mock_group_by(*_, **__):
return query


def mock_group_by_doubles_sketch(*_, **__):
"""Mock group by response for doubles sketch from druid."""
result = [
{
"event": {
"agg0": 4,
"assetAlias": "Intuit.identity.authn.signin",
"env": "prod",
"postAgg0": 21988,
},
"timestamp": "2023-09-06T07:50:00.000Z",
"version": "v1",
},
{
"event": {
"agg0": 22,
"assetAlias": "Intuit.identity.authn.signin",
"env": "prod",
"postAgg0": 2237.7999997138977,
},
"timestamp": "2023-09-06T07:53:00.000Z",
"version": "v1",
},
]
query = pydruid.query.Query(query_dict={}, query_type="groupBy")
query.parse(json.dumps(result))
return query


class TestDruid(unittest.TestCase):
start = None
end = None
Expand All @@ -54,17 +91,36 @@ def test_fetch(self):
filter_keys=["assetId"],
filter_values=["5984175597303660107"],
dimensions=["ciStatus"],
datasource="customer-interaction-metrics",
aggregations={"count": doublesum("count")},
datasource="tech-ip-customer-interaction-metrics",
aggregations={"count": aggregators.doublesum("count")},
group_by=["timestamp", "ciStatus"],
hours=36,
hours=2,
pivot=Pivot(
index="timestamp",
columns=["ciStatus"],
value=["count"],
),
)
self.assertEqual(_out.shape, (2, 2))
self.assertEqual((2, 2), _out.shape)

@patch.object(PyDruid, "groupby", Mock(return_value=mock_group_by_doubles_sketch()))
def test_fetch_double_sketch(self):
_out = self.druid.fetch(
filter_keys=["assetAlias"],
filter_values=["Intuit.accounting.core.qbowebapp"],
dimensions=["assetAlias", "env"],
datasource="coredevx-rum-perf-metrics",
aggregations={
"agg0": _agg.quantiles_doubles_sketch("valuesDoublesSketch", "agg0", 256)
},
post_aggregations={
"postAgg0": _post_agg.QuantilesDoublesSketchToQuantile(
output_name="agg0", field=postaggregator.Field("agg0"), fraction=0.9
)
},
hours=2,
)
self.assertEqual((2, 5), _out.shape)

def test_build_param(self):
expected = {
Expand All @@ -77,8 +133,14 @@ def test_build_param(self):
}

filter_pairs = make_filter_pairs(["ciStatus"], ["false"])
actual = build_params("", "foo", ["bar"], filter_pairs, "all", 24, 3)
actual["intervals"] = ""
actual = build_params(
datasource="foo",
dimensions=["bar"],
filter_pairs=filter_pairs,
granularity="all",
hours=24.0,
delay=3,
)
diff = DeepDiff(expected, actual).get("values_changed", {})
self.assertDictEqual({}, diff)

Expand All @@ -89,7 +151,7 @@ def test_fetch_exception(self):
filter_values=["5984175597303660107"],
dimensions=["ciStatus"],
datasource="customer-interaction-metrics",
aggregations={"count": doublesum("count")},
aggregations={"count": aggregators.doublesum("count")},
group_by=["timestamp", "ciStatus"],
hours=36,
pivot=Pivot(
Expand Down

0 comments on commit dfab26e

Please sign in to comment.