diff --git a/README.md b/README.md index c43ee20c..bf670d79 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This library makes it easy to insert data into [QuestDB](https://questdb.io/). This client library implements the [InfluxDB Line Protocol]( -https://questdb.io/docs/reference/api/ilp/overview/) (ILP) over TCP or HTTP. +https://questdb.io/docs/reference/api/ilp/overview/) (ILP) over HTTP and TCP. * Implementation is in Rust, with no additional [run-time or link-time dependencies](doc/BUILD.md#pre-requisites-and-dependencies) @@ -24,7 +24,7 @@ streaming use cases). | Protocol | Record Insertion Reporting | Data Insertion Performance | | -------- | -------------------------- | -------------------------- | | **[ILP/HTTP](https://questdb.io/docs/reference/api/ilp/overview/)** | Transaction-level (on flush) | **Excellent** | -| [ILP/TCP](https://questdb.io/docs/reference/api/ilp/overview/)| Errors in logs; Disconnect on error | **Best** (tolerates higher latency networks) | +| [ILP/TCP](https://questdb.io/docs/reference/api/ilp/overview/)| Errors in logs; Disconnect on error | **Best** (tolerates higher-latency networks) | | [CSV Upload via HTTP](https://questdb.io/docs/reference/api/rest/#imp---import-data) | Configurable | Very Good | | [PostgreSQL](https://questdb.io/docs/reference/api/postgres/) | Transaction-level | Good | @@ -33,7 +33,7 @@ See the [flush troubleshooting](doc/CONSIDERATIONS.md) docs for more details on how to debug ILP/TCP. For an overview and code examples, see the -[ILP page of the developer docs](https://questdb.io/docs/develop/insert-data/#influxdb-line-protocol). +[InfluxDB Line Protocol page of the developer docs](https://questdb.io/docs/develop/insert-data/#influxdb-line-protocol). To understand the protocol in more depth, consult the [protocol reference docs](https://questdb.io/docs/reference/api/ilp/overview/). diff --git a/include/questdb/ingress/line_sender.h b/include/questdb/ingress/line_sender.h index a51b06d9..16449d88 100644 --- a/include/questdb/ingress/line_sender.h +++ b/include/questdb/ingress/line_sender.h @@ -96,7 +96,7 @@ typedef enum line_sender_protocol line_sender_protocol_https, } line_sender_protocol; -/* Possible sources of the root certificates used to validate the server's TLS certificate. */ +/** Possible sources of the root certificates used to validate the server's TLS certificate. */ typedef enum line_sender_ca { /** Use the set of root certificates provided by the `webpki` crate. */ line_sender_ca_webpki_roots, @@ -104,10 +104,10 @@ typedef enum line_sender_ca { /** Use the set of root certificates provided by the operating system. */ line_sender_ca_os_roots, - /** Use the set of root certificates provided by both the `webpki` crate and the operating system. */ + /** Combine the set of root certificates provided by the `webpki` crate and the operating system. */ line_sender_ca_webpki_and_os_roots, - /** Use a custom root certificate `.pem` file. */ + /** Use the root certificates provided in a PEM-encoded file. */ line_sender_ca_pem_file, } line_sender_ca; @@ -137,7 +137,7 @@ void line_sender_error_free(line_sender_error*); typedef struct line_sender_utf8 { // Don't initialize fields directly. - // Call `line_sender_utf8_init` instead. + // Call `line_sender_utf8_init()` instead. size_t len; const char* buf; } line_sender_utf8; @@ -178,7 +178,7 @@ line_sender_utf8 line_sender_utf8_assert(size_t len, const char* buf); typedef struct line_sender_table_name { // Don't initialize fields directly. - // Call `line_sender_table_name_init` instead. + // Call `line_sender_table_name_init()` instead. size_t len; const char* buf; } line_sender_table_name; @@ -223,7 +223,7 @@ line_sender_table_name line_sender_table_name_assert( typedef struct line_sender_column_name { // Don't initialize fields directly. - // Call `line_sender_column_name_init` instead. + // Call `line_sender_column_name_init()` instead. size_t len; const char* buf; } line_sender_column_name; @@ -265,16 +265,25 @@ line_sender_column_name line_sender_column_name_assert( /////////// Constructing ILP messages. /** - * Prepare rows for sending via the line sender's `flush` function. - * Buffer objects are re-usable and cleared automatically when flushing. + * Accumulates a batch of rows to be sent via `line_sender_flush()` or its + * variants. A buffer object can be reused after flushing and clearing. */ typedef struct line_sender_buffer line_sender_buffer; -/** Create a buffer for serializing ILP messages. */ +/** + * Construct a `line_sender_buffer` with a `max_name_len` of `127`, which is the + * same as the QuestDB server default. + */ LINESENDER_API line_sender_buffer* line_sender_buffer_new(); -/** Create a buffer for serializing ILP messages. */ +/** + * Construct a `line_sender_buffer` with a custom maximum length for table and + * column names. This should match the `cairo.max.file.name.length` setting of + * the QuestDB server you're connecting to. + * If the server does not configure it, the default is `127`, and you can + * call `line_sender_buffer_new()` instead. + */ LINESENDER_API line_sender_buffer* line_sender_buffer_with_max_name_len(size_t max_name_len); @@ -304,9 +313,9 @@ size_t line_sender_buffer_capacity(const line_sender_buffer* buffer); /** * Mark a rewind point. * This allows undoing accumulated changes to the buffer for one or more - * rows by calling `rewind_to_marker`. + * rows by calling `rewind_to_marker()`. * Any previous marker will be discarded. - * Once the marker is no longer needed, call `clear_marker`. + * Once the marker is no longer needed, call `clear_marker()`. */ LINESENDER_API bool line_sender_buffer_set_marker( @@ -314,7 +323,7 @@ bool line_sender_buffer_set_marker( line_sender_error** err_out); /** - * Undo all changes since the last `set_marker` call. + * Undo all changes since the last `set_marker()` call. * As a side-effect, this also clears the marker. */ LINESENDER_API @@ -334,7 +343,7 @@ void line_sender_buffer_clear_marker( LINESENDER_API void line_sender_buffer_clear(line_sender_buffer* buffer); -/** Number of bytes in the accumulated buffer. */ +/** The number of bytes accumulated in the buffer. */ LINESENDER_API size_t line_sender_buffer_size(const line_sender_buffer* buffer); @@ -343,18 +352,21 @@ LINESENDER_API size_t line_sender_buffer_row_count(const line_sender_buffer* buffer); /** - * The buffer is transactional if sent over HTTP. - * A buffer stops being transactional if it contains rows for multiple tables. + * Tell whether the buffer is transactional. It is transactional iff it contains + * data for at most one table. Additionally, you must send the buffer over HTTP to + * get transactional behavior. */ LINESENDER_API bool line_sender_buffer_transactional(const line_sender_buffer* buffer); /** - * Peek into the accumulated buffer that is to be sent out at the next `flush`. + * Get a string representation of the contents of the buffer. * - * @param[in] buffer Line buffer object. - * @param[out] len_out The length in bytes of the accumulated buffer. - * @return UTF-8 encoded buffer. The buffer is not nul-terminated. + * @param[in] buffer Line sender buffer object. + * @param[out] len_out The length in bytes of the returned string buffer. + * @return UTF-8 encoded buffer with the string representation of the line + * sender buffer's contents. The buffer is not nul-terminated, and the + * length is in the `len_out` parameter. */ LINESENDER_API const char* line_sender_buffer_peek( @@ -362,7 +374,7 @@ const char* line_sender_buffer_peek( size_t* len_out); /** - * Start batching the next row of input for the named table. + * Start recording a new row for the given table. * @param[in] buffer Line buffer object. * @param[in] name Table name. */ @@ -373,9 +385,8 @@ bool line_sender_buffer_table( line_sender_error** err_out); /** - * Append a value for a SYMBOL column. - * Symbol columns must always be written before other columns for any given - * row. + * Record a symbol value for the given column. + * Make sure you record all the symbol columns before any other column type. * @param[in] buffer Line buffer object. * @param[in] name Column name. * @param[in] value Column value. @@ -390,7 +401,7 @@ bool line_sender_buffer_symbol( line_sender_error** err_out); /** - * Append a value for a BOOLEAN column. + * Record a boolean value for the given column. * @param[in] buffer Line buffer object. * @param[in] name Column name. * @param[in] value Column value. @@ -405,7 +416,7 @@ bool line_sender_buffer_column_bool( line_sender_error** err_out); /** - * Append a value for a LONG column. + * Record an integer value for the given column. * @param[in] buffer Line buffer object. * @param[in] name Column name. * @param[in] value Column value. @@ -420,7 +431,7 @@ bool line_sender_buffer_column_i64( line_sender_error** err_out); /** - * Append a value for a DOUBLE column. + * Record a floating-point value for the given column. * @param[in] buffer Line buffer object. * @param[in] name Column name. * @param[in] value Column value. @@ -435,7 +446,7 @@ bool line_sender_buffer_column_f64( line_sender_error** err_out); /** - * Append a value for a STRING column. + * Record a string value for the given column. * @param[in] buffer Line buffer object. * @param[in] name Column name. * @param[in] value Column value. @@ -450,10 +461,10 @@ bool line_sender_buffer_column_str( line_sender_error** err_out); /** - * Append a value for a TIMESTAMP column from nanoseconds. + * Record a nanosecond timestamp value for the given column. * @param[in] buffer Line buffer object. * @param[in] name Column name. - * @param[in] nanos The timestamp in nanoseconds since the unix epoch. + * @param[in] nanos The timestamp in nanoseconds since the Unix epoch. * @param[out] err_out Set on error. * @return true on success, false on error. */ @@ -465,10 +476,10 @@ bool line_sender_buffer_column_ts_nanos( line_sender_error** err_out); /** - * Append a value for a TIMESTAMP column from microseconds. + * Record a microsecond timestamp value for the given column. * @param[in] buffer Line buffer object. * @param[in] name Column name. - * @param[in] micros The timestamp in microseconds since the unix epoch. + * @param[in] micros The timestamp in microseconds since the Unix epoch. * @param[out] err_out Set on error. * @return true on success, false on error. */ @@ -480,17 +491,17 @@ bool line_sender_buffer_column_ts_micros( line_sender_error** err_out); /** - * Complete the row with a timestamp specified as nanoseconds. + * Complete the current row with the designated timestamp in nanoseconds. * - * After this call, you can start batching the next row by calling - * `table` again, or you can send the accumulated batch by - * calling `flush`. + * After this call, you can start recording the next row by calling + * `line_sender_buffer_table()` again, or you can send the accumulated batch + * by calling `line_sender_flush()` or one of its variants. * - * If you want to pass the current system timestamp, - * see `line_sender_now_nanos`. + * If you want to pass the current system timestamp, see + * `line_sender_now_nanos()`. * * @param[in] buffer Line buffer object. - * @param[in] epoch_nanos Number of nanoseconds since 1st Jan 1970 UTC. + * @param[in] epoch_nanos Number of nanoseconds since the Unix epoch. * @param[out] err_out Set on error. * @return true on success, false on error. */ @@ -501,17 +512,17 @@ bool line_sender_buffer_at_nanos( line_sender_error** err_out); /** - * Complete the row with a timestamp specified as microseconds. + * Complete the current row with the designated timestamp in microseconds. * - * After this call, you can start batching the next row by calling - * `table` again, or you can send the accumulated batch by - * calling `flush`. + * After this call, you can start recording the next row by calling + * `line_sender_buffer_table()` again, or you can send the accumulated batch + * by calling `line_sender_flush()` or one of its variants. * - * If you want to pass the current system timestamp, - * see `line_sender_now_micros`. + * If you want to pass the current system timestamp, see + * `line_sender_now_micros()`. * * @param[in] buffer Line buffer object. - * @param[in] epoch_micros Number of microseconds since 1st Jan 1970 UTC. + * @param[in] epoch_micros Number of microseconds since the Unix epoch. * @param[out] err_out Set on error. * @return true on success, false on error. */ @@ -522,21 +533,24 @@ bool line_sender_buffer_at_micros( line_sender_error** err_out); /** - * Complete the row without providing a timestamp. - * The QuestDB instance will insert its own timestamp. + * Complete the current row without providing a timestamp. The QuestDB instance + * will insert its own timestamp. + * + * Letting the server assign the timestamp can be faster since it a reliable way + * to avoid out-of-order operations in the database for maximum ingestion + * throughput. However, it removes the ability to deduplicate rows. * - * This is NOT equivalent to calling `line_sender_buffer_at_nanos` or - * `line_sender_buffer_at_micros` with the current time. - * There's a trade-off: Letting the server assign the timestamp can be faster - * since it a reliable way to avoid out-of-order operations in the database - * for maximum ingestion throughput. - * On the other hand, it removes the ability to deduplicate rows. + * This is NOT equivalent to calling `line_sender_buffer_at_nanos()` or + * `line_sender_buffer_at_micros()` with the current time: the QuestDB server + * will set the timestamp only after receiving the row. If you're flushing + * infrequently, the server-assigned timestamp may be significantly behind the + * time the data was recorded in the buffer. * - * In almost all cases, you should prefer the `line_sender_at_*` functions. + * In almost all cases, you should prefer the `line_sender_buffer_at_*()` functions. * - * After this call, you can start batching the next row by calling - * `table` again, or you can send the accumulated batch by - * calling `flush`. + * After this call, you can start recording the next row by calling `table()` + * again, or you can send the accumulated batch by calling `flush()` or one of + * its variants. * * @param[in] buffer Line buffer object. * @param[out] err_out Set on error. @@ -550,23 +564,33 @@ bool line_sender_buffer_at_now( /////////// Connecting, sending and disconnecting. /** - * Insert data into QuestDB via the InfluxDB Line Protocol. + * Inserts data into QuestDB via the InfluxDB Line Protocol. * - * Batch up rows in `buffer` objects, then call `flush` to send them. + * Batch up rows in a `line_sender_buffer`, then call `line_sender_flush()` or + * one of its variants with this object to send them. */ typedef struct line_sender line_sender; /** - * Accumulates parameters for creating a line sender connection. + * Accumulates parameters for a new `line_sender` object. */ typedef struct line_sender_opts line_sender_opts; /** - * Create a new `line_sender_opts` instance from configuration string. + * Create a new `line_sender_opts` instance from the given configuration string. * The format of the string is: "tcp::addr=host:port;key=value;...;" - * Alongside "tcp" you can also specify "tcps", "http", and "https". - * The accepted set of keys and values is the same as for the opt's API. - * E.g. "https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;" + * Instead of "tcp" you can also specify "tcps", "http", and "https". + * + * The accepted keys match one-for-one with the functions on `line_sender_opts`. + * For example, this is a valid configuration string: + * + * "https::addr=host:port;username=alice;password=secret;" + * + * and there are matching functions `line_sender_opts_username()` and + * `line_sender_opts_password()`. The value for `addr=` is supplied directly to + * `line_sender_opts_new`, so there's no function with a matching name. + * + * For the full list of keys, search this header for `bool line_sender_opts_`. */ LINESENDER_API line_sender_opts* line_sender_opts_from_conf( @@ -574,15 +598,16 @@ line_sender_opts* line_sender_opts_from_conf( line_sender_error** err_out); /** - * Create a new `line_sender_opts` instance from configuration string - * read from the `QDB_CLIENT_CONF` environment variable. + * Create a new `line_sender_opts` instance from the configuration stored in the + * `QDB_CLIENT_CONF` environment variable. */ LINESENDER_API line_sender_opts* line_sender_opts_from_env( line_sender_error** err_out); /** - * A new set of options for a line sender connection. + * Create a new `line_sender_opts` instance with the given protocol, hostname and + * port. * @param[in] protocol The protocol to use. * @param[in] host The QuestDB database host. * @param[in] port The QuestDB ILP TCP port. @@ -594,7 +619,8 @@ line_sender_opts* line_sender_opts_new( uint16_t port); /** - * Variant of line_sender_opts_new that takes a service name for port. + * Create a new `line_sender_opts` instance with the given protocol, hostname and + * service name. */ LINESENDER_API line_sender_opts* line_sender_opts_new_service( @@ -618,11 +644,11 @@ bool line_sender_opts_bind_interface( /** * Set the username for authentication. * - * For TCP this is the `kid` part of the ECDSA key set. + * For TCP, this is the `kid` part of the ECDSA key set. * The other fields are `token` `token_x` and `token_y`. * - * For HTTP this is part of basic authentication. - * Also see `password`. + * For HTTP, this is part of basic authentication. + * See also: `line_sender_opts_password()`. */ LINESENDER_API bool line_sender_opts_username( @@ -632,7 +658,7 @@ bool line_sender_opts_username( /** * Set the password for basic HTTP authentication. - * Also see `username`. + * See also: `line_sender_opts_username()`. */ LINESENDER_API bool line_sender_opts_password( @@ -641,8 +667,8 @@ bool line_sender_opts_password( line_sender_error** err_out); /** - * Token (Bearer) Authentication Parameters for ILP over HTTP, - * or the ECDSA private key for ILP over TCP authentication. + * Set the Token (Bearer) Authentication parameter for HTTP, + * or the ECDSA private key for TCP authentication. */ LINESENDER_API bool line_sender_opts_token( @@ -651,7 +677,7 @@ bool line_sender_opts_token( line_sender_error** err_out); /** - * The ECDSA public key X for ILP over TCP authentication. + * Set the ECDSA public key X for TCP authentication. */ LINESENDER_API bool line_sender_opts_token_x( @@ -660,7 +686,7 @@ bool line_sender_opts_token_x( line_sender_error** err_out); /** - * The ECDSA public key Y for ILP over TCP authentication. + * Set the ECDSA public key Y for TCP authentication. */ LINESENDER_API bool line_sender_opts_token_y( @@ -671,7 +697,7 @@ bool line_sender_opts_token_y( /** * Configure how long to wait for messages from the QuestDB server during * the TLS handshake and authentication process. - * The default is 15 seconds. + * The value is in milliseconds, and the default is 15 seconds. */ LINESENDER_API bool line_sender_opts_auth_timeout( @@ -683,7 +709,7 @@ bool line_sender_opts_auth_timeout( * Set to `false` to disable TLS certificate verification. * This should only be used for debugging purposes as it reduces security. * - * For testing consider specifying a path to a `.pem` file instead via + * For testing, consider specifying a path to a `.pem` file instead via * the `tls_roots` setting. */ LINESENDER_API @@ -716,7 +742,7 @@ bool line_sender_opts_tls_roots( line_sender_error** err_out); /** - * The maximum buffer size that the client will flush to the server. + * Set the maximum buffer size in bytes that the client will flush to the server. * The default is 100 MiB. */ LINESENDER_API @@ -726,8 +752,8 @@ bool line_sender_opts_max_buf_size( line_sender_error** err_out); /** - * Cumulative duration spent in retries. - * The default is 10 seconds. + * Set the cumulative duration spent in retries. + * The value is in milliseconds, and the default is 10 seconds. */ LINESENDER_API bool line_sender_opts_retry_timeout( @@ -736,10 +762,14 @@ bool line_sender_opts_retry_timeout( line_sender_error** err_out); /** - * Minimum expected throughput in bytes per second for HTTP requests. - * If the throughput is lower than this value, the connection will time out. - * The default is 100 KiB/s. - * The value is expressed as a number of bytes per second. + * Set the minimum acceptable throughput while sending a buffer to the server. + * The sender will divide the payload size by this number to determine for how + * long to keep sending the payload before timing out. + * The value is in bytes per second, and the default is 100 KiB/s. + * The timeout calculated from minimum throughput is adedd to the value of + * `request_timeout`. + * + * See also: `line_sender_opts_request_timeout()` */ LINESENDER_API bool line_sender_opts_request_min_throughput( @@ -748,8 +778,11 @@ bool line_sender_opts_request_min_throughput( line_sender_error** err_out); /** - * Grace request timeout before relying on the minimum throughput logic. - * The default is 10 seconds. + * Set the additional time to wait on top of that calculated from the minimum + * throughput. This accounts for the fixed latency of the HTTP request-response + * roundtrip. The value is in milliseconds, and the default is 10 seconds. + * + * See also: `line_sender_opts_request_min_throughput()` */ LINESENDER_API bool line_sender_opts_request_timeout( @@ -776,8 +809,14 @@ LINESENDER_API void line_sender_opts_free(line_sender_opts* opts); /** - * Create the client. - * The client should be accessed by only a single thread a time. + * Create a new line sender instance from the given options object. + * + * In the case of TCP, this synchronously establishes the TCP connection, and + * returns once the connection is fully established. If the connection + * requires authentication or TLS, these will also be completed before + * returning. + * + * The sender should be accessed by only a single thread a time. * @param[in] opts Options for the connection. * @note The opts object is freed. */ @@ -787,13 +826,27 @@ line_sender* line_sender_build( line_sender_error** err_out); /** - * Create a new `line_sender` instance from configuration string. + * Create a new line sender instance from the given configuration string. * The format of the string is: "tcp::addr=host:port;key=value;...;" - * Alongside "tcp" you can also specify "tcps", "http", and "https". - * The accepted set of keys and values is the same as for the opt's API. - * E.g. "https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;" + * Instead of "tcp" you can also specify "tcps", "http", and "https". + * + * The accepted keys match one-for-one with the functions on `line_sender_opts`. + * For example, this is a valid configuration string: + * + * "https::addr=host:port;username=alice;password=secret;" + * + * and there are matching functions `line_sender_opts_username()` and + * `line_sender_opts_password()`. The value for `addr=` is supplied directly to + * `line_sender_opts_new`, so there's no function with a matching name. * * For the full list of keys, search this header for `bool line_sender_opts_`. + * + * In the case of TCP, this synchronously establishes the TCP connection, and + * returns once the connection is fully established. If the connection + * requires authentication or TLS, these will also be completed before + * returning. + * + * The sender should be accessed by only a single thread a time. */ LINESENDER_API line_sender* line_sender_from_conf( @@ -801,15 +854,24 @@ line_sender* line_sender_from_conf( line_sender_error** err_out); /** - * Create a new `line_sender` instance from configuration string read from the + * Create a new `line_sender` instance from the configuration stored in the * `QDB_CLIENT_CONF` environment variable. + * + * In the case of TCP, this synchronously establishes the TCP connection, and + * returns once the connection is fully established. If the connection + * requires authentication or TLS, these will also be completed before + * returning. + * + * The sender should be accessed by only a single thread a time. */ LINESENDER_API line_sender* line_sender_from_env( line_sender_error** err_out); /** - * Check if an error occurred previously and the sender must be closed. + * Tell whether the sender is no longer usable and must be closed. + * This happens when there was an earlier failure. + * This fuction is specific to TCP and is not relevant for HTTP. * @param[in] sender Line sender object. * @return true if an error occurred with a sender and it must be closed. */ @@ -824,10 +886,28 @@ LINESENDER_API void line_sender_close(line_sender* sender); /** - * Send buffer of rows to the QuestDB server. + * Send the given buffer of rows to the QuestDB server, clearing the buffer. * - * The buffer will be automatically cleared, ready for re-use. - * If instead you want to preserve the buffer contents, call `flush_and_keep`. + * After this function returns, the buffer is empty and ready for the next batch. + * If you want to preserve the buffer contents, call `line_sender_flush_and_keep`. + * If you want to ensure the flush is transactional, call + * `line_sender_flush_and_keep_with_flags`. + * + * With ILP-over-HTTP, this function sends an HTTP request and waits for the + * response. If the server responds with an error, it returns a descriptive error. + * In the case of a network error, it retries until it has exhausted the retry time + * budget. + * + * With ILP-over-TCP, the function blocks only until the buffer is flushed to the + * underlying OS-level network socket, without waiting to actually send it to the + * server. In the case of an error, the server will quietly disconnect: consult the + * server logs for error messages. + * + * HTTP should be the first choice, but use TCP if you need to continuously send + * data to the server at a high rate. + * + * To improve the HTTP performance, send larger buffers (with more rows), and + * consider parallelizing writes using multiple senders from multiple threads. * * @param[in] sender Line sender object. * @param[in] buffer Line buffer object. @@ -840,10 +920,12 @@ bool line_sender_flush( line_sender_error** err_out); /** - * Send buffer of rows to the QuestDB server. + * Send the given buffer of rows to the QuestDB server. * - * The buffer will left untouched and must be cleared before re-use. - * To send and clear in one single step, `flush` instead. + * All the data stays in the buffer. Clear the buffer before starting a new batch. + * + * To send and clear in one step, call `line_sender_flush` instead. Also, see the docs + * on that function for more important details on flushing. * @param[in] sender Line sender object. * @param[in] buffer Line buffer object. * @return true on success, false on error. @@ -855,15 +937,22 @@ bool line_sender_flush_and_keep( line_sender_error** err_out); /** - * Variant of `.flush()` that does not clear the buffer and allows for - * transactional flushes. + * Send the batch of rows in the buffer to the QuestDB server, and, if the parameter + * `transactional` is true, ensure the flush will be transactional. + * + * A flush is transactional iff all the rows belong to the same table. This allows + * QuestDB to treat the flush as a single database transaction, because it doesn't + * support transactions spanning multiple tables. Additionally, only ILP-over-HTTP + * supports transactional flushes. + * + * If the flush wouldn't be transactional, this function returns an error and + * doesn't flush any data. * - * A transactional flush is simply a flush that ensures that all rows in - * the ILP buffer refer to the same table, thus allowing the server to - * treat the flush request as a single transaction. + * The function sends an HTTP request and waits for the response. If the server + * responds with an error, it returns a descriptive error. In the case of a network + * error, it retries until it has exhausted the retry time budget. * - * This is because QuestDB does not support transactions spanning multiple - * tables. + * All the data stays in the buffer. Clear the buffer before starting a new batch. */ LINESENDER_API bool line_sender_flush_and_keep_with_flags( @@ -874,11 +963,11 @@ bool line_sender_flush_and_keep_with_flags( /////////// Getting the current timestamp. -/** Get the current time in nanoseconds since the unix epoch (UTC). */ +/** Get the current time in nanoseconds since the Unix epoch (UTC). */ LINESENDER_API int64_t line_sender_now_nanos(); -/** Get the current time in microseconds since the unix epoch (UTC). */ +/** Get the current time in microseconds since the Unix epoch (UTC). */ LINESENDER_API int64_t line_sender_now_micros(); diff --git a/include/questdb/ingress/line_sender.hpp b/include/questdb/ingress/line_sender.hpp index 79115da8..dacde03f 100644 --- a/include/questdb/ingress/line_sender.hpp +++ b/include/questdb/ingress/line_sender.hpp @@ -103,17 +103,17 @@ namespace questdb::ingress /** Use the set of root certificates provided by the operating system. */ os_roots, - /** Use the set of root certificates provided by both the `webpki` crate and the operating system. */ + /** Combine the set of root certificates provided by the `webpki` crate and the operating system. */ webpki_and_os_roots, - /** Use a custom root certificate `.pem` file. */ + /** Use the root certificates provided in a PEM-encoded file. */ pem_file, }; /** * An error that occurred when using the line sender. * - * Call `.what()` to obtain ASCII encoded error message. + * Call `.what()` to obtain the ASCII-encoded error message. */ class line_sender_error : public std::runtime_error { @@ -392,7 +392,7 @@ namespace questdb::ingress return 0; } - /** Number of bytes in the accumulated buffer. */ + /** The number of bytes accumulated in the buffer. */ size_t size() const noexcept { if (_impl) @@ -411,8 +411,9 @@ namespace questdb::ingress } /** - * The buffer is transactional if sent over HTTP. - * A buffer stops being transactional if it contains rows for multiple tables. + * Tell whether the buffer is transactional. It is transactional iff it contains + * data for at most one table. Additionally, you must send the buffer over HTTP to + * get transactional behavior. */ bool transactional() const noexcept { @@ -423,8 +424,7 @@ namespace questdb::ingress } /** - * Get a view of the accumulated buffer. - * This is useful for debugging. + * Get a string representation of the contents of the buffer. */ std::string_view peek() const noexcept { @@ -483,7 +483,7 @@ namespace questdb::ingress } /** - * Start batching the next row of input for the named table. + * Start recording a new row for the given table. * @param name Table name. */ line_sender_buffer& table(table_name_view name) @@ -497,9 +497,8 @@ namespace questdb::ingress } /** - * Append a value for a SYMBOL column. - * Symbol columns must always be written before other columns for any - * given row. + * Record a symbol value for the given column. + * Make sure you record all the symbol columns before any other column type. * @param name Column name. * @param value Column value. */ @@ -520,7 +519,7 @@ namespace questdb::ingress line_sender_buffer& column(column_name_view name, T value) = delete; /** - * Append a value for a BOOLEAN column. + * Record a boolean value for the given column. * @param name Column name. * @param value Column value. */ @@ -536,7 +535,7 @@ namespace questdb::ingress } /** - * Append a value for a LONG column. + * Record an integer value for the given column. * @param name Column name. * @param value Column value. */ @@ -552,7 +551,7 @@ namespace questdb::ingress } /** - * Append a value for a DOUBLE column. + * Record a floating-point value for the given column. * @param name Column name. * @param value Column value. */ @@ -568,7 +567,7 @@ namespace questdb::ingress } /** - * Append a value for a STRING column. + * Record a string value for the given column. * @param name Column name. * @param value Column value. */ @@ -605,6 +604,7 @@ namespace questdb::ingress return column(name, utf8_view{value}); } + /** Record a nanosecond timestamp value for the given column. */ template line_sender_buffer& column( column_name_view name, @@ -614,6 +614,8 @@ namespace questdb::ingress return column(name, nanos); } + /** Record a timestamp value for the given column, specified as a + * `DurationT`. */ template line_sender_buffer& column( column_name_view name, @@ -650,13 +652,15 @@ namespace questdb::ingress } /** - * Complete the row with a timestamp specified as nanoseconds. + * Complete the current row with the designated timestamp in nanoseconds. * - * After this call, you can start batching the next row by calling - * `.table(..)` again, or you can send the accumulated batch by - * calling `.flush(..)`. + * After this call, you can start recording the next row by calling + * `table()` again, or you can send the accumulated batch by calling + * `flush()` or one of its variants. * - * @param timestamp Number of nanoseconds since 1st Jan 1970 UTC. + * If you want to pass the current system timestamp, call `at_now()`. + * + * @param timestamp Number of nanoseconds since the Unix epoch. */ void at(timestamp_nanos timestamp) { @@ -668,13 +672,13 @@ namespace questdb::ingress } /** - * Complete the row with a timestamp specified as microseconds. + * Complete the current row with the designated timestamp in microseconds. * - * After this call, you can start batching the next row by calling - * `.table(..)` again, or you can send the accumulated batch by - * calling `.flush(..)`. + * After this call, you can start recording the next row by calling + * `table()` again, or you can send the accumulated batch by calling + * `flush()` or one of its variants. * - * @param timestamp Number of microseconds since 1st Jan 1970 UTC. + * @param timestamp Number of microseconds since the Unix epoch. */ void at(timestamp_micros timestamp) { @@ -686,17 +690,24 @@ namespace questdb::ingress } /** - * Complete the row without providing a timestamp. - * The QuestDB instance will insert its own timestamp. + * Complete the current row without providing a timestamp. The QuestDB instance + * will insert its own timestamp. + * + * Letting the server assign the timestamp can be faster since it a reliable way + * to avoid out-of-order operations in the database for maximum ingestion + * throughput. However, it removes the ability to deduplicate rows. * - * This is NOT equivalent to calling `at()` with the - * current system time. - * There's a trade-off: Letting the server assign the timestamp - * can be faster since it a reliable way to avoid out-of-order - * operations in the database for maximum ingestion throughput. - * On the other hand, it removes the ability to deduplicate rows. + * This is NOT equivalent to calling `line_sender_buffer_at_nanos()` or + * `line_sender_buffer_at_micros()` with the current time: the QuestDB server + * will set the timestamp only after receiving the row. If you're flushing + * infrequently, the server-assigned timestamp may be significantly behind the + * time the data was recorded in the buffer. * - * In almost all cases, you should prefer the `at()` method. + * In almost all cases, you should prefer the `at()`/`at_now()` methods. + * + * After this call, you can start recording the next row by calling `table()` + * again, or you can send the accumulated batch by calling `flush()` or one of + * its variants. */ void at_now() { @@ -748,11 +759,18 @@ namespace questdb::ingress { public: /** - * Create a new `opts` instance from configuration string. - * The format of the string is: "tcp::addr=host:port;key=value;...;" - * Alongside "tcp" you can also specify "tcps", "http", and "https". - * The accepted set of keys and values is the same as for the opt's API. - * E.g. "https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;" + * Create a new `opts` instance from the given configuration string. + * The format of the string is: "tcp::addr=host:port;key=value;...;" + * Instead of "tcp" you can also specify "tcps", "http", and "https". + * + * The accepted keys match one-for-one with the methods on `opts`. + * For example, this is a valid configuration string: + * + * "https::addr=host:port;username=alice;password=secret;" + * + * and there are matching methods `opts.username()` and `opts.password()`. + * The value for `addr=` is supplied directly to `opts()`, so there's no + * function with a matching name. */ static inline opts from_conf(utf8_view conf) { @@ -762,7 +780,7 @@ namespace questdb::ingress } /** - * Create a new `opts` instance from configuration string read from the + * Create a new `opts` instance from the configuration stored in the * `QDB_CLIENT_CONF` environment variable. */ static inline opts from_env() @@ -777,7 +795,7 @@ namespace questdb::ingress } /** - * A new set of options for a line sender connection. + * Create a new `opts` instance with the given protocol, hostname and port. * @param[in] protocol The protocol to use. * @param[in] host The QuestDB database host. * @param[in] port The QuestDB tcp or http port. @@ -800,7 +818,7 @@ namespace questdb::ingress } /** - * A new set of options for a line sender connection. + * Create a new `opts` instance with the given protocol, hostname and service name. * @param[in] protocol The protocol to use. * @param[in] host The QuestDB database host. * @param[in] port The QuestDB tcp or http port as service name. @@ -869,13 +887,13 @@ namespace questdb::ingress } /** - * Set the username for basic HTTP authentication. + * Set the username for authentication. * * For TCP this is the `kid` part of the ECDSA key set. * The other fields are `token` `token_x` and `token_y`. * * For HTTP this is part of basic authentication. - * Also see `password`. + * See also: `password()`. */ opts& username(utf8_view username) { @@ -888,7 +906,7 @@ namespace questdb::ingress /** * Set the password for basic HTTP authentication. - * Also see `username`. + * See also: `username()`. */ opts& password(utf8_view password) { @@ -900,8 +918,8 @@ namespace questdb::ingress } /** - * Token (Bearer) Authentication Parameters for ILP over HTTP, - * or the ECDSA private key for ILP over TCP authentication. + * Set the Token (Bearer) Authentication parameter for HTTP, + * or the ECDSA private key for TCP authentication. */ opts& token(utf8_view token) { @@ -913,7 +931,7 @@ namespace questdb::ingress } /** - * The ECDSA public key X for ILP over TCP authentication. + * Set the ECDSA public key X for TCP authentication. */ opts& token_x(utf8_view token_x) { @@ -925,7 +943,7 @@ namespace questdb::ingress } /** - * The ECDSA public key Y for ILP over TCP authentication. + * Set the ECDSA public key Y for TCP authentication. */ opts& token_y(utf8_view token_y) { @@ -939,7 +957,7 @@ namespace questdb::ingress /** * Configure how long to wait for messages from the QuestDB server during * the TLS handshake and authentication process. - * The default is 15 seconds. + * The value is in milliseconds, and the default is 15 seconds. */ opts& auth_timeout(uint64_t millis) { @@ -954,7 +972,7 @@ namespace questdb::ingress * Set to `false` to disable TLS certificate verification. * This should only be used for debugging purposes as it reduces security. * - * For testing consider specifying a path to a `.pem` file instead via + * For testing, consider specifying a path to a `.pem` file instead via * the `tls_roots` setting. */ opts& tls_verify(bool verify) @@ -997,7 +1015,7 @@ namespace questdb::ingress } /** - * The maximum buffer size that the client will flush to the server. + * The maximum buffer size in bytes that the client will flush to the server. * The default is 100 MiB. */ opts& max_buf_size(size_t max_buf_size) @@ -1010,8 +1028,8 @@ namespace questdb::ingress } /** - * Cumulative duration spent in retries. - * The default is 10 seconds. + * Set the cumulative duration spent in retries. + * The value is in milliseconds, and the default is 10 seconds. */ opts& retry_timeout(uint64_t millis) { @@ -1023,10 +1041,14 @@ namespace questdb::ingress } /** - * Minimum expected throughput in bytes per second for HTTP requests. - * If the throughput is lower than this value, the connection will time out. - * The default is 100 KiB/s. - * The value is expressed as a number of bytes per second. + * Set the minimum acceptable throughput while sending a buffer to the server. + * The sender will divide the payload size by this number to determine for how + * long to keep sending the payload before timing out. + * The value is in bytes per second, and the default is 100 KiB/s. + * The timeout calculated from minimum throughput is adedd to the value of + * `request_timeout`. + * + * See also: `request_timeout()` */ opts& request_min_throughput(uint64_t bytes_per_sec) { @@ -1038,8 +1060,11 @@ namespace questdb::ingress } /** - * Grace request timeout before relying on the minimum throughput logic. - * The default is 10 seconds. + * Additional time to wait on top of that calculated from the minimum throughput. + * This accounts for the fixed latency of the HTTP request-response roundtrip. + * The value is in milliseconds, and the default is 10 seconds. + * + * See also: `request_min_throughput()` */ opts& request_timeout(uint64_t millis) { @@ -1073,22 +1098,37 @@ namespace questdb::ingress }; /** - * Insert data into QuestDB via the InfluxDB Line Protocol. + * Inserts data into QuestDB via the InfluxDB Line Protocol. * - * A `line_sender` object connects on construction. - * If you want to connect later, wrap it up in an std::optional. + * Batch up rows in a `line_sender_buffer` object, then call + * `.flush()` or one of its variants to send. * - * Batch up rows, then call `.flush()` to send. + * When you use ILP-over-TCP, the `line_sender` object connects on construction. + * If you want to connect later, wrap it in an std::optional. */ class line_sender { public: /** - * Create a new `line_sender` instance from configuration string. + * Create a new line sender instance from the given configuration string. * The format of the string is: "tcp::addr=host:port;key=value;...;" - * Alongside "tcp" you can also specify "tcps", "http", and "https". - * The accepted set of keys and values is the same as for the opt's API. - * E.g. "https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;" + * Instead of "tcp" you can also specify "tcps", "http", and "https". + * + * The accepted keys match one-for-one with the methods on `opts`. + * For example, this is a valid configuration string: + * + * "https::addr=host:port;username=alice;password=secret;" + * + * and there are matching methods `opts.username()` and `opts.password()`. The + * value for `addr=` is supplied directly to `opts()`, so there's no function + * with a matching name. + * + * In the case of TCP, this synchronously establishes the TCP connection, and + * returns once the connection is fully established. If the connection + * requires authentication or TLS, these will also be completed before + * returning. + * + * The sender should be accessed by only a single thread a time. */ static inline line_sender from_conf(utf8_view conf) { @@ -1096,8 +1136,15 @@ namespace questdb::ingress } /** - * Create a new `line_sender` instance from configuration string read from the + * Create a new `line_sender` instance from the configuration stored in the * `QDB_CLIENT_CONF` environment variable. + * + * In the case of TCP, this synchronously establishes the TCP connection, and + * returns once the connection is fully established. If the connection + * requires authentication or TLS, these will also be completed before + * returning. + * + * The sender should be accessed by only a single thread a time. */ static inline line_sender from_env() { @@ -1139,13 +1186,27 @@ namespace questdb::ingress } /** - * Send batched-up rows to the QuestDB server. + * Send the given buffer of rows to the QuestDB server, clearing the buffer. * - * This method automatically clears the buffer. - * See `flush_and_keep` if you want to retain the buffer contents. + * After this function returns, the buffer is empty and ready for the next batch. + * If you want to preserve the buffer contents, call `flush_and_keep()`. If you + * want to ensure the flush is transactional, call `flush_and_keep_with_flags()`. * - * After sending a batch, you can close the connection or begin - * preparing a new batch by calling `.table(..)` on the buffer again. + * With ILP-over-HTTP, this function sends an HTTP request and waits for the + * response. If the server responds with an error, it returns a descriptive error. + * In the case of a network error, it retries until it has exhausted the retry time + * budget. + * + * With ILP-over-TCP, the function blocks only until the buffer is flushed to the + * underlying OS-level network socket, without waiting to actually send it to the + * server. In the case of an error, the server will quietly disconnect: consult the + * server logs for error messages. + * + * HTTP should be the first choice, but use TCP if you need to continuously send + * data to the server at a high rate. + * + * To improve the HTTP performance, send larger buffers (with more rows), and + * consider parallelizing writes using multiple senders from multiple threads. */ void flush(line_sender_buffer& buffer) { @@ -1158,10 +1219,12 @@ namespace questdb::ingress } /** - * Send batched-up rows to the QuestDB server. + * Send the given buffer of rows to the QuestDB server. + * + * All the data stays in the buffer. Clear the buffer before starting a new batch. * - * This method does not affect the buffer and its contains - * will be retained. + * To send and clear in one step, call `flush()` instead. Also, see the docs + * on that method for more important details on flushing. */ void flush_and_keep(const line_sender_buffer& buffer) { @@ -1186,6 +1249,8 @@ namespace questdb::ingress /** * Check if an error occurred previously and the sender must be closed. + * This happens when there was an earlier failure. + * This method is specific to ILP-over-TCP and is not relevant for ILP-over-HTTP. * @return true if an error occurred with a sender and it must be closed. */ bool must_close() const noexcept diff --git a/questdb-rs-ffi/src/lib.rs b/questdb-rs-ffi/src/lib.rs index dd34a80d..a792a1f7 100644 --- a/questdb-rs-ffi/src/lib.rs +++ b/questdb-rs-ffi/src/lib.rs @@ -206,16 +206,16 @@ impl From for Protocol { #[repr(C)] #[derive(Debug, Copy, Clone)] pub enum line_sender_ca { - /// Use the set of root certificates provided by the `webpki` crate. + /// Use the root certificates provided by the `webpki` crate. line_sender_ca_webpki_roots, - /// Use the set of root certificates provided by the operating system. + /// Use the root certificates provided by the operating system. line_sender_ca_os_roots, - /// Use the set of root certificates provided by both the `webpki` crate and the operating system. + /// Combine the root certificates provided by the OS and the `webpki-roots` crate. line_sender_ca_webpki_and_os_roots, - /// Use a custom root certificate `.pem` file. + /// Use the root certificates provided in a PEM-encoded file. line_sender_ca_pem_file, } @@ -536,18 +536,23 @@ pub unsafe extern "C" fn line_sender_column_name_assert( } } -/// Prepare rows for sending via the line sender's `flush` function. -/// Buffer objects are re-usable and cleared automatically when flushing. +/// Accumulates a batch of rows to be sent via `line_sender_flush()` or its +/// variants. A buffer object can be reused after flushing and clearing. pub struct line_sender_buffer(Buffer); -/// Create a buffer for serializing ILP messages. +/// Construct a `line_sender_buffer` with a `max_name_len` of `127`, which is the +/// same as the QuestDB server default. #[no_mangle] pub unsafe extern "C" fn line_sender_buffer_new() -> *mut line_sender_buffer { let buffer = Buffer::new(); Box::into_raw(Box::new(line_sender_buffer(buffer))) } -/// Create a buffer for serializing ILP messages. +/// Construct a `line_sender_buffer` with a custom maximum length for table and +/// column names. This should match the `cairo.max.file.name.length` setting of +/// the QuestDB server you're connecting to. +/// If the server does not configure it, the default is `127`, and you can +/// call `line_sender_buffer_new()` instead. #[no_mangle] pub unsafe extern "C" fn line_sender_buffer_with_max_name_len( max_name_len: size_t, @@ -583,6 +588,7 @@ pub unsafe extern "C" fn line_sender_buffer_clone( /// Pre-allocate to ensure the buffer has enough capacity for at least the /// specified additional byte count. This may be rounded up. +/// This does not allocate if such additional capacity is already satisfied. /// See: `capacity`. #[no_mangle] pub unsafe extern "C" fn line_sender_buffer_reserve( @@ -641,7 +647,7 @@ pub unsafe extern "C" fn line_sender_buffer_clear(buffer: *mut line_sender_buffe buffer.clear(); } -/// Number of bytes in the accumulated buffer. +/// The number of bytes accumulated in the buffer. #[no_mangle] pub unsafe extern "C" fn line_sender_buffer_size(buffer: *const line_sender_buffer) -> size_t { let buffer = unwrap_buffer(buffer); @@ -655,8 +661,9 @@ pub unsafe extern "C" fn line_sender_buffer_row_count(buffer: *const line_sender buffer.row_count() } -/// The buffer is transactional if sent over HTTP. -/// A buffer stops being transactional if it contains rows for multiple tables. +/// Tell whether the buffer is transactional. It is transactional iff it contains +/// data for at most one table. Additionally, you must send the buffer over HTTP to +/// get transactional behavior. #[no_mangle] pub unsafe extern "C" fn line_sender_buffer_transactional( buffer: *const line_sender_buffer, @@ -665,7 +672,7 @@ pub unsafe extern "C" fn line_sender_buffer_transactional( buffer.transactional() } -/// Peek into the accumulated buffer that is to be sent out at the next `flush`. +/// Get a string representation of the contents of the buffer. /// /// @param[in] buffer Line buffer object. /// @param[out] len_out The length in bytes of the accumulated buffer. @@ -681,7 +688,7 @@ pub unsafe extern "C" fn line_sender_buffer_peek( buf.as_ptr() as *const c_char } -/// Start batching the next row of input for the named table. +/// Start recording a new row for the given table. /// @param[in] buffer Line buffer object. /// @param[in] name Table name. #[no_mangle] @@ -695,9 +702,8 @@ pub unsafe extern "C" fn line_sender_buffer_table( true } -/// Append a value for a SYMBOL column. -/// Symbol columns must always be written before other columns for any given -/// row. +/// Record a symbol value for the given column. +/// Make sure you record all the symbol columns before any other column type. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] value Column value. @@ -715,7 +721,7 @@ pub unsafe extern "C" fn line_sender_buffer_symbol( true } -/// Append a value for a BOOLEAN column. +/// Record a boolean value for the given column. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] value Column value. @@ -733,7 +739,7 @@ pub unsafe extern "C" fn line_sender_buffer_column_bool( true } -/// Append a value for a LONG column. +/// Record an integer value for the given column. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] value Column value. @@ -751,7 +757,7 @@ pub unsafe extern "C" fn line_sender_buffer_column_i64( true } -/// Append a value for a DOUBLE column. +/// Record a floating-point value for the given column. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] value Column value. @@ -769,7 +775,7 @@ pub unsafe extern "C" fn line_sender_buffer_column_f64( true } -/// Append a value for a STRING column. +/// Record a string value for the given column. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] value Column value. @@ -789,7 +795,7 @@ pub unsafe extern "C" fn line_sender_buffer_column_str( true } -/// Append a value for a TIMESTAMP column from nanoseconds. +/// Record a nanosecond timestamp value for the given column. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] nanos The timestamp in nanoseconds before or since the unix epoch. @@ -808,7 +814,7 @@ pub unsafe extern "C" fn line_sender_buffer_column_ts_nanos( true } -/// Append a value for a TIMESTAMP column from microseconds. +/// Record a microsecond timestamp value for the given column. /// @param[in] buffer Line buffer object. /// @param[in] name Column name. /// @param[in] micros The timestamp in microseconds before or since the unix epoch. @@ -827,11 +833,14 @@ pub unsafe extern "C" fn line_sender_buffer_column_ts_micros( true } -/// Complete the row with a timestamp specified as nanoseconds. +/// Complete the current row with the designated timestamp in nanoseconds. /// -/// After this call, you can start batching the next row by calling -/// `table` again, or you can send the accumulated batch by -/// calling `flush`. +/// After this call, you can start recording the next row by calling +/// `line_sender_buffer_table()` again, or you can send the accumulated batch +/// by calling `line_sender_flush()` or one of its variants. +/// +/// If you want to pass the current system timestamp, see +/// `line_sender_now_nanos()`. /// /// @param[in] buffer Line buffer object. /// @param[in] epoch_nanos Number of nanoseconds since 1st Jan 1970 UTC. @@ -849,11 +858,14 @@ pub unsafe extern "C" fn line_sender_buffer_at_nanos( true } -/// Complete the row with a timestamp specified as microseconds. +/// Complete the current row with the designated timestamp in microseconds. +/// +/// After this call, you can start recording the next row by calling +/// `line_sender_buffer_table()` again, or you can send the accumulated batch +/// by calling `line_sender_flush()` or one of its variants. /// -/// After this call, you can start batching the next row by calling -/// `table` again, or you can send the accumulated batch by -/// calling `flush`. +/// If you want to pass the current system timestamp, see +/// `line_sender_now_micros()`. /// /// @param[in] buffer Line buffer object. /// @param[in] epoch_micros Number of microseconds since 1st Jan 1970 UTC. @@ -871,12 +883,24 @@ pub unsafe extern "C" fn line_sender_buffer_at_micros( true } -/// Complete the row without providing a timestamp. -/// The QuestDB instance will insert its own timestamp. +/// Complete the current row without providing a timestamp. The QuestDB instance +/// will insert its own timestamp. +/// +/// Letting the server assign the timestamp can be faster since it a reliable way +/// to avoid out-of-order operations in the database for maximum ingestion +/// throughput. However, it removes the ability to deduplicate rows. +/// +/// This is NOT equivalent to calling `line_sender_buffer_at_nanos()` or +/// `line_sender_buffer_at_micros()` with the current time: the QuestDB server +/// will set the timestamp only after receiving the row. If you're flushing +/// infrequently, the server-assigned timestamp may be significantly behind the +/// time the data was recorded in the buffer. /// -/// After this call, you can start batching the next row by calling -/// `table` again, or you can send the accumulated batch by -/// calling `flush`. +/// In almost all cases, you should prefer the `line_sender_buffer_at_*()` functions. +/// +/// After this call, you can start recording the next row by calling `table()` +/// again, or you can send the accumulated batch by calling `flush()` or one of +/// its variants. /// /// @param[in] buffer Line buffer object. /// @param[out] err_out Set on error. @@ -891,14 +915,23 @@ pub unsafe extern "C" fn line_sender_buffer_at_now( true } -/// Accumulates parameters for creating a line sender connection. +/// Accumulates parameters for a new `line_sender` object. pub struct line_sender_opts(SenderBuilder); -/// Create a new `line_sender_opts` instance from configuration string. +/// Create a new `line_sender_opts` instance from the given configuration string. /// The format of the string is: "tcp::addr=host:port;key=value;...;" -/// Alongside "tcp" you can also specify "tcps", "http", and "https". -/// The accepted set of keys and values is the same as for the opt's API. -/// E.g. "https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;" +/// Instead of "tcp" you can also specify "tcps", "http", and "https". +/// +/// The accepted keys match one-for-one with the functions on `line_sender_opts`. +/// For example, this is a valid configuration string: +/// +/// "https::addr=host:port;username=alice;password=secret;" +/// +/// and there are matching functions `line_sender_opts_username()` and +/// `line_sender_opts_password()`. The value for `addr=` is supplied directly to +/// `line_sender_opts_new`, so there's no function with a matching name. +/// +/// For the full list of keys, search this module for `fn line_sender_opts_`. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_from_conf( config: line_sender_utf8, @@ -909,8 +942,8 @@ pub unsafe extern "C" fn line_sender_opts_from_conf( Box::into_raw(Box::new(line_sender_opts(builder))) } -/// Create a new `line_sender_opts` instance from configuration string read -/// from the `QDB_CLIENT_CONF` environment variable. +/// Create a new `line_sender_opts` instance from the configuration stored in the +/// `QDB_CLIENT_CONF` environment variable. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_from_env( err_out: *mut *mut line_sender_error, @@ -919,7 +952,8 @@ pub unsafe extern "C" fn line_sender_opts_from_env( Box::into_raw(Box::new(line_sender_opts(builder))) } -/// A new set of options for a line sender connection for ILP/TCP. +/// Create a new `line_sender_opts` instance with the given protocol, hostname and +/// port. /// @param[in] protocol The protocol to use. /// @param[in] host The QuestDB database host. /// @param[in] port The QuestDB ILP TCP port. @@ -936,7 +970,8 @@ pub unsafe extern "C" fn line_sender_opts_new( Box::into_raw(Box::new(line_sender_opts(builder))) } -/// Variant of line_sender_opts_new that takes a service name for port. +/// Create a new `line_sender_opts` instance with the given protocol, hostname and +/// service name. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_new_service( protocol: line_sender_protocol, @@ -966,11 +1001,11 @@ pub unsafe extern "C" fn line_sender_opts_bind_interface( /// Set the username for authentication. /// -/// For TCP this is the `kid` part of the ECDSA key set. +/// For TCP, this is the `kid` part of the ECDSA key set. /// The other fields are `token` `token_x` and `token_y`. /// -/// For HTTP this is part of basic authentication. -/// Also see `password`. +/// For HTTP, this is part of basic authentication. +/// See also: `line_sender_opts_password()`. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_username( opts: *mut line_sender_opts, @@ -981,7 +1016,7 @@ pub unsafe extern "C" fn line_sender_opts_username( } /// Set the password for basic HTTP authentication. -/// Also see `username`. +/// See also: `line_sender_opts_username()`. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_password( opts: *mut line_sender_opts, @@ -991,8 +1026,8 @@ pub unsafe extern "C" fn line_sender_opts_password( upd_opts!(opts, err_out, password, password.as_str()) } -/// Token (Bearer) Authentication Parameters for ILP over HTTP, -/// or the ECDSA private key for ILP over TCP authentication. +/// Set the Token (Bearer) Authentication parameter for HTTP, +/// or the ECDSA private key for TCP authentication. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_token( opts: *mut line_sender_opts, @@ -1002,7 +1037,7 @@ pub unsafe extern "C" fn line_sender_opts_token( upd_opts!(opts, err_out, token, token.as_str()) } -/// The ECDSA public key X for ILP over TCP authentication. +/// Set the ECDSA public key X for TCP authentication. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_token_x( opts: *mut line_sender_opts, @@ -1012,7 +1047,7 @@ pub unsafe extern "C" fn line_sender_opts_token_x( upd_opts!(opts, err_out, token_x, token_x.as_str()) } -/// The ECDSA public key Y for ILP over TCP authentication. +/// Set the ECDSA public key Y for TCP authentication. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_token_y( opts: *mut line_sender_opts, @@ -1024,7 +1059,7 @@ pub unsafe extern "C" fn line_sender_opts_token_y( /// Configure how long to wait for messages from the QuestDB server during /// the TLS handshake and authentication process. -/// The default is 15000 milliseconds. +/// The value is in milliseconds, and the default is 15 seconds. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_auth_timeout( opts: *mut line_sender_opts, @@ -1064,7 +1099,8 @@ pub unsafe extern "C" fn line_sender_opts_tls_ca( /// Set the path to a custom root certificate `.pem` file. /// This is used to validate the server's certificate during the TLS handshake. /// -/// See notes on how to test with [self-signed certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). +/// See notes on how to test with [self-signed +/// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). #[no_mangle] pub unsafe extern "C" fn line_sender_opts_tls_roots( opts: *mut line_sender_opts, @@ -1075,7 +1111,7 @@ pub unsafe extern "C" fn line_sender_opts_tls_roots( upd_opts!(opts, err_out, tls_roots, path) } -/// The maximum buffer size that the client will flush to the server. +/// Set the maximum buffer size in bytes that the client will flush to the server. /// The default is 100 MiB. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_max_buf_size( @@ -1086,8 +1122,8 @@ pub unsafe extern "C" fn line_sender_opts_max_buf_size( upd_opts!(opts, err_out, max_buf_size, max_buf_size) } -/// Cumulative duration spent in retries. -/// The default is 10 seconds. +/// Set the cumulative duration spent in retries. +/// The value is in milliseconds, and the default is 10 seconds. #[no_mangle] pub unsafe extern "C" fn line_sender_opts_retry_timeout( opts: *mut line_sender_opts, @@ -1098,10 +1134,14 @@ pub unsafe extern "C" fn line_sender_opts_retry_timeout( upd_opts!(opts, err_out, retry_timeout, retry_timeout) } -/// Minimum expected throughput in bytes per second for HTTP requests. -/// If the throughput is lower than this value, the connection will time out. -/// The default is 100 KiB/s. -/// The value is expressed as a number of bytes per second. +/// Set the minimum acceptable throughput while sending a buffer to the server. +/// The sender will divide the payload size by this number to determine for how +/// long to keep sending the payload before timing out. +/// The value is in bytes per second, and the default is 100 KiB/s. +/// The timeout calculated from minimum throughput is adedd to the value of +/// `request_timeout`. +/// +/// See also: `line_sender_opts_request_timeout()` #[no_mangle] pub unsafe extern "C" fn line_sender_opts_request_min_throughput( opts: *mut line_sender_opts, @@ -1111,8 +1151,11 @@ pub unsafe extern "C" fn line_sender_opts_request_min_throughput( upd_opts!(opts, err_out, request_min_throughput, bytes_per_sec) } -/// Grace request timeout before relying on the minimum throughput logic. -/// The default is 5 seconds. +/// Set the additional time to wait on top of that calculated from the minimum +/// throughput. This accounts for the fixed latency of the HTTP request-response +/// roundtrip. The value is in milliseconds, and the default is 10 seconds. +/// +/// See also: `line_sender_opts_request_min_throughput()` #[no_mangle] pub unsafe extern "C" fn line_sender_opts_request_timeout( opts: *mut line_sender_opts, @@ -1153,19 +1196,20 @@ pub unsafe extern "C" fn line_sender_opts_free(opts: *mut line_sender_opts) { } } -/// Insert data into QuestDB via the InfluxDB Line Protocol. +/// Inserts data into QuestDB via the InfluxDB Line Protocol. /// -/// Batch up rows in `buffer` objects, then call `flush` to send them. +/// Batch up rows in a `line_sender_buffer`, then call `line_sender_flush()` or +/// one of its variants with this object to send them. pub struct line_sender(Sender); -/// Build the line sender. +/// Create a new line sender instance from the given options object. /// -/// In case of TCP, this synchronously establishes the TCP connection, and +/// In the case of TCP, this synchronously establishes the TCP connection, and /// returns once the connection is fully established. If the connection /// requires authentication or TLS, these will also be completed before /// returning. /// -/// The connection should be accessed by only a single thread a time. +/// The sender should be accessed by only a single thread a time. /// /// @param[in] opts Options for the connection. #[no_mangle] @@ -1178,13 +1222,27 @@ pub unsafe extern "C" fn line_sender_build( Box::into_raw(Box::new(line_sender(sender))) } -/// Create a new `line_sender` instance from configuration string. +/// Create a new line sender instance from the given configuration string. /// The format of the string is: "tcp::addr=host:port;key=value;...;" -/// Alongside "tcp" you can also specify "tcps", "http", and "https". -/// The accepted set of keys and values is the same as for the opt's API. -/// E.g. "https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;" +/// Instead of "tcp" you can also specify "tcps", "http", and "https". +/// +/// The accepted keys match one-for-one with the functions on `line_sender_opts`. +/// For example, this is a valid configuration string: +/// +/// "https::addr=host:port;username=alice;password=secret;" +/// +/// and there are matching functions `line_sender_opts_username()` and +/// `line_sender_opts_password()`. The value for `addr=` is supplied directly to +/// `line_sender_opts_new`, so there's no function with a matching name. +/// +/// For the full list of keys, search this header for `bool line_sender_opts_`. /// -/// For the full list of keys, search this file for `fn line_sender_opts_`. +/// In the case of TCP, this synchronously establishes the TCP connection, and +/// returns once the connection is fully established. If the connection +/// requires authentication or TLS, these will also be completed before +/// returning. +/// +/// The sender should be accessed by only a single thread a time. #[no_mangle] pub unsafe extern "C" fn line_sender_from_conf( config: line_sender_utf8, @@ -1199,8 +1257,15 @@ pub unsafe extern "C" fn line_sender_from_conf( Box::into_raw(Box::new(line_sender(sender))) } -/// Create a new `line_sender` instance from configuration string read from the +/// Create a new `line_sender` instance from the configuration stored in the /// `QDB_CLIENT_CONF` environment variable. +/// +/// In the case of TCP, this synchronously establishes the TCP connection, and +/// returns once the connection is fully established. If the connection +/// requires authentication or TLS, these will also be completed before +/// returning. +/// +/// The sender should be accessed by only a single thread a time. #[no_mangle] pub unsafe extern "C" fn line_sender_from_env( err_out: *mut *mut line_sender_error, @@ -1221,7 +1286,9 @@ unsafe fn unwrap_sender_mut<'a>(sender: *mut line_sender) -> &'a mut Sender { &mut (*sender).0 } -/// Check if an error occurred previously and the sender must be closed. +/// Tell whether the sender is no longer usable and must be closed. +/// This happens when there was an earlier failure. +/// This fuction is specific to TCP and is not relevant for HTTP. /// @param[in] sender Line sender object. /// @return true if an error occurred with a sender and it must be closed. #[no_mangle] @@ -1238,10 +1305,28 @@ pub unsafe extern "C" fn line_sender_close(sender: *mut line_sender) { } } -/// Send buffer of rows to the QuestDB server. +/// Send the given buffer of rows to the QuestDB server, clearing the buffer. +/// +/// After this function returns, the buffer is empty and ready for the next batch. +/// If you want to preserve the buffer contents, call `line_sender_flush_and_keep`. +/// If you want to ensure the flush is transactional, call +/// `line_sender_flush_and_keep_with_flags`. +/// +/// With ILP-over-HTTP, this function sends an HTTP request and waits for the +/// response. If the server responds with an error, it returns a descriptive error. +/// In the case of a network error, it retries until it has exhausted the retry time +/// budget. +/// +/// With ILP-over-TCP, the function blocks only until the buffer is flushed to the +/// underlying OS-level network socket, without waiting to actually send it to the +/// server. In the case of an error, the server will quietly disconnect: consult the +/// server logs for error messages. /// -/// The buffer will be automatically cleared, ready for re-use. -/// If instead you want to preserve the buffer contents, call `flush_and_keep`. +/// HTTP should be the first choice, but use TCP if you need to continuously send +/// data to the server at a high rate. +/// +/// To improve the HTTP performance, send larger buffers (with more rows), and +/// consider parallelizing writes using multiple senders from multiple threads. /// /// @param[in] sender Line sender object. /// @param[in] buffer Line buffer object. @@ -1258,10 +1343,12 @@ pub unsafe extern "C" fn line_sender_flush( true } -/// Send buffer of rows to the QuestDB server. +/// Send the given buffer of rows to the QuestDB server. +/// +/// All the data stays in the buffer. Clear the buffer before starting a new batch. /// -/// The buffer will left untouched and must be cleared before re-use. -/// To send and clear in one single step, `flush` instead. +/// To send and clear in one step, call `line_sender_flush` instead. Also, see the docs +/// on that function for more important details on flushing. /// @param[in] sender Line sender object. /// @param[in] buffer Line buffer object. /// @return true on success, false on error. @@ -1277,15 +1364,22 @@ pub unsafe extern "C" fn line_sender_flush_and_keep( true } -/// Variant of `.flush()` that does not clear the buffer and allows for -/// transactional flushes. +/// Send the batch of rows in the buffer to the QuestDB server, and, if the parameter +/// `transactional` is true, ensure the flush will be transactional. +/// +/// A flush is transactional iff all the rows belong to the same table. This allows +/// QuestDB to treat the flush as a single database transaction, because it doesn't +/// support transactions spanning multiple tables. Additionally, only ILP-over-HTTP +/// supports transactional flushes. +/// +/// If the flush wouldn't be transactional, this function returns an error and +/// doesn't flush any data. /// -/// A transactional flush is simply a flush that ensures that all rows in -/// the ILP buffer refer to the same table, thus allowing the server to -/// treat the flush request as a single transaction. +/// The function sends an HTTP request and waits for the response. If the server +/// responds with an error, it returns a descriptive error. In the case of a network +/// error, it retries until it has exhausted the retry time budget. /// -/// This is because QuestDB does not support transactions spanning multiple -/// tables. +/// All the data stays in the buffer. Clear the buffer before starting a new batch. #[no_mangle] pub unsafe extern "C" fn line_sender_flush_and_keep_with_flags( sender: *mut line_sender, @@ -1302,13 +1396,13 @@ pub unsafe extern "C" fn line_sender_flush_and_keep_with_flags( true } -/// Get the current time in nanoseconds since the unix epoch (UTC). +/// Get the current time in nanoseconds since the Unix epoch (UTC). #[no_mangle] pub unsafe extern "C" fn line_sender_now_nanos() -> i64 { TimestampNanos::now().as_i64() } -/// Get the current time in microseconds since the unix epoch (UTC). +/// Get the current time in microseconds since the Unix epoch (UTC). #[no_mangle] pub unsafe extern "C" fn line_sender_now_micros() -> i64 { TimestampMicros::now().as_i64() diff --git a/questdb-rs/Cargo.toml b/questdb-rs/Cargo.toml index fdb9dec4..46c11c9c 100644 --- a/questdb-rs/Cargo.toml +++ b/questdb-rs/Cargo.toml @@ -51,7 +51,7 @@ chrono = "0.4.31" tempfile = "3.2.0" [features] -default = ["tls-webpki-certs"] +default = ["tls-webpki-certs", "ilp-over-http"] # Include support for ILP over HTTP. ilp-over-http = ["dep:ureq", "dep:serde_json", "dep:rand"] diff --git a/questdb-rs/README.md b/questdb-rs/README.md index 290a59cb..bd1b5fc1 100644 --- a/questdb-rs/README.md +++ b/questdb-rs/README.md @@ -1,15 +1,17 @@ # QuestDB Client Library for Rust -Official Rust client for [QuestDB](https://questdb.io/), an open-source SQL database designed to process time-series data, faster. +Official Rust client for [QuestDB](https://questdb.io/), an open-source SQL +database designed to process time-series data, faster. -The client library is designed for fast ingestion of data into QuestDB via the InfluxDB Line Protocol (ILP). +The client library is designed for fast ingestion of data into QuestDB via the +InfluxDB Line Protocol (ILP). * [QuestDB Database docs](https://questdb.io/docs/) -* [ILP docs](https://questdb.io/docs/reference/api/ilp/overview/) +* [Docs on InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) ## Getting Started -To start using `questdb-rs` add it to your `Cargo.toml`: +To start using `questdb-rs`, add it to your `Cargo.toml`: ```toml [dependencies] @@ -18,9 +20,12 @@ questdb-rs = "4.0.0" ## Docs -See documentation for the [`ingress`](https://docs.rs/questdb-rs/4.0.0/questdb/ingress/) module to insert data into QuestDB via the ILP protocol. +See documentation for the +[`ingress`](https://docs.rs/questdb-rs/4.0.0/questdb/ingress/) module to insert +data into QuestDB via the ILP protocol. -* Latest API docs: [https://docs.rs/questdb-rs/latest/](https://docs.rs/questdb-rs/latest/) +* Latest API docs: + [https://docs.rs/questdb-rs/latest/](https://docs.rs/questdb-rs/latest/) ## Example @@ -48,28 +53,30 @@ fn main() -> Result<()> { ## Crate features -This Rust crate supports a number of optional features. +This Rust crate supports a number of optional features, in most cases linked +to additional library dependencies. -For example, if you want to work with ILP/HTTP and work with Chrono timestamps, -use: +For example, if you want to work with Chrono timestamps, use: ```bash -cargo add questdb-rs --features ilp-over-http chrono +cargo add questdb-rs --features chrono_timestamp ``` ### Default-enabled features -* `tls-webpki-certs`: Use the `webpki-roots` crate for TLS cert verification. +* `ilp-over-http`: Enables ILP/HTTP support via the `ureq` crate. +* `tls-webpki-certs`: Supports using the `webpki-roots` crate for TLS + certificate verification. ### Optional features -These features are opt-in as they bring in additional downstream dependencies. +These features are opt-in: -* `ilp-over-http`: Enables ILP/HTTP support via the `ureq` crate. +* `chrono_timestamp`: Allows specifying timestamps as `chrono::Datetime` objects. * `tls-native-certs`: Supports validating TLS certificates against the OS's certificates store. -* `insecure-skip-verify`: Allows skipping TLS validation. -* `chrono_timestamp`: Allows specifying timestamps as `chrono::Datetime` objects. +* `insecure-skip-verify`: Allows skipping server certificate validation in TLS + (this compromises security). ## C, C++ and Python APIs diff --git a/questdb-rs/src/ingress/mod.md b/questdb-rs/src/ingress/mod.md new file mode 100644 index 00000000..b0af2090 --- /dev/null +++ b/questdb-rs/src/ingress/mod.md @@ -0,0 +1,293 @@ +# Fast Ingestion of Data into QuestDB + +The `ingress` module implements QuestDB's variant of the +[InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) +(ILP). + +To get started: + +* Use [`Sender::from_conf()`] to get the [`Sender`] object +* Populate a [`Buffer`] with one or more rows of data +* Send the buffer using [`sender.flush()`](Sender::flush) + +```rust no_run +use questdb::{ + Result, + ingress::{ + Sender, + Buffer, + TimestampNanos}}; +fn main() -> Result<()> { + let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; + let mut buffer = Buffer::new(); + buffer + .table("sensors")? + .symbol("id", "toronto1")? + .column_f64("temperature", 20.0)? + .column_i64("humidity", 50)? + .at(TimestampNanos::now())?; + sender.flush(&mut buffer)?; + Ok(()) +} +``` + +# Configuration String + +The easiest way to configure all the available parameters on a line sender is +the configuration string. The general structure is: + +```plain +::addr=host:port;param1=val1;param2=val2;... +``` + +`transport` can be `http`, `https`, `tcp`, or `tcps`. See the full details on +supported parameters in a dedicated section below. + +# Don't Forget to Flush + +The sender and buffer objects are entirely decoupled. This means that the sender +won't get access to the data in the buffer until you explicitly call +[`sender.flush(&mut buffer)`](Sender::flush) or a variant. This may lead to a +pitfall where you drop a buffer that still has some data in it, resulting in +permanent data loss. + +A common technique is to flush periodically on a timer and/or once the buffer +exceeds a certain size. You can check the buffer's size by the calling +[`buffer.len()`](Buffer::len). + +The default `flush()` method clears the buffer after sending its data. If you +want to preserve its contents (for example, to send the same data to multiple +QuestDB instances), call +[`sender.flush_and_keep(&mut buffer)`](Sender::flush_and_keep) instead. + +# Error Handling + +The two supported transport modes, HTTP and TCP, handle errors very differently. +In a nutshell, HTTP is much better at error handling. + +## TCP + +TCP doesn't report errors at all to the sender; instead, the server quietly +disconnects and you'll have to inspect the server logs to get more information +on the reason. When this has happened, the sender transitions into an error +state, and it is permanently unusable. You must drop it and create a new sender. +You can inspect the sender's error state by calling +[`sender.must_close()`](Sender::must_close). + +## HTTP + +HTTP distinguishes between recoverable and non-recoverable errors. For +recoverable ones, it enters a retry loop with exponential backoff, and reports +the error to the caller only after it has exhausted the retry time budget +(configuration parameter: `retry_timeout`). + +`sender.flush()` and variant methods communicate the error in the `Result` +return value. The category of the error is signalled through the +[`ErrorCode`](crate::error::ErrorCode) enum, and it's accompanied with an error +message. + +After the sender has signalled an error, it remains usable. You can handle the +error as appropriate and continue using it. + +# Configuration Parameters + +In the examples below, we'll use configuration strings. We also provide the +[`SenderBuilder`] to programmatically configure the sender. The methods on +[`SenderBuilder`] match one-for-one with the keys in the configuration string. + +## Authentication + +To establish an +[authenticated](https://questdb.io/docs/reference/api/ilp/overview/#authentication) +and TLS-encrypted connection, use the `https` or `tcps` protocol, and use the +configuration options appropriate for the authentication method. + +Here are quick examples of configuration strings for each authentication method +we support: + +### HTTP Token Bearer Authentication + +```no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf( + "https::addr=localhost:9000;token=Yfym3fgMv0B9;" +)?; +# Ok(()) +# } +``` + +* `token`: the authentication token + +### HTTP Basic Authentication + +```no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf( + "https::addr=localhost:9000;username=testUser1;password=Yfym3fgMv0B9;" +)?; +# Ok(()) +# } +``` + +* `username`: the username +* `password`: the password + +### TCP Elliptic Curve Digital Signature Algorithm (ECDSA) + +```no_run +# use questdb::{Result, ingress::Sender}; +# fn main() -> Result<()> { +let mut sender = Sender::from_conf( + "tcps::addr=localhost:9009;username=testUser1;token=5UjEA0;token_x=fLKYa9;token_y=bS1dEfy" +)?; +# Ok(()) +# } +``` + +The four ECDSA components are: + +* `username`, aka. _kid_ +* `token`, aka. _d_ +* `token_x`, aka. _x_ +* `token_y`, aka. _y_ + +### Authentication Timeout + +You can specify how long the client should wait for the authentication request +to resolve. The configuration parameter is: + +* `auth_timeout` (milliseconds, default 15 seconds) + +## Encryption on the Wire: TLS + +To enable TLS on the QuestDB Enterprise server, refer to the [QuestDB Enterprise +TLS documentation](https://questdb.io/docs/operations/tls/). + +*Note*: QuestDB Open Source does not support TLS natively. To use TLS with +QuestDB Open Source, use a TLS proxy such as +[HAProxy](http://www.haproxy.org/). + +We support several certification authorities (sources of PKI root certificates). +To select one, use the `tls_ca` config option. These are the supported variants: + +* `tls_ca=webpki_roots;` use the roots provided in the standard Rust crate + [webpki-roots](https://crates.io/crates/webpki-roots) + +* `tls_ca=os_roots;` use the OS-provided certificate store + +* `tls_ca=webpki_and_os_roots;` combine both of the above + +* `tls_roots=/path/to/root-ca.pem;` get the root certificates from the specified + file. Main purpose is for testing with self-signed certificates. _Note:_ this + automatically sets `tls_ca=pem_file`. + +See our notes on [how to generate a self-signed +certificate](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). + +* `tls_verify=unsafe_off;` tells the QuestDB client to ignore all CA roots and + accept any server certificate without checking. You can use it as a last + resort, when you weren't able to apply the above approach with a self-signed + certificate. You should **never use it in production** as it defeats security + and allows a man-in-the middle attack. + +## HTTP Timeouts + +Instead of a fixed timeout value, we use a flexible timeout that depends on the +size of the HTTP request payload (how much data is in the buffer that you're +flushing). You can configure it using two options: + +* `request_min_throughput` (bytes per second, default 100 KiB/s): divide the + payload size by this number to determine for how long to keep sending the + payload before timing out. +* `request_timeout` (milliseconds, default 10 seconds): additional time + allowance to account for the fixed latency of the request-response roundtrip. + +Finally, the client will keep retrying the request if it experiences errors. You +can configure the total time budget for retrying: + +* `retry_timeout` (milliseconds, default 10 seconds) + +# Usage Considerations + +## Transactional Flush + +When using HTTP, you can arrange that each `flush()` call happens within its own +transaction. For this to work, your buffer must contain data that targets only +one table. This is because QuestDB doesn't support multi-table transactions. + +In order to ensure in advance that a flush will be transactional, call +[`sender.flush_and_keep_with_flags(&mut buffer, true)`](Sender::flush_and_keep_with_flags). +This call will refuse to flush a buffer if the flush wouldn't be transactional. + +## When to Choose the TCP Transport? + +As discussed above, the TCP transport mode is raw and simplistic: it doesn't +report any errors to the caller (the server just disconnects), has no automatic +retries, requires manual handling of connection failures, and doesn't support +transactional flushing. + +However, TCP has a lower overhead than HTTP and it's worthwhile to try out as an +alternative in a scenario where you have a constantly high data rate and/or deal +with a high-latency network connection. + +## Sequential Coupling in the Buffer API + +The fluent API of [`Buffer`] has sequential coupling: there's a certain order in +which you are expected to call the methods. For example, you must write the +symbols before the columns, and you must terminate each row by calling either +[`at`](Buffer::at) or [`at_now`](Buffer::at_now). Refer to the [`Buffer`] doc +for the full rules and a flowchart. + +## Optimization: Avoid Revalidating Names + +The client validates every name you provide. To avoid the redundant CPU work of +re-validating the same names on every row, create pre-validated [`ColumnName`] +and [`TableName`] values: + +```no_run +# use questdb::Result; +use questdb::ingress::{ + TableName, + ColumnName, + Buffer, + TimestampNanos}; +# fn main() -> Result<()> { +let mut buffer = Buffer::new(); +let tide_name = TableName::new("tide")?; +let water_level_name = ColumnName::new("water_level")?; +buffer.table(tide_name)?.column_f64(water_level_name, 20.4)?.at(TimestampNanos::now())?; +buffer.table(tide_name)?.column_f64(water_level_name, 17.2)?.at(TimestampNanos::now())?; +# Ok(()) +# } +``` + +## Check out the CONSIDERATIONS Document + +The [Library +considerations](https://github.com/questdb/c-questdb-client/blob/main/doc/CONSIDERATIONS.md) +document covers these topics: + +* Threading +* Differences between the InfluxDB Line Protocol and QuestDB Data Types +* Data Quality +* Client-side checks and server errors +* Flushing +* Disconnections, data errors and troubleshooting + +# Troubleshooting Common Issues + +## Infrequent Flushing + +If the data doesn't appear in the database in a timely manner, you may not be +calling [`flush()`](Sender::flush) often enough. + +## Debug disconnects and inspect errors + +If you're using ILP-over-TCP, it doesn't report any errors to the client. +Instead, on error, the server terminates the connection, and logs any error +messages in [server logs](https://questdb.io/docs/troubleshooting/log/). + +To inspect or log a buffer's contents before you send it, call +[`buffer.as_str()`](Buffer::as_str). diff --git a/questdb-rs/src/ingress/mod.rs b/questdb-rs/src/ingress/mod.rs index 1b40e4ca..0239eac5 100644 --- a/questdb-rs/src/ingress/mod.rs +++ b/questdb-rs/src/ingress/mod.rs @@ -22,163 +22,7 @@ * ******************************************************************************/ -//! # Fast Ingestion of data into QuestDB -//! -//! The `ingress` module implements QuestDB's variant of the -//! [InfluxDB Line Protocol](https://questdb.io/docs/reference/api/ilp/overview/) -//! (ILP) over TCP. -//! -//! To get started: -//! * Connect to QuestDB by creating a [`Sender`] object, usually via [`Sender::from_conf`]. -//! * Populate a [`Buffer`] with one or more rows of data. -//! * Send the buffer via the Sender's [`flush`](Sender::flush) method. -//! -//! ```rust no_run -//! use questdb::{ -//! Result, -//! ingress::{ -//! Sender, -//! Buffer, -//! TimestampNanos}}; -//! -//! fn main() -> Result<()> { -//! let mut sender = Sender::from_conf("http::addr=localhost:9000;")?; -//! let mut buffer = Buffer::new(); -//! buffer -//! .table("sensors")? -//! .symbol("id", "toronto1")? -//! .column_f64("temperature", 20.0)? -//! .column_i64("humidity", 50)? -//! .at(TimestampNanos::now())?; -//! sender.flush(&mut buffer)?; -//! Ok(()) -//! } -//! ``` -//! -//! # Flushing -//! -//! The Sender's [`flush`](Sender::flush) method will clear the buffer -//! which is then reusable for another batch of rows. -//! -//! Dropping the sender will close the connection to QuestDB and any unflushed -//! messages will be lost: In other words, *do not forget to -//! [`flush`](Sender::flush) before closing the connection!* -//! -//! A common technique is to flush periodically on a timer and/or once the -//! buffer exceeds a certain size. -//! You can check the buffer's size by the calling Buffer's [`len`](Buffer::len) -//! method. -//! -//! Note that flushing will automatically clear the buffer's contents. -//! If you'd rather preserve the contents (for example, to send the same data to -//! multiple QuestDB instances), you can call -//! [`flush_and_keep`](Sender::flush_and_keep) instead. -//! -//! # Connection Security Options -//! -//! To establish an [authenticated](https://questdb.io/docs/reference/api/ilp/authenticate) -//! and TLS-encrypted connection, call the SenderBuilder's authentication and tls methods. -//! -//! Here's an example that uses full security with TCP: -//! -//! ```no_run -//! # use questdb::Result; -//! use questdb::ingress::{Protocol, SenderBuilder}; -//! -//! # fn main() -> Result<()> { -//! // See: https://questdb.io/docs/reference/api/ilp/authenticate -//! let mut sender = SenderBuilder::new(Protocol::Tcps, "localhost", 9009) -//! .username("testUser1")? // kid -//! .token("5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48")? // d -//! .token_x("fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU")? // x -//! .token_y("Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac")? // y -//! .build()?; -//! # Ok(()) -//! # } -//! ``` -//! -//! Note that Open Source QuestDB does not natively support TLS -//! encryption (this is a QuestDB enterprise feature). -//! -//! To use TLS with QuestDB open source, use a TLS proxy such as -//! [HAProxy](http://www.haproxy.org/). -//! -//! For testing, you can use a self-signed certificate and key. -//! -//! See our notes on [how to generate keys that this library will -//! accept](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). -//! -//! From the API, you can then point to a custom CA file: -//! -//! ```no_run -//! # use questdb::Result; -//! use std::path::PathBuf; -//! use questdb::ingress::{SenderBuilder, Protocol, CertificateAuthority}; -//! -//! # fn main() -> Result<()> { -//! let mut sender = SenderBuilder::new(Protocol::Tcps, "localhost", 9009) -//! .tls_roots("/path/to/server_rootCA.pem")? -//! .build()?; -//! # Ok(()) -//! # } -//! ``` -//! -//! # Avoiding revalidating names -//! To avoid re-validating table and column names, consider re-using them across -//! rows. -//! -//! ``` -//! # use questdb::Result; -//! use questdb::ingress::{ -//! TableName, -//! ColumnName, -//! Buffer, -//! TimestampNanos}; -//! -//! # fn main() -> Result<()> { -//! let mut buffer = Buffer::new(); -//! let tide_name = TableName::new("tide")?; -//! let water_level_name = ColumnName::new("water_level")?; -//! buffer.table(tide_name)?.column_f64(water_level_name, 20.4)?.at(TimestampNanos::now())?; -//! buffer.table(tide_name)?.column_f64(water_level_name, 17.2)?.at(TimestampNanos::now())?; -//! # Ok(()) -//! # } -//! ``` -//! -//! # Buffer API sequential coupling -//! -//! Symbols must always be written before rows. See the [`Buffer`] documentation -//! for details. Each row must be terminated with a call to either -//! [`at`](Buffer::at) or [`at_now`](Buffer::at_now). -//! -//! # Considerations -//! -//! The [Library considerations](https://github.com/questdb/c-questdb-client/blob/main/doc/CONSIDERATIONS.md) documentation -//! goes through: -//! * Threading. -//! * Differences between ILP vs QuestDB Data Types. -//! * Data Quality -//! * Client-side checks and server errors -//! * Flushing -//! * Disconnections, data errors and troubleshooting -//! -//! # Troubleshooting Common Issues -//! -//! ## Infrequent Flushing -//! -//! You may not see data appear in a timely manner because you’re not calling -//! the [`flush`](Sender::flush) method often enough. -//! -//! ## Debugging disconnects and inspecting errors -//! -//! The ILP protocol does not send errors back to the client. -//! Instead, on error, the QuestDB server will disconnect and any error messages -//! will be present in the -//! [server logs](https://questdb.io/docs/troubleshooting/log/). -//! -//! If you want to inspect or log a buffer's contents before it is sent, you -//! can call its [`as_str`](Buffer::as_str) method. -//! +#![doc = include_str!("mod.md")] pub use self::timestamp::*; @@ -231,8 +75,8 @@ fn map_io_to_socket_err(prefix: &str, io_err: io::Error) -> Error { /// /// This type simply wraps a `&str`. /// -/// You can use it to construct it explicitly to avoid re-validating the same -/// names over and over. +/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method, +/// it doesn't have to validate it again. This saves CPU cycles. #[derive(Clone, Copy)] pub struct TableName<'a> { name: &'a str, @@ -301,12 +145,12 @@ impl<'a> TableName<'a> { Ok(Self { name }) } - /// Construct an unvalidated table name. + /// Construct a table name without validating it. /// /// This breaks API encapsulation and is only intended for use /// when the the string was already previously validated. /// - /// Invalid table names will be rejected by the QuestDB server. + /// The QuestDB server will reject an invalid table name. pub fn new_unchecked(name: &'a str) -> Self { Self { name } } @@ -316,8 +160,8 @@ impl<'a> TableName<'a> { /// /// This type simply wraps a `&str`. /// -/// You can use it to construct it explicitly to avoid re-validating the same -/// names over and over. +/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method, +/// it doesn't have to validate it again. This saves CPU cycles. #[derive(Clone, Copy)] pub struct ColumnName<'a> { name: &'a str, @@ -374,12 +218,12 @@ impl<'a> ColumnName<'a> { Ok(Self { name }) } - /// Construct an unvalidated column name. + /// Construct a column name without validating it. /// /// This breaks API encapsulation and is only intended for use /// when the the string was already previously validated. /// - /// Invalid column names will be rejected by the QuestDB server. + /// The QuestDB server will reject an invalid column name. pub fn new_unchecked(name: &'a str) -> Self { Self { name } } @@ -612,7 +456,7 @@ impl BufferState { } } -/// A reusable buffer to prepare ILP messages. +/// A reusable buffer to prepare a batch of ILP messages. /// /// # Example /// @@ -643,8 +487,7 @@ impl BufferState { /// # } /// ``` /// -/// The buffer can then be sent with the Sender's [`flush`](Sender::flush) -/// method. +/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush). /// /// # Sequential Coupling /// The Buffer API is sequentially coupled: @@ -656,11 +499,11 @@ impl BufferState { /// [`column_f64`](Buffer::column_f64), /// [`column_str`](Buffer::column_str), /// [`column_ts`](Buffer::column_ts)). -/// * Symbols must always appear before columns. +/// * Symbols must appear before columns. /// * A row must be terminated with either [`at`](Buffer::at) or /// [`at_now`](Buffer::at_now). /// -/// This diagram might help: +/// This diagram visualizes the sequence: /// /// /// @@ -675,12 +518,13 @@ impl BufferState { /// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) | /// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) | /// -/// QuestDB supports both `STRING` columns and `SYMBOL` column types. +/// QuestDB supports both `STRING` and `SYMBOL` column types. /// -/// To understand the difference refer to the -/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/), but in -/// short symbols are interned strings that are most suitable for identifiers -/// that you expect to be repeated throughout the column. +/// To understand the difference, refer to the +/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell, +/// symbols are interned strings, most suitable for identifiers that are repeated many +/// times throughout the column. They offer an advantage in storage space and query +/// performance. /// /// # Inserting NULL values /// @@ -688,13 +532,13 @@ impl BufferState { /// /// # Recovering from validation errors /// -/// If you want to recover from potential validation errors, you can use the -/// [`set_marker`](Buffer::set_marker) method to track a last known good state, -/// append as many rows or parts of rows as you like and then call -/// [`clear_marker`](Buffer::clear_marker) on success. +/// If you want to recover from potential validation errors, call +/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state, +/// append as many rows or parts of rows as you like, and then call +/// [`buffer.clear_marker()`](Buffer::clear_marker) on success. /// -/// If there was an error in one of the table names or other, you can use the -/// [`rewind_to_marker`](Buffer::rewind_to_marker) method to go back to the +/// If there was an error in one of the rows, use +/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the /// marked last known good state. /// #[derive(Debug, Clone)] @@ -706,8 +550,8 @@ pub struct Buffer { } impl Buffer { - /// Construct an instance with a `max_name_len` of `127`, - /// which is the same as the QuestDB default. + /// Construct a `Buffer` with a `max_name_len` of `127`, which is the same as the + /// QuestDB server default. pub fn new() -> Self { Self { output: String::new(), @@ -717,17 +561,17 @@ impl Buffer { } } - /// Construct with a custom maximum length for table and column names. + /// Construct a `Buffer` with a custom maximum length for table and column names. /// /// This should match the `cairo.max.file.name.length` setting of the /// QuestDB instance you're connecting to. /// - /// If the server does not configure it the default is `127` and you might - /// as well call [`new`](Buffer::new). + /// If the server does not configure it, the default is `127` and you can simply + /// call [`new`](Buffer::new). pub fn with_max_name_len(max_name_len: usize) -> Self { - let mut buffer = Self::new(); - buffer.max_name_len = max_name_len; - buffer + let mut buf = Self::new(); + buf.max_name_len = max_name_len; + buf } /// Pre-allocate to ensure the buffer has enough capacity for at least the @@ -738,7 +582,7 @@ impl Buffer { self.output.reserve(additional); } - /// Number of bytes accumulated in the buffer. + /// The number of bytes accumulated in the buffer. pub fn len(&self) -> usize { self.output.len() } @@ -748,8 +592,9 @@ impl Buffer { self.state.row_count } - /// The buffer is transactional if sent over HTTP. - /// A buffer stops being transactional if it contains rows for multiple tables. + /// Tells whether the buffer is transactional. It is transactional iff it contains + /// data for at most one table. Additionally, you must send the buffer over HTTP to + /// get transactional behavior. pub fn transactional(&self) -> bool { self.state.transactional } @@ -758,13 +603,12 @@ impl Buffer { self.output.is_empty() } - /// Number of bytes that can be written to the buffer before it needs to - /// resize. + /// The total number of bytes the buffer can hold before it needs to resize. pub fn capacity(&self) -> usize { self.output.capacity() } - /// Inspect the contents of the buffer. + /// A string representation of the buffer's contents. Useful for debugging. pub fn as_str(&self) -> &str { &self.output } @@ -851,7 +695,7 @@ impl Buffer { Ok(()) } - /// Begin recording a row for a given table. + /// Begin recording a new row for the given table. /// /// ``` /// # use questdb::Result; @@ -899,7 +743,8 @@ impl Buffer { Ok(self) } - /// Record a symbol for a given column. + /// Record a symbol for the given column. + /// Make sure you record all symbol columns before any other column type. /// /// ``` /// # use questdb::Result; @@ -979,7 +824,7 @@ impl Buffer { Ok(self) } - /// Record a boolean value for a column. + /// Record a boolean value for the given column. /// /// ``` /// # use questdb::Result; @@ -1017,7 +862,7 @@ impl Buffer { Ok(self) } - /// Record an integer value for a column. + /// Record an integer value for the given column. /// /// ``` /// # use questdb::Result; @@ -1058,7 +903,7 @@ impl Buffer { Ok(self) } - /// Record a floating point value for a column. + /// Record a floating point value for the given column. /// /// ``` /// # use questdb::Result; @@ -1097,7 +942,7 @@ impl Buffer { Ok(self) } - /// Record a string value for a column. + /// Record a string value for the given column. /// /// ``` /// # use questdb::Result; @@ -1150,7 +995,7 @@ impl Buffer { Ok(self) } - /// Record a timestamp for a column. + /// Record a timestamp value for the given column. /// /// ``` /// # use questdb::Result; @@ -1219,7 +1064,9 @@ impl Buffer { Ok(self) } - /// Terminate the row with a specified timestamp. + /// Complete the current row with the designated timestamp. After this call, you can + /// start recording the next row by calling [Buffer::table] again, or you can send + /// the accumulated batch by calling [Sender::flush] or one of its variants. /// /// ``` /// # use questdb::Result; @@ -1283,16 +1130,23 @@ impl Buffer { Ok(()) } - /// Terminate the row with a server-specified timestamp. + /// Complete the current row without providing a timestamp. The QuestDB instance + /// will insert its own timestamp. /// - /// This is NOT equivalent to calling [`at`](Buffer::at) with the current time. - /// There's a trade-off: Letting the server assign the timestamp can be faster - /// since it a reliable way to avoid out-of-order operations in the database - /// for maximum ingestion throughput. + /// Letting the server assign the timestamp can be faster since it reliably avoids + /// out-of-order operations in the database for maximum ingestion throughput. However, + /// it removes the ability to deduplicate rows. /// - /// On the other hand, it removes the ability to deduplicate rows. + /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB + /// server will set the timestamp only after receiving the row. If you're flushing + /// infrequently, the server-assigned timestamp may be significantly behind the + /// time the data was recorded in the buffer. /// - /// In almost all cases, you should prefer [`at`](Buffer::at) over this method. + /// In almost all cases, you should prefer the [Buffer::at] function. + /// + /// After this call, you can start recording the next row by calling [Buffer::table] + /// again, or you can send the accumulated batch by calling [Sender::flush] or one of + /// its variants. /// /// ``` /// # use questdb::Result; @@ -1304,11 +1158,6 @@ impl Buffer { /// # Ok(()) /// # } /// ``` - /// - /// The QuestDB instance will set the timestamp once it receives the row. - /// If you're [`flushing`](Sender::flush) infrequently, the timestamp - /// assigned by the server may drift significantly from when the data - /// was recorded in the buffer. pub fn at_now(&mut self) -> Result<()> { self.check_op(Op::At)?; self.output.push('\n'); @@ -1361,7 +1210,8 @@ enum AuthParams { Token(TokenAuthParams), } -/// Possible sources of the root certificates used to validate the server's TLS certificate. +/// Possible sources of the root certificates used to validate the server's TLS +/// certificate. #[derive(PartialEq, Debug, Clone, Copy)] pub enum CertificateAuthority { /// Use the root certificates provided by the @@ -1373,11 +1223,11 @@ pub enum CertificateAuthority { #[cfg(feature = "tls-native-certs")] OsRoots, - /// Use the root certificates provided by both the OS and the `webpki-roots` crate. + /// Combine the root certificates provided by the OS and the `webpki-roots` crate. #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))] WebpkiAndOsRoots, - /// Use the root certificates provided by a PEM-encoded file. + /// Use the root certificates provided in a PEM-encoded file. PemFile, } @@ -1704,12 +1554,10 @@ impl Protocol { } } -/// Accumulate parameters for a new `Sender` instance. -/// -/// The `SenderBuilder` can be created either for ILP/TCP or ILP/HTTP (with the "ilp-over-http" -/// feature enabled). +/// Accumulates parameters for a new `Sender` instance. /// -/// It can also be created from a config string or the `QDB_CLIENT_CONF` environment variable. +/// You can also create the builder from a config string or the `QDB_CLIENT_CONF` +/// environment variable. /// #[cfg_attr( feature = "ilp-over-http", @@ -1781,25 +1629,32 @@ pub struct SenderBuilder { } impl SenderBuilder { - /// Create a new `SenderBuilder` instance from configuration string. + /// Create a new `SenderBuilder` instance from the configuration string. /// /// The format of the string is: `"http::addr=host:port;key=value;...;"`. /// - /// Alongside `"http"` you can also specify `"https"`, `"tcp"`, and `"tcps"`. + /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`. /// - /// HTTP is recommended in most cases as is provides better error feedback - /// allows controlling transactions. TCP can sometimes be faster in higher-latency - /// networks, but misses out on a number of features. + /// We recommend HTTP for most cases because it provides more features, like + /// reporting errors to the client and supporting transaction control. TCP can + /// sometimes be faster in higher-latency networks, but misses a number of + /// features. /// - /// The accepted set of keys and values is the same as for the `SenderBuilder`'s API. + /// The accepted keys match one-for-one with the methods on `SenderBuilder`. + /// For example, this is a valid configuration string: /// - /// E.g. `"https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;"`. + /// "https::addr=host:port;username=alice;password=secret;" /// - /// If you prefer, you can also load the configuration from an environment variable. - /// See [`SenderBuilder::from_env`]. + /// and there are matching methods [SenderBuilder::username] and + /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the + /// `SenderBuilder` constructor, so there's no matching method for that. /// - /// Once a `SenderBuilder` is created from a string (or from the environment variable) - /// it can be further customized before calling [`SenderBuilder::build`]. + /// You can also load the configuration from an environment variable. See + /// [`SenderBuilder::from_env`]. + /// + /// Once you have a `SenderBuilder` instance, you can further customize it + /// before calling [`SenderBuilder::build`], but you can't change any settings + /// that are already set in the config string. pub fn from_conf>(conf: T) -> Result { let conf = conf.as_ref(); let conf = questdb_confstr::parse_conf_str(conf) @@ -1941,8 +1796,8 @@ impl SenderBuilder { Ok(builder) } - /// Create a new `SenderBuilder` instance from configuration string read from the - /// `QDB_CLIENT_CONF` environment variable. + /// Create a new `SenderBuilder` instance from the configuration from the + /// configuration stored in the `QDB_CLIENT_CONF` environment variable. /// /// The format of the string is the same as for [`SenderBuilder::from_conf`]. pub fn from_env() -> Result { @@ -1952,8 +1807,8 @@ impl SenderBuilder { Self::from_conf(conf) } - /// Create a new `SenderBuilder` instance from the provided QuestDB - /// server and port using ILP over the specified protocol. + /// Create a new `SenderBuilder` instance with the provided QuestDB + /// server and port, using ILP over the specified protocol. /// /// ```no_run /// # use questdb::Result; @@ -2021,12 +1876,12 @@ impl SenderBuilder { /// Set the username for authentication. /// - /// For TCP this is the `kid` part of the ECDSA key set. + /// For TCP, this is the `kid` part of the ECDSA key set. /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x), /// and [`token_y`](SenderBuilder::token_y). /// - /// For HTTP this is part of basic authentication. - /// Also see [`password`](SenderBuilder::password). + /// For HTTP, this is a part of basic authentication. + /// See also: [`password`](SenderBuilder::password). pub fn username(mut self, username: &str) -> Result { self.username .set_specified("username", Some(validate_value(username.to_string())?))?; @@ -2034,29 +1889,29 @@ impl SenderBuilder { } /// Set the password for basic HTTP authentication. - /// Also see [`username`](SenderBuilder::username). + /// See also: [`username`](SenderBuilder::username). pub fn password(mut self, password: &str) -> Result { self.password .set_specified("password", Some(validate_value(password.to_string())?))?; Ok(self) } - /// Token (Bearer) Authentication Parameters for ILP over HTTP, - /// or the ECDSA private key for ILP over TCP authentication. + /// Set the Token (Bearer) Authentication parameter for HTTP, + /// or the ECDSA private key for TCP authentication. pub fn token(mut self, token: &str) -> Result { self.token .set_specified("token", Some(validate_value(token.to_string())?))?; Ok(self) } - /// The ECDSA public key X for ILP over TCP authentication. + /// Set the ECDSA public key X for TCP authentication. pub fn token_x(mut self, token_x: &str) -> Result { self.token_x .set_specified("token_x", Some(validate_value(token_x.to_string())?))?; Ok(self) } - /// The ECDSA public key Y for ILP over TCP authentication. + /// Set the ECDSA public key Y for TCP authentication. pub fn token_y(mut self, token_y: &str) -> Result { self.token_y .set_specified("token_y", Some(validate_value(token_y.to_string())?))?; @@ -2086,7 +1941,7 @@ impl SenderBuilder { /// Set to `false` to disable TLS certificate verification. /// This should only be used for debugging purposes as it reduces security. /// - /// For testing consider specifying a path to a `.pem` file instead via + /// For testing, consider specifying a path to a `.pem` file instead via /// the [`tls_roots`](SenderBuilder::tls_roots) method. #[cfg(feature = "insecure-skip-verify")] pub fn tls_verify(mut self, verify: bool) -> Result { @@ -2106,7 +1961,8 @@ impl SenderBuilder { /// Set the path to a custom root certificate `.pem` file. /// This is used to validate the server's certificate during the TLS handshake. /// - /// See notes on how to test with [self-signed certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). + /// See notes on how to test with [self-signed + /// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs). pub fn tls_roots>(self, path: P) -> Result { let mut builder = self.tls_ca(CertificateAuthority::PemFile)?; let path = path.into(); @@ -2123,7 +1979,7 @@ impl SenderBuilder { Ok(builder) } - /// The maximum buffer size that the client will flush to the server. + /// The maximum buffer size in bytes that the client will flush to the server. /// The default is 100 MiB. pub fn max_buf_size(mut self, value: usize) -> Result { let min = 1024; @@ -2138,8 +1994,8 @@ impl SenderBuilder { } #[cfg(feature = "ilp-over-http")] - /// Cumulative duration spent in retries. - /// The default is 10 seconds. + /// Set the cumulative duration spent in retries. + /// The value is in milliseconds, and the default is 10 seconds. pub fn retry_timeout(mut self, value: Duration) -> Result { if let Some(http) = &mut self.http { http.retry_timeout.set_specified("retry_timeout", value)?; @@ -2153,12 +2009,13 @@ impl SenderBuilder { } #[cfg(feature = "ilp-over-http")] - /// Minimum expected throughput in bytes per second for HTTP requests. - /// If the throughput is lower than this value, the connection will time out. - /// The default is 100 KiB/s. - /// The value is expressed as a number of bytes per second. - /// This is used to calculate additional request timeout, on top of - /// the [`request_timeout`](SenderBuilder::request_timeout). + /// Set the minimum acceptable throughput while sending a buffer to the server. + /// The sender will divide the payload size by this number to determine for how + /// long to keep sending the payload before timing out. + /// The value is in bytes per second, and the default is 100 KiB/s. + /// The timeout calculated from minimum throughput is adedd to the value of + /// [`request_timeout`](SenderBuilder::request_timeout) to get the total timeout + /// value. pub fn request_min_throughput(mut self, value: u64) -> Result { if let Some(http) = &mut self.http { http.request_min_throughput @@ -2173,9 +2030,10 @@ impl SenderBuilder { } #[cfg(feature = "ilp-over-http")] - /// Grace request timeout before relying on the minimum throughput logic. - /// The default is 10 seconds. - /// See [`request_min_throughput`](SenderBuilder::request_min_throughput) for more details. + /// Additional time to wait on top of that calculated from the minimum throughput. + /// This accounts for the fixed latency of the HTTP request-response roundtrip. + /// The value is in milliseconds, and the default is 10 seconds. + /// See also: [`request_min_throughput`](SenderBuilder::request_min_throughput). pub fn request_timeout(mut self, value: Duration) -> Result { if let Some(http) = &mut self.http { http.request_timeout @@ -2390,7 +2248,7 @@ impl SenderBuilder { /// Build the sender. /// - /// In case of TCP, this synchronously establishes the TCP connection, and + /// In the case of TCP, this synchronously establishes the TCP connection, and /// returns once the connection is fully established. If the connection /// requires authentication or TLS, these will also be completed before /// returning. @@ -2623,31 +2481,40 @@ impl F64Serializer { } impl Sender { - /// Create a new `Sender` instance from configuration string. + /// Create a new `Sender` instance from the given configuration string. /// /// The format of the string is: `"http::addr=host:port;key=value;...;"`. /// - /// Alongside `"http"` you can also specify `"https"`, `"tcp"`, and `"tcps"`. - /// - /// HTTP is recommended in most cases as is provides better error feedback - /// allows controlling transactions. TCP can sometimes be faster in higher-latency - /// networks, but misses out on a number of features. + /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`. /// - /// The accepted set of keys and values is the same as for the opt's API. + /// We recommend HTTP for most cases because it provides more features, like + /// reporting errors to the client and supporting transaction control. TCP can + /// sometimes be faster in higher-latency networks, but misses a number of + /// features. /// - /// E.g. `"https::addr=host:port;username=alice;password=secret;tls_ca=os_roots;"`. + /// Keys in the config string correspond to same-named methods on `SenderBuilder`. /// - /// For full list of keys and values, see the [`SenderBuilder`] documentation: - /// The builder API and the configuration string API are equivalent. + /// For the full list of keys and values, see the docs on [`SenderBuilder`]. /// - /// If you prefer, you can also load the configuration from an environment variable. + /// You can also load the configuration from an environment variable. /// See [`Sender::from_env`]. + /// + /// In the case of TCP, this synchronously establishes the TCP connection, and + /// returns once the connection is fully established. If the connection + /// requires authentication or TLS, these will also be completed before + /// returning. pub fn from_conf>(conf: T) -> Result { SenderBuilder::from_conf(conf)?.build() } - /// Create a new `Sender` from the `QDB_CLIENT_CONF` environment variable. - /// The format is the same as that taken by [`Sender::from_conf`]. + /// Create a new `Sender` from the configuration stored in the `QDB_CLIENT_CONF` + /// environment variable. The format is the same as that accepted by + /// [`Sender::from_conf`]. + /// + /// In the case of TCP, this synchronously establishes the TCP connection, and + /// returns once the connection is fully established. If the connection + /// requires authentication or TLS, these will also be completed before + /// returning. pub fn from_env() -> Result { SenderBuilder::from_env()?.build() } @@ -2736,56 +2603,69 @@ impl Sender { Ok(()) } - /// Variant of `.flush()` that does not clear the buffer and allows for - /// transactional flushes. + /// Send the batch of rows in the buffer to the QuestDB server, and, if the + /// `transactional` parameter is true, ensure the flush will be transactional. /// - /// A transactional flush is simply a flush that ensures that all rows in - /// the ILP buffer refer to the same table, thus allowing the server to - /// treat the flush request as a single transaction. + /// A flush is transactional iff all the rows belong to the same table. This allows + /// QuestDB to treat the flush as a single database transaction, because it doesn't + /// support transactions spanning multiple tables. Additionally, only ILP-over-HTTP + /// supports transactional flushes. /// - /// This is because QuestDB does not support transactions spanning multiple - /// tables. + /// If the flush wouldn't be transactional, this function returns an error and + /// doesn't flush any data. /// - /// Note that transactional flushes are only supported for ILP over HTTP. + /// The function sends an HTTP request and waits for the response. If the server + /// responds with an error, it returns a descriptive error. In the case of a network + /// error, it retries until it has exhausted the retry time budget. + /// + /// All the data stays in the buffer. Clear the buffer before starting a new batch. #[cfg(feature = "ilp-over-http")] pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> { self.flush_impl(buf, transactional) } - /// Variant of `.flush()` that does not clear the buffer. + /// Send the given buffer of rows to the QuestDB server. + /// + /// All the data stays in the buffer. Clear the buffer before starting a new batch. + /// + /// To send and clear in one step, call [Sender::flush] instead. pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> { self.flush_impl(buf, false) } - /// Send buffer to the QuestDB server, clearing the buffer. + /// Send the given buffer of rows to the QuestDB server, clearing the buffer. + /// + /// After this function returns, the buffer is empty and ready for the next batch. + /// If you want to preserve the buffer contents, call [Sender::flush_and_keep]. If + /// you want to ensure the flush is transactional, call + /// [Sender::flush_and_keep_with_flags]. /// - /// If sending over HTTP, flushing will send an HTTP request and wait - /// for the response. If the server responds with an error, this function - /// will return a descriptive error. In case of network errors, - /// this function will retry. + /// With ILP-over-HTTP, this function sends an HTTP request and waits for the + /// response. If the server responds with an error, it returns a descriptive error. + /// In the case of a network error, it retries until it has exhausted the retry time + /// budget. /// - /// If sending over TCP, this will block until the buffer is flushed to the - /// network socket. Note that this does not guarantee that the buffer will - /// be sent to the server or that the server has received it. - /// In case of errors the server will disconnect: consult the server logs. + /// With ILP-over-TCP, the function blocks only until the buffer is flushed to the + /// underlying OS-level network socket, without waiting to actually send it to the + /// server. In the case of an error, the server will quietly disconnect: consult the + /// server logs for error messages. /// - /// Prefer HTTP in most cases, but use TCP if you need to continuously + /// HTTP should be the first choice, but use TCP if you need to continuously send /// data to the server at a high rate. /// - /// To improve HTTP performance, send larger buffers (with more rows), - /// and consider parallelizing writes using multiple senders from multiple - /// threads. + /// To improve the HTTP performance, send larger buffers (with more rows), and + /// consider parallelizing writes using multiple senders from multiple threads. pub fn flush(&mut self, buf: &mut Buffer) -> Result<()> { self.flush_impl(buf, false)?; buf.clear(); Ok(()) } - /// The sender is no longer usable and must be dropped. + /// Tell whether the sender is no longer usable and must be dropped. /// - /// This is caused if there was an earlier failure. + /// This happens when there was an earlier failure. /// - /// This method is specific to ILP/TCP and is not relevant for ILP/HTTP. + /// This method is specific to ILP-over-TCP and is not relevant for ILP-over-HTTP. pub fn must_close(&self) -> bool { !self.connected }