From 12745349581041341a5cb69fadbfd4b8b108413f Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Fri, 3 Oct 2025 18:01:51 +0100 Subject: [PATCH 1/8] designated timestamp suffix --- questdb-rs/src/ingress/buffer.rs | 41 ++++++++++++++++++++---------- questdb-rs/src/tests/sender.rs | 2 +- system_test/questdb_line_sender.py | 9 +++++++ system_test/test.py | 20 ++++++++++++++- 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/questdb-rs/src/ingress/buffer.rs b/questdb-rs/src/ingress/buffer.rs index 68edc67d..b4557686 100644 --- a/questdb-rs/src/ingress/buffer.rs +++ b/questdb-rs/src/ingress/buffer.rs @@ -23,7 +23,7 @@ ******************************************************************************/ use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed}; use crate::ingress::{ - ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampNanos, + ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, MAX_NAME_LEN_DEFAULT, }; use crate::{error, Error}; @@ -1209,23 +1209,36 @@ impl Buffer { self.check_op(Op::At)?; let timestamp: Timestamp = timestamp.try_into()?; - // https://github.com/rust-lang/rust/issues/115880 - let timestamp: crate::Result = timestamp.try_into(); - let timestamp: TimestampNanos = timestamp?; + let (number, suffix) = match timestamp { + Timestamp::Micros(micros) => { + let epoch_micros = micros.as_i64(); + if epoch_micros < 0 { + return Err(error::fmt!( + InvalidTimestamp, + "Microsecond timestamp {} is negative. It must be >= 0.", + epoch_micros + )); + } + (epoch_micros, "t\n") + } + Timestamp::Nanos(nanos) => { + let epoch_nanos = nanos.as_i64(); + if epoch_nanos < 0 { + return Err(error::fmt!( + InvalidTimestamp, + "Nanosecond timestamp {} is negative. It must be >= 0.", + epoch_nanos + )); + } + (epoch_nanos, "\n") + } + }; - let epoch_nanos = timestamp.as_i64(); - if epoch_nanos < 0 { - return Err(error::fmt!( - InvalidTimestamp, - "Timestamp {} is negative. It must be >= 0.", - epoch_nanos - )); - } let mut buf = itoa::Buffer::new(); - let printed = buf.format(epoch_nanos); + let printed = buf.format(number); self.output.push(b' '); self.output.extend_from_slice(printed.as_bytes()); - self.output.push(b'\n'); + self.output.extend_from_slice(suffix.as_bytes()); self.state.op_case = OpCase::MayFlushOrTable; self.state.row_count += 1; Ok(()) diff --git a/questdb-rs/src/tests/sender.rs b/questdb-rs/src/tests/sender.rs index 0a62e582..32653789 100644 --- a/questdb-rs/src/tests/sender.rs +++ b/questdb-rs/src/tests/sender.rs @@ -538,7 +538,7 @@ fn test_timestamp_overloads() -> TestResult { )?)?; let exp = concat!( - "tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1000\n", + "tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1t\n", "tbl_name a=1000000t 5000000000\n" ) .as_bytes(); diff --git a/system_test/questdb_line_sender.py b/system_test/questdb_line_sender.py index 55e0fcb0..56476d44 100644 --- a/system_test/questdb_line_sender.py +++ b/system_test/questdb_line_sender.py @@ -787,6 +787,12 @@ def at(self, timestamp: int): self._impl, timestamp) + def at_micros(self, timestamp: int): + _error_wrapped_call( + _DLL.line_sender_buffer_at_micros, + self._impl, + timestamp) + def __del__(self): _DLL.line_sender_buffer_free(self._impl) @@ -923,6 +929,9 @@ def at_now(self): def at(self, timestamp: int): self._buffer.at(timestamp) + def at_micros(self, timestamp: int): + self._buffer.at_micros(timestamp) + def flush(self, buffer: Optional[Buffer] = None, clear=True, transactional=None): if (buffer is None) and not clear: raise ValueError( diff --git a/system_test/test.py b/system_test/test.py index 9465d630..e2756db9 100755 --- a/system_test/test.py +++ b/system_test/test.py @@ -355,11 +355,29 @@ def test_neg_at(self): at_ts_ns = -10000000 with self.assertRaisesRegex(qls.SenderError, r'Bad call to'): with self._mk_linesender() as sender: - with self.assertRaisesRegex(qls.SenderError, r'.*Timestamp .* is negative.*'): + with self.assertRaisesRegex(qls.SenderError, r'.*Nanosecond timestamp .* is negative.*'): (sender .table(table_name) .symbol('a', 'A') .at(at_ts_ns)) + + def test_micros_at(self): + if QDB_FIXTURE.version <= (6, 0, 7, 1): + self.skipTest('No support for user-provided timestamps.') + return + table_name = uuid.uuid4().hex + at_ts_ns = 1647357688714369403 + at_ts_us = at_ts_ns // 1000 + pending = None + with self._mk_linesender() as sender: + (sender + .table(table_name) + .symbol('a', 'A') + .at_micros(at_ts_us)) + pending = sender.buffer.peek() + resp = retry_check_table(table_name, log_ctx=pending) + exp_dataset = [['A', ns_to_qdb_date(at_ts_ns)]] + self.assertEqual(resp['dataset'], exp_dataset) def test_timestamp_col(self): if QDB_FIXTURE.version <= (6, 0, 7, 1): From 6e9a01e6536f725d21d902288f9443e1bb329ef4 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 7 Oct 2025 11:44:32 +0100 Subject: [PATCH 2/8] maintaining old logic for V1, always suffix for V2 (also designated TS) --- questdb-rs/src/ingress/buffer.rs | 67 ++++++++++++++------------------ 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/questdb-rs/src/ingress/buffer.rs b/questdb-rs/src/ingress/buffer.rs index b4557686..b87688bf 100644 --- a/questdb-rs/src/ingress/buffer.rs +++ b/questdb-rs/src/ingress/buffer.rs @@ -23,8 +23,9 @@ ******************************************************************************/ use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed}; use crate::ingress::{ - ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, - ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, MAX_NAME_LEN_DEFAULT, + ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros, + TimestampNanos, ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, + MAX_NAME_LEN_DEFAULT, }; use crate::{error, Error}; use std::fmt::{Debug, Formatter}; @@ -1147,19 +1148,19 @@ impl Buffer { { self.write_column_key(name)?; let timestamp: Timestamp = value.try_into()?; - let mut buf = itoa::Buffer::new(); - match timestamp { - Timestamp::Micros(ts) => { - let printed = buf.format(ts.as_i64()); - self.output.extend_from_slice(printed.as_bytes()); - self.output.push(b't'); - } - Timestamp::Nanos(ts) => { - let printed = buf.format(ts.as_i64()); - self.output.extend_from_slice(printed.as_bytes()); - self.output.push(b'n'); + let (number, suffix) = match (self.protocol_version, timestamp) { + (ProtocolVersion::V1, _) => { + let timestamp: TimestampMicros = timestamp.try_into()?; + (timestamp.as_i64(), b't') } - } + (_, Timestamp::Micros(ts)) => (ts.as_i64(), b't'), + (_, Timestamp::Nanos(ts)) => (ts.as_i64(), b'n'), + }; + + let mut buf = itoa::Buffer::new(); + let printed = buf.format(number); + self.output.extend_from_slice(printed.as_bytes()); + self.output.push(suffix); Ok(self) } @@ -1209,36 +1210,28 @@ impl Buffer { self.check_op(Op::At)?; let timestamp: Timestamp = timestamp.try_into()?; - let (number, suffix) = match timestamp { - Timestamp::Micros(micros) => { - let epoch_micros = micros.as_i64(); - if epoch_micros < 0 { - return Err(error::fmt!( - InvalidTimestamp, - "Microsecond timestamp {} is negative. It must be >= 0.", - epoch_micros - )); - } - (epoch_micros, "t\n") - } - Timestamp::Nanos(nanos) => { - let epoch_nanos = nanos.as_i64(); - if epoch_nanos < 0 { - return Err(error::fmt!( - InvalidTimestamp, - "Nanosecond timestamp {} is negative. It must be >= 0.", - epoch_nanos - )); - } - (epoch_nanos, "\n") + let (number, termination) = match (self.protocol_version, timestamp) { + (ProtocolVersion::V1, _) => { + let timestamp: crate::Result = timestamp.try_into(); + (timestamp?.as_i64(), "\n") } + (_, Timestamp::Micros(micros)) => (micros.as_i64(), "t\n"), + (_, Timestamp::Nanos(nanos)) => (nanos.as_i64(), "n\n"), }; + if number < 0 { + return Err(error::fmt!( + InvalidTimestamp, + "Timestamp {} is negative. It must be >= 0.", + number + )); + } + let mut buf = itoa::Buffer::new(); let printed = buf.format(number); self.output.push(b' '); self.output.extend_from_slice(printed.as_bytes()); - self.output.extend_from_slice(suffix.as_bytes()); + self.output.extend_from_slice(termination.as_bytes()); self.state.op_case = OpCase::MayFlushOrTable; self.state.row_count += 1; Ok(()) From 0ad34ce4b9dd0f995027d274644abb28f47e6855 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 7 Oct 2025 11:53:00 +0100 Subject: [PATCH 3/8] fixed up the Rust unit tests --- questdb-rs/src/tests/http.rs | 8 +++- questdb-rs/src/tests/sender.rs | 78 ++++++++++++++++++++++++++++++---- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/questdb-rs/src/tests/http.rs b/questdb-rs/src/tests/http.rs index d66242db..d19b9f09 100644 --- a/questdb-rs/src/tests/http.rs +++ b/questdb-rs/src/tests/http.rs @@ -723,10 +723,16 @@ fn _test_sender_auto_detect_protocol_version( )?, Some(_) => server.send_settings_response()?, } + + let designated_ts = if expect_version == ProtocolVersion::V1 { + " 10000000\n" + } else { + " 10000000n\n" + }; let exp = &[ b"test,t1=v1 ", crate::tests::sender::f64_to_bytes("f1", 0.5, expect_version).as_slice(), - b" 10000000\n", + designated_ts.as_bytes(), ] .concat(); let req = server.recv_http_q()?; diff --git a/questdb-rs/src/tests/sender.rs b/questdb-rs/src/tests/sender.rs index 32653789..4f63b2ed 100644 --- a/questdb-rs/src/tests/sender.rs +++ b/questdb-rs/src/tests/sender.rs @@ -86,12 +86,19 @@ fn test_basics( .at(ts_nanos)?; assert_eq!(server.recv_q()?, 0); + let (ts3_num, ts3_suffix, dts_suffix) = if version == ProtocolVersion::V1 { + (ts_nanos_num / 1000, "t", "") + } else { + (ts_nanos_num, "n", "n") + }; let exp = &[ "test,t1=v1 ".as_bytes(), f64_to_bytes("f1", 0.5, version).as_slice(), format!( - ",ts1=12345t,ts2={}t,ts3={ts_nanos_num}n {ts_nanos_num}\n", - ts_micros_num + ",ts1=12345t,\ + ts2={ts_micros_num}t,\ + ts3={ts3_num}{ts3_suffix} \ + {ts_nanos_num}{dts_suffix}\n" ) .as_bytes(), ] @@ -139,7 +146,7 @@ fn test_array_f64_basic() -> TestResult { &1.0f64.to_le_bytes(), &2.0f64.to_le_bytes(), &3.0f64.to_le_bytes(), - format!(" {}\n", ts.as_i64()).as_bytes(), + format!(" {}n\n", ts.as_i64()).as_bytes(), ] .concat(); @@ -217,7 +224,7 @@ fn test_array_f64_for_ndarray() -> TestResult { ",arr3d=".as_bytes(), array_header3d, array_data3d.as_slice(), - format!(" {}\n", ts.as_i64()).as_bytes(), + format!(" {}n\n", ts.as_i64()).as_bytes(), ] .concat(); @@ -506,7 +513,49 @@ fn test_bad_key( } #[test] -fn test_timestamp_overloads() -> TestResult { +fn test_timestamp_overloads_v1() -> TestResult { + use std::time::SystemTime; + + let tbl_name = TableName::new("tbl_name")?; + + let mut buffer = Buffer::new(ProtocolVersion::V1); + buffer + .table(tbl_name)? + .column_ts("a", TimestampMicros::new(12345))? + .column_ts("b", TimestampMicros::new(-100000000))? + .column_ts("c", TimestampNanos::new(12345678))? + .column_ts("d", TimestampNanos::new(-12345678))? + .column_ts("e", Timestamp::Micros(TimestampMicros::new(-1)))? + .column_ts("f", Timestamp::Nanos(TimestampNanos::new(-10000)))? + .at(TimestampMicros::new(1))?; + buffer + .table(tbl_name)? + .column_ts( + "a", + TimestampMicros::from_systemtime( + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(1)) + .unwrap(), + )?, + )? + .at(TimestampNanos::from_systemtime( + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(5)) + .unwrap(), + )?)?; + + let exp = concat!( + "tbl_name a=12345t,b=-100000000t,c=12345t,d=-12345t,e=-1t,f=-10t 1000\n", + "tbl_name a=1000000t 5000000000\n" + ) + .as_bytes(); + assert_eq!(buffer.as_bytes(), exp); + + Ok(()) +} + +#[test] +fn test_timestamp_overloads_v2() -> TestResult { use std::time::SystemTime; let tbl_name = TableName::new("tbl_name")?; @@ -539,7 +588,7 @@ fn test_timestamp_overloads() -> TestResult { let exp = concat!( "tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1t\n", - "tbl_name a=1000000t 5000000000\n" + "tbl_name a=1000000t 5000000000n\n" ) .as_bytes(); assert_eq!(buffer.as_bytes(), exp); @@ -559,7 +608,7 @@ fn test_chrono_timestamp() -> TestResult { let mut buffer = Buffer::new(ProtocolVersion::V2); buffer.table(tbl_name)?.column_ts("a", ts)?.at(ts)?; - let exp = b"tbl_name a=1000000000n 1000000000\n"; + let exp = b"tbl_name a=1000000000n 1000000000n\n"; assert_eq!(buffer.as_bytes(), exp); Ok(()) @@ -633,11 +682,17 @@ fn test_tls_with_file_ca( .column_f64("f1", 0.5)? .at(TimestampNanos::new(10000000))?; + let designated_ts = if version == ProtocolVersion::V1 { + " 10000000\n" + } else { + " 10000000n\n" + }; + assert_eq!(server.recv_q()?, 0); let exp = &[ "test,t1=v1 ".as_bytes(), f64_to_bytes("f1", 0.5, version).as_slice(), - " 10000000\n".as_bytes(), + designated_ts.as_bytes(), ] .concat(); assert_eq!(buffer.as_bytes(), exp); @@ -740,10 +795,15 @@ fn test_tls_insecure_skip_verify( .at(TimestampNanos::new(10000000))?; assert_eq!(server.recv_q()?, 0); + let designated_ts = if version == ProtocolVersion::V1 { + " 10000000\n" + } else { + " 10000000n\n" + }; let exp = &[ "test,t1=v1 ".as_bytes(), f64_to_bytes("f1", 0.5, version).as_slice(), - " 10000000\n".as_bytes(), + designated_ts.as_bytes(), ] .concat(); assert_eq!(buffer.as_bytes(), exp); From 4fc220eb7b9b5fcb52a86a47dfb6055741fe4145 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 7 Oct 2025 12:34:01 +0100 Subject: [PATCH 4/8] fixed up C++ tests --- cpp_test/test_line_sender.cpp | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/cpp_test/test_line_sender.cpp b/cpp_test/test_line_sender.cpp index f6ed3ebd..441692fd 100644 --- a/cpp_test/test_line_sender.cpp +++ b/cpp_test/test_line_sender.cpp @@ -228,7 +228,7 @@ TEST_CASE("line_sender c api basics") &err)); CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err)); CHECK(server.recv() == 0); - CHECK(::line_sender_buffer_size(buffer) == 382); + CHECK(::line_sender_buffer_size(buffer) == 383); CHECK(::line_sender_flush(sender, buffer, &err)); ::line_sender_buffer_free(buffer); CHECK(server.recv() == 1); @@ -236,7 +236,8 @@ TEST_CASE("line_sender c api basics") push_double_to_buffer(expect, 0.5).append(",a1=="); push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2=="); push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a3=="); - push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n"); + push_double_arr_to_buffer(expect, arr_data, 3, shape) + .append(" 10000000n\n"); CHECK(server.msgs(0) == expect); } @@ -329,7 +330,7 @@ TEST_CASE("line_sender c++ api basics") .at(questdb::ingress::timestamp_nanos{10000000}); CHECK(server.recv() == 0); - CHECK(buffer.size() == 610); + CHECK(buffer.size() == 611); sender.flush(buffer); CHECK(server.recv() == 1); std::string expect{"test,t1=v1,t2= f1=="}; @@ -340,7 +341,7 @@ TEST_CASE("line_sender c++ api basics") push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a4=="); push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a5=="); push_double_arr_to_buffer(expect, arr_data, 1, shapes_1dim) - .append(" 10000000\n"); + .append(" 10000000n\n"); CHECK(server.msgs(0) == expect); } @@ -378,12 +379,12 @@ TEST_CASE("line_sender array vector API") uintptr_t test_shape[] = {12}; CHECK(server.recv() == 0); - CHECK(buffer.size() == 132); + CHECK(buffer.size() == 133); sender.flush(buffer); CHECK(server.recv() == 1); std::string expect{"test,t1=v1,t2= a1=="}; push_double_arr_to_buffer(expect, arr_data, 1, test_shape) - .append(" 10000000\n"); + .append(" 10000000n\n"); CHECK(server.msgs(0) == expect); } @@ -427,12 +428,12 @@ TEST_CASE("line_sender array span API") uintptr_t test_shape[] = {8}; CHECK(server.recv() == 0); - CHECK(buffer.size() == 100); + CHECK(buffer.size() == 101); sender.flush(buffer); CHECK(server.recv() == 1); std::string expect{"test,t1=v1,t2= a1=="}; push_double_arr_to_buffer(expect, expect_arr_data, 1, test_shape) - .append(" 10000000\n"); + .append(" 10000000n\n"); CHECK(server.msgs(0) == expect); } #endif @@ -468,12 +469,12 @@ TEST_CASE("test multiple lines") .at_now(); CHECK(server.recv() == 0); - CHECK(buffer.size() == 142); + CHECK(buffer.size() == 143); sender.flush(buffer); CHECK(server.recv() == 2); std::string expect{"metric1,t1=val1,t2=val2 f1=t,f2=12345i,f3=="}; push_double_to_buffer(expect, 10.75) - .append(",f4=\"val3\",f5=\"val4\",f6=\"val5\" 111222233333\n"); + .append(",f4=\"val3\",f5=\"val4\",f6=\"val5\" 111222233333n\n"); CHECK(server.msgs(0) == expect); CHECK( server.msgs(1) == "metric1,tag3=value\\ 3,tag\\ 4=value:4 field5=f\n"); @@ -917,7 +918,7 @@ TEST_CASE("Opts copy ctor, assignment and move testing.") } } -TEST_CASE("Test timestamp column.") +TEST_CASE("Test timestamp column V1.") { questdb::ingress::test::mock_server server; questdb::ingress::line_sender sender{questdb::ingress::opts{ @@ -943,8 +944,8 @@ TEST_CASE("Test timestamp column.") .at(now_nanos_ts); std::stringstream ss; - ss << "test ts1=12345t,ts2=" << now_micros << "t,ts3=" << now_nanos << "n " - << now_nanos << "\n"; + ss << "test ts1=12345t,ts2=" << now_micros << "t,ts3=" << (now_nanos / 1000) + << "t " << now_nanos << "\n"; const auto exp = ss.str(); CHECK(buffer.peek() == exp); @@ -1193,11 +1194,11 @@ TEST_CASE("line sender protocol version v2") .at(questdb::ingress::timestamp_nanos{10000000}); CHECK(server.recv() == 0); - CHECK(buffer.size() == 38); + CHECK(buffer.size() == 39); sender.flush(buffer); CHECK(server.recv() == 1); std::string expect{"test,t1=v1,t2= f1=="}; - push_double_to_buffer(expect, 0.5).append(" 10000000\n"); + push_double_to_buffer(expect, 0.5).append(" 10000000n\n"); CHECK(server.msgs(0) == expect); } From a1c0a581ab315fb1ffd5d3d4b0d886d88d487c36 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Tue, 7 Oct 2025 12:43:10 +0100 Subject: [PATCH 5/8] fixed up Python integration tests --- ci/run_all_tests.py | 2 +- system_test/test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/run_all_tests.py b/ci/run_all_tests.py index 21d5b3ea..e753ed17 100644 --- a/ci/run_all_tests.py +++ b/ci/run_all_tests.py @@ -43,7 +43,7 @@ def main(): build_cxx20_dir.glob(f'**/test_line_sender{exe_suffix}'))) system_test_path = pathlib.Path('system_test') / 'test.py' - qdb_v = '9.0.3' # The version of QuestDB we'll test against. + qdb_v = '9.1.0' # The version of QuestDB we'll test against. run_cmd('cargo', 'test', '--', '--nocapture', cwd='questdb-rs') diff --git a/system_test/test.py b/system_test/test.py index e2756db9..724c4340 100755 --- a/system_test/test.py +++ b/system_test/test.py @@ -355,7 +355,7 @@ def test_neg_at(self): at_ts_ns = -10000000 with self.assertRaisesRegex(qls.SenderError, r'Bad call to'): with self._mk_linesender() as sender: - with self.assertRaisesRegex(qls.SenderError, r'.*Nanosecond timestamp .* is negative.*'): + with self.assertRaisesRegex(qls.SenderError, r'.*Timestamp .* is negative.*'): (sender .table(table_name) .symbol('a', 'A') From e898ff14640dfbb04c6fbc606045198debb4af48 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Thu, 9 Oct 2025 14:35:09 +0100 Subject: [PATCH 6/8] tests for upcoming server functionality - cross-tested against questdb#6220 --- system_test/test.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/system_test/test.py b/system_test/test.py index 724c4340..d29396be 100755 --- a/system_test/test.py +++ b/system_test/test.py @@ -63,14 +63,6 @@ def retry_check_table(*args, **kwargs): return QDB_FIXTURE.retry_check_table(*args, **kwargs) -def ns_to_qdb_date(at_ts_ns): - # We first need to match QuestDB's internal microsecond resolution. - at_ts_us = int(at_ts_ns / 1000.0) - at_ts_sec = at_ts_us / 1000000.0 - at_td = datetime.datetime.utcfromtimestamp(at_ts_sec) - return at_td.isoformat() + 'Z' - - # Valid keys, but not registered with the QuestDB fixture. AUTH_UNRECOGNIZED = dict( username="testUser2", @@ -112,6 +104,30 @@ def _mk_linesender(self): QDB_FIXTURE.host, QDB_FIXTURE.http_server_port if QDB_FIXTURE.http else QDB_FIXTURE.line_tcp_port, **kwargs) + + def _ns_to_qdb_date(self, at_ts_ns, exp_nanos=True): + # We first need to match QuestDB's internal microsecond resolution. + at_ts_us = at_ts_ns // 1000 + trimmed_ns = at_ts_ns % 1000 + at_ts_sec = at_ts_us / 1000000.0 + at_td = datetime.datetime.fromtimestamp(at_ts_sec, datetime.UTC) + extra_precision = '' + if exp_nanos and self.clean_nanos_supported: + extra_precision = f'{trimmed_ns:03}' + return at_td.replace(tzinfo=None).isoformat() + extra_precision + 'Z' + + @property + def clean_nanos_supported(self) -> bool: + """True if the QuestDB server supports nanos and also respects the client's precision.""" + if QDB_FIXTURE.version <= (9, 1, 0): + return False + + if QDB_FIXTURE.http: + return QDB_FIXTURE.protocol_version != qls.ProtocolVersion.V1 + elif QDB_FIXTURE.protocol_version is None: + return False # TCP defaults to ProtocolVersion.V1 + else: + return QDB_FIXTURE.protocol_version >= qls.ProtocolVersion.V2 @property def expected_protocol_version(self) -> qls.ProtocolVersion: @@ -344,7 +360,7 @@ def test_at(self): .at(at_ts_ns)) pending = sender.buffer.peek() resp = retry_check_table(table_name, log_ctx=pending) - exp_dataset = [['A', ns_to_qdb_date(at_ts_ns)]] + exp_dataset = [['A', self._ns_to_qdb_date(at_ts_ns)]] self.assertEqual(resp['dataset'], exp_dataset) def test_neg_at(self): @@ -376,7 +392,7 @@ def test_micros_at(self): .at_micros(at_ts_us)) pending = sender.buffer.peek() resp = retry_check_table(table_name, log_ctx=pending) - exp_dataset = [['A', ns_to_qdb_date(at_ts_ns)]] + exp_dataset = [['A', self._ns_to_qdb_date(at_ts_ns, exp_nanos=False)]] self.assertEqual(resp['dataset'], exp_dataset) def test_timestamp_col(self): @@ -768,12 +784,13 @@ def _test_example(self, bin_name, table_name, tls=False): # Check inserted data. resp = retry_check_table(table_name) + exp_ts_type = 'TIMESTAMP_NS' if self.clean_nanos_supported else 'TIMESTAMP' exp_columns = [ {'name': 'symbol', 'type': 'SYMBOL'}, {'name': 'side', 'type': 'SYMBOL'}, {'name': 'price', 'type': 'DOUBLE'}, {'name': 'amount', 'type': 'DOUBLE'}, - {'name': 'timestamp', 'type': 'TIMESTAMP'}] + {'name': 'timestamp', 'type': exp_ts_type}] self.assertEqual(resp['columns'], exp_columns) exp_dataset = [['ETH-USD', @@ -853,10 +870,11 @@ def _test_array_example(self, bin_name, table_name): args.extend(['127.0.0.1', str(port)]) subprocess.check_call(args, cwd=bin_path.parent) resp = retry_check_table(table_name) + exp_ts_type = 'TIMESTAMP_NS' if self.clean_nanos_supported else 'TIMESTAMP' exp_columns = [ {'name': 'symbol', 'type': 'SYMBOL'}, {'dim': 3, 'elemType': 'DOUBLE', 'name': 'order_book', 'type': 'ARRAY'}, - {'name': 'timestamp', 'type': 'TIMESTAMP'}] + {'name': 'timestamp', 'type': exp_ts_type}] self.assertEqual(resp['columns'], exp_columns) exp_dataset = [['BTC-USD', [[[48123.5, 2.4], [48124.0, 1.8], [48124.5, 0.9]], From 5595c3f0e5bd5fe9699016123803a00089d562e4 Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Thu, 9 Oct 2025 15:10:56 +0100 Subject: [PATCH 7/8] more test coverage --- system_test/questdb_line_sender.py | 21 ++++++++++++++------- system_test/test.py | 24 ++++++++++++++++-------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/system_test/questdb_line_sender.py b/system_test/questdb_line_sender.py index 56476d44..53d25de3 100644 --- a/system_test/questdb_line_sender.py +++ b/system_test/questdb_line_sender.py @@ -627,6 +627,11 @@ def __init__(self, micros: int): self.value = micros +class TimestampNanos: + def __init__(self, nanos: int): + self.value = nanos + + class Buffer: def __init__(self, protocol_version: ProtocolVersion, init_buf_size=65536, max_name_len=127, ): self._impl = _DLL.line_sender_buffer_with_max_name_len( @@ -637,12 +642,8 @@ def __init__(self, protocol_version: ProtocolVersion, init_buf_size=65536, max_n def __len__(self): return _DLL.line_sender_buffer_size(self._impl) - def peek(self) -> str: - #  This is a hacky way of doing it because it copies the whole buffer. - # Instead the `buffer` should be made to support the buffer protocol: - # https://docs.python.org/3/c-api/buffer.html - # This way we would not need to `bytes(..)` the object to keep it alive. - # Then we could call `PyMemoryView_FromObject`. + def peek(self) -> bytes: + # Copy buffer view = _DLL.line_sender_buffer_peek(self._impl) if view.len: c_buf = ctypes.cast(view.buf, c_char_p) # uint8_t* → char* @@ -709,6 +710,12 @@ def column( self._impl, _column_name(name), value.value) + elif isinstance(value, TimestampNanos): + _error_wrapped_call( + _DLL.line_sender_buffer_column_ts_nanos, + self._impl, + _column_name(name), + value.value) elif isinstance(value, datetime): micros_epoch = int(value.timestamp()) * 1e6 + value.microsecond _error_wrapped_call( @@ -908,7 +915,7 @@ def symbol(self, name: str, value: str): def column( self, name: str, - value: Union[bool, int, float, str, TimestampMicros, datetime]): + value: Union[bool, int, float, str, TimestampMicros, TimestampNanos, datetime]): self._buffer.column(name, value) return self diff --git a/system_test/test.py b/system_test/test.py index d29396be..831a8493 100755 --- a/system_test/test.py +++ b/system_test/test.py @@ -105,20 +105,20 @@ def _mk_linesender(self): QDB_FIXTURE.http_server_port if QDB_FIXTURE.http else QDB_FIXTURE.line_tcp_port, **kwargs) - def _ns_to_qdb_date(self, at_ts_ns, exp_nanos=True): + def _ns_to_qdb_date(self, at_ts_ns, exp_nanos: bool): # We first need to match QuestDB's internal microsecond resolution. at_ts_us = at_ts_ns // 1000 trimmed_ns = at_ts_ns % 1000 at_ts_sec = at_ts_us / 1000000.0 at_td = datetime.datetime.fromtimestamp(at_ts_sec, datetime.UTC) extra_precision = '' - if exp_nanos and self.clean_nanos_supported: + if exp_nanos: extra_precision = f'{trimmed_ns:03}' return at_td.replace(tzinfo=None).isoformat() + extra_precision + 'Z' @property - def clean_nanos_supported(self) -> bool: - """True if the QuestDB server supports nanos and also respects the client's precision.""" + def client_driven_nanos_supported(self) -> bool: + """True if the QuestDB server supports nanos and also respects the client's precision for the designated timestamp.""" if QDB_FIXTURE.version <= (9, 1, 0): return False @@ -357,10 +357,14 @@ def test_at(self): (sender .table(table_name) .symbol('a', 'A') + .column('b', qls.TimestampNanos(at_ts_ns)) .at(at_ts_ns)) pending = sender.buffer.peek() resp = retry_check_table(table_name, log_ctx=pending) - exp_dataset = [['A', self._ns_to_qdb_date(at_ts_ns)]] + exp_dataset = [[ + 'A', + self._ns_to_qdb_date(at_ts_ns, exp_nanos=self.client_driven_nanos_supported), + self._ns_to_qdb_date(at_ts_ns, exp_nanos=self.client_driven_nanos_supported)]] self.assertEqual(resp['dataset'], exp_dataset) def test_neg_at(self): @@ -389,10 +393,14 @@ def test_micros_at(self): (sender .table(table_name) .symbol('a', 'A') + .column('b', qls.TimestampMicros(at_ts_us)) .at_micros(at_ts_us)) pending = sender.buffer.peek() resp = retry_check_table(table_name, log_ctx=pending) - exp_dataset = [['A', self._ns_to_qdb_date(at_ts_ns, exp_nanos=False)]] + exp_dataset = [[ + 'A', + self._ns_to_qdb_date(at_ts_ns, exp_nanos=False), + self._ns_to_qdb_date(at_ts_ns, exp_nanos=False)]] self.assertEqual(resp['dataset'], exp_dataset) def test_timestamp_col(self): @@ -784,7 +792,7 @@ def _test_example(self, bin_name, table_name, tls=False): # Check inserted data. resp = retry_check_table(table_name) - exp_ts_type = 'TIMESTAMP_NS' if self.clean_nanos_supported else 'TIMESTAMP' + exp_ts_type = 'TIMESTAMP_NS' if self.client_driven_nanos_supported else 'TIMESTAMP' exp_columns = [ {'name': 'symbol', 'type': 'SYMBOL'}, {'name': 'side', 'type': 'SYMBOL'}, @@ -870,7 +878,7 @@ def _test_array_example(self, bin_name, table_name): args.extend(['127.0.0.1', str(port)]) subprocess.check_call(args, cwd=bin_path.parent) resp = retry_check_table(table_name) - exp_ts_type = 'TIMESTAMP_NS' if self.clean_nanos_supported else 'TIMESTAMP' + exp_ts_type = 'TIMESTAMP_NS' if self.client_driven_nanos_supported else 'TIMESTAMP' exp_columns = [ {'name': 'symbol', 'type': 'SYMBOL'}, {'dim': 3, 'elemType': 'DOUBLE', 'name': 'order_book', 'type': 'ARRAY'}, From db91ac724c400d3bad9a2e2acb8b2ddb5bb61ceb Mon Sep 17 00:00:00 2001 From: Adam Cimarosti Date: Thu, 9 Oct 2025 16:12:59 +0100 Subject: [PATCH 8/8] CI fix --- system_test/test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/system_test/test.py b/system_test/test.py index 831a8493..e60562c7 100755 --- a/system_test/test.py +++ b/system_test/test.py @@ -110,11 +110,14 @@ def _ns_to_qdb_date(self, at_ts_ns, exp_nanos: bool): at_ts_us = at_ts_ns // 1000 trimmed_ns = at_ts_ns % 1000 at_ts_sec = at_ts_us / 1000000.0 - at_td = datetime.datetime.fromtimestamp(at_ts_sec, datetime.UTC) + + # Commented out for now. Uncomment when CI catches up to a newer Python version. + # at_td = datetime.datetime.fromtimestamp(at_ts_sec, datetime.UTC).replace(tzinfo=None) + at_td = datetime.datetime.utcfromtimestamp(at_ts_sec) extra_precision = '' if exp_nanos: extra_precision = f'{trimmed_ns:03}' - return at_td.replace(tzinfo=None).isoformat() + extra_precision + 'Z' + return at_td.isoformat() + extra_precision + 'Z' @property def client_driven_nanos_supported(self) -> bool: