Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conf_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
}
senderConf.autoFlushInterval = time.Duration(parsedVal) * time.Millisecond
case "min_throughput", "init_buf_size", "max_buf_size":
case "request_min_throughput", "init_buf_size", "max_buf_size":
parsedVal, err := strconv.Atoi(v)
if err != nil {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
}

switch k {
case "min_throughput":
case "request_min_throughput":
senderConf.minThroughput = parsedVal
case "init_buf_size":
senderConf.initBufSize = parsedVal
Expand Down
36 changes: 18 additions & 18 deletions conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,26 @@ func TestParserHappyCases(t *testing.T) {
},
},
{
name: "https with min_throughput",
config: fmt.Sprintf("https::addr=%s;min_throughput=%d;", addr, min_throughput),
name: "https with request_min_throughput",
config: fmt.Sprintf("https::addr=%s;request_min_throughput=%d;", addr, min_throughput),
expected: qdb.ConfigData{
Schema: "https",
KeyValuePairs: map[string]string{
"addr": addr,
"min_throughput": fmt.Sprintf("%d", min_throughput),
"addr": addr,
"request_min_throughput": fmt.Sprintf("%d", min_throughput),
},
},
},
{
name: "https with min_throughput, init_buf_size and tls_verify=unsafe_off",
config: fmt.Sprintf("https::addr=%s;min_throughput=%d;init_buf_size=%d;tls_verify=unsafe_off;", addr, min_throughput, 1024),
name: "https with request_min_throughput, init_buf_size and tls_verify=unsafe_off",
config: fmt.Sprintf("https::addr=%s;request_min_throughput=%d;init_buf_size=%d;tls_verify=unsafe_off;", addr, min_throughput, 1024),
expected: qdb.ConfigData{
Schema: "https",
KeyValuePairs: map[string]string{
"addr": addr,
"min_throughput": fmt.Sprintf("%d", min_throughput),
"init_buf_size": "1024",
"tls_verify": "unsafe_off",
"addr": addr,
"request_min_throughput": fmt.Sprintf("%d", min_throughput),
"init_buf_size": "1024",
"tls_verify": "unsafe_off",
},
},
},
Expand All @@ -153,16 +153,16 @@ func TestParserHappyCases(t *testing.T) {
},
},
{
name: "http with min_throughput, request_timeout, and retry_timeout",
config: fmt.Sprintf("http::addr=%s;min_throughput=%d;request_timeout=%d;retry_timeout=%d;",
name: "http with request_min_throughput, request_timeout, and retry_timeout",
config: fmt.Sprintf("http::addr=%s;request_min_throughput=%d;request_timeout=%d;retry_timeout=%d;",
addr, min_throughput, request_timeout.Milliseconds(), retry_timeout.Milliseconds()),
expected: qdb.ConfigData{
Schema: "http",
KeyValuePairs: map[string]string{
"addr": addr,
"min_throughput": fmt.Sprintf("%d", min_throughput),
"request_timeout": fmt.Sprintf("%d", request_timeout.Milliseconds()),
"retry_timeout": fmt.Sprintf("%d", retry_timeout.Milliseconds()),
"addr": addr,
"request_min_throughput": fmt.Sprintf("%d", min_throughput),
"request_timeout": fmt.Sprintf("%d", request_timeout.Milliseconds()),
"retry_timeout": fmt.Sprintf("%d", retry_timeout.Milliseconds()),
},
},
},
Expand Down Expand Up @@ -373,8 +373,8 @@ func TestHappyCasesFromConf(t *testing.T) {
},
},
{
name: "min_throughput",
config: fmt.Sprintf("http::addr=%s;min_throughput=%d;",
name: "request_min_throughput",
config: fmt.Sprintf("http::addr=%s;request_min_throughput=%d;",
addr, minThroughput),
expectedOpts: []qdb.LineSenderOption{
qdb.WithHttp(),
Expand Down
2 changes: 1 addition & 1 deletion http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token))
}

