Skip to content

Commit 9e4adf7

Browse files
committed
README.md, features and fixes.
1 parent 4411f02 commit 9e4adf7

File tree

9 files changed

+321
-34
lines changed

9 files changed

+321
-34
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,6 @@ cython_debug/
158158
# and can be added to the global gitignore or merged into this file. For a more nuclear
159159
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
160160
.idea/
161+
162+
# poetry
163+
.python-version

README.md

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,200 @@
11
# py-questdb-query
22
Fast query over HTTP(S)/CSV for QuestDB
3+
4+
## Installation
5+
6+
To install (or upgrade):
7+
8+
```shell
9+
python3 -m pip install -U git+https://github.com/questdb/py-questdb-query.git#questdb_query
10+
```
11+
12+
If you need to uninstall it, run:
13+
14+
```shell
15+
python3 -m pip uninstall questdb_query
16+
```
17+
18+
## Basic Usage, querying into Numpy
19+
20+
To query the database on localhost, just use the `numpy_query` function.
21+
22+
Here's an example querying CPU utilisation data from a `localhost` database.
23+
24+
```python
25+
from questdb_query import numpy_query
26+
27+
np_arrs = numpy_query('''
28+
select
29+
timestamp, hostname, datacenter, usage_user, usage_nice
30+
from
31+
cpu
32+
limit 10''')
33+
```
34+
35+
The `np_arrs` object is a python `dict` which holds a numpy array per column, keyed by column name:
36+
```python
37+
>>> np_arrs
38+
{'timestamp': array(['2016-01-01T00:00:00.000000000', '2016-01-01T00:00:10.000000000',
39+
'2016-01-01T00:00:20.000000000', '2016-01-01T00:00:30.000000000',
40+
'2016-01-01T00:00:40.000000000', '2016-01-01T00:00:50.000000000',
41+
'2016-01-01T00:01:00.000000000', '2016-01-01T00:01:10.000000000',
42+
'2016-01-01T00:01:20.000000000', '2016-01-01T00:01:30.000000000'],
43+
dtype='datetime64[ns]'), 'hostname': array(['host_0', 'host_1', 'host_2', 'host_3', 'host_4', 'host_5',
44+
'host_6', 'host_7', 'host_8', 'host_9'], dtype=object), 'datacenter': array(['ap-southeast-2b', 'eu-west-1b', 'us-west-1b', 'us-west-2c',
45+
'us-west-2b', 'eu-west-1b', 'eu-west-1b', 'us-west-1a',
46+
'ap-southeast-2a', 'us-east-1a'], dtype=object), 'usage_user': array([1.39169048, 0.33846369, 0. , 1.81511203, 0.84273104,
47+
0. , 0. , 0.28085548, 0. , 1.37192634]), 'usage_nice': array([0.30603088, 1.21496673, 0. , 0.16688796, 0. ,
48+
2.77319521, 0.40332488, 1.81585253, 1.92844804, 2.12841919])}
49+
```
50+
51+
If we wanted to calculate a (rather non-sensical) weighted average of `usage_user` and `usage_nice` we can
52+
do this by accessing the `numpy` columns:
53+
54+
```python
55+
>>> np_arrs['usage_user'].dot(np_arrs['usage_nice'].T)
56+
4.5700692045031985
57+
```
58+
59+
## Querying a remote database
60+
61+
If your database is running on a remote host, specify an endpoint:
62+
63+
```python
64+
from questdb_query import numpy_query, Endpoint
65+
66+
endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass')
67+
68+
np_arrs = numpy_query('select * from cpu limit 10', endpoint)
69+
```
70+
71+
Note how the example above enables HTTPS and specifies a username and password for authentication.
72+
73+
74+
## Querying into Pandas
75+
76+
You can also query into Pandas:
77+
78+
```python
79+
from questdb_query import pandas_query, Endpoint
80+
81+
endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass')
82+
83+
df = pandas_query('select * from cpu limit 1000', endpoint)
84+
```
85+
86+
This allows you, for example, to pre-aggregate results:
87+
88+
```python
89+
>>> df = df[['region', 'usage_user', 'usage_nice']].groupby('region').mean()
90+
>>> df
91+
usage_user usage_nice
92+
region
93+
ap-northeast-1 8.163766 6.492334
94+
ap-southeast-1 6.511215 7.341863
95+
ap-southeast-2 6.788770 6.257839
96+
eu-central-1 7.392642 6.416479
97+
eu-west-1 7.213417 7.185956
98+
sa-east-1 7.143568 5.925026
99+
us-east-1 7.620643 7.243553
100+
us-west-1 6.286770 6.531977
101+
us-west-2 6.228692 6.439672
102+
```
103+
104+
You can then switch over to numpy with a simple and fast conversion:
105+
106+
```python
107+
>>> from questdb_query import pandas_to_numpy
108+
>>> np_arrs = pandas_to_numpy(df)
109+
>>> np_arrs
110+
{'usage_user': array([8.16376556, 6.51121543, 6.78876964, 7.3926419 , 7.21341716,
111+
7.14356839, 7.62064304, 6.28677006, 6.22869169]), 'usage_nice': array([6.49233392, 7.34186348, 6.25783903, 6.41647863, 7.18595643,
112+
5.92502642, 7.24355328, 6.53197733, 6.43967247]), 'region': array(['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2',
113+
'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1',
114+
'us-west-2'], dtype=object)}
115+
```
116+
117+
## Benchmarking
118+
119+
### From code
120+
121+
Each query result also contains a `Stats` object with the performance summary which you can print.
122+
123+
```python
124+
>>> from questdb_query import numpy_query
125+
>>> np_arrs = numpy_query('select * from cpu', chunks=8)
126+
>>> print(np_arrs.query_stats)
127+
Duration: 2.631s
128+
Millions of lines: 5.000
129+
Millions of lines/s: 1.901
130+
MiB: 1332.144
131+
MiB/s: 506.381
132+
```
133+
134+
You can also extract individual fields:
135+
136+
```python
137+
>>> np_arrs.query_stats
138+
Stats(duration_s=2.630711865, line_count=5000000, byte_count=1396853875, throughput_mbs=506.3814407360216, throughput_mlps=1.900626239810569)
139+
>>> np_arrs.query_stats.throughput_mlps
140+
1.900626239810569
141+
```
142+
143+
### From the command line
144+
145+
To get the best performance it may be useful to try queries with different hardware setups, chunk counts etc.
146+
147+
You can run the benchmarking tool from the command line:
148+
149+
```bash
150+
$ python3 -m questdb_query.tool --chunks 8 "select * from cpu"
151+
```
152+
```
153+
hostname region datacenter rack os arch team service service_version service_environment usage_user usage_system usage_idle usage_nice usage_iowait usage_irq usage_softirq usage_steal usage_guest usage_guest_nice timestamp
154+
0 host_0 ap-southeast-2 ap-southeast-2b 96 Ubuntu16.10 x86 CHI 11 0 test 1.391690 0.000000 2.644812 0.306031 1.194629 0.000000 0.000000 0.726996 0.000000 0.000000 2016-01-01 00:00:00
155+
1 host_1 eu-west-1 eu-west-1b 52 Ubuntu16.04LTS x64 NYC 7 0 production 0.338464 1.951409 2.455378 1.214967 2.037935 0.000000 1.136997 1.022753 1.711183 0.000000 2016-01-01 00:00:10
156+
2 host_2 us-west-1 us-west-1b 69 Ubuntu16.04LTS x64 LON 8 1 production 0.000000 2.800873 2.296324 0.000000 1.754139 1.531160 0.662572 0.000000 0.472402 0.312164 2016-01-01 00:00:20
157+
3 host_3 us-west-2 us-west-2c 8 Ubuntu16.04LTS x86 LON 11 0 test 1.815112 4.412385 2.056344 0.166888 3.507148 3.276577 0.000000 0.000000 0.000000 1.496152 2016-01-01 00:00:30
158+
4 host_4 us-west-2 us-west-2b 83 Ubuntu16.04LTS x64 NYC 6 0 test 0.842731 3.141248 2.199520 0.000000 2.943054 5.032342 0.391105 1.375450 0.000000 1.236811 2016-01-01 00:00:40
159+
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
160+
624995 host_3995 ap-southeast-2 ap-southeast-2a 30 Ubuntu16.04LTS x86 CHI 19 1 staging 33.238309 82.647341 17.272531 52.707720 71.718564 45.605728 100.000000 22.907723 78.130846 15.652954 2017-08-01 16:52:30
161+
624996 host_3996 us-west-2 us-west-2a 67 Ubuntu15.10 x64 CHI 9 0 production 33.344070 81.922739 16.653731 52.107537 71.844945 45.880606 99.835977 23.045458 76.468930 17.091646 2017-08-01 16:52:40
162+
624997 host_3997 us-west-2 us-west-2b 63 Ubuntu15.10 x86 SF 8 0 production 32.932095 80.662915 14.708377 53.354277 72.265215 44.803275 99.013038 20.375169 78.043473 17.870002 2017-08-01 16:52:50
163+
624998 host_3998 eu-west-1 eu-west-1b 53 Ubuntu16.04LTS x86 CHI 11 1 staging 31.199818 80.994859 15.051577 51.923123 74.169828 46.453950 99.107213 21.004499 78.341154 18.880808 2017-08-01 16:53:00
164+
624999 host_3999 us-east-1 us-east-1c 87 Ubuntu16.10 x64 SF 8 1 production 30.310735 81.727637 15.413537 51.417897 74.973555 44.882255 98.821672 19.055040 78.094993 19.263652 2017-08-01 16:53:10
165+
166+
[5000000 rows x 21 columns]
167+
168+
Duration: 2.547s
169+
Millions of lines: 5.000
170+
Millions of lines/s: 1.963
171+
MiB: 1332.144
172+
MiB/s: 522.962
173+
```
174+
175+
176+
## Async operation
177+
178+
The `numpy_query` and `pandas_query` functions are actually wrappers around `async` variants.
179+
180+
If your application is already using `async`, then call those directly as it allows other parts of your application to
181+
perform work in parallel during the data download.
182+
183+
The functions take identical arguments as their synchronous counterparts.
184+
185+
```python
186+
import asyncio
187+
from questdb_query.asynchronous import numpy_query
188+
189+
190+
def main():
191+
endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass')
192+
np_arrs = await numpy_query('select * from cpu limit 10', endpoint)
193+
print(np_arrs)
194+
195+
196+
if __name__ == '__main__':
197+
asyncio.run(main())
198+
199+
```
200+

