Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 188 additions & 31 deletions src/questdb/dataframe.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ cdef enum col_source_t:
col_source_str_lrg_utf8_arrow = 406000
col_source_dt64ns_numpy = 501000
col_source_dt64ns_tz_arrow = 502000
col_source_arr_f64_numpyobj = 601100
col_source_decimal_pyobj = 701100
col_source_decimal32_arrow = 702000
col_source_decimal64_arrow = 703000
col_source_decimal128_arrow = 704000
col_source_decimal256_arrow = 705000
col_source_dt64us_numpy = 601000
col_source_dt64us_tz_arrow = 602000
col_source_arr_f64_numpyobj = 701100
col_source_decimal_pyobj = 801100
col_source_decimal32_arrow = 802000
col_source_decimal64_arrow = 803000
col_source_decimal128_arrow = 804000
col_source_decimal256_arrow = 805000


cdef bint col_source_needs_gil(col_source_t source) noexcept nogil:
Expand Down Expand Up @@ -242,6 +244,8 @@ cdef dict _TARGET_TO_SOURCES = {
col_target_t.col_target_column_ts: {
col_source_t.col_source_dt64ns_numpy,
col_source_t.col_source_dt64ns_tz_arrow,
col_source_t.col_source_dt64us_numpy,
col_source_t.col_source_dt64us_tz_arrow
},
col_target_t.col_target_column_arr_f64: {
col_source_t.col_source_arr_f64_numpyobj,
Expand All @@ -256,6 +260,8 @@ cdef dict _TARGET_TO_SOURCES = {
col_target_t.col_target_at: {
col_source_t.col_source_dt64ns_numpy,
col_source_t.col_source_dt64ns_tz_arrow,
col_source_t.col_source_dt64us_numpy,
col_source_t.col_source_dt64us_tz_arrow,
},
}

Expand Down Expand Up @@ -386,11 +392,22 @@ cdef enum col_dispatch_code_t:
col_target_t.col_target_column_ts + \
col_source_t.col_source_dt64ns_tz_arrow

col_dispatch_code_column_ts__dt64us_numpy = \
col_target_t.col_target_column_ts + col_source_t.col_source_dt64us_numpy
col_dispatch_code_column_ts__dt64us_tz_arrow = \
col_target_t.col_target_column_ts + \
col_source_t.col_source_dt64us_tz_arrow

col_dispatch_code_at__dt64ns_numpy = \
col_target_t.col_target_at + col_source_t.col_source_dt64ns_numpy
col_dispatch_code_at__dt64ns_tz_arrow = \
col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow

col_dispatch_code_at__dt64us_numpy = \
col_target_t.col_target_at + col_source_t.col_source_dt64us_numpy
col_dispatch_code_at__dt64us_tz_arrow = \
col_target_t.col_target_at + col_source_t.col_source_dt64us_tz_arrow

col_dispatch_code_column_arr_f64__arr_f64_numpyobj = \
col_target_t.col_target_column_arr_f64 + col_source_t.col_source_arr_f64_numpyobj

Expand Down Expand Up @@ -508,6 +525,7 @@ cdef object _NUMPY_INT64 = None
cdef object _NUMPY_FLOAT32 = None
cdef object _NUMPY_FLOAT64 = None
cdef object _NUMPY_DATETIME64_NS = None
cdef object _NUMPY_DATETIME64_US = None
cdef object _NUMPY_OBJECT = None
cdef object _PANDAS = None # module object
cdef object _PANDAS_NA = None # pandas.NA
Expand Down Expand Up @@ -541,6 +559,7 @@ cdef object _dataframe_may_import_deps():
global _NUMPY_FLOAT32
global _NUMPY_FLOAT64
global _NUMPY_DATETIME64_NS
global _NUMPY_DATETIME64_US
global _NUMPY_OBJECT
if _NUMPY is not None:
return
Expand All @@ -567,6 +586,7 @@ cdef object _dataframe_may_import_deps():
_NUMPY_FLOAT32 = type(_NUMPY.dtype('float32'))
_NUMPY_FLOAT64 = type(_NUMPY.dtype('float64'))
_NUMPY_DATETIME64_NS = type(_NUMPY.dtype('datetime64[ns]'))
_NUMPY_DATETIME64_US = type(_NUMPY.dtype('datetime64[us]'))
_NUMPY_OBJECT = type(_NUMPY.dtype('object'))
_PANDAS = pandas
_PANDAS_NA = pandas.NA
Expand Down Expand Up @@ -781,16 +801,47 @@ cdef int64_t _AT_IS_SERVER_NOW = -2
cdef int64_t _AT_IS_SET_BY_COLUMN = -1


cdef str _SUPPORTED_DATETIMES = 'datetime64[ns] or datetime64[ns, tz]'
cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[us], datetime64[ns, tz], timestamp[ns][pyarrow], or timestamp[us][pyarrow]'


cdef object _dataframe_is_supported_datetime(object dtype):
if (isinstance(dtype, _NUMPY_DATETIME64_NS) and
(str(dtype) == 'datetime64[ns]')):
return True
if isinstance(dtype, _PANDAS.DatetimeTZDtype):
return dtype.unit == 'ns'
return False
cdef int _dataframe_classify_timestamp_dtype(object dtype) except -1:
"""
Classify the dtype and determine if it's supported for use as a timestamp.

Returns:
> 0 - a value castable to a `col_source_t`.
0 - dtype is not a supported timestamp datatype.
"""
cdef object arrow_type
if isinstance(dtype, _NUMPY_DATETIME64_NS) and str(dtype) == "datetime64[ns]":
return col_source_t.col_source_dt64ns_numpy
elif isinstance(dtype, _NUMPY_DATETIME64_US) and str(dtype) == "datetime64[us]":
return col_source_t.col_source_dt64us_numpy
elif isinstance(dtype, _PANDAS.DatetimeTZDtype):
# Docs say this should always be nanos, but best assert in case the API changes in the future.
# https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html
if dtype.unit == 'ns':
return col_source_t.col_source_dt64ns_tz_arrow
else:
raise IngressError(
IngressErrorCode.BadDataFrame,
f'Unsupported pandas dtype {dtype} unit {dtype.unit}. ' +
'Raise an issue if you think it should be supported: ' +
'https://github.com/questdb/py-questdb-client/issues.')
elif isinstance(dtype, _PANDAS.ArrowDtype):
arrow_type = dtype.pyarrow_dtype
if arrow_type.id == _PYARROW.lib.Type_TIMESTAMP:
if arrow_type.unit == "ns":
return col_source_t.col_source_dt64ns_tz_arrow
elif arrow_type.unit == "us":
return col_source_t.col_source_dt64us_tz_arrow
else:
raise IngressError(
IngressErrorCode.BadDataFrame,
f'Unsupported arrow dtype {dtype} unit {arrow_type.unit}. ' +
'Raise an issue if you think it should be supported: ' +
'https://github.com/questdb/py-questdb-client/issues.')
return 0


cdef ssize_t _dataframe_resolve_at(
Expand Down Expand Up @@ -827,7 +878,7 @@ cdef ssize_t _dataframe_resolve_at(
'Must be one of: None, TimestampNanos, datetime, ' +
'int (column index), str (colum name)')
dtype = df.dtypes.iloc[col_index]
if _dataframe_is_supported_datetime(dtype):
if _dataframe_classify_timestamp_dtype(dtype) != 0:
at_value_out[0] = _AT_IS_SET_BY_COLUMN
col = &cols.d[col_index]
col.setup.meta_target = meta_target_t.meta_target_at
Expand Down Expand Up @@ -954,28 +1005,52 @@ cdef void_int _dataframe_category_series_as_arrow(
f'got a category of {pandas_col.series.dtype.categories.dtype}.')

cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrowtype, col_t *col) except -1:
cdef bint is_decimal_col = False
_dataframe_series_as_arrow(pandas_col, col)
if arrowtype.id == _PYARROW.lib.Type_DECIMAL32:
col.setup.source = col_source_t.col_source_decimal32_arrow
is_decimal_col = True
elif arrowtype.id == _PYARROW.lib.Type_DECIMAL64:
col.setup.source = col_source_t.col_source_decimal64_arrow
is_decimal_col = True
elif arrowtype.id == _PYARROW.lib.Type_DECIMAL128:
col.setup.source = col_source_t.col_source_decimal128_arrow
is_decimal_col = True
elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256:
col.setup.source = col_source_t.col_source_decimal256_arrow
is_decimal_col = True
elif arrowtype.id == _PYARROW.lib.Type_BOOL:
col.setup.source = col_source_t.col_source_bool_arrow
elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING:
col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow
elif arrowtype.id == _PYARROW.lib.Type_FLOAT:
col.setup.source = col_source_t.col_source_f32_arrow
elif arrowtype.id == _PYARROW.lib.Type_DOUBLE:
col.setup.source = col_source_t.col_source_f64_arrow
elif arrowtype.id == _PYARROW.lib.Type_INT8:
col.setup.source = col_source_t.col_source_i8_arrow
elif arrowtype.id == _PYARROW.lib.Type_INT16:
col.setup.source = col_source_t.col_source_i16_arrow
elif arrowtype.id == _PYARROW.lib.Type_INT32:
col.setup.source = col_source_t.col_source_i32_arrow
elif arrowtype.id == _PYARROW.lib.Type_INT64:
col.setup.source = col_source_t.col_source_i64_arrow
else:
raise IngressError(
IngressErrorCode.BadDataFrame,
f'Unsupported arrow type {arrowtype} for column {pandas_col.name!r}. ' +
'Raise an issue if you think it should be supported: ' +
'https://github.com/questdb/py-questdb-client/issues.')
if arrowtype.scale < 0 or arrowtype.scale > 76:
raise IngressError(
IngressErrorCode.BadDataFrame,
f'Bad column {pandas_col.name!r}: ' +
f'Unsupported decimal scale {arrowtype.scale}: ' +
'Must be in the range 0 to 76 inclusive.')
col.scale = <uint8_t>arrowtype.scale
if is_decimal_col:
if arrowtype.scale < 0 or arrowtype.scale > 76:
raise IngressError(
IngressErrorCode.BadDataFrame,
f'Bad column {pandas_col.name!r}: ' +
f'Unsupported decimal scale {arrowtype.scale}: ' +
'Must be in the range 0 to 76 inclusive.')
col.scale = <uint8_t>arrowtype.scale
else:
col.scale = 0
return 0

cdef inline bint _dataframe_is_float_nan(PyObject* obj) noexcept:
Expand Down Expand Up @@ -1061,7 +1136,15 @@ cdef void_int _dataframe_series_sniff_pyobj(
cdef void_int _dataframe_resolve_source_and_buffers(
PandasCol pandas_col, col_t* col) except -1:
cdef object dtype = pandas_col.dtype
if isinstance(dtype, _NUMPY_BOOL):
cdef int ts_col_source = _dataframe_classify_timestamp_dtype(dtype)
if ts_col_source != 0:
col.setup.source = <col_source_t>ts_col_source
if ((col.setup.source == col_source_t.col_source_dt64ns_numpy) or
(col.setup.source == col_source_t.col_source_dt64us_numpy)):
_dataframe_series_as_pybuf(pandas_col, col)
else:
_dataframe_series_as_arrow(pandas_col, col)
elif isinstance(dtype, _NUMPY_BOOL):
col.setup.source = col_source_t.col_source_bool_numpy
_dataframe_series_as_pybuf(pandas_col, col)
elif isinstance(dtype, _PANDAS.BooleanDtype):
Expand Down Expand Up @@ -1150,14 +1233,6 @@ cdef void_int _dataframe_resolve_source_and_buffers(
f'for column {pandas_col.name} of dtype {dtype}.')
elif isinstance(dtype, _PANDAS.CategoricalDtype):
_dataframe_category_series_as_arrow(pandas_col, col)
elif (isinstance(dtype, _NUMPY_DATETIME64_NS) and
_dataframe_is_supported_datetime(dtype)):
col.setup.source = col_source_t.col_source_dt64ns_numpy
_dataframe_series_as_pybuf(pandas_col, col)
elif (isinstance(dtype, _PANDAS.DatetimeTZDtype) and
_dataframe_is_supported_datetime(dtype)):
col.setup.source = col_source_t.col_source_dt64ns_tz_arrow
_dataframe_series_as_arrow(pandas_col, col)
elif isinstance(dtype, _NUMPY_OBJECT):
_dataframe_series_sniff_pyobj(pandas_col, col)
elif isinstance(dtype, _PANDAS.ArrowDtype):
Expand Down Expand Up @@ -2126,6 +2201,21 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy(
_ensure_has_gil(gs)
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_column_ts__dt64us_numpy(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
col_t* col,
PyThreadState** gs) except -1:
cdef line_sender_error* err = NULL
cdef int64_t* access = <int64_t*>col.cursor.chunk.buffers[1]
cdef int64_t cell = access[col.cursor.offset]
if cell != _NAT:
if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
Expand Down Expand Up @@ -2265,6 +2355,7 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal256_arrow(
_ensure_has_gil(gs)
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
Expand All @@ -2282,6 +2373,23 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_column_ts__dt64us_tz_arrow(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
col_t* col,
PyThreadState** gs) except -1:
cdef line_sender_error* err = NULL
cdef bint valid = _dataframe_arrow_is_valid(&col.cursor)
cdef int64_t cell
cdef int64_t* access
if valid:
access = <int64_t*>col.cursor.chunk.buffers[1]
cell = access[col.cursor.offset]
if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
Expand All @@ -2301,6 +2409,25 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy(
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_at_dt64us_numpy(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
col_t* col,
PyThreadState** gs) except -1:
cdef line_sender_error* err = NULL
cdef int64_t* access = <int64_t*>col.cursor.chunk.buffers[1]
cdef int64_t cell = access[col.cursor.offset]
if cell == _NAT:
if not line_sender_buffer_at_now(ls_buf, &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)
else:
# Note: ls_buf will validate against negative numbers.
if not line_sender_buffer_at_micros(ls_buf, cell, &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
Expand All @@ -2323,6 +2450,28 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow(
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell_at_dt64us_tz_arrow(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
col_t* col,
PyThreadState** gs) except -1:
cdef line_sender_error* err = NULL
cdef bint valid = _dataframe_arrow_is_valid(&col.cursor)
cdef int64_t* access
cdef int64_t cell
if valid:
access = <int64_t*>col.cursor.chunk.buffers[1]
cell = access[col.cursor.offset]
# Note: ls_buf will validate against negative numbers.
if not line_sender_buffer_at_micros(ls_buf, cell, &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)
else:
if not line_sender_buffer_at_now(ls_buf, &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)


cdef void_int _dataframe_serialize_cell(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
Expand Down Expand Up @@ -2421,6 +2570,8 @@ cdef void_int _dataframe_serialize_cell(
_dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy:
_dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_numpy:
_dataframe_serialize_cell_column_ts__dt64us_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_arr_f64__arr_f64_numpyobj:
_dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col)
elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj:
Expand All @@ -2435,10 +2586,16 @@ cdef void_int _dataframe_serialize_cell(
_dataframe_serialize_cell_column_decimal__decimal256_arrow(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow:
_dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_tz_arrow:
_dataframe_serialize_cell_column_ts__dt64us_tz_arrow(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy:
_dataframe_serialize_cell_at_dt64ns_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_numpy:
_dataframe_serialize_cell_at_dt64us_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_tz_arrow:
_dataframe_serialize_cell_at_dt64ns_tz_arrow(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_tz_arrow:
_dataframe_serialize_cell_at_dt64us_tz_arrow(ls_buf, b, col, gs)
else:
_ensure_has_gil(gs)
raise RuntimeError(f"Unknown column dispatch code: {dc}")
Expand Down
Loading