Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow chunked druid fetch #349

Merged
merged 4 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 157 additions & 29 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
Expand All @@ -10,12 +11,13 @@

from numalogic.connectors._base import DataFetcher
from numalogic.connectors._config import Pivot
from typing import Optional
from typing import Optional, Final

from numalogic.tools.exceptions import DruidFetcherError

_LOGGER = logging.getLogger(__name__)
TIMEOUT = 10000
TIMEOUT: Final[int] = 10000
_MAX_CONCURRENCY: Final[int] = 16


# TODO: pass dictionary of keys and values as dict
Expand All @@ -41,6 +43,7 @@ def build_params(
delay: float,
aggregations: Optional[list[str]] = None,
post_aggregations: Optional[list[str]] = None,
reference_dt: Optional[datetime] = None,
) -> dict:
"""

Expand All @@ -56,6 +59,8 @@ def build_params(
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: A map from post aggregator name to one of the
``pydruid.utils.postaggregator`` e.g., ``QuantilesDoublesSketchToQuantile``.
reference_dt: reference datetime to calculate start and end dt
(None will mean using current datetime).

Returns: a dict of parameters

Expand All @@ -64,7 +69,8 @@ def build_params(
type="and",
fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()],
)
end_dt = datetime.now(pytz.utc) - timedelta(hours=delay)
reference_dt = reference_dt or datetime.now(pytz.utc)
end_dt = reference_dt - timedelta(hours=delay)
_LOGGER.debug("Querying with end_dt: %s, that is with delay of %s hrs", end_dt, delay)

start_dt = end_dt - timedelta(hours=hours)
Expand Down Expand Up @@ -98,6 +104,8 @@ class DruidFetcher(DataFetcher):

"""

__slots__ = ("client",)

def __init__(self, url: str, endpoint: str):
super().__init__(url)
self.client = PyDruid(url, endpoint)
Expand All @@ -116,6 +124,28 @@ def fetch(
pivot: Optional[Pivot] = None,
hours: float = 24,
) -> pd.DataFrame:
"""
Fetch data from Druid.

Args:
------
datasource: Data source to query
filter_keys: keys
filter_values: values
dimensions: The dimensions to group by
delay: Added delay to the fetch query from current time.
granularity: Time bucket to aggregate data by hour, day, minute, etc.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: postaggregations map
group_by: List of columns to group by
pivot: Pivot configuration
hours: Hours from now to fetch.

Returns
-------
Fetched dataframe
"""
_start_time = time.perf_counter()
filter_pairs = make_filter_pairs(filter_keys, filter_values)
query_params = build_params(
Expand All @@ -128,33 +158,131 @@ def fetch(
aggregations=aggregations,
post_aggregations=post_aggregations,
)
try:
response = self.client.groupby(**query_params)
except Exception as err:
raise DruidFetcherError("Druid Exception:\n") from err
else:
df = response.export_pandas()
if df.empty or df.shape[0] == 0:
logging.warning("No data found for keys %s", filter_pairs)
return pd.DataFrame()

df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)
df = self._fetch(**query_params)

if df.empty or df.shape[0] == 0:
logging.warning("No data found for keys %s", filter_pairs)
return pd.DataFrame()

df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

_end_time = time.perf_counter() - _start_time
_LOGGER.debug("params: %s latency: %.6fs", query_params, _end_time)
return df
if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

_end_time = time.perf_counter() - _start_time
_LOGGER.debug("Druid params: %s, fetch time: %.4fs", query_params, _end_time)
return df

def raw_fetch(self, *args, **kwargs) -> pd.DataFrame:
raise NotImplementedError

def chunked_fetch(
ab93 marked this conversation as resolved.
Show resolved Hide resolved
self,
datasource: str,
filter_keys: list[str],
filter_values: list[str],
dimensions: list[str],
delay: float = 3.0,
granularity: str = "minute",
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
pivot: Optional[Pivot] = None,
hours: int = 24,
chunked_hours: int = 6,
s0nicboOm marked this conversation as resolved.
Show resolved Hide resolved
) -> pd.DataFrame:
"""
Fetch data concurrently, and concatenate the results.

Args:
------
datasource: Data source to query
filter_keys: keys
filter_values: values
dimensions: The dimensions to group by
delay: Added delay to the fetch query from current time.
granularity: Time bucket to aggregate data by hour, day, minute, etc.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: postaggregations map
group_by: List of columns to group by
pivot: Pivot configuration
hours: Hours from now to skip training.
chunked_hours: Hours to fetch in each chunk

Returns
-------
Fetched dataframe

Raises
------
ValueError: If chunked_hours is less than 1
"""
if chunked_hours < 1:
raise ValueError("chunked_hours should be integer and >= 1.")

_start_time = time.perf_counter()
filter_pairs = make_filter_pairs(filter_keys, filter_values)

hours_elapsed = 0
chunked_dfs = []
qparams = []
curr_time = datetime.now(pytz.utc)

while hours_elapsed < hours:
ref_dt = curr_time - timedelta(hours=hours_elapsed)
qparams.append(
build_params(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
granularity=granularity,
hours=min(chunked_hours, hours - hours_elapsed),
delay=delay,
aggregations=aggregations,
post_aggregations=post_aggregations,
reference_dt=ref_dt,
)
)
hours_elapsed += chunked_hours

max_threads = min(_MAX_CONCURRENCY, len(qparams))
_LOGGER.debug("Fetching data concurrently with %s threads", max_threads)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
ab93 marked this conversation as resolved.
Show resolved Hide resolved
futures = [executor.submit(self._fetch, **params) for params in qparams]
for future in futures:
chunked_dfs.append(future.result())
ab93 marked this conversation as resolved.
Show resolved Hide resolved

df = pd.concat(chunked_dfs, axis=0, ignore_index=True)
df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

_LOGGER.debug("Fetch time: %.4fs", time.perf_counter() - _start_time)
return df

def _fetch(self, **query_params) -> pd.DataFrame:
try:
response = self.client.groupby(**query_params)
except Exception as err:
raise DruidFetcherError("Druid Exception:\n") from err
return response.export_pandas()
Loading
Loading