questdb_query/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,9 @@
55
66
"""
77

8+
__version__ = '0.1.0'
9+
810
from .endpoint import Endpoint
911
from .errors import QueryError
12+
from .synchronous import pandas_query, numpy_query
13+
from .pandas_util import pandas_to_numpy

questdb_query/asynchronous.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
__all__ = ['pandas_query', 'numpy_query']
66

77
import asyncio
8+
import time
89
from concurrent.futures import ThreadPoolExecutor
910
from io import BytesIO
1011

@@ -14,6 +15,8 @@
1415

1516
from .endpoint import Endpoint
1617
from .errors import QueryError
18+
from .pandas_util import pandas_to_numpy
19+
from .stats import Stats
1720

1821

1922
def _new_session(endpoint):
@@ -79,7 +82,10 @@ def _read_csv():
7982
try:
8083
if col_type == 'TIMESTAMP':
8184
series = df[col_name]
82-
series = pd.to_datetime(series)
85+
# Drop the UTC timezone during conversion.
86+
# This allows `.to_numpy()` on the series to
87+
# yield a `datetime64` dtype column.
88+
series = pd.to_datetime(series).dt.tz_convert(None)
8389
df[col_name] = series
8490
except Exception as e:
8591
raise ValueError(
@@ -91,11 +97,12 @@ def _read_csv():
9197
return df, download_bytes
9298

9399

94-
async def pandas_query(query: str, endpoint: Endpoint = None, chunks: int = 1, *, stats: bool = False) -> pd.DataFrame:
100+
async def pandas_query(query: str, endpoint: Endpoint = None, chunks: int = 1) -> pd.DataFrame:
95101
"""
96102
Query QuestDB via CSV to a Pandas DataFrame.
97103
"""
98104
endpoint = endpoint or Endpoint()
105+
start_ts = time.perf_counter_ns()
99106
with ThreadPoolExecutor(max_workers=chunks) as executor:
100107
async with _new_session(endpoint) as session:
101108
result_schema, row_count = await _pre_query(session, endpoint, query)
@@ -113,22 +120,16 @@ async def pandas_query(query: str, endpoint: Endpoint = None, chunks: int = 1, *
113120
results = await asyncio.gather(*tasks)
114121
sub_dataframes = [result[0] for result in results]
115122
df = pd.concat(sub_dataframes)
116-
if stats:
117-
total_downloaded = sum(result[1] for result in results)
118-
return df, total_downloaded
119-
else:
120-
return df
123+
end_ts = time.perf_counter_ns()
124+
total_downloaded = sum(result[1] for result in results)
125+
df.query_stats = Stats(end_ts - start_ts, row_count, total_downloaded)
126+
return df
121127

122128

123-
async def numpy_query(query: str, endpoint: Endpoint = None, chunks: int = 1, *, stats: bool = False) -> dict[str, np.array]:
129+
async def numpy_query(query: str, endpoint: Endpoint = None, chunks: int = 1) -> dict[str, np.array]:
124130
"""
125131
Query and obtain the result as a dict of columns.
126132
Each column is a numpy array.
127133
"""
128-
res = await pandas_query(query, endpoint, chunks, stats=stats)
129-
df, stats_res = res if stats else (res, None)
130-
# Calling `.to_numpy()` for each column is quite efficient and generally avoids copies.
131-
# Pandas already stores columns as numpy.
132-
# We go through Pandas as this allows us to get fast CSV parsing.
133-
np_arrays = {col_name: df[col_name].to_numpy() for col_name in df}
134-
return (np_arrays, stats_res) if stats else np_arrays
134+
df = await pandas_query(query, endpoint, chunks)
135+
return pandas_to_numpy(df)

questdb_query/endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ class Endpoint:
22
"""
33
HTTP connection parameters into QuestDB
44
"""
5-
def __init__(self, host='127.0.0.1', port=None, https=True, username=None, password=None):
5+
def __init__(self, host='127.0.0.1', port=None, https=False, username=None, password=None):
66
self.host = host
77
self.port = port or (443 if https else 9000)
88
self.https = https

questdb_query/pandas_util.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
__all__ = ['pandas_to_numpy']
2+
3+
import numpy as np
4+
import pandas as pd
5+
6+
from .stats import StatsDict
7+
8+
9+
def pandas_to_numpy(df: pd.DataFrame) -> dict[str, np.array]:
10+
"""
11+
Convert a pandas dataframe into a dict containing numpy arrays, keyed by column name.
12+
13+
If the index is named, then convert that too.
14+
"""
15+
# Calling `.to_numpy()` for each column is quite efficient and generally avoids copies.
16+
# This is because Pandas internally already usually stores columns as numpy.
17+
np_arrs = {col_name: df[col_name].to_numpy() for col_name in df}
18+
19+
# If the index is named, then convert that too.
20+
if df.index.name:
21+
np_arrs[df.index.name] = df.index.to_numpy()
22+
23+
# Carry across stats, if these are present.
24+
if hasattr(df, 'query_stats'):
25+
np_arrs = StatsDict(np_arrs, df.query_stats)
26+
27+
return np_arrs

questdb_query/stats.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
__all__ = ['Stats', 'StatsDict']
2+
3+
NS_IN_S = 1e9
4+
5+
STATS_TEMPLATE = '''Duration: {duration_s:.3f}s
6+
Millions of lines: {line_count_millions:.3f}
7+
Millions of lines/s: {throughput_mlps:.3f}
8+
MiB: {byte_count_mib:.3f}
9+
MiB/s: {throughput_mbs:.3f}'''
10+
11+
12+
class Stats:
13+
def __init__(self, duration_ns: int, line_count: int, byte_count: int):
14+
self.duration_ns = duration_ns
15+
self.line_count = line_count
16+
self.byte_count = byte_count
17+
18+
@property
19+
def duration_s(self) -> float:
20+
"""
21+
How long the query took in seconds.
22+
"""
23+
return self.duration_ns / NS_IN_S
24+
25+
@property
26+
def throughput_mbs(self) -> float:
27+
"""
28+
How many MiB/s were downloaded and parsed.
29+
"""
30+
return self.byte_count / self.duration_ns * NS_IN_S / 1024 / 1024
31+
32+
@property
33+
def throughput_mlps(self) -> float:
34+
"""
35+
How many millions of lines per second were parsed.
36+
"""
37+
return self.line_count / self.duration_ns * NS_IN_S / 1e6
38+
39+
def __repr__(self) -> str:
40+
return (f'Stats(duration_s={self.duration_s}, '
41+
f'line_count={self.line_count}, '
42+
f'byte_count={self.byte_count}, '
43+
f'throughput_mbs={self.throughput_mbs}, '
44+
f'throughput_mlps={self.throughput_mlps})')
45+
46+
def __str__(self):
47+
return STATS_TEMPLATE.format(
48+
duration_s=self.duration_s,
49+
line_count_millions=self.line_count / 1e6,
50+
throughput_mbs=self.throughput_mbs,
51+
byte_count_mib=self.byte_count / 1024 / 1024,
52+
throughput_mlps=self.throughput_mlps)
53+
54+
55+
class StatsDict(dict):
56+
"""A dict with an additional .query_stats attribute."""
57+
58+
def __init__(self, other: dict, query_stats: Stats):
59+
super().__init__(other)
60+
self.query_stats = query_stats

0 commit comments

Comments
 (0)