// reqTimeout = ( request.len() / min_throughput ) + request_timeout
// reqTimeout = ( request.len() / request_min_throughput ) + request_timeout
// nb: conversion from int to time.Duration is in milliseconds
reqTimeout := time.Duration(s.buf.Len()/s.minThroughputBytesPerSecond)*time.Second + s.requestTimeout
reqCtx, cancel := context.WithTimeout(ctx, reqTimeout)
Expand Down
6 changes: 3 additions & 3 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestHttpHappyCasesFromConf(t *testing.T) {
addr, pass, user),
},
{
name: "min_throughput",
config: fmt.Sprintf("http::addr=%s;min_throughput=%d;",
name: "request_min_throughput",
config: fmt.Sprintf("http::addr=%s;request_min_throughput=%d;",
addr, min_throughput),
},
{
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestHttpPathologicalCasesFromConf(t *testing.T) {
},
{
name: "negative min throughput",
config: "http::min_throughput=-1;",
config: "http::request_min_throughput=-1;",
expectedErr: "min throughput is negative",
},
{
Expand Down
24 changes: 21 additions & 3 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ func WithBearerToken(token string) LineSenderOption {
}
}

// WithRequestTimeout is used in combination with min_throughput
// WithRequestTimeout is used in combination with request_min_throughput
// to set the timeout of an ILP request. Defaults to 10 seconds.
//
// timeout = (request.len() / min_throughput) + request_timeout
// timeout = (request.len() / request_min_throughput) + request_timeout
//
// Only available for the HTTP sender.
func WithRequestTimeout(timeout time.Duration) LineSenderOption {
Expand All @@ -278,7 +278,7 @@ func WithRequestTimeout(timeout time.Duration) LineSenderOption {
// WithMinThroughput is used in combination with request_timeout
// to set the timeout of an ILP request. Defaults to 100KiB/s.
//
// timeout = (request.len() / min_throughput) + request_timeout
// timeout = (request.len() / request_min_throughput) + request_timeout
//
// Only available for the HTTP sender.
func WithMinThroughput(bytesPerSecond int) LineSenderOption {
Expand Down Expand Up @@ -429,27 +429,45 @@ func LineSenderFromEnv(ctx context.Context) (LineSender, error) {
//
// Options:
// http(s) and tcp(s):
//
// -------------------
//
// addr: hostname/port of QuestDB endpoint
//
// init_buf_size: initial growable ILP buffer size in bytes (defaults to 128KiB)
//
// tls_verify: determines if TLS certificates should be validated (defaults to "on", can be set to "unsafe_off")
//
// http(s)-only
//
// ------------
//
// username: for basic authentication
//
// password: for basic authentication
//
// token: bearer token auth (used instead of basic authentication)
//
// auto_flush: determines if auto-flushing is enabled (values "on" or "off", defaults to "on")
//
// auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
//
// auto_flush_interval: auto-flushing is triggered above this time, in milliseconds (defaults to 1000 milliseconds). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
//
// request_min_throughput: bytes per second, used to calculate each request's timeout (defaults to 100KiB/s)
//
// request_timeout: minimum request timeout in milliseconds (defaults to 10 seconds)
//
// retry_timeout: cumulative maximum millisecond duration spent in retries (defaults to 10 seconds)
//
// max_buf_size: buffer growth limit in bytes. Client errors if breached (default is 100MiB)
//
// tcp(s)-only
//
// -----------
//
// username: KID (key ID) for ECDSA authentication
//
// token: Secret K (D) for ECDSA authentication
func LineSenderFromConf(ctx context.Context, conf string) (LineSender, error) {
c, err := confFromStr(conf)
Expand Down
8 changes: 4 additions & 4 deletions tcp_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func TestTcpPathologicalCasesFromEnv(t *testing.T) {
expectedErr: "requestTimeout setting is not available",
},
{
name: "min_throughput",
config: "tcp::min_throughput=5;",
name: "request_min_throughput",
config: "tcp::request_min_throughput=5;",
expectedErr: "minThroughput setting is not available",
},
{
Expand Down Expand Up @@ -158,8 +158,8 @@ func TestTcpPathologicalCasesFromConf(t *testing.T) {
expectedErr: "retryTimeout setting is not available",
},
{
name: "min_throughput",
config: "tcp::min_throughput=5;",
name: "request_min_throughput",
config: "tcp::request_min_throughput=5;",
expectedErr: "minThroughput setting is not available",
},
{
Expand Down