From 6bf3a1422df50612fd4f2e38696cb6b0326fbf0a Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 25 Nov 2025 14:55:54 +0000 Subject: [PATCH 01/15] some more arrow types supported --- src/questdb/dataframe.pxi | 45 ++++++++++++++++++++++++++------ test/test_dataframe.py | 54 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 8 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 1fa6b466..33b87d3e 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -781,7 +781,7 @@ cdef int64_t _AT_IS_SERVER_NOW = -2 cdef int64_t _AT_IS_SET_BY_COLUMN = -1 -cdef str _SUPPORTED_DATETIMES = 'datetime64[ns] or datetime64[ns, tz]' +cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[ns, tz] or timestamp[ns][pyarrow]' cdef object _dataframe_is_supported_datetime(object dtype): @@ -790,6 +790,11 @@ cdef object _dataframe_is_supported_datetime(object dtype): return True if isinstance(dtype, _PANDAS.DatetimeTZDtype): return dtype.unit == 'ns' + elif isinstance(dtype, _PANDAS.ArrowDtype): + arrow_type = dtype.pyarrow_dtype + return ( + (arrow_type.id == _PYARROW.lib.Type_TIMESTAMP) and + (arrow_type.unit == 'ns')) return False @@ -954,28 +959,52 @@ cdef void_int _dataframe_category_series_as_arrow( f'got a category of {pandas_col.series.dtype.categories.dtype}.') cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrowtype, col_t *col) except -1: + cdef bint is_decimal_col = False _dataframe_series_as_arrow(pandas_col, col) if arrowtype.id == _PYARROW.lib.Type_DECIMAL32: col.setup.source = col_source_t.col_source_decimal32_arrow + is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_DECIMAL64: col.setup.source = col_source_t.col_source_decimal64_arrow + is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_DECIMAL128: col.setup.source = col_source_t.col_source_decimal128_arrow + is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256: col.setup.source = col_source_t.col_source_decimal256_arrow + is_decimal_col = True + elif arrowtype.id == _PYARROW.lib.Type_TIMESTAMP: + if arrowtype.unit != "ns": + raise IngressError( + IngressErrorCode.BadDataFrame, + f"Unsupported timestamp unit {arrowtype.unit!r} " + f"for column {pandas_col.name!r}: only 'ns' is supported." + ) + # N.B.: Even if there's a timezone set, + # the data is recorded as UTC, so we're good. + col.setup.source = col_source_t.col_source_dt64ns_tz_arrow + elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING: + col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow + elif arrowtype.id == _PYARROW.lib.Type_DOUBLE: + col.setup.source = col_source_t.col_source_f64_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT64: + col.setup.source = col_source_t.col_source_i64_arrow else: raise IngressError( IngressErrorCode.BadDataFrame, f'Unsupported arrow type {arrowtype} for column {pandas_col.name!r}. ' + 'Raise an issue if you think it should be supported: ' + 'https://github.com/questdb/py-questdb-client/issues.') - if arrowtype.scale < 0 or arrowtype.scale > 76: - raise IngressError( - IngressErrorCode.BadDataFrame, - f'Bad column {pandas_col.name!r}: ' + - f'Unsupported decimal scale {arrowtype.scale}: ' + - 'Must be in the range 0 to 76 inclusive.') - col.scale = arrowtype.scale + if is_decimal_col: + if arrowtype.scale < 0 or arrowtype.scale > 76: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Bad column {pandas_col.name!r}: ' + + f'Unsupported decimal scale {arrowtype.scale}: ' + + 'Must be in the range 0 to 76 inclusive.') + col.scale = arrowtype.scale + else: + col.scale = 0 return 0 cdef inline bint _dataframe_is_float_nan(PyObject* obj) noexcept: diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 4576338a..e3c66471 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1849,6 +1849,60 @@ def test_f64_np_array(self): b'tbl1 a=' + _array_binary_bytes(np.array([1.0], np.float64)) + b'\n' + b'tbl1 a=' + _array_binary_bytes(np.array([2.0], np.float64)) + b'\n' + b'tbl1 a=' + _array_binary_bytes(np.array([3.0], np.float64)) + b'\n') + + def test_arrow_types(self): + df = pd.DataFrame({ + # Timestamp (no nulls) + "ts": pd.Series( + pa.array( + pd.date_range("2024-01-01", periods=5, freq="s"), + type=pa.timestamp("ns") + ), + dtype="timestamp[ns][pyarrow]" + ), + + # large_string[pyarrow] — one null (row 1) + "sensor_large": pd.Series( + pa.LargeStringArray.from_pandas( + ["alpha", None, "gamma", "delta", "epsilon"] + ), + dtype="large_string[pyarrow]" + ), + + # string[pyarrow] — one null (row 2) + "sensor_small": pd.Series( + pa.array(["foo", "bar", None, "baz", "qux"], type=pa.string()), + dtype="string[pyarrow]" + ), + + # float64[pyarrow] — one null (row 3) + "value_f64": pd.Series( + pa.array([1.1, 2.2, 3.3, None, 5.5], type=pa.float64()), + dtype="float64[pyarrow]" + ), + + # int64[pyarrow] — one null (row 4) + "value_i64": pd.Series( + pa.array([10, 20, 30, 40, None], type=pa.int64()), + dtype="int64[pyarrow]" + ), + }) + + # print(df) + # print(df.dtypes) + + # designated timestamp suffix and line ending + tss = b'n\n' if self.version >= 2 else b'\n' + + exp = ( + b'tbl1 sensor_large="alpha",sensor_small="foo",value_f64' + _float_binary_bytes(1.1, self.version == 1) + b',value_i64=10i 1704067200000000000' + tss + + b'tbl1 sensor_small="bar",value_f64' + _float_binary_bytes(2.2, self.version == 1) + b',value_i64=20i 1704067201000000000' + tss + + b'tbl1 sensor_large="gamma",value_f64' + _float_binary_bytes(3.3, self.version == 1) + b',value_i64=30i 1704067202000000000' + tss + + b'tbl1 sensor_large="delta",sensor_small="baz",value_i64=40i 1704067203000000000' + tss + + b'tbl1 sensor_large="epsilon",sensor_small="qux",value_f64' + _float_binary_bytes(5.5, self.version == 1) + b' 1704067204000000000' + tss) + act = _dataframe(self.version, df, table_name='tbl1', at='ts') + print(f'{act!r}') + self.assertEqual(act, exp) class TestPandasProtocolVersionV1(TestPandasBase.TestPandas): name = 'protocol version 1' From 9a5585886e89c21086502ca546d194461c1de198 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 25 Nov 2025 15:12:02 +0000 Subject: [PATCH 02/15] f32 arrow --- test/test_dataframe.py | 68 +++++++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index e3c66471..bfbc9c40 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1852,7 +1852,6 @@ def test_f64_np_array(self): def test_arrow_types(self): df = pd.DataFrame({ - # Timestamp (no nulls) "ts": pd.Series( pa.array( pd.date_range("2024-01-01", periods=5, freq="s"), @@ -1861,7 +1860,14 @@ def test_arrow_types(self): dtype="timestamp[ns][pyarrow]" ), - # large_string[pyarrow] — one null (row 1) + "ts2": pd.Series( + pa.array( + pd.date_range("2024-01-01", periods=5, freq="s"), + type=pa.timestamp("ns") + ), + dtype="timestamp[ns][pyarrow]" + ), + "sensor_large": pd.Series( pa.LargeStringArray.from_pandas( ["alpha", None, "gamma", "delta", "epsilon"] @@ -1869,19 +1875,21 @@ def test_arrow_types(self): dtype="large_string[pyarrow]" ), - # string[pyarrow] — one null (row 2) "sensor_small": pd.Series( pa.array(["foo", "bar", None, "baz", "qux"], type=pa.string()), dtype="string[pyarrow]" ), - # float64[pyarrow] — one null (row 3) + "value_f32": pd.Series( + pa.array([None, 20.0, 30.25, 40.5, 50.75], type=pa.float32()), + dtype="float64[pyarrow]" + ), + "value_f64": pd.Series( pa.array([1.1, 2.2, 3.3, None, 5.5], type=pa.float64()), dtype="float64[pyarrow]" ), - # int64[pyarrow] — one null (row 4) "value_i64": pd.Series( pa.array([10, 20, 30, 40, None], type=pa.int64()), dtype="int64[pyarrow]" @@ -1891,15 +1899,53 @@ def test_arrow_types(self): # print(df) # print(df.dtypes) + # format a timestamp + def fts(value): + if self.version >= 2: + return f'{value}n'.encode() + else: + value = value // 1000 + return f'{value}t'.encode() + # designated timestamp suffix and line ending - tss = b'n\n' if self.version >= 2 else b'\n' + tsls = b'n\n' if self.version >= 2 else b'\n' exp = ( - b'tbl1 sensor_large="alpha",sensor_small="foo",value_f64' + _float_binary_bytes(1.1, self.version == 1) + b',value_i64=10i 1704067200000000000' + tss + - b'tbl1 sensor_small="bar",value_f64' + _float_binary_bytes(2.2, self.version == 1) + b',value_i64=20i 1704067201000000000' + tss + - b'tbl1 sensor_large="gamma",value_f64' + _float_binary_bytes(3.3, self.version == 1) + b',value_i64=30i 1704067202000000000' + tss + - b'tbl1 sensor_large="delta",sensor_small="baz",value_i64=40i 1704067203000000000' + tss + - b'tbl1 sensor_large="epsilon",sensor_small="qux",value_f64' + _float_binary_bytes(5.5, self.version == 1) + b' 1704067204000000000' + tss) + b'tbl1 ts2=' + fts(1704067200000000000) + + b',sensor_large="alpha",sensor_small="foo",value_f64' + + _float_binary_bytes(1.1, self.version == 1) + + b',value_i64=10i 1704067200000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067201000000000) + + b',sensor_small="bar",value_f32' + + _float_binary_bytes(20.0, self.version == 1) + + b',value_f64' + + _float_binary_bytes(2.2, self.version == 1) + + b',value_i64=20i 1704067201000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067202000000000) + + b',sensor_large="gamma",value_f32' + + _float_binary_bytes(30.25, self.version == 1) + + b',value_f64' + + _float_binary_bytes(3.3, self.version == 1) + + b',value_i64=30i 1704067202000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067203000000000) + + b',sensor_large="delta",sensor_small="baz",value_f32' + + _float_binary_bytes(40.5, self.version == 1) + + b',value_i64=40i 1704067203000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067204000000000) + + b',sensor_large="epsilon",sensor_small="qux",value_f32' + + _float_binary_bytes(50.75, self.version == 1) + + b',value_f64' + + _float_binary_bytes(5.5, self.version == 1) + + b' 1704067204000000000' + + tsls) act = _dataframe(self.version, df, table_name='tbl1', at='ts') print(f'{act!r}') self.assertEqual(act, exp) From e1a77bd123250d565e2a9b8879432a7c384c2c00 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 25 Nov 2025 15:17:53 +0000 Subject: [PATCH 03/15] arrow bool --- src/questdb/dataframe.pxi | 2 ++ test/test_dataframe.py | 17 +++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 33b87d3e..e97d8e13 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -973,6 +973,8 @@ cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrow elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256: col.setup.source = col_source_t.col_source_decimal256_arrow is_decimal_col = True + elif arrowtype.id == _PYARROW.lib.Type_BOOL: + col.setup.source = col_source_t.col_source_bool_arrow elif arrowtype.id == _PYARROW.lib.Type_TIMESTAMP: if arrowtype.unit != "ns": raise IngressError( diff --git a/test/test_dataframe.py b/test/test_dataframe.py index bfbc9c40..452f8372 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1868,6 +1868,11 @@ def test_arrow_types(self): dtype="timestamp[ns][pyarrow]" ), + "b": pd.Series( + pa.array([True, False, True, True, False], type=pa.bool_()), + dtype="bool[pyarrow]" + ), + "sensor_large": pd.Series( pa.LargeStringArray.from_pandas( ["alpha", None, "gamma", "delta", "epsilon"] @@ -1912,13 +1917,13 @@ def fts(value): exp = ( b'tbl1 ts2=' + fts(1704067200000000000) + - b',sensor_large="alpha",sensor_small="foo",value_f64' + + b',b=t,sensor_large="alpha",sensor_small="foo",value_f64' + _float_binary_bytes(1.1, self.version == 1) + b',value_i64=10i 1704067200000000000' + tsls + b'tbl1 ts2=' + fts(1704067201000000000) + - b',sensor_small="bar",value_f32' + + b',b=f,sensor_small="bar",value_f32' + _float_binary_bytes(20.0, self.version == 1) + b',value_f64' + _float_binary_bytes(2.2, self.version == 1) + @@ -1926,7 +1931,7 @@ def fts(value): tsls + b'tbl1 ts2=' + fts(1704067202000000000) + - b',sensor_large="gamma",value_f32' + + b',b=t,sensor_large="gamma",value_f32' + _float_binary_bytes(30.25, self.version == 1) + b',value_f64' + _float_binary_bytes(3.3, self.version == 1) + @@ -1934,20 +1939,20 @@ def fts(value): tsls + b'tbl1 ts2=' + fts(1704067203000000000) + - b',sensor_large="delta",sensor_small="baz",value_f32' + + b',b=t,sensor_large="delta",sensor_small="baz",value_f32' + _float_binary_bytes(40.5, self.version == 1) + b',value_i64=40i 1704067203000000000' + tsls + b'tbl1 ts2=' + fts(1704067204000000000) + - b',sensor_large="epsilon",sensor_small="qux",value_f32' + + b',b=f,sensor_large="epsilon",sensor_small="qux",value_f32' + _float_binary_bytes(50.75, self.version == 1) + b',value_f64' + _float_binary_bytes(5.5, self.version == 1) + b' 1704067204000000000' + tsls) act = _dataframe(self.version, df, table_name='tbl1', at='ts') - print(f'{act!r}') + # print(f'{act!r}') self.assertEqual(act, exp) class TestPandasProtocolVersionV1(TestPandasBase.TestPandas): From 0cca3abe70c81daf3cd1aadb8586cad044d1aa3c Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 25 Nov 2025 15:24:42 +0000 Subject: [PATCH 04/15] arrow int16 --- src/questdb/dataframe.pxi | 8 ++++++++ test/test_dataframe.py | 20 +++++++++++++++----- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index e97d8e13..9cc02dc8 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -987,8 +987,16 @@ cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrow col.setup.source = col_source_t.col_source_dt64ns_tz_arrow elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING: col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow + elif arrowtype.id == _PYARROW.lib.Type_FLOAT: + col.setup.source = col_source_t.col_source_f32_arrow elif arrowtype.id == _PYARROW.lib.Type_DOUBLE: col.setup.source = col_source_t.col_source_f64_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT8: + col.setup.source = col_source_t.col_source_i8_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT16: + col.setup.source = col_source_t.col_source_i16_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT32: + col.setup.source = col_source_t.col_source_i32_arrow elif arrowtype.id == _PYARROW.lib.Type_INT64: col.setup.source = col_source_t.col_source_i64_arrow else: diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 452f8372..1a162cb9 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1895,6 +1895,16 @@ def test_arrow_types(self): dtype="float64[pyarrow]" ), + "value_i8": pd.Series( + pa.array([1, None, 3, 4, 5], type=pa.int8()), + dtype="int8[pyarrow]" + ), + + "value_i16": pd.Series( + pa.array([100, 200, 300, 400, None], type=pa.int16()), + dtype="int16[pyarrow]" + ), + "value_i64": pd.Series( pa.array([10, 20, 30, 40, None], type=pa.int64()), dtype="int64[pyarrow]" @@ -1919,7 +1929,7 @@ def fts(value): b'tbl1 ts2=' + fts(1704067200000000000) + b',b=t,sensor_large="alpha",sensor_small="foo",value_f64' + _float_binary_bytes(1.1, self.version == 1) + - b',value_i64=10i 1704067200000000000' + + b',value_i8=1i,value_i16=100i,value_i64=10i 1704067200000000000' + tsls + b'tbl1 ts2=' + fts(1704067201000000000) + @@ -1927,7 +1937,7 @@ def fts(value): _float_binary_bytes(20.0, self.version == 1) + b',value_f64' + _float_binary_bytes(2.2, self.version == 1) + - b',value_i64=20i 1704067201000000000' + + b',value_i16=200i,value_i64=20i 1704067201000000000' + tsls + b'tbl1 ts2=' + fts(1704067202000000000) + @@ -1935,13 +1945,13 @@ def fts(value): _float_binary_bytes(30.25, self.version == 1) + b',value_f64' + _float_binary_bytes(3.3, self.version == 1) + - b',value_i64=30i 1704067202000000000' + + b',value_i8=3i,value_i16=300i,value_i64=30i 1704067202000000000' + tsls + b'tbl1 ts2=' + fts(1704067203000000000) + b',b=t,sensor_large="delta",sensor_small="baz",value_f32' + _float_binary_bytes(40.5, self.version == 1) + - b',value_i64=40i 1704067203000000000' + + b',value_i8=4i,value_i16=400i,value_i64=40i 1704067203000000000' + tsls + b'tbl1 ts2=' + fts(1704067204000000000) + @@ -1949,7 +1959,7 @@ def fts(value): _float_binary_bytes(50.75, self.version == 1) + b',value_f64' + _float_binary_bytes(5.5, self.version == 1) + - b' 1704067204000000000' + + b',value_i8=5i 1704067204000000000' + tsls) act = _dataframe(self.version, df, table_name='tbl1', at='ts') # print(f'{act!r}') From 2e96317cddd113be4c1d3a7e8e1c1dedbf52b49b Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 25 Nov 2025 15:27:15 +0000 Subject: [PATCH 05/15] arrow int32 --- test/test_dataframe.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 1a162cb9..1589d0f8 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1905,6 +1905,11 @@ def test_arrow_types(self): dtype="int16[pyarrow]" ), + "value_i32": pd.Series( + pa.array([1000, 2000, None, 4000, 5000], type=pa.int32()), + dtype="int32[pyarrow]" + ), + "value_i64": pd.Series( pa.array([10, 20, 30, 40, None], type=pa.int64()), dtype="int64[pyarrow]" @@ -1929,7 +1934,7 @@ def fts(value): b'tbl1 ts2=' + fts(1704067200000000000) + b',b=t,sensor_large="alpha",sensor_small="foo",value_f64' + _float_binary_bytes(1.1, self.version == 1) + - b',value_i8=1i,value_i16=100i,value_i64=10i 1704067200000000000' + + b',value_i8=1i,value_i16=100i,value_i32=1000i,value_i64=10i 1704067200000000000' + tsls + b'tbl1 ts2=' + fts(1704067201000000000) + @@ -1937,7 +1942,7 @@ def fts(value): _float_binary_bytes(20.0, self.version == 1) + b',value_f64' + _float_binary_bytes(2.2, self.version == 1) + - b',value_i16=200i,value_i64=20i 1704067201000000000' + + b',value_i16=200i,value_i32=2000i,value_i64=20i 1704067201000000000' + tsls + b'tbl1 ts2=' + fts(1704067202000000000) + @@ -1951,7 +1956,7 @@ def fts(value): b'tbl1 ts2=' + fts(1704067203000000000) + b',b=t,sensor_large="delta",sensor_small="baz",value_f32' + _float_binary_bytes(40.5, self.version == 1) + - b',value_i8=4i,value_i16=400i,value_i64=40i 1704067203000000000' + + b',value_i8=4i,value_i16=400i,value_i32=4000i,value_i64=40i 1704067203000000000' + tsls + b'tbl1 ts2=' + fts(1704067204000000000) + @@ -1959,7 +1964,7 @@ def fts(value): _float_binary_bytes(50.75, self.version == 1) + b',value_f64' + _float_binary_bytes(5.5, self.version == 1) + - b',value_i8=5i 1704067204000000000' + + b',value_i8=5i,value_i32=5000i 1704067204000000000' + tsls) act = _dataframe(self.version, df, table_name='tbl1', at='ts') # print(f'{act!r}') From 8cc9ab2f49678a444b13156c15fb3fd2960708ac Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 25 Nov 2025 19:07:16 +0000 Subject: [PATCH 06/15] added micros timestamp support and fixed a broken testcase --- src/questdb/dataframe.pxi | 158 ++++++++++++++++++++++++++++++++++---- test/test_dataframe.py | 86 +++++++++++++++++++-- 2 files changed, 222 insertions(+), 22 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 9cc02dc8..7f5db626 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -148,12 +148,14 @@ cdef enum col_source_t: col_source_str_lrg_utf8_arrow = 406000 col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 - col_source_arr_f64_numpyobj = 601100 - col_source_decimal_pyobj = 701100 - col_source_decimal32_arrow = 702000 - col_source_decimal64_arrow = 703000 - col_source_decimal128_arrow = 704000 - col_source_decimal256_arrow = 705000 + col_source_dt64us_numpy = 601000 + col_source_dt64us_tz_arrow = 602000 + col_source_arr_f64_numpyobj = 701100 + col_source_decimal_pyobj = 801100 + col_source_decimal32_arrow = 802000 + col_source_decimal64_arrow = 803000 + col_source_decimal128_arrow = 804000 + col_source_decimal256_arrow = 805000 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: @@ -242,6 +244,8 @@ cdef dict _TARGET_TO_SOURCES = { col_target_t.col_target_column_ts: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, + col_source_t.col_source_dt64us_numpy, + col_source_t.col_source_dt64us_tz_arrow }, col_target_t.col_target_column_arr_f64: { col_source_t.col_source_arr_f64_numpyobj, @@ -256,6 +260,8 @@ cdef dict _TARGET_TO_SOURCES = { col_target_t.col_target_at: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, + col_source_t.col_source_dt64us_numpy, + col_source_t.col_source_dt64us_tz_arrow, }, } @@ -386,11 +392,22 @@ cdef enum col_dispatch_code_t: col_target_t.col_target_column_ts + \ col_source_t.col_source_dt64ns_tz_arrow + col_dispatch_code_column_ts__dt64us_numpy = \ + col_target_t.col_target_column_ts + col_source_t.col_source_dt64us_numpy + col_dispatch_code_column_ts__dt64us_tz_arrow = \ + col_target_t.col_target_column_ts + \ + col_source_t.col_source_dt64us_tz_arrow + col_dispatch_code_at__dt64ns_numpy = \ col_target_t.col_target_at + col_source_t.col_source_dt64ns_numpy col_dispatch_code_at__dt64ns_tz_arrow = \ col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow + col_dispatch_code_at__dt64us_numpy = \ + col_target_t.col_target_at + col_source_t.col_source_dt64us_numpy + col_dispatch_code_at__dt64us_tz_arrow = \ + col_target_t.col_target_at + col_source_t.col_source_dt64us_tz_arrow + col_dispatch_code_column_arr_f64__arr_f64_numpyobj = \ col_target_t.col_target_column_arr_f64 + col_source_t.col_source_arr_f64_numpyobj @@ -508,6 +525,7 @@ cdef object _NUMPY_INT64 = None cdef object _NUMPY_FLOAT32 = None cdef object _NUMPY_FLOAT64 = None cdef object _NUMPY_DATETIME64_NS = None +cdef object _NUMPY_DATETIME64_US = None cdef object _NUMPY_OBJECT = None cdef object _PANDAS = None # module object cdef object _PANDAS_NA = None # pandas.NA @@ -541,6 +559,7 @@ cdef object _dataframe_may_import_deps(): global _NUMPY_FLOAT32 global _NUMPY_FLOAT64 global _NUMPY_DATETIME64_NS + global _NUMPY_DATETIME64_US global _NUMPY_OBJECT if _NUMPY is not None: return @@ -567,6 +586,7 @@ cdef object _dataframe_may_import_deps(): _NUMPY_FLOAT32 = type(_NUMPY.dtype('float32')) _NUMPY_FLOAT64 = type(_NUMPY.dtype('float64')) _NUMPY_DATETIME64_NS = type(_NUMPY.dtype('datetime64[ns]')) + _NUMPY_DATETIME64_US = type(_NUMPY.dtype('datetime64[us]')) _NUMPY_OBJECT = type(_NUMPY.dtype('object')) _PANDAS = pandas _PANDAS_NA = pandas.NA @@ -788,13 +808,16 @@ cdef object _dataframe_is_supported_datetime(object dtype): if (isinstance(dtype, _NUMPY_DATETIME64_NS) and (str(dtype) == 'datetime64[ns]')): return True - if isinstance(dtype, _PANDAS.DatetimeTZDtype): + elif (isinstance(dtype, _NUMPY_DATETIME64_US) and + (str(dtype) == 'datetime64[us]')): + return True + elif isinstance(dtype, _PANDAS.DatetimeTZDtype): return dtype.unit == 'ns' elif isinstance(dtype, _PANDAS.ArrowDtype): arrow_type = dtype.pyarrow_dtype return ( (arrow_type.id == _PYARROW.lib.Type_TIMESTAMP) and - (arrow_type.unit == 'ns')) + (arrow_type.unit in ('us', 'ns'))) return False @@ -976,15 +999,18 @@ cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrow elif arrowtype.id == _PYARROW.lib.Type_BOOL: col.setup.source = col_source_t.col_source_bool_arrow elif arrowtype.id == _PYARROW.lib.Type_TIMESTAMP: - if arrowtype.unit != "ns": + # N.B.: Even if there's a timezone set, + # the data is recorded as UTC, so we're good. + if arrowtype.unit == 'us': + col.setup.source = col_source_t.col_source_dt64us_tz_arrow + elif arrowtype.unit == 'ns': + col.setup.source = col_source_t.col_source_dt64ns_tz_arrow + else: raise IngressError( IngressErrorCode.BadDataFrame, f"Unsupported timestamp unit {arrowtype.unit!r} " - f"for column {pandas_col.name!r}: only 'ns' is supported." + f"for column {pandas_col.name!r}: only 'us' and 'ns' are supported." ) - # N.B.: Even if there's a timezone set, - # the data is recorded as UTC, so we're good. - col.setup.source = col_source_t.col_source_dt64ns_tz_arrow elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING: col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow elif arrowtype.id == _PYARROW.lib.Type_FLOAT: @@ -1191,10 +1217,32 @@ cdef void_int _dataframe_resolve_source_and_buffers( _dataframe_category_series_as_arrow(pandas_col, col) elif (isinstance(dtype, _NUMPY_DATETIME64_NS) and _dataframe_is_supported_datetime(dtype)): - col.setup.source = col_source_t.col_source_dt64ns_numpy + unit = dtype.name.split('[')[-1].strip(']') + if unit == 'ns': + col.setup.source = col_source_t.col_source_dt64ns_numpy + elif unit == 'us': + col.setup.source = col_source_t.col_source_dt64us_numpy + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + 'Unsupported unit for datetime64 numpy column type ' + + f'for column {pandas_col.name} of dtype {dtype}. ' + + f' Must be "us" or "ns", not {unit}') + _dataframe_series_as_pybuf(pandas_col, col) + elif (isinstance(dtype, _NUMPY_DATETIME64_US) and + _dataframe_is_supported_datetime(dtype)): + col.setup.source = col_source_t.col_source_dt64us_numpy _dataframe_series_as_pybuf(pandas_col, col) elif (isinstance(dtype, _PANDAS.DatetimeTZDtype) and _dataframe_is_supported_datetime(dtype)): + if dtype.unit != 'ns': + # Docs say this should always be nanos, but best assert. + # https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported dtype {dtype} unit {dtype.unit} for column {pandas_col.name!r}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') col.setup.source = col_source_t.col_source_dt64ns_tz_arrow _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): @@ -2165,6 +2213,21 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy( _ensure_has_gil(gs) raise c_err_to_py(err) + +cdef void_int _dataframe_serialize_cell_column_ts__dt64us_numpy( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef int64_t* access = col.cursor.chunk.buffers[1] + cdef int64_t cell = access[col.cursor.offset] + if cell != _NAT: + if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2304,6 +2367,7 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal256_arrow( _ensure_has_gil(gs) raise c_err_to_py(err) + cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2321,6 +2385,23 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( raise c_err_to_py(err) +cdef void_int _dataframe_serialize_cell_column_ts__dt64us_tz_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef int64_t cell + cdef int64_t* access + if valid: + access = col.cursor.chunk.buffers[1] + cell = access[col.cursor.offset] + if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2340,6 +2421,25 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy( raise c_err_to_py(err) +cdef void_int _dataframe_serialize_cell_at_dt64us_numpy( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef int64_t* access = col.cursor.chunk.buffers[1] + cdef int64_t cell = access[col.cursor.offset] + if cell == _NAT: + if not line_sender_buffer_at_now(ls_buf, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + # Note: ls_buf will validate against negative numbers. + if not line_sender_buffer_at_micros(ls_buf, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2362,6 +2462,28 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow( raise c_err_to_py(err) +cdef void_int _dataframe_serialize_cell_at_dt64us_tz_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef int64_t* access + cdef int64_t cell + if valid: + access = col.cursor.chunk.buffers[1] + cell = access[col.cursor.offset] + # Note: ls_buf will validate against negative numbers. + if not line_sender_buffer_at_micros(ls_buf, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + if not line_sender_buffer_at_now(ls_buf, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2460,6 +2582,8 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy: _dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_numpy: + _dataframe_serialize_cell_column_ts__dt64us_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_arr_f64__arr_f64_numpyobj: _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col) elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj: @@ -2474,10 +2598,16 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_decimal__decimal256_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow: _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_tz_arrow: + _dataframe_serialize_cell_column_ts__dt64us_tz_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy: _dataframe_serialize_cell_at_dt64ns_numpy(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_numpy: + _dataframe_serialize_cell_at_dt64us_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_tz_arrow: _dataframe_serialize_cell_at_dt64ns_tz_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_tz_arrow: + _dataframe_serialize_cell_at_dt64us_tz_arrow(ls_buf, b, col, gs) else: _ensure_has_gil(gs) raise RuntimeError(f"Unknown column dispatch code: {dc}") diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 1589d0f8..c0016f4a 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1756,14 +1756,6 @@ def test_arrow_chunked_array(self): pandarrow_b = pd.array(chunked_b, dtype='int32[pyarrow]') df = pd.DataFrame({'a': pandarrow_a, 'b': pandarrow_b}) - # Note that this dtype is experimental (currently), - # so we don't support it yet.. but we have everything in place should we - # need to, so - as for now - we just test that we raise a nice error. - with self.assertRaisesRegex( - qi.IngressError, - r"Unsupported arrow type int16 for column 'a'.*github"): - _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) - @unittest.skipIf(not fastparquet, 'fastparquet not installed') @with_tmp_dir def test_parquet_roundtrip(self, tmpdir): @@ -1850,6 +1842,84 @@ def test_f64_np_array(self): b'tbl1 a=' + _array_binary_bytes(np.array([2.0], np.float64)) + b'\n' + b'tbl1 a=' + _array_binary_bytes(np.array([3.0], np.float64)) + b'\n') + def test_numpy_micros_col(self): + df = pd.DataFrame({ + 'x': [1, 2, 3], + 'ts1': pd.Series([ + pd.Timestamp(2023, 2, 1, 10, 0, 0), + pd.Timestamp(2023, 2, 2, 12, 30, 15), + pd.Timestamp(2023, 2, 3, 15, 45, 30) + ], dtype='datetime64[us]'), + 'ts2': pd.Series([ + pd.Timestamp(2023, 2, 1, 10, 0, 0), + None, + pd.Timestamp(2023, 2, 3, 15, 45, 30) + ], dtype='datetime64[us]') + }) + + # print(df) + # print(df.dtypes) + + act = _dataframe(self.version, df, table_name='tbl1', at='ts2') + + # format designated timestamp micros + def fdtm(value): + if self.version >= 2: + return f'{value}t\n'.encode() + else: + value = value * 1000 + return f'{value}\n'.encode() + + exp = ( + b'tbl1 x=1i,ts1=1675245600000000t ' + fdtm(1675245600000000) + + b'tbl1 x=2i,ts1=1675341015000000t\n' + + b'tbl1 x=3i,ts1=1675439130000000t ' + fdtm(1675439130000000)) + + # print(repr(exp)) + # print(repr(act)) + self.assertEqual(exp, act) + + def test_arrow_micros_col(self): + df = pd.DataFrame({ + 'x': [1, 2, 3], + 'ts1': pd.Series( + pa.array( + [ + pd.Timestamp("2024-01-01 00:00:00.123456"), + pd.Timestamp("2024-01-01 00:00:01.654321"), + pd.Timestamp("2024-01-01 00:00:02.111111"), + ], + type=pa.timestamp("us") + ), + dtype="timestamp[us][pyarrow]"), + 'ts2': pd.Series( + pa.array( + [ + pd.Timestamp("2024-01-01 00:00:00.123456"), + pd.Timestamp("2024-01-01 00:00:01.654321"), + None + ], + type=pa.timestamp("us") + ), + dtype="timestamp[us][pyarrow]"), + }) + act = _dataframe(self.version, df, table_name='tbl1', at='ts2') + # format designated timestamp micros + def fdtm(value): + if self.version >= 2: + return f'{value}t\n'.encode() + else: + value = value * 1000 + return f'{value}\n'.encode() + + exp = ( + b'tbl1 x=1i,ts1=1704067200123456t ' + fdtm(1704067200123456) + + b'tbl1 x=2i,ts1=1704067201654321t ' + fdtm(1704067201654321) + + b'tbl1 x=3i,ts1=1704067202111111t\n') + # print(repr(exp)) + # print(repr(act)) + self.assertEqual(exp, act) + def test_arrow_types(self): df = pd.DataFrame({ "ts": pd.Series( From aa469423a549833c6e87953663562ec7a959dcc3 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 10:34:06 +0000 Subject: [PATCH 07/15] fixed error message with list of supported designated timestamp types --- src/questdb/dataframe.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 7f5db626..f52f72d1 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -801,7 +801,7 @@ cdef int64_t _AT_IS_SERVER_NOW = -2 cdef int64_t _AT_IS_SET_BY_COLUMN = -1 -cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[ns, tz] or timestamp[ns][pyarrow]' +cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[us], datetime64[ns, tz], timestamp[ns][pyarrow], or timestamp[us][pyarrow]' cdef object _dataframe_is_supported_datetime(object dtype): From f5272f6887dc37052f879e6342ca95d29a5164ea Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 10:41:54 +0000 Subject: [PATCH 08/15] moved assert --- src/questdb/dataframe.pxi | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index f52f72d1..bffb3c3f 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -812,7 +812,16 @@ cdef object _dataframe_is_supported_datetime(object dtype): (str(dtype) == 'datetime64[us]')): return True elif isinstance(dtype, _PANDAS.DatetimeTZDtype): - return dtype.unit == 'ns' + # Docs say this should always be nanos, but best assert in case the API changes in the future. + # https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html + if dtype.unit == 'ns': + return True + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported dtype {dtype} unit {dtype.unit}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') elif isinstance(dtype, _PANDAS.ArrowDtype): arrow_type = dtype.pyarrow_dtype return ( @@ -1235,14 +1244,7 @@ cdef void_int _dataframe_resolve_source_and_buffers( _dataframe_series_as_pybuf(pandas_col, col) elif (isinstance(dtype, _PANDAS.DatetimeTZDtype) and _dataframe_is_supported_datetime(dtype)): - if dtype.unit != 'ns': - # Docs say this should always be nanos, but best assert. - # https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html - raise IngressError( - IngressErrorCode.BadDataFrame, - f'Unsupported dtype {dtype} unit {dtype.unit} for column {pandas_col.name!r}. ' + - 'Raise an issue if you think it should be supported: ' + - 'https://github.com/questdb/py-questdb-client/issues.') + # N.B.: We've already asserted this is 'ns'. col.setup.source = col_source_t.col_source_dt64ns_tz_arrow _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): From 64ffa7175a8df39daf0a4ce6387939eb0b94591c Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 11:11:09 +0000 Subject: [PATCH 09/15] refactored and deduplicated timestamp dtype detection logic --- src/questdb/dataframe.pxi | 70 +++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index bffb3c3f..fe7a02c8 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -804,30 +804,44 @@ cdef int64_t _AT_IS_SET_BY_COLUMN = -1 cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[us], datetime64[ns, tz], timestamp[ns][pyarrow], or timestamp[us][pyarrow]' -cdef object _dataframe_is_supported_datetime(object dtype): - if (isinstance(dtype, _NUMPY_DATETIME64_NS) and - (str(dtype) == 'datetime64[ns]')): - return True - elif (isinstance(dtype, _NUMPY_DATETIME64_US) and - (str(dtype) == 'datetime64[us]')): - return True +cdef int _dataframe_classify_timestamp_dtype(object dtype) except -1: + """ + Classify the dtype and determine if it's supported for use as a timestamp. + + Returns: + > 0 - a value castable to a `col_source_t`. + 0 - dtype is not a supported timestamp datatype. + """ + cdef object arrow_type + if isinstance(dtype, _NUMPY_DATETIME64_NS) and str(dtype) == "datetime64[ns]": + return col_source_t.col_source_dt64ns_numpy + elif isinstance(dtype, _NUMPY_DATETIME64_US) and str(dtype) == "datetime64[us]": + return col_source_t.col_source_dt64us_numpy elif isinstance(dtype, _PANDAS.DatetimeTZDtype): # Docs say this should always be nanos, but best assert in case the API changes in the future. # https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html if dtype.unit == 'ns': - return True + return col_source_t.col_source_dt64ns_tz_arrow else: raise IngressError( IngressErrorCode.BadDataFrame, - f'Unsupported dtype {dtype} unit {dtype.unit}. ' + + f'Unsupported pandas dtype {dtype} unit {dtype.unit}. ' + 'Raise an issue if you think it should be supported: ' + 'https://github.com/questdb/py-questdb-client/issues.') elif isinstance(dtype, _PANDAS.ArrowDtype): arrow_type = dtype.pyarrow_dtype - return ( - (arrow_type.id == _PYARROW.lib.Type_TIMESTAMP) and - (arrow_type.unit in ('us', 'ns'))) - return False + if arrow_type.id == _PYARROW.lib.Type_TIMESTAMP: + if arrow_type.unit == "ns": + return col_source_t.col_source_dt64ns_tz_arrow + elif arrow_type.unit == "us": + return col_source_t.col_source_dt64us_tz_arrow + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported arrow dtype {dtype} unit {dtype.unit}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') + return 0 cdef ssize_t _dataframe_resolve_at( @@ -864,7 +878,7 @@ cdef ssize_t _dataframe_resolve_at( 'Must be one of: None, TimestampNanos, datetime, ' + 'int (column index), str (colum name)') dtype = df.dtypes.iloc[col_index] - if _dataframe_is_supported_datetime(dtype): + if _dataframe_classify_timestamp_dtype(dtype) != 0: at_value_out[0] = _AT_IS_SET_BY_COLUMN col = &cols.d[col_index] col.setup.meta_target = meta_target_t.meta_target_at @@ -1224,29 +1238,13 @@ cdef void_int _dataframe_resolve_source_and_buffers( f'for column {pandas_col.name} of dtype {dtype}.') elif isinstance(dtype, _PANDAS.CategoricalDtype): _dataframe_category_series_as_arrow(pandas_col, col) - elif (isinstance(dtype, _NUMPY_DATETIME64_NS) and - _dataframe_is_supported_datetime(dtype)): - unit = dtype.name.split('[')[-1].strip(']') - if unit == 'ns': - col.setup.source = col_source_t.col_source_dt64ns_numpy - elif unit == 'us': - col.setup.source = col_source_t.col_source_dt64us_numpy + elif _dataframe_classify_timestamp_dtype(dtype) != 0: + col.setup.source = _dataframe_classify_timestamp_dtype(dtype) + if ((col.setup.source == col_source_t.col_source_dt64ns_numpy) or + (col.setup.source == col_source_t.col_source_dt64us_numpy)): + _dataframe_series_as_pybuf(pandas_col, col) else: - raise IngressError( - IngressErrorCode.BadDataFrame, - 'Unsupported unit for datetime64 numpy column type ' + - f'for column {pandas_col.name} of dtype {dtype}. ' + - f' Must be "us" or "ns", not {unit}') - _dataframe_series_as_pybuf(pandas_col, col) - elif (isinstance(dtype, _NUMPY_DATETIME64_US) and - _dataframe_is_supported_datetime(dtype)): - col.setup.source = col_source_t.col_source_dt64us_numpy - _dataframe_series_as_pybuf(pandas_col, col) - elif (isinstance(dtype, _PANDAS.DatetimeTZDtype) and - _dataframe_is_supported_datetime(dtype)): - # N.B.: We've already asserted this is 'ns'. - col.setup.source = col_source_t.col_source_dt64ns_tz_arrow - _dataframe_series_as_arrow(pandas_col, col) + _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): _dataframe_series_sniff_pyobj(pandas_col, col) elif isinstance(dtype, _PANDAS.ArrowDtype): From e228e7146815020a7dc8122297de4a8938bd51e1 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 11:20:05 +0000 Subject: [PATCH 10/15] avoiding to call the _dataframe_classify_timestamp_dtype function twice --- src/questdb/dataframe.pxi | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index fe7a02c8..b847d1b9 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -1149,7 +1149,15 @@ cdef void_int _dataframe_series_sniff_pyobj( cdef void_int _dataframe_resolve_source_and_buffers( PandasCol pandas_col, col_t* col) except -1: cdef object dtype = pandas_col.dtype - if isinstance(dtype, _NUMPY_BOOL): + cdef int ts_col_source = _dataframe_classify_timestamp_dtype(dtype) + if ts_col_source != 0: + col.setup.source = ts_col_source + if ((col.setup.source == col_source_t.col_source_dt64ns_numpy) or + (col.setup.source == col_source_t.col_source_dt64us_numpy)): + _dataframe_series_as_pybuf(pandas_col, col) + else: + _dataframe_series_as_arrow(pandas_col, col) + elif isinstance(dtype, _NUMPY_BOOL): col.setup.source = col_source_t.col_source_bool_numpy _dataframe_series_as_pybuf(pandas_col, col) elif isinstance(dtype, _PANDAS.BooleanDtype): @@ -1238,13 +1246,6 @@ cdef void_int _dataframe_resolve_source_and_buffers( f'for column {pandas_col.name} of dtype {dtype}.') elif isinstance(dtype, _PANDAS.CategoricalDtype): _dataframe_category_series_as_arrow(pandas_col, col) - elif _dataframe_classify_timestamp_dtype(dtype) != 0: - col.setup.source = _dataframe_classify_timestamp_dtype(dtype) - if ((col.setup.source == col_source_t.col_source_dt64ns_numpy) or - (col.setup.source == col_source_t.col_source_dt64us_numpy)): - _dataframe_series_as_pybuf(pandas_col, col) - else: - _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): _dataframe_series_sniff_pyobj(pandas_col, col) elif isinstance(dtype, _PANDAS.ArrowDtype): From 9c93f74211471f43f0233754d400bbec498b25e5 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 11:31:52 +0000 Subject: [PATCH 11/15] added tests for pyarrow string/largestring as symbol types --- test/test_dataframe.py | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index c0016f4a..66d343b7 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1857,9 +1857,6 @@ def test_numpy_micros_col(self): ], dtype='datetime64[us]') }) - # print(df) - # print(df.dtypes) - act = _dataframe(self.version, df, table_name='tbl1', at='ts2') # format designated timestamp micros @@ -1874,9 +1871,6 @@ def fdtm(value): b'tbl1 x=1i,ts1=1675245600000000t ' + fdtm(1675245600000000) + b'tbl1 x=2i,ts1=1675341015000000t\n' + b'tbl1 x=3i,ts1=1675439130000000t ' + fdtm(1675439130000000)) - - # print(repr(exp)) - # print(repr(act)) self.assertEqual(exp, act) def test_arrow_micros_col(self): @@ -1916,8 +1910,6 @@ def fdtm(value): b'tbl1 x=1i,ts1=1704067200123456t ' + fdtm(1704067200123456) + b'tbl1 x=2i,ts1=1704067201654321t ' + fdtm(1704067201654321) + b'tbl1 x=3i,ts1=1704067202111111t\n') - # print(repr(exp)) - # print(repr(act)) self.assertEqual(exp, act) def test_arrow_types(self): @@ -1986,9 +1978,6 @@ def test_arrow_types(self): ), }) - # print(df) - # print(df.dtypes) - # format a timestamp def fts(value): if self.version >= 2: @@ -2037,9 +2026,34 @@ def fts(value): b',value_i8=5i,value_i32=5000i 1704067204000000000' + tsls) act = _dataframe(self.version, df, table_name='tbl1', at='ts') - # print(f'{act!r}') self.assertEqual(act, exp) + def test_arrow_strings_as_symbols(self): + df = pd.DataFrame({ + "sym_large": pd.Series( + pa.LargeStringArray.from_pandas( + ["alpha", None, "gamma", "delta", "epsilon"] + ), + dtype="large_string[pyarrow]" + ), + + "sym_small": pd.Series( + pa.array(["foo", "bar", None, "baz", "qux"], type=pa.string()), + dtype="string[pyarrow]" + ) + }) + + act = _dataframe(self.version, df, table_name='tbl1', symbols=('sym_large', 'sym_small'), at=qi.ServerTimestamp) + exp = ( + b'tbl1,sym_large=alpha,sym_small=foo\n' + b'tbl1,sym_small=bar\n' + b'tbl1,sym_large=gamma\n' + b'tbl1,sym_large=delta,sym_small=baz\n' + b'tbl1,sym_large=epsilon,sym_small=qux\n' + ) + self.assertEqual(exp, act) + + class TestPandasProtocolVersionV1(TestPandasBase.TestPandas): name = 'protocol version 1' version = 1 From bf2d62a310246fa2053fa9fce6785f687746c07e Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 11:34:57 +0000 Subject: [PATCH 12/15] exception fix --- src/questdb/dataframe.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index b847d1b9..0c436d40 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -838,7 +838,7 @@ cdef int _dataframe_classify_timestamp_dtype(object dtype) except -1: else: raise IngressError( IngressErrorCode.BadDataFrame, - f'Unsupported arrow dtype {dtype} unit {dtype.unit}. ' + + f'Unsupported arrow dtype {dtype} unit {arrow_type.unit}. ' + 'Raise an issue if you think it should be supported: ' + 'https://github.com/questdb/py-questdb-client/issues.') return 0 From ee3e674c2bc94dcd30526b8e16141607f6796e63 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 11:52:37 +0000 Subject: [PATCH 13/15] fixed broken datatype in test --- test/test_dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 66d343b7..5ff090d1 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1949,7 +1949,7 @@ def test_arrow_types(self): "value_f32": pd.Series( pa.array([None, 20.0, 30.25, 40.5, 50.75], type=pa.float32()), - dtype="float64[pyarrow]" + dtype="float32[pyarrow]" ), "value_f64": pd.Series( From a4660cdfc551bb16f8116c69f036c0b6768f68d3 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 11:58:01 +0000 Subject: [PATCH 14/15] fixed whitespace issues --- test/test_dataframe.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 5ff090d1..d92dca11 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1859,7 +1859,7 @@ def test_numpy_micros_col(self): act = _dataframe(self.version, df, table_name='tbl1', at='ts2') - # format designated timestamp micros + # format designated timestamp micros def fdtm(value): if self.version >= 2: return f'{value}t\n'.encode() @@ -1898,7 +1898,8 @@ def test_arrow_micros_col(self): dtype="timestamp[us][pyarrow]"), }) act = _dataframe(self.version, df, table_name='tbl1', at='ts2') - # format designated timestamp micros + + # format designated timestamp micros def fdtm(value): if self.version >= 2: return f'{value}t\n'.encode() From 13a76d52ef0dbb60ad5d6facc94924ef940a6cc3 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Wed, 26 Nov 2025 12:08:32 +0000 Subject: [PATCH 15/15] removed dead code --- src/questdb/dataframe.pxi | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 0c436d40..d69cebd5 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -1021,19 +1021,6 @@ cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrow is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_BOOL: col.setup.source = col_source_t.col_source_bool_arrow - elif arrowtype.id == _PYARROW.lib.Type_TIMESTAMP: - # N.B.: Even if there's a timezone set, - # the data is recorded as UTC, so we're good. - if arrowtype.unit == 'us': - col.setup.source = col_source_t.col_source_dt64us_tz_arrow - elif arrowtype.unit == 'ns': - col.setup.source = col_source_t.col_source_dt64ns_tz_arrow - else: - raise IngressError( - IngressErrorCode.BadDataFrame, - f"Unsupported timestamp unit {arrowtype.unit!r} " - f"for column {pandas_col.name!r}: only 'us' and 'ns' are supported." - ) elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING: col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow elif arrowtype.id == _PYARROW.lib.Type_FLOAT: