diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 1fa6b466..d69cebd5 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -148,12 +148,14 @@ cdef enum col_source_t: col_source_str_lrg_utf8_arrow = 406000 col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 - col_source_arr_f64_numpyobj = 601100 - col_source_decimal_pyobj = 701100 - col_source_decimal32_arrow = 702000 - col_source_decimal64_arrow = 703000 - col_source_decimal128_arrow = 704000 - col_source_decimal256_arrow = 705000 + col_source_dt64us_numpy = 601000 + col_source_dt64us_tz_arrow = 602000 + col_source_arr_f64_numpyobj = 701100 + col_source_decimal_pyobj = 801100 + col_source_decimal32_arrow = 802000 + col_source_decimal64_arrow = 803000 + col_source_decimal128_arrow = 804000 + col_source_decimal256_arrow = 805000 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: @@ -242,6 +244,8 @@ cdef dict _TARGET_TO_SOURCES = { col_target_t.col_target_column_ts: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, + col_source_t.col_source_dt64us_numpy, + col_source_t.col_source_dt64us_tz_arrow }, col_target_t.col_target_column_arr_f64: { col_source_t.col_source_arr_f64_numpyobj, @@ -256,6 +260,8 @@ cdef dict _TARGET_TO_SOURCES = { col_target_t.col_target_at: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, + col_source_t.col_source_dt64us_numpy, + col_source_t.col_source_dt64us_tz_arrow, }, } @@ -386,11 +392,22 @@ cdef enum col_dispatch_code_t: col_target_t.col_target_column_ts + \ col_source_t.col_source_dt64ns_tz_arrow + col_dispatch_code_column_ts__dt64us_numpy = \ + col_target_t.col_target_column_ts + col_source_t.col_source_dt64us_numpy + col_dispatch_code_column_ts__dt64us_tz_arrow = \ + col_target_t.col_target_column_ts + \ + col_source_t.col_source_dt64us_tz_arrow + col_dispatch_code_at__dt64ns_numpy = \ col_target_t.col_target_at + col_source_t.col_source_dt64ns_numpy col_dispatch_code_at__dt64ns_tz_arrow = \ col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow + col_dispatch_code_at__dt64us_numpy = \ + col_target_t.col_target_at + col_source_t.col_source_dt64us_numpy + col_dispatch_code_at__dt64us_tz_arrow = \ + col_target_t.col_target_at + col_source_t.col_source_dt64us_tz_arrow + col_dispatch_code_column_arr_f64__arr_f64_numpyobj = \ col_target_t.col_target_column_arr_f64 + col_source_t.col_source_arr_f64_numpyobj @@ -508,6 +525,7 @@ cdef object _NUMPY_INT64 = None cdef object _NUMPY_FLOAT32 = None cdef object _NUMPY_FLOAT64 = None cdef object _NUMPY_DATETIME64_NS = None +cdef object _NUMPY_DATETIME64_US = None cdef object _NUMPY_OBJECT = None cdef object _PANDAS = None # module object cdef object _PANDAS_NA = None # pandas.NA @@ -541,6 +559,7 @@ cdef object _dataframe_may_import_deps(): global _NUMPY_FLOAT32 global _NUMPY_FLOAT64 global _NUMPY_DATETIME64_NS + global _NUMPY_DATETIME64_US global _NUMPY_OBJECT if _NUMPY is not None: return @@ -567,6 +586,7 @@ cdef object _dataframe_may_import_deps(): _NUMPY_FLOAT32 = type(_NUMPY.dtype('float32')) _NUMPY_FLOAT64 = type(_NUMPY.dtype('float64')) _NUMPY_DATETIME64_NS = type(_NUMPY.dtype('datetime64[ns]')) + _NUMPY_DATETIME64_US = type(_NUMPY.dtype('datetime64[us]')) _NUMPY_OBJECT = type(_NUMPY.dtype('object')) _PANDAS = pandas _PANDAS_NA = pandas.NA @@ -781,16 +801,47 @@ cdef int64_t _AT_IS_SERVER_NOW = -2 cdef int64_t _AT_IS_SET_BY_COLUMN = -1 -cdef str _SUPPORTED_DATETIMES = 'datetime64[ns] or datetime64[ns, tz]' +cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[us], datetime64[ns, tz], timestamp[ns][pyarrow], or timestamp[us][pyarrow]' -cdef object _dataframe_is_supported_datetime(object dtype): - if (isinstance(dtype, _NUMPY_DATETIME64_NS) and - (str(dtype) == 'datetime64[ns]')): - return True - if isinstance(dtype, _PANDAS.DatetimeTZDtype): - return dtype.unit == 'ns' - return False +cdef int _dataframe_classify_timestamp_dtype(object dtype) except -1: + """ + Classify the dtype and determine if it's supported for use as a timestamp. + + Returns: + > 0 - a value castable to a `col_source_t`. + 0 - dtype is not a supported timestamp datatype. + """ + cdef object arrow_type + if isinstance(dtype, _NUMPY_DATETIME64_NS) and str(dtype) == "datetime64[ns]": + return col_source_t.col_source_dt64ns_numpy + elif isinstance(dtype, _NUMPY_DATETIME64_US) and str(dtype) == "datetime64[us]": + return col_source_t.col_source_dt64us_numpy + elif isinstance(dtype, _PANDAS.DatetimeTZDtype): + # Docs say this should always be nanos, but best assert in case the API changes in the future. + # https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html + if dtype.unit == 'ns': + return col_source_t.col_source_dt64ns_tz_arrow + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported pandas dtype {dtype} unit {dtype.unit}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') + elif isinstance(dtype, _PANDAS.ArrowDtype): + arrow_type = dtype.pyarrow_dtype + if arrow_type.id == _PYARROW.lib.Type_TIMESTAMP: + if arrow_type.unit == "ns": + return col_source_t.col_source_dt64ns_tz_arrow + elif arrow_type.unit == "us": + return col_source_t.col_source_dt64us_tz_arrow + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported arrow dtype {dtype} unit {arrow_type.unit}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') + return 0 cdef ssize_t _dataframe_resolve_at( @@ -827,7 +878,7 @@ cdef ssize_t _dataframe_resolve_at( 'Must be one of: None, TimestampNanos, datetime, ' + 'int (column index), str (colum name)') dtype = df.dtypes.iloc[col_index] - if _dataframe_is_supported_datetime(dtype): + if _dataframe_classify_timestamp_dtype(dtype) != 0: at_value_out[0] = _AT_IS_SET_BY_COLUMN col = &cols.d[col_index] col.setup.meta_target = meta_target_t.meta_target_at @@ -954,28 +1005,52 @@ cdef void_int _dataframe_category_series_as_arrow( f'got a category of {pandas_col.series.dtype.categories.dtype}.') cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrowtype, col_t *col) except -1: + cdef bint is_decimal_col = False _dataframe_series_as_arrow(pandas_col, col) if arrowtype.id == _PYARROW.lib.Type_DECIMAL32: col.setup.source = col_source_t.col_source_decimal32_arrow + is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_DECIMAL64: col.setup.source = col_source_t.col_source_decimal64_arrow + is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_DECIMAL128: col.setup.source = col_source_t.col_source_decimal128_arrow + is_decimal_col = True elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256: col.setup.source = col_source_t.col_source_decimal256_arrow + is_decimal_col = True + elif arrowtype.id == _PYARROW.lib.Type_BOOL: + col.setup.source = col_source_t.col_source_bool_arrow + elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING: + col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow + elif arrowtype.id == _PYARROW.lib.Type_FLOAT: + col.setup.source = col_source_t.col_source_f32_arrow + elif arrowtype.id == _PYARROW.lib.Type_DOUBLE: + col.setup.source = col_source_t.col_source_f64_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT8: + col.setup.source = col_source_t.col_source_i8_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT16: + col.setup.source = col_source_t.col_source_i16_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT32: + col.setup.source = col_source_t.col_source_i32_arrow + elif arrowtype.id == _PYARROW.lib.Type_INT64: + col.setup.source = col_source_t.col_source_i64_arrow else: raise IngressError( IngressErrorCode.BadDataFrame, f'Unsupported arrow type {arrowtype} for column {pandas_col.name!r}. ' + 'Raise an issue if you think it should be supported: ' + 'https://github.com/questdb/py-questdb-client/issues.') - if arrowtype.scale < 0 or arrowtype.scale > 76: - raise IngressError( - IngressErrorCode.BadDataFrame, - f'Bad column {pandas_col.name!r}: ' + - f'Unsupported decimal scale {arrowtype.scale}: ' + - 'Must be in the range 0 to 76 inclusive.') - col.scale = arrowtype.scale + if is_decimal_col: + if arrowtype.scale < 0 or arrowtype.scale > 76: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Bad column {pandas_col.name!r}: ' + + f'Unsupported decimal scale {arrowtype.scale}: ' + + 'Must be in the range 0 to 76 inclusive.') + col.scale = arrowtype.scale + else: + col.scale = 0 return 0 cdef inline bint _dataframe_is_float_nan(PyObject* obj) noexcept: @@ -1061,7 +1136,15 @@ cdef void_int _dataframe_series_sniff_pyobj( cdef void_int _dataframe_resolve_source_and_buffers( PandasCol pandas_col, col_t* col) except -1: cdef object dtype = pandas_col.dtype - if isinstance(dtype, _NUMPY_BOOL): + cdef int ts_col_source = _dataframe_classify_timestamp_dtype(dtype) + if ts_col_source != 0: + col.setup.source = ts_col_source + if ((col.setup.source == col_source_t.col_source_dt64ns_numpy) or + (col.setup.source == col_source_t.col_source_dt64us_numpy)): + _dataframe_series_as_pybuf(pandas_col, col) + else: + _dataframe_series_as_arrow(pandas_col, col) + elif isinstance(dtype, _NUMPY_BOOL): col.setup.source = col_source_t.col_source_bool_numpy _dataframe_series_as_pybuf(pandas_col, col) elif isinstance(dtype, _PANDAS.BooleanDtype): @@ -1150,14 +1233,6 @@ cdef void_int _dataframe_resolve_source_and_buffers( f'for column {pandas_col.name} of dtype {dtype}.') elif isinstance(dtype, _PANDAS.CategoricalDtype): _dataframe_category_series_as_arrow(pandas_col, col) - elif (isinstance(dtype, _NUMPY_DATETIME64_NS) and - _dataframe_is_supported_datetime(dtype)): - col.setup.source = col_source_t.col_source_dt64ns_numpy - _dataframe_series_as_pybuf(pandas_col, col) - elif (isinstance(dtype, _PANDAS.DatetimeTZDtype) and - _dataframe_is_supported_datetime(dtype)): - col.setup.source = col_source_t.col_source_dt64ns_tz_arrow - _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): _dataframe_series_sniff_pyobj(pandas_col, col) elif isinstance(dtype, _PANDAS.ArrowDtype): @@ -2126,6 +2201,21 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy( _ensure_has_gil(gs) raise c_err_to_py(err) + +cdef void_int _dataframe_serialize_cell_column_ts__dt64us_numpy( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef int64_t* access = col.cursor.chunk.buffers[1] + cdef int64_t cell = access[col.cursor.offset] + if cell != _NAT: + if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2265,6 +2355,7 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal256_arrow( _ensure_has_gil(gs) raise c_err_to_py(err) + cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2282,6 +2373,23 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( raise c_err_to_py(err) +cdef void_int _dataframe_serialize_cell_column_ts__dt64us_tz_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef int64_t cell + cdef int64_t* access + if valid: + access = col.cursor.chunk.buffers[1] + cell = access[col.cursor.offset] + if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2301,6 +2409,25 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy( raise c_err_to_py(err) +cdef void_int _dataframe_serialize_cell_at_dt64us_numpy( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef int64_t* access = col.cursor.chunk.buffers[1] + cdef int64_t cell = access[col.cursor.offset] + if cell == _NAT: + if not line_sender_buffer_at_now(ls_buf, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + # Note: ls_buf will validate against negative numbers. + if not line_sender_buffer_at_micros(ls_buf, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2323,6 +2450,28 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow( raise c_err_to_py(err) +cdef void_int _dataframe_serialize_cell_at_dt64us_tz_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef int64_t* access + cdef int64_t cell + if valid: + access = col.cursor.chunk.buffers[1] + cell = access[col.cursor.offset] + # Note: ls_buf will validate against negative numbers. + if not line_sender_buffer_at_micros(ls_buf, cell, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + if not line_sender_buffer_at_now(ls_buf, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2421,6 +2570,8 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy: _dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_numpy: + _dataframe_serialize_cell_column_ts__dt64us_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_arr_f64__arr_f64_numpyobj: _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col) elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj: @@ -2435,10 +2586,16 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_decimal__decimal256_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow: _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_tz_arrow: + _dataframe_serialize_cell_column_ts__dt64us_tz_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy: _dataframe_serialize_cell_at_dt64ns_numpy(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_numpy: + _dataframe_serialize_cell_at_dt64us_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_tz_arrow: _dataframe_serialize_cell_at_dt64ns_tz_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_tz_arrow: + _dataframe_serialize_cell_at_dt64us_tz_arrow(ls_buf, b, col, gs) else: _ensure_has_gil(gs) raise RuntimeError(f"Unknown column dispatch code: {dc}") diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 4576338a..d92dca11 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1756,14 +1756,6 @@ def test_arrow_chunked_array(self): pandarrow_b = pd.array(chunked_b, dtype='int32[pyarrow]') df = pd.DataFrame({'a': pandarrow_a, 'b': pandarrow_b}) - # Note that this dtype is experimental (currently), - # so we don't support it yet.. but we have everything in place should we - # need to, so - as for now - we just test that we raise a nice error. - with self.assertRaisesRegex( - qi.IngressError, - r"Unsupported arrow type int16 for column 'a'.*github"): - _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) - @unittest.skipIf(not fastparquet, 'fastparquet not installed') @with_tmp_dir def test_parquet_roundtrip(self, tmpdir): @@ -1849,6 +1841,219 @@ def test_f64_np_array(self): b'tbl1 a=' + _array_binary_bytes(np.array([1.0], np.float64)) + b'\n' + b'tbl1 a=' + _array_binary_bytes(np.array([2.0], np.float64)) + b'\n' + b'tbl1 a=' + _array_binary_bytes(np.array([3.0], np.float64)) + b'\n') + + def test_numpy_micros_col(self): + df = pd.DataFrame({ + 'x': [1, 2, 3], + 'ts1': pd.Series([ + pd.Timestamp(2023, 2, 1, 10, 0, 0), + pd.Timestamp(2023, 2, 2, 12, 30, 15), + pd.Timestamp(2023, 2, 3, 15, 45, 30) + ], dtype='datetime64[us]'), + 'ts2': pd.Series([ + pd.Timestamp(2023, 2, 1, 10, 0, 0), + None, + pd.Timestamp(2023, 2, 3, 15, 45, 30) + ], dtype='datetime64[us]') + }) + + act = _dataframe(self.version, df, table_name='tbl1', at='ts2') + + # format designated timestamp micros + def fdtm(value): + if self.version >= 2: + return f'{value}t\n'.encode() + else: + value = value * 1000 + return f'{value}\n'.encode() + + exp = ( + b'tbl1 x=1i,ts1=1675245600000000t ' + fdtm(1675245600000000) + + b'tbl1 x=2i,ts1=1675341015000000t\n' + + b'tbl1 x=3i,ts1=1675439130000000t ' + fdtm(1675439130000000)) + self.assertEqual(exp, act) + + def test_arrow_micros_col(self): + df = pd.DataFrame({ + 'x': [1, 2, 3], + 'ts1': pd.Series( + pa.array( + [ + pd.Timestamp("2024-01-01 00:00:00.123456"), + pd.Timestamp("2024-01-01 00:00:01.654321"), + pd.Timestamp("2024-01-01 00:00:02.111111"), + ], + type=pa.timestamp("us") + ), + dtype="timestamp[us][pyarrow]"), + 'ts2': pd.Series( + pa.array( + [ + pd.Timestamp("2024-01-01 00:00:00.123456"), + pd.Timestamp("2024-01-01 00:00:01.654321"), + None + ], + type=pa.timestamp("us") + ), + dtype="timestamp[us][pyarrow]"), + }) + act = _dataframe(self.version, df, table_name='tbl1', at='ts2') + + # format designated timestamp micros + def fdtm(value): + if self.version >= 2: + return f'{value}t\n'.encode() + else: + value = value * 1000 + return f'{value}\n'.encode() + + exp = ( + b'tbl1 x=1i,ts1=1704067200123456t ' + fdtm(1704067200123456) + + b'tbl1 x=2i,ts1=1704067201654321t ' + fdtm(1704067201654321) + + b'tbl1 x=3i,ts1=1704067202111111t\n') + self.assertEqual(exp, act) + + def test_arrow_types(self): + df = pd.DataFrame({ + "ts": pd.Series( + pa.array( + pd.date_range("2024-01-01", periods=5, freq="s"), + type=pa.timestamp("ns") + ), + dtype="timestamp[ns][pyarrow]" + ), + + "ts2": pd.Series( + pa.array( + pd.date_range("2024-01-01", periods=5, freq="s"), + type=pa.timestamp("ns") + ), + dtype="timestamp[ns][pyarrow]" + ), + + "b": pd.Series( + pa.array([True, False, True, True, False], type=pa.bool_()), + dtype="bool[pyarrow]" + ), + + "sensor_large": pd.Series( + pa.LargeStringArray.from_pandas( + ["alpha", None, "gamma", "delta", "epsilon"] + ), + dtype="large_string[pyarrow]" + ), + + "sensor_small": pd.Series( + pa.array(["foo", "bar", None, "baz", "qux"], type=pa.string()), + dtype="string[pyarrow]" + ), + + "value_f32": pd.Series( + pa.array([None, 20.0, 30.25, 40.5, 50.75], type=pa.float32()), + dtype="float32[pyarrow]" + ), + + "value_f64": pd.Series( + pa.array([1.1, 2.2, 3.3, None, 5.5], type=pa.float64()), + dtype="float64[pyarrow]" + ), + + "value_i8": pd.Series( + pa.array([1, None, 3, 4, 5], type=pa.int8()), + dtype="int8[pyarrow]" + ), + + "value_i16": pd.Series( + pa.array([100, 200, 300, 400, None], type=pa.int16()), + dtype="int16[pyarrow]" + ), + + "value_i32": pd.Series( + pa.array([1000, 2000, None, 4000, 5000], type=pa.int32()), + dtype="int32[pyarrow]" + ), + + "value_i64": pd.Series( + pa.array([10, 20, 30, 40, None], type=pa.int64()), + dtype="int64[pyarrow]" + ), + }) + + # format a timestamp + def fts(value): + if self.version >= 2: + return f'{value}n'.encode() + else: + value = value // 1000 + return f'{value}t'.encode() + + # designated timestamp suffix and line ending + tsls = b'n\n' if self.version >= 2 else b'\n' + + exp = ( + b'tbl1 ts2=' + fts(1704067200000000000) + + b',b=t,sensor_large="alpha",sensor_small="foo",value_f64' + + _float_binary_bytes(1.1, self.version == 1) + + b',value_i8=1i,value_i16=100i,value_i32=1000i,value_i64=10i 1704067200000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067201000000000) + + b',b=f,sensor_small="bar",value_f32' + + _float_binary_bytes(20.0, self.version == 1) + + b',value_f64' + + _float_binary_bytes(2.2, self.version == 1) + + b',value_i16=200i,value_i32=2000i,value_i64=20i 1704067201000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067202000000000) + + b',b=t,sensor_large="gamma",value_f32' + + _float_binary_bytes(30.25, self.version == 1) + + b',value_f64' + + _float_binary_bytes(3.3, self.version == 1) + + b',value_i8=3i,value_i16=300i,value_i64=30i 1704067202000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067203000000000) + + b',b=t,sensor_large="delta",sensor_small="baz",value_f32' + + _float_binary_bytes(40.5, self.version == 1) + + b',value_i8=4i,value_i16=400i,value_i32=4000i,value_i64=40i 1704067203000000000' + + tsls + + + b'tbl1 ts2=' + fts(1704067204000000000) + + b',b=f,sensor_large="epsilon",sensor_small="qux",value_f32' + + _float_binary_bytes(50.75, self.version == 1) + + b',value_f64' + + _float_binary_bytes(5.5, self.version == 1) + + b',value_i8=5i,value_i32=5000i 1704067204000000000' + + tsls) + act = _dataframe(self.version, df, table_name='tbl1', at='ts') + self.assertEqual(act, exp) + + def test_arrow_strings_as_symbols(self): + df = pd.DataFrame({ + "sym_large": pd.Series( + pa.LargeStringArray.from_pandas( + ["alpha", None, "gamma", "delta", "epsilon"] + ), + dtype="large_string[pyarrow]" + ), + + "sym_small": pd.Series( + pa.array(["foo", "bar", None, "baz", "qux"], type=pa.string()), + dtype="string[pyarrow]" + ) + }) + + act = _dataframe(self.version, df, table_name='tbl1', symbols=('sym_large', 'sym_small'), at=qi.ServerTimestamp) + exp = ( + b'tbl1,sym_large=alpha,sym_small=foo\n' + b'tbl1,sym_small=bar\n' + b'tbl1,sym_large=gamma\n' + b'tbl1,sym_large=delta,sym_small=baz\n' + b'tbl1,sym_large=epsilon,sym_small=qux\n' + ) + self.assertEqual(exp, act) + class TestPandasProtocolVersionV1(TestPandasBase.TestPandas): name = 'protocol version 1'