Skip to content

Commit

Permalink
feat: allow chunked druid fetch (#349)
Browse files Browse the repository at this point in the history
Client side fetch time comparision:

- Normal fetch, 72 hours: `0.35 s`
- Chunked fetch, 72 hours, 6 hour chunk: `0.34 s`

---------
Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
  • Loading branch information
ab93 committed Feb 23, 2024
1 parent bf27eb5 commit fb97b3a
Show file tree
Hide file tree
Showing 6 changed files with 3,660 additions and 946 deletions.
186 changes: 157 additions & 29 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
Expand All @@ -10,12 +11,13 @@

from numalogic.connectors._base import DataFetcher
from numalogic.connectors._config import Pivot
from typing import Optional
from typing import Optional, Final

from numalogic.tools.exceptions import DruidFetcherError

_LOGGER = logging.getLogger(__name__)
TIMEOUT = 10000
TIMEOUT: Final[int] = 10000
_MAX_CONCURRENCY: Final[int] = 16


# TODO: pass dictionary of keys and values as dict
Expand All @@ -41,6 +43,7 @@ def build_params(
delay: float,
aggregations: Optional[list[str]] = None,
post_aggregations: Optional[list[str]] = None,
reference_dt: Optional[datetime] = None,
) -> dict:
"""
Expand All @@ -56,6 +59,8 @@ def build_params(
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: A map from post aggregator name to one of the
``pydruid.utils.postaggregator`` e.g., ``QuantilesDoublesSketchToQuantile``.
reference_dt: reference datetime to calculate start and end dt
(None will mean using current datetime).
Returns: a dict of parameters
Expand All @@ -64,7 +69,8 @@ def build_params(
type="and",
fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()],
)
end_dt = datetime.now(pytz.utc) - timedelta(hours=delay)
reference_dt = reference_dt or datetime.now(pytz.utc)
end_dt = reference_dt - timedelta(hours=delay)
_LOGGER.debug("Querying with end_dt: %s, that is with delay of %s hrs", end_dt, delay)

start_dt = end_dt - timedelta(hours=hours)
Expand Down Expand Up @@ -98,6 +104,8 @@ class DruidFetcher(DataFetcher):
"""

__slots__ = ("client",)

def __init__(self, url: str, endpoint: str):
super().__init__(url)
self.client = PyDruid(url, endpoint)
Expand All @@ -116,6 +124,28 @@ def fetch(
pivot: Optional[Pivot] = None,
hours: float = 24,
) -> pd.DataFrame:
"""
Fetch data from Druid.
Args:
------
datasource: Data source to query
filter_keys: keys
filter_values: values
dimensions: The dimensions to group by
delay: Added delay to the fetch query from current time.
granularity: Time bucket to aggregate data by hour, day, minute, etc.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: postaggregations map
group_by: List of columns to group by
pivot: Pivot configuration
hours: Hours from now to fetch.
Returns
-------
Fetched dataframe
"""
_start_time = time.perf_counter()
filter_pairs = make_filter_pairs(filter_keys, filter_values)
query_params = build_params(
Expand All @@ -128,33 +158,131 @@ def fetch(
aggregations=aggregations,
post_aggregations=post_aggregations,
)
try:
response = self.client.groupby(**query_params)
except Exception as err:
raise DruidFetcherError("Druid Exception:\n") from err
else:
df = response.export_pandas()
if df.empty or df.shape[0] == 0:
logging.warning("No data found for keys %s", filter_pairs)
return pd.DataFrame()

df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)
df = self._fetch(**query_params)

if df.empty or df.shape[0] == 0:
logging.warning("No data found for keys %s", filter_pairs)
return pd.DataFrame()

df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

_end_time = time.perf_counter() - _start_time
_LOGGER.debug("params: %s latency: %.6fs", query_params, _end_time)
return df
if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

_end_time = time.perf_counter() - _start_time
_LOGGER.debug("Druid params: %s, fetch time: %.4fs", query_params, _end_time)
return df

def raw_fetch(self, *args, **kwargs) -> pd.DataFrame:
raise NotImplementedError

def chunked_fetch(
self,
datasource: str,
filter_keys: list[str],
filter_values: list[str],
dimensions: list[str],
delay: float = 3.0,
granularity: str = "minute",
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
pivot: Optional[Pivot] = None,
hours: int = 24,
chunked_hours: int = 6,
) -> pd.DataFrame:
"""
Fetch data concurrently, and concatenate the results.
Args:
------
datasource: Data source to query
filter_keys: keys
filter_values: values
dimensions: The dimensions to group by
delay: Added delay to the fetch query from current time.
granularity: Time bucket to aggregate data by hour, day, minute, etc.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: postaggregations map
group_by: List of columns to group by
pivot: Pivot configuration
hours: Hours from now to skip training.
chunked_hours: Hours to fetch in each chunk
Returns
-------
Fetched dataframe
Raises
------
ValueError: If chunked_hours is less than 1
"""
if chunked_hours < 1:
raise ValueError("chunked_hours should be integer and >= 1.")

_start_time = time.perf_counter()
filter_pairs = make_filter_pairs(filter_keys, filter_values)

hours_elapsed = 0
chunked_dfs = []
qparams = []
curr_time = datetime.now(pytz.utc)

while hours_elapsed < hours:
ref_dt = curr_time - timedelta(hours=hours_elapsed)
qparams.append(
build_params(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
granularity=granularity,
hours=min(chunked_hours, hours - hours_elapsed),
delay=delay,
aggregations=aggregations,
post_aggregations=post_aggregations,
reference_dt=ref_dt,
)
)
hours_elapsed += chunked_hours

max_threads = min(_MAX_CONCURRENCY, len(qparams))
_LOGGER.debug("Fetching data concurrently with %s threads", max_threads)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = [executor.submit(self._fetch, **params) for params in qparams]
for future in futures:
chunked_dfs.append(future.result())

df = pd.concat(chunked_dfs, axis=0, ignore_index=True)
df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

_LOGGER.debug("Fetch time: %.4fs", time.perf_counter() - _start_time)
return df

def _fetch(self, **query_params) -> pd.DataFrame:
try:
response = self.client.groupby(**query_params)
except Exception as err:
raise DruidFetcherError("Druid Exception:\n") from err
return response.export_pandas()

0 comments on commit fb97b3a

Please sign in to comment.