|
| 1 | +""" |
| 2 | +Async functions to query QuestDB over HTTP(S) via CSV into Pandas or Numpy. |
| 3 | +""" |
| 4 | + |
| 5 | +__all__ = ['pandas_query', 'numpy_query'] |
| 6 | + |
| 7 | +import asyncio |
| 8 | +from concurrent.futures import ThreadPoolExecutor |
| 9 | +from io import BytesIO |
| 10 | + |
| 11 | +import aiohttp |
| 12 | +import numpy as np |
| 13 | +import pandas as pd |
| 14 | + |
| 15 | +from .endpoint import Endpoint |
| 16 | +from .errors import QueryError |
| 17 | + |
| 18 | + |
| 19 | +def _new_session(endpoint): |
| 20 | + auth = None |
| 21 | + if endpoint.username is not None: |
| 22 | + if endpoint.password is not None: |
| 23 | + raise ValueError('Password specified without username') |
| 24 | + auth = aiohttp.BasicAuth(endpoint.username, endpoint.password) |
| 25 | + return aiohttp.ClientSession(auth=auth) |
| 26 | + |
| 27 | + |
| 28 | +async def _pre_query(session: aiohttp.ClientSession, endpoint: Endpoint, query: str) -> tuple[ |
| 29 | + list[tuple[str, (str, object)]], int]: |
| 30 | + url = f'{endpoint.url}/exec' |
| 31 | + params = [('query', query), ('count', 'true'), ('limit', '0')] |
| 32 | + dtypes_map = { |
| 33 | + 'STRING': ('STRING', None), |
| 34 | + 'SYMBOL': ('SYMBOL', None), |
| 35 | + 'DOUBLE': ('DOUBLE', 'float64'), |
| 36 | + 'FLOAT': ('FLOAT', 'float32'), |
| 37 | + 'CHAR': ('CHAR', None), |
| 38 | + 'TIMESTAMP': ('TIMESTAMP', None) |
| 39 | + } |
| 40 | + async with session.get(url=url, params=params) as resp: |
| 41 | + result = await resp.json() |
| 42 | + if resp.status != 200: |
| 43 | + raise QueryError.from_json(result) |
| 44 | + columns = [ |
| 45 | + (col['name'], dtypes_map[col['type'].upper()]) |
| 46 | + for col in result['columns']] |
| 47 | + count = result['count'] |
| 48 | + return columns, count |
| 49 | + |
| 50 | + |
| 51 | +async def _query_pandas( |
| 52 | + session: aiohttp.ClientSession, |
| 53 | + executor: ThreadPoolExecutor, |
| 54 | + endpoint: Endpoint, |
| 55 | + query: str, |
| 56 | + result_schema: list[tuple[str, tuple[str, object]]], |
| 57 | + limit_range: tuple[int, int]) -> pd.DataFrame: |
| 58 | + url = f'{endpoint.url}/exp' |
| 59 | + params = [ |
| 60 | + ('query', query), |
| 61 | + ('limit', f'{limit_range[0]},{limit_range[1]}')] |
| 62 | + async with session.get(url=url, params=params) as resp: |
| 63 | + if resp.status != 200: |
| 64 | + raise QueryError.from_json(await resp.json()) |
| 65 | + buf = await resp.content.read() |
| 66 | + download_bytes = len(buf) |
| 67 | + buf_reader = BytesIO(buf) |
| 68 | + dtypes = { |
| 69 | + col[0]: col[1][1] |
| 70 | + for col in result_schema |
| 71 | + if col[1][1] is not None} |
| 72 | + |
| 73 | + def _read_csv(): |
| 74 | + df = pd.read_csv(buf_reader, dtype=dtypes, engine='pyarrow') |
| 75 | + # Patch up the column types. |
| 76 | + for col_schema in result_schema: |
| 77 | + col_name = col_schema[0] |
| 78 | + col_type = col_schema[1][0] |
| 79 | + try: |
| 80 | + if col_type == 'TIMESTAMP': |
| 81 | + series = df[col_name] |
| 82 | + series = pd.to_datetime(series) |
| 83 | + df[col_name] = series |
| 84 | + except Exception as e: |
| 85 | + raise ValueError( |
| 86 | + f'Failed to convert column {col_name} to type {col_type}: {e}\n{series}') |
| 87 | + return df |
| 88 | + |
| 89 | + loop = asyncio.get_running_loop() |
| 90 | + df = await loop.run_in_executor(executor, _read_csv) |
| 91 | + return df, download_bytes |
| 92 | + |
| 93 | + |
| 94 | +async def pandas_query(query: str, endpoint: Endpoint = None, chunks: int = 1, *, stats: bool = False) -> pd.DataFrame: |
| 95 | + """ |
| 96 | + Query QuestDB via CSV to a Pandas DataFrame. |
| 97 | + """ |
| 98 | + endpoint = endpoint or Endpoint() |
| 99 | + with ThreadPoolExecutor(max_workers=chunks) as executor: |
| 100 | + async with _new_session(endpoint) as session: |
| 101 | + result_schema, row_count = await _pre_query(session, endpoint, query) |
| 102 | + rows_per_spawn = row_count // chunks |
| 103 | + limit_ranges = [ |
| 104 | + ( |
| 105 | + i * rows_per_spawn, |
| 106 | + ((i + 1) * rows_per_spawn) if i < chunks - 1 else row_count |
| 107 | + ) |
| 108 | + for i in range(chunks)] |
| 109 | + tasks = [ |
| 110 | + asyncio.ensure_future(_query_pandas( |
| 111 | + session, executor, endpoint, query, result_schema, limit_range)) |
| 112 | + for limit_range in limit_ranges] |
| 113 | + results = await asyncio.gather(*tasks) |
| 114 | + sub_dataframes = [result[0] for result in results] |
| 115 | + df = pd.concat(sub_dataframes) |
| 116 | + if stats: |
| 117 | + total_downloaded = sum(result[1] for result in results) |
| 118 | + return df, total_downloaded |
| 119 | + else: |
| 120 | + return df |
| 121 | + |
| 122 | + |
| 123 | +async def numpy_query(query: str, endpoint: Endpoint = None, chunks: int = 1, *, stats: bool = False) -> dict[str, np.array]: |
| 124 | + """ |
| 125 | + Query and obtain the result as a dict of columns. |
| 126 | + Each column is a numpy array. |
| 127 | + """ |
| 128 | + res = await pandas_query(query, endpoint, chunks, stats=stats) |
| 129 | + df, stats_res = res if stats else (res, None) |
| 130 | + # Calling `.to_numpy()` for each column is quite efficient and generally avoids copies. |
| 131 | + # Pandas already stores columns as numpy. |
| 132 | + # We go through Pandas as this allows us to get fast CSV parsing. |
| 133 | + np_arrays = {col_name: df[col_name].to_numpy() for col_name in df} |
| 134 | + return (np_arrays, stats_res) if stats else np_arrays |
0 commit comments