Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/run_all_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def main():
build_cxx20_dir.glob(f'**/test_line_sender{exe_suffix}')))

system_test_path = pathlib.Path('system_test') / 'test.py'
qdb_v = '9.0.3' # The version of QuestDB we'll test against.
qdb_v = '9.1.0' # The version of QuestDB we'll test against.

run_cmd('cargo', 'test',
'--', '--nocapture', cwd='questdb-rs')
Expand Down
31 changes: 16 additions & 15 deletions cpp_test/test_line_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,16 @@ TEST_CASE("line_sender c api basics")
&err));
CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err));
CHECK(server.recv() == 0);
CHECK(::line_sender_buffer_size(buffer) == 382);
CHECK(::line_sender_buffer_size(buffer) == 383);
CHECK(::line_sender_flush(sender, buffer, &err));
::line_sender_buffer_free(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1 f1=="};
push_double_to_buffer(expect, 0.5).append(",a1==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a3==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n");
push_double_arr_to_buffer(expect, arr_data, 3, shape)
.append(" 10000000n\n");
CHECK(server.msgs(0) == expect);
}

Expand Down Expand Up @@ -329,7 +330,7 @@ TEST_CASE("line_sender c++ api basics")
.at(questdb::ingress::timestamp_nanos{10000000});

CHECK(server.recv() == 0);
CHECK(buffer.size() == 610);
CHECK(buffer.size() == 611);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= f1=="};
Expand All @@ -340,7 +341,7 @@ TEST_CASE("line_sender c++ api basics")
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a4==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a5==");
push_double_arr_to_buffer(expect, arr_data, 1, shapes_1dim)
.append(" 10000000\n");
.append(" 10000000n\n");
CHECK(server.msgs(0) == expect);
}

Expand Down Expand Up @@ -378,12 +379,12 @@ TEST_CASE("line_sender array vector API")

uintptr_t test_shape[] = {12};
CHECK(server.recv() == 0);
CHECK(buffer.size() == 132);
CHECK(buffer.size() == 133);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= a1=="};
push_double_arr_to_buffer(expect, arr_data, 1, test_shape)
.append(" 10000000\n");
.append(" 10000000n\n");
CHECK(server.msgs(0) == expect);
}

Expand Down Expand Up @@ -427,12 +428,12 @@ TEST_CASE("line_sender array span API")

uintptr_t test_shape[] = {8};
CHECK(server.recv() == 0);
CHECK(buffer.size() == 100);
CHECK(buffer.size() == 101);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= a1=="};
push_double_arr_to_buffer(expect, expect_arr_data, 1, test_shape)
.append(" 10000000\n");
.append(" 10000000n\n");
CHECK(server.msgs(0) == expect);
}
#endif
Expand Down Expand Up @@ -468,12 +469,12 @@ TEST_CASE("test multiple lines")
.at_now();

CHECK(server.recv() == 0);
CHECK(buffer.size() == 142);
CHECK(buffer.size() == 143);
sender.flush(buffer);
CHECK(server.recv() == 2);
std::string expect{"metric1,t1=val1,t2=val2 f1=t,f2=12345i,f3=="};
push_double_to_buffer(expect, 10.75)
.append(",f4=\"val3\",f5=\"val4\",f6=\"val5\" 111222233333\n");
.append(",f4=\"val3\",f5=\"val4\",f6=\"val5\" 111222233333n\n");
CHECK(server.msgs(0) == expect);
CHECK(
server.msgs(1) == "metric1,tag3=value\\ 3,tag\\ 4=value:4 field5=f\n");
Expand Down Expand Up @@ -917,7 +918,7 @@ TEST_CASE("Opts copy ctor, assignment and move testing.")
}
}

TEST_CASE("Test timestamp column.")
TEST_CASE("Test timestamp column V1.")
{
questdb::ingress::test::mock_server server;
questdb::ingress::line_sender sender{questdb::ingress::opts{
Expand All @@ -943,8 +944,8 @@ TEST_CASE("Test timestamp column.")
.at(now_nanos_ts);

std::stringstream ss;
ss << "test ts1=12345t,ts2=" << now_micros << "t,ts3=" << now_nanos << "n "
<< now_nanos << "\n";
ss << "test ts1=12345t,ts2=" << now_micros << "t,ts3=" << (now_nanos / 1000)
<< "t " << now_nanos << "\n";
const auto exp = ss.str();
CHECK(buffer.peek() == exp);

Expand Down Expand Up @@ -1193,11 +1194,11 @@ TEST_CASE("line sender protocol version v2")
.at(questdb::ingress::timestamp_nanos{10000000});

CHECK(server.recv() == 0);
CHECK(buffer.size() == 38);
CHECK(buffer.size() == 39);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= f1=="};
push_double_to_buffer(expect, 0.5).append(" 10000000\n");
push_double_to_buffer(expect, 0.5).append(" 10000000n\n");
CHECK(server.msgs(0) == expect);
}

Expand Down
50 changes: 28 additions & 22 deletions questdb-rs/src/ingress/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
******************************************************************************/
use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
use crate::ingress::{
ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampNanos,
ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, MAX_NAME_LEN_DEFAULT,
ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros,
TimestampNanos, ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS,
MAX_NAME_LEN_DEFAULT,
};
use crate::{error, Error};
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -1147,19 +1148,19 @@ impl Buffer {
{
self.write_column_key(name)?;
let timestamp: Timestamp = value.try_into()?;
let mut buf = itoa::Buffer::new();
match timestamp {
Timestamp::Micros(ts) => {
let printed = buf.format(ts.as_i64());
self.output.extend_from_slice(printed.as_bytes());
self.output.push(b't');
}
Timestamp::Nanos(ts) => {
let printed = buf.format(ts.as_i64());
self.output.extend_from_slice(printed.as_bytes());
self.output.push(b'n');
let (number, suffix) = match (self.protocol_version, timestamp) {
(ProtocolVersion::V1, _) => {
let timestamp: TimestampMicros = timestamp.try_into()?;
(timestamp.as_i64(), b't')
}
}
(_, Timestamp::Micros(ts)) => (ts.as_i64(), b't'),
(_, Timestamp::Nanos(ts)) => (ts.as_i64(), b'n'),
};

let mut buf = itoa::Buffer::new();
let printed = buf.format(number);
self.output.extend_from_slice(printed.as_bytes());
self.output.push(suffix);
Ok(self)
}

Expand Down Expand Up @@ -1209,23 +1210,28 @@ impl Buffer {
self.check_op(Op::At)?;
let timestamp: Timestamp = timestamp.try_into()?;

// https://github.com/rust-lang/rust/issues/115880
let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
let timestamp: TimestampNanos = timestamp?;
let (number, termination) = match (self.protocol_version, timestamp) {
(ProtocolVersion::V1, _) => {
let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
(timestamp?.as_i64(), "\n")
}
(_, Timestamp::Micros(micros)) => (micros.as_i64(), "t\n"),
(_, Timestamp::Nanos(nanos)) => (nanos.as_i64(), "n\n"),
};

let epoch_nanos = timestamp.as_i64();
if epoch_nanos < 0 {
if number < 0 {
return Err(error::fmt!(
InvalidTimestamp,
"Timestamp {} is negative. It must be >= 0.",
epoch_nanos
number
));
}

let mut buf = itoa::Buffer::new();
let printed = buf.format(epoch_nanos);
let printed = buf.format(number);
self.output.push(b' ');
self.output.extend_from_slice(printed.as_bytes());
self.output.push(b'\n');
self.output.extend_from_slice(termination.as_bytes());
self.state.op_case = OpCase::MayFlushOrTable;
self.state.row_count += 1;
Ok(())
Expand Down
8 changes: 7 additions & 1 deletion questdb-rs/src/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,16 @@ fn _test_sender_auto_detect_protocol_version(
)?,
Some(_) => server.send_settings_response()?,
}

let designated_ts = if expect_version == ProtocolVersion::V1 {
" 10000000\n"
} else {
" 10000000n\n"
};
let exp = &[
b"test,t1=v1 ",
crate::tests::sender::f64_to_bytes("f1", 0.5, expect_version).as_slice(),
b" 10000000\n",
designated_ts.as_bytes(),
]
.concat();
let req = server.recv_http_q()?;
Expand Down
80 changes: 70 additions & 10 deletions questdb-rs/src/tests/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,19 @@ fn test_basics(
.at(ts_nanos)?;

assert_eq!(server.recv_q()?, 0);
let (ts3_num, ts3_suffix, dts_suffix) = if version == ProtocolVersion::V1 {
(ts_nanos_num / 1000, "t", "")
} else {
(ts_nanos_num, "n", "n")
};
let exp = &[
"test,t1=v1 ".as_bytes(),
f64_to_bytes("f1", 0.5, version).as_slice(),
format!(
",ts1=12345t,ts2={}t,ts3={ts_nanos_num}n {ts_nanos_num}\n",
ts_micros_num
",ts1=12345t,\
ts2={ts_micros_num}t,\
ts3={ts3_num}{ts3_suffix} \
{ts_nanos_num}{dts_suffix}\n"
)
.as_bytes(),
]
Expand Down Expand Up @@ -139,7 +146,7 @@ fn test_array_f64_basic() -> TestResult {
&1.0f64.to_le_bytes(),
&2.0f64.to_le_bytes(),
&3.0f64.to_le_bytes(),
format!(" {}\n", ts.as_i64()).as_bytes(),
format!(" {}n\n", ts.as_i64()).as_bytes(),
]
.concat();

Expand Down Expand Up @@ -217,7 +224,7 @@ fn test_array_f64_for_ndarray() -> TestResult {
",arr3d=".as_bytes(),
array_header3d,
array_data3d.as_slice(),
format!(" {}\n", ts.as_i64()).as_bytes(),
format!(" {}n\n", ts.as_i64()).as_bytes(),
]
.concat();

Expand Down Expand Up @@ -506,12 +513,12 @@ fn test_bad_key(
}

#[test]
fn test_timestamp_overloads() -> TestResult {
fn test_timestamp_overloads_v1() -> TestResult {
use std::time::SystemTime;

let tbl_name = TableName::new("tbl_name")?;

let mut buffer = Buffer::new(ProtocolVersion::V2);
let mut buffer = Buffer::new(ProtocolVersion::V1);
buffer
.table(tbl_name)?
.column_ts("a", TimestampMicros::new(12345))?
Expand All @@ -538,7 +545,7 @@ fn test_timestamp_overloads() -> TestResult {
)?)?;

let exp = concat!(
"tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1000\n",
"tbl_name a=12345t,b=-100000000t,c=12345t,d=-12345t,e=-1t,f=-10t 1000\n",
"tbl_name a=1000000t 5000000000\n"
)
.as_bytes();
Expand All @@ -547,6 +554,48 @@ fn test_timestamp_overloads() -> TestResult {
Ok(())
}

#[test]
fn test_timestamp_overloads_v2() -> TestResult {
use std::time::SystemTime;

let tbl_name = TableName::new("tbl_name")?;

let mut buffer = Buffer::new(ProtocolVersion::V2);
buffer
.table(tbl_name)?
.column_ts("a", TimestampMicros::new(12345))?
.column_ts("b", TimestampMicros::new(-100000000))?
.column_ts("c", TimestampNanos::new(12345678))?
.column_ts("d", TimestampNanos::new(-12345678))?
.column_ts("e", Timestamp::Micros(TimestampMicros::new(-1)))?
.column_ts("f", Timestamp::Nanos(TimestampNanos::new(-10000)))?
.at(TimestampMicros::new(1))?;
buffer
.table(tbl_name)?
.column_ts(
"a",
TimestampMicros::from_systemtime(
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(1))
.unwrap(),
)?,
)?
.at(TimestampNanos::from_systemtime(
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(5))
.unwrap(),
)?)?;

let exp = concat!(
"tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1t\n",
"tbl_name a=1000000t 5000000000n\n"
)
.as_bytes();
assert_eq!(buffer.as_bytes(), exp);

Ok(())
}

#[cfg(feature = "chrono_timestamp")]
#[test]
fn test_chrono_timestamp() -> TestResult {
Expand All @@ -559,7 +608,7 @@ fn test_chrono_timestamp() -> TestResult {
let mut buffer = Buffer::new(ProtocolVersion::V2);
buffer.table(tbl_name)?.column_ts("a", ts)?.at(ts)?;

let exp = b"tbl_name a=1000000000n 1000000000\n";
let exp = b"tbl_name a=1000000000n 1000000000n\n";
assert_eq!(buffer.as_bytes(), exp);

Ok(())
Expand Down Expand Up @@ -633,11 +682,17 @@ fn test_tls_with_file_ca(
.column_f64("f1", 0.5)?
.at(TimestampNanos::new(10000000))?;

let designated_ts = if version == ProtocolVersion::V1 {
" 10000000\n"
} else {
" 10000000n\n"
};

assert_eq!(server.recv_q()?, 0);
let exp = &[
"test,t1=v1 ".as_bytes(),
f64_to_bytes("f1", 0.5, version).as_slice(),
" 10000000\n".as_bytes(),
designated_ts.as_bytes(),
]
.concat();
assert_eq!(buffer.as_bytes(), exp);
Expand Down Expand Up @@ -740,10 +795,15 @@ fn test_tls_insecure_skip_verify(
.at(TimestampNanos::new(10000000))?;

assert_eq!(server.recv_q()?, 0);
let designated_ts = if version == ProtocolVersion::V1 {
" 10000000\n"
} else {
" 10000000n\n"
};
let exp = &[
"test,t1=v1 ".as_bytes(),
f64_to_bytes("f1", 0.5, version).as_slice(),
" 10000000\n".as_bytes(),
designated_ts.as_bytes(),
]
.concat();
assert_eq!(buffer.as_bytes(), exp);
Expand Down
Loading