From cfbd06bd5937dc894c0190b4112a745506b17e5d Mon Sep 17 00:00:00 2001 From: Zifeng Yu Date: Wed, 31 Jul 2024 22:45:01 +0800 Subject: [PATCH] add Timeplus APIs --- .clang-format | 3 +- .github/workflows/timeplus_cpp_ci.yml | 11 +- CMakeLists.txt | 12 +- README.md | 2 +- examples/CMakeLists.txt | 3 + examples/insert_examples.cpp | 58 ++++++ tests/insert-async/CMakeLists.txt | 11 + tests/insert-async/main.cpp | 122 +++++++++++ tests/insert/CMakeLists.txt | 11 + tests/insert/main.cpp | 85 ++++++++ timeplus/CMakeLists.txt | 11 + timeplus/block.h | 2 + timeplus/blocking_queue.h | 244 ++++++++++++++++++++++ timeplus/client.cpp | 46 ++-- timeplus/client.h | 5 +- timeplus/client_pool.cpp | 44 ++++ timeplus/client_pool.h | 81 ++++++++ timeplus/columns/numeric.h | 1 + timeplus/exceptions.h | 8 + timeplus/macros.h | 12 ++ timeplus/query.cpp | 6 +- timeplus/query.h | 3 +- timeplus/timeplus.cpp | 198 ++++++++++++++++++ timeplus/timeplus.h | 66 ++++++ timeplus/timeplus_config.h | 27 +++ ut/CMakeLists.txt | 2 + ut/client_pool_ut.cpp | 46 ++++ ut/roundtrip_tests.cpp | 1 + ut/socket_ut.cpp | 4 + ut/timeplus_ut.cpp | 289 ++++++++++++++++++++++++++ 30 files changed, 1383 insertions(+), 31 deletions(-) create mode 100644 examples/insert_examples.cpp create mode 100644 tests/insert-async/CMakeLists.txt create mode 100644 tests/insert-async/main.cpp create mode 100644 tests/insert/CMakeLists.txt create mode 100644 tests/insert/main.cpp create mode 100644 timeplus/blocking_queue.h create mode 100644 timeplus/client_pool.cpp create mode 100644 timeplus/client_pool.h create mode 100644 timeplus/macros.h create mode 100644 timeplus/timeplus.cpp create mode 100644 timeplus/timeplus.h create mode 100644 timeplus/timeplus_config.h create mode 100644 ut/client_pool_ut.cpp create mode 100644 ut/timeplus_ut.cpp diff --git a/.clang-format b/.clang-format index 3f5f374..1d9337d 100644 --- a/.clang-format +++ b/.clang-format @@ -2,10 +2,11 @@ Language: Cpp BasedOnStyle: Google AccessModifierOffset: -4 -AlignConsecutiveAssignments: true +AlignConsecutiveAssignments: false AllowShortFunctionsOnASingleLine: InlineOnly ColumnLimit: 140 DerivePointerAlignment: false FixNamespaceComments: true +IncludeBlocks: Preserve IndentWidth: 4 PointerAlignment: Left diff --git a/.github/workflows/timeplus_cpp_ci.yml b/.github/workflows/timeplus_cpp_ci.yml index 8666944..0313650 100644 --- a/.github/workflows/timeplus_cpp_ci.yml +++ b/.github/workflows/timeplus_cpp_ci.yml @@ -21,10 +21,10 @@ jobs: fail-fast: false matrix: os: [ubuntu-20.04] - compiler: [clang-10, clang-18, gcc-7, gcc-8, gcc-9] + compiler: [clang-10, clang-18, gcc-8, gcc-9] ssl: [ssl_ON, ssl_OFF] dependencies: [dependencies_BUILT_IN] - timeplusd: [2.3.22] + timeplusd: [2.3.23] include: - compiler: clang-10 @@ -37,11 +37,6 @@ jobs: C_COMPILER: clang-18 CXX_COMPILER: clang++-18 - - compiler: gcc-7 - COMPILER_INSTALL: gcc-7 g++-7 - C_COMPILER: gcc-7 - CXX_COMPILER: g++-7 - - compiler: gcc-8 COMPILER_INSTALL: gcc-8 g++-8 C_COMPILER: gcc-8 @@ -58,7 +53,7 @@ jobs: - dependencies: dependencies_SYSTEM compiler: compiler_SYSTEM os: ubuntu-22.04 - timeplusd: 2.3.22 + timeplusd: 2.3.23 COMPILER_INSTALL: gcc g++ C_COMPILER: gcc CXX_COMPILER: g++ diff --git a/CMakeLists.txt b/CMakeLists.txt index dc44ac6..bc4ff5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,16 +16,24 @@ OPTION (WITH_SYSTEM_ABSEIL "Use system ABSEIL" OFF) OPTION (WITH_SYSTEM_LZ4 "Use system LZ4" OFF) OPTION (WITH_SYSTEM_CITYHASH "Use system cityhash" OFF) OPTION (WITH_SYSTEM_ZSTD "Use system ZSTD" OFF) -OPTION (DEBUG_DEPENDENCIES "Print debug info about dependencies duting build" ON) +OPTION (DEBUG_DEPENDENCIES "Print debug info about dependencies during build" ON) OPTION (CHECK_VERSION "Check that version number corresponds to git tag, usefull in CI/CD to validate that new version published on GitHub has same version in sources" OFF) OPTION (BUILD_GTEST "Build Google Test" OFF) OPTION (ENABLE_GEOMETRIC_TEST "Enable geometric test" OFF) +OPTION (ENABLE_TRACE_TIMEPLUS_CPP "Enable tracing log" OFF) PROJECT (TIMEPLUS-CLIENT VERSION "${TIMEPLUS_CPP_VERSION}" DESCRIPTION "Timeplus C++ client library" ) +set(CMAKE_EXPORT_COMPILE_COMMANDS 1) + +IF (ENABLE_TRACE_TIMEPLUS_CPP) + ADD_DEFINITIONS(-DTRACE_TIMEPLUS_CPP) +ENDIF() + + USE_CXX17 () USE_OPENSSL () @@ -127,6 +135,8 @@ IF (BUILD_TESTS) INCLUDE_DIRECTORIES (contrib/gtest/include contrib/gtest) SUBDIRS ( tests/simple + tests/insert + tests/insert-async ) ENDIF (BUILD_TESTS) diff --git a/README.md b/README.md index 0041eb1..008ad80 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Optional dependencies: ```sh $ mkdir build . $ cd build -$ cmake -D CMAKE_BUILD_TYPE=Release .. +$ cmake -D CMAKE_BUILD_TYPE=Release -D BUILD_TESTS=ON -D BUILD_EXAMPLES=ON -D BUILD_GTEST=ON .. $ make ``` diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index e8044d9..534c52c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -3,3 +3,6 @@ add_executable(timeplus-client main.cpp) target_link_libraries(timeplus-client PRIVATE timeplus-cpp-lib) + +add_executable(insert-examples insert_examples.cpp) +target_link_libraries(insert-examples PRIVATE timeplus-cpp-lib) diff --git a/examples/insert_examples.cpp b/examples/insert_examples.cpp new file mode 100644 index 0000000..d01ebe9 --- /dev/null +++ b/examples/insert_examples.cpp @@ -0,0 +1,58 @@ +#include + +#include + +using namespace timeplus; + +/// Stream to insert is created with DDL: +/// `CREATE STREAM insert_examples(i uint64, v string)` +const std::string TABLE_NAME = "insert_examples"; + +int main() { + TimeplusConfig config; + config.client_options.endpoints.push_back({"localhost", 8463}); + config.max_connections = 3; + config.max_retries = 10; + config.wait_time_before_retry_ms = 1000; + config.task_executors = 1; + + Timeplus tp{std::move(config)}; + + auto block = std::make_shared(); + + auto col_i = std::make_shared(); + col_i->Append(5); + col_i->Append(7); + block->AppendColumn("i", col_i); + + auto col_v = std::make_shared(); + col_v->Append("five"); + col_v->Append("seven"); + block->AppendColumn("v", col_v); + + /// Use synchronous insert API. + auto insert_result = tp.Insert(TABLE_NAME, block, /*idempotent_id=*/"block-1"); + if (insert_result.ok()) { + std::cout << "Synchronous insert suceeded." << std::endl; + } else { + std::cout << "Synchronous insert failed: code=" << insert_result.err_code << " msg=" << insert_result.err_msg << std::endl; + } + + /// Use asynchrounous insert API. + std::atomic done = false; + tp.InsertAsync(TABLE_NAME, block, /*idempotent_id=*/"block-2", [&done](const BaseResult& result) { + const auto& async_insert_result = static_cast(result); + if (async_insert_result.ok()) { + std::cout << "Asynchronous insert suceeded." << std::endl; + } else { + std::cout << "Asynchronous insert failed: code=" << async_insert_result.err_code << " msg=" << async_insert_result.err_msg + << std::endl; + } + done = true; + }); + + while (!done) { + } + + return 0; +} diff --git a/tests/insert-async/CMakeLists.txt b/tests/insert-async/CMakeLists.txt new file mode 100644 index 0000000..b7654a1 --- /dev/null +++ b/tests/insert-async/CMakeLists.txt @@ -0,0 +1,11 @@ +ADD_EXECUTABLE (insert-async-test + main.cpp +) + +TARGET_LINK_LIBRARIES (insert-async-test + timeplus-cpp-lib +) + +IF (MSVC) + TARGET_LINK_LIBRARIES (insert-async-test Crypt32) +ENDIF() diff --git a/tests/insert-async/main.cpp b/tests/insert-async/main.cpp new file mode 100644 index 0000000..eb23943 --- /dev/null +++ b/tests/insert-async/main.cpp @@ -0,0 +1,122 @@ +#include +#include + +#include +#include +#include +#include +#include + +using namespace timeplus; + +const size_t INSERT_BLOCKS = 100'000; +const size_t BLOCKS_PER_BATCH = 1000; + +const std::vector> HOST_PORTS = { + /// Single instance + {"localhost", 8463}, + /// Cluster nodes + {"localhost", 18463}, + {"localhost", 28463}, + {"localhost", 38463}, +}; + +void prepareTable() { + ClientOptions options; + for (const auto& [host, port] : HOST_PORTS) { + options.endpoints.push_back({host, port}); + } + + Client client{options}; + client.Execute("DROP STREAM IF EXISTS insert_async_test;"); + client.Execute("CREATE STREAM insert_async_test (i uint64, s string);"); +} + +auto timestamp() { + auto now = std::chrono::system_clock::now(); + std::time_t now_time = std::chrono::system_clock::to_time_t(now); + std::tm* local_time = std::localtime(&now_time); + return std::put_time(local_time, "%Y-%m-%d %H:%M:%S"); +} + +int main() { + prepareTable(); + + TimeplusConfig config; + for (const auto& [host, port] : HOST_PORTS) { + config.client_options.endpoints.push_back({host, port}); + } + config.max_connections = 4; + config.max_retries = 5; + config.task_executors = 4; + config.task_queue_capacity = BLOCKS_PER_BATCH; /// use large input queue to avoid deadlock on retry failure + + Timeplus tp{std::move(config)}; + + auto block = std::make_shared(); + auto col_i = std::make_shared(std::vector{5, 7, 4, 8}); + auto col_s = std::make_shared( + std::vector{"Before my bed, the moon is bright,", "I think that it is frost on the ground.", + "I raise my head to gaze at the bright moon,", "And lower it to think of my hometown."}); + block->AppendColumn("i", col_i); + block->AppendColumn("s", col_s); + + /// Queue to store failed inserts which need to be resent. + BlockingQueue> insert_failure(BLOCKS_PER_BATCH); + std::atomic insert_success_count{0}; + + auto handle_insert_result = [&insert_failure, &insert_success_count](size_t block_id, const InsertResult& result) { + if (result.ok()) { + insert_success_count.fetch_add(1); + } else { + std::cout << "[" << timestamp() << "]\t Failed to insert block: insert_id=" << block_id << " err=" << result.err_msg + << std::endl; + insert_failure.emplace(block_id, result.block); + } + }; + + auto async_insert_block = [&tp, &handle_insert_result](size_t block_id, BlockPtr block) { + tp.InsertAsync(/*table_name=*/"insert_async_test", block, [block_id, &handle_insert_result](const BaseResult& result) { + const auto& insert_result = static_cast(result); + handle_insert_result(block_id, insert_result); + }); + }; + + auto start_time = std::chrono::high_resolution_clock::now(); + auto last_time = start_time; + for (size_t batch = 0; batch < INSERT_BLOCKS / BLOCKS_PER_BATCH; ++batch) { + insert_success_count = 0; + /// Insert blocks asynchronously. + for (size_t i = 0; i < BLOCKS_PER_BATCH; ++i) { + async_insert_block(i, block); + } + + /// Wait for all blocks of the batch are inserted. + while (insert_success_count.load() != BLOCKS_PER_BATCH) { + if (!insert_failure.empty()) { + /// Re-insert the failed blocks + auto blocks = insert_failure.drain(); + for (auto &[i, b] : blocks) { + async_insert_block(i, b); + } + } + + std::this_thread::yield(); + } + + /// Print insert statistics of the batch. + auto current_time = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = current_time - last_time; + last_time = current_time; + std::cout << "[" << timestamp() << "]\t" << (batch + 1) * BLOCKS_PER_BATCH << " blocks inserted\telapsed = " << elapsed.count() + << " sec\teps = " << static_cast(BLOCKS_PER_BATCH * block->GetRowCount()) / elapsed.count() << std::endl; + } + + /// Print summary. + auto current_time = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = current_time - start_time; + std::cout << "\nInsert Done. Total Events = " << INSERT_BLOCKS * block->GetRowCount() << " Total Time = " << elapsed.count() << " sec" + << std::endl; + + return 0; +} diff --git a/tests/insert/CMakeLists.txt b/tests/insert/CMakeLists.txt new file mode 100644 index 0000000..65be024 --- /dev/null +++ b/tests/insert/CMakeLists.txt @@ -0,0 +1,11 @@ +ADD_EXECUTABLE (insert-test + main.cpp +) + +TARGET_LINK_LIBRARIES (insert-test + timeplus-cpp-lib +) + +IF (MSVC) + TARGET_LINK_LIBRARIES (insert-test Crypt32) +ENDIF() diff --git a/tests/insert/main.cpp b/tests/insert/main.cpp new file mode 100644 index 0000000..d151fde --- /dev/null +++ b/tests/insert/main.cpp @@ -0,0 +1,85 @@ +#include + +#include +#include +#include + +using namespace timeplus; + +const size_t INSERT_BLOCKS = 100'000; + +const std::vector> HOST_PORTS = { + /// Single instance + {"localhost", 8463}, + /// Cluster nodes + {"localhost", 18463}, + {"localhost", 28463}, + {"localhost", 38463}, +}; + +void prepareTable() { + ClientOptions options; + for (const auto& [host, port] : HOST_PORTS) { + options.endpoints.push_back({host, port}); + } + + Client client{options}; + client.Execute("DROP STREAM IF EXISTS insert_test;"); + client.Execute("CREATE STREAM insert_test (i uint64, s string);"); +} + +auto timestamp() { + auto now = std::chrono::system_clock::now(); + std::time_t now_time = std::chrono::system_clock::to_time_t(now); + std::tm* local_time = std::localtime(&now_time); + return std::put_time(local_time, "%Y-%m-%d %H:%M:%S"); +} + +int main() { + prepareTable(); + + TimeplusConfig config; + for (const auto& [host, port] : HOST_PORTS) { + config.client_options.endpoints.push_back({host, port}); + } + config.max_connections = 1; + config.max_retries = 5; + config.task_executors = 0; + Timeplus tp{std::move(config)}; + + auto block = std::make_shared(); + auto col_i = std::make_shared(std::vector{5, 7, 4, 8}); + auto col_s = std::make_shared( + std::vector{"Before my bed, the moon is bright,", "I think that it is frost on the ground.", + "I raise my head to gaze at the bright moon,", "And lower it to think of my hometown."}); + block->AppendColumn("i", col_i); + block->AppendColumn("s", col_s); + + auto start_time = std::chrono::high_resolution_clock::now(); + auto last_time = start_time; + for (size_t i = 1; i <= INSERT_BLOCKS; ++i) { + while (true) { + try { + tp.Insert("insert_test", block); + break; + } catch (const std::exception& ex) { + std::cout << timestamp() << "\t Failed to insert block " << i << " : " << ex.what() << std::endl; + } + } + + if (i % 1000 == 0) { + auto current_time = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = current_time - last_time; + last_time = current_time; + std::cout << "[" << timestamp() << "]\t" << i << " blocks inserted\telapsed = " << elapsed.count() + << " sec\teps = " << 1000.0 * block->GetRowCount() / elapsed.count() << std::endl; + } + } + + auto current_time = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = current_time - start_time; + std::cout << "\nInsert Done. Total Events = " << INSERT_BLOCKS * block->GetRowCount() << " Total Time = " << elapsed.count() << " sec" + << std::endl; + + return 0; +} diff --git a/timeplus/CMakeLists.txt b/timeplus/CMakeLists.txt index 67c24b8..fa5a608 100644 --- a/timeplus/CMakeLists.txt +++ b/timeplus/CMakeLists.txt @@ -31,7 +31,9 @@ SET ( timeplus-cpp-lib-src block.cpp client.cpp + client_pool.cpp query.cpp + timeplus.cpp # Headers base/buffer.h @@ -75,12 +77,16 @@ SET ( timeplus-cpp-lib-src types/types.h block.h + blocking_queue.h client.h + client_pool.h error_codes.h exceptions.h protocol.h query.h server_exception.h + timeplus.h + timeplus_config.h ) if (MSVC) @@ -159,12 +165,16 @@ INSTALL (TARGETS timeplus-cpp-lib # general INSTALL(FILES block.h DESTINATION include/timeplus/) +INSTALL(FILES blocking_queue.h DESTINATION include/timeplus/) INSTALL(FILES client.h DESTINATION include/timeplus/) +INSTALL(FILES client_pool.h DESTINATION include/timeplus/) INSTALL(FILES error_codes.h DESTINATION include/timeplus/) INSTALL(FILES exceptions.h DESTINATION include/timeplus/) INSTALL(FILES server_exception.h DESTINATION include/timeplus/) INSTALL(FILES protocol.h DESTINATION include/timeplus/) INSTALL(FILES query.h DESTINATION include/timeplus/) +INSTALL(FILES timeplus.h DESTINATION include/timeplus/) +INSTALL(FILES timeplus_config.h DESTINATION include/timeplus/) INSTALL(FILES version.h DESTINATION include/timeplus/) # base @@ -175,6 +185,7 @@ INSTALL(FILES base/open_telemetry.h DESTINATION include/timeplus/base/) INSTALL(FILES base/output.h DESTINATION include/timeplus/base/) INSTALL(FILES base/platform.h DESTINATION include/timeplus/base/) INSTALL(FILES base/projected_iterator.h DESTINATION include/timeplus/base/) +INSTALL(FILES base/scope_guard.h DESTINATION include/timeplus/base/) INSTALL(FILES base/singleton.h DESTINATION include/timeplus/base/) INSTALL(FILES base/socket.h DESTINATION include/timeplus/base/) INSTALL(FILES base/string_utils.h DESTINATION include/timeplus/base/) diff --git a/timeplus/block.h b/timeplus/block.h index 650376e..ea1858d 100644 --- a/timeplus/block.h +++ b/timeplus/block.h @@ -105,4 +105,6 @@ class Block { size_t rows_; }; +using BlockPtr = std::shared_ptr; + } diff --git a/timeplus/blocking_queue.h b/timeplus/blocking_queue.h new file mode 100644 index 0000000..c85c8c4 --- /dev/null +++ b/timeplus/blocking_queue.h @@ -0,0 +1,244 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace timeplus { +/// A naive blocking queue implementation. +/// Revisit this + +template > +class BlockingQueue { +public: + explicit BlockingQueue(size_t max_size_) : max_size(max_size_) {} + + /// add will be blocked if queue is full + void add(const T& v) { + { + std::unique_lock lock{mu}; + if (queue.size() >= max_size) cv.wait(lock, [this] { return queue.size() < max_size; }); + + queue.push_back(v); + assert(queue.size() <= max_size); + } + + /// Notify front() we have value + cv.notify_one(); + } + + void add(T&& v) { + { + std::unique_lock lock{mu}; + + if (queue.size() >= max_size) cv.wait(lock, [this] { return queue.size() < max_size; }); + + queue.push_back(std::move(v)); + assert(queue.size() <= max_size); + } + + /// Notify one waiting thread we have one item to consume + cv.notify_one(); + } + + /// \return current size after add and if passed value is added + std::pair tryAdd(T&& v) { + size_t siz = 0; + { + std::unique_lock lock{mu}; + + if (queue.size() == max_size) return {max_size, false}; + + queue.push_back(std::move(v)); + assert(queue.size() <= max_size); + siz = queue.size(); + } + + /// Notify one waiting thread we have one item to consume + cv.notify_one(); + + return {siz, true}; + } + + bool add(T&& v, int64_t timeout_ms) { + { + std::unique_lock lock{mu}; + + if (queue.size() >= max_size) { + auto status = cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] { return queue.size() < max_size; }); + if (!status) return false; + } + + queue.push_back(std::move(v)); + assert(queue.size() <= max_size); + } + + /// Notify one waiting thread we have one item to consume + cv.notify_one(); + + return true; + } + + template + void emplace(Args&&... args) { + { + std::unique_lock lock{mu}; + + if (queue.size() >= max_size) cv.wait(lock, [this] { return queue.size() < max_size; }); + + queue.emplace_back(std::forward(args)...); + assert(queue.size() <= max_size); + } + + /// Notify one waiting thread we have one item to consume + cv.notify_one(); + } + + template + bool tryEmplace(Args&&... args) { + { + std::unique_lock lock{mu}; + + if (queue.size() == max_size) return false; + + queue.emplace_back(std::forward(args)...); + assert(queue.size() <= max_size); + } + + /// Notify one waiting thread we have one item to consume + cv.notify_one(); + + return true; + } + + template + bool tryEmplace(int64_t timeout_ms, Args&&... args) { + { + std::unique_lock lock{mu}; + + if (queue.size() >= max_size) { + auto status = cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] { return queue.size() < max_size; }); + if (!status) return false; + } + + queue.emplace_back(std::forward(args)...); + assert(queue.size() <= max_size); + } + + /// Notify one waiting thread we have one item to consume + cv.notify_one(); + + return true; + } + + /// get and pop front + T take() { + std::unique_lock lock{mu}; + + if (queue.empty()) cv.wait(lock, [this] { return !queue.empty(); }); + + assert(!queue.empty()); + T t{std::move(queue.front())}; + queue.pop_front(); + + /// Manually unlocking is done before notifying to avoid waking up + /// the waiting thread only to block again + lock.unlock(); + + /// Notify push/emplace, there is empty slot + cv.notify_one(); + + return t; + } + + /// get and pop front if not timeout + /// return empty if timeout + std::optional take(int64_t timeout_ms) { + std::unique_lock lock{mu}; + + if (queue.empty()) { + auto status = cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] { return !queue.empty(); }); + if (!status) return {}; + } + + assert(!queue.empty()); + T t{std::move(queue.front())}; + queue.pop_front(); + + /// Manually unlocking is done before notifying to avoid waking up + /// the waiting thread only to block again + lock.unlock(); + + /// Notify push/emplace, there is empty slot + cv.notify_one(); + + return t; + } + + std::optional tryTake() { + std::unique_lock lock{mu}; + + if (queue.empty()) return {}; + + T t{std::move(queue.front())}; + queue.pop_front(); + + /// Manually unlocking is done before notifying to avoid waking up + /// the waiting thread only to block again + lock.unlock(); + + /// Notify push/emplace, there is empty slot + cv.notify_one(); + + return t; + } + + /// Get front. If queue is empty, wait forever for one + const T& peek() const { + std::unique_lock lock{mu}; + + if (queue.empty()) cv.wait(lock, [this] { return !queue.empty(); }); + + assert(!queue.empty()); + return queue.front(); + } + + Queue drain() { + Queue r; + { + std::unique_lock lock{mu}; + /// When queue is empty, we don't want to steal its + /// underlying allocated memory. + if (queue.empty()) return {}; + + r.swap(queue); + assert(queue.empty()); + } + cv.notify_all(); + + return r; + } + + size_t size() const { + std::unique_lock lock{mu}; + return queue.size(); + } + + bool empty() const { + std::unique_lock lock{mu}; + return queue.empty(); + } + +private: + const size_t max_size; + + std::condition_variable cv; + mutable std::mutex mu; + Queue queue; +}; + +template +using BlockingQueuePtr = std::shared_ptr>; +} // namespace timeplus diff --git a/timeplus/client.cpp b/timeplus/client.cpp index 8a2085a..188143e 100644 --- a/timeplus/client.cpp +++ b/timeplus/client.cpp @@ -7,8 +7,8 @@ #include "base/wire_format.h" #include "columns/factory.h" +#include "timeplus/macros.h" -#include #include #include #include @@ -153,7 +153,7 @@ class Client::Impl { void SendCancel(); - void Insert(const std::string& table_name, const std::string& query_id, const Block& block); + void Insert(const std::string& table_name, const std::string& query_id, const Block& block, const std::string& idempotent_id); void Ping(); @@ -306,23 +306,41 @@ std::string NameToQueryString(const std::string &input) return output; } -void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) { +void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block, + const std::string& idempotent_id) { if (options_.ping_before_query) { RetryGuard([this]() { Ping(); }); } - std::stringstream fields_section; - const auto num_columns = block.GetColumnCount(); + constexpr std::string_view INSERT_INTO = "INSERT INTO "; + constexpr std::string_view LEFT_PAREN = " ( "; + constexpr std::string_view RIGHT_PAREN_VALUES = " ) VALUES"; - for (unsigned int i = 0; i < num_columns; ++i) { + const size_t num_columns = block.GetColumnCount(); + size_t fields_section_size = num_columns - 1; /// commas between column names + for (size_t i = 0; i < num_columns; ++i) { + fields_section_size += block.GetColumnName(i).size() + 2; + } + size_t query_text_size = INSERT_INTO.size() + table_name.size() + LEFT_PAREN.size() + fields_section_size + RIGHT_PAREN_VALUES.size(); + + std::string query_text; + query_text.reserve(query_text_size + query_text_size / 5); /// pre-allocate with query_text_size * 1.2 + query_text.append(INSERT_INTO).append(table_name).append(LEFT_PAREN); + for (size_t i = 0; i < num_columns; ++i) { if (i == num_columns - 1) { - fields_section << NameToQueryString(block.GetColumnName(i)); + query_text.append(NameToQueryString(block.GetColumnName(i))); } else { - fields_section << NameToQueryString(block.GetColumnName(i)) << ","; + query_text.append(NameToQueryString(block.GetColumnName(i))).append(","); } } + query_text.append(RIGHT_PAREN_VALUES); + + Query query(std::move(query_text), query_id); + + if (!idempotent_id.empty()) { + query.SetSetting("idempotent_id", {idempotent_id}); + } - Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id); SendQuery(query); uint64_t server_packet; @@ -376,8 +394,10 @@ void Client::Impl::ResetConnection() { InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value())); if (!Handshake()) { + TRACE("client reset connection: host=%s port=%d res=fail", current_endpoint_->host.c_str(), current_endpoint_->port); throw ProtocolError("fail to connect to " + options_.host); } + TRACE("client reset connection: host=%s port=%d res=success", current_endpoint_->host.c_str(), current_endpoint_->port); } void Client::Impl::ResetConnectionEndpoint() { @@ -1038,12 +1058,12 @@ void Client::Select(const Query& query) { Execute(query); } -void Client::Insert(const std::string& table_name, const Block& block) { - impl_->Insert(table_name, Query::default_query_id, block); +void Client::Insert(const std::string& table_name, const Block& block, const std::string& idempotent_id) { + impl_->Insert(table_name, Query::default_query_id, block, idempotent_id); } -void Client::Insert(const std::string& table_name, const std::string& query_id, const Block& block) { - impl_->Insert(table_name, query_id, block); +void Client::Insert(const std::string& table_name, const std::string& query_id, const Block& block, const std::string& idempotent_id) { + impl_->Insert(table_name, query_id, block, idempotent_id); } void Client::Ping() { diff --git a/timeplus/client.h b/timeplus/client.h index 322f170..eab146e 100644 --- a/timeplus/client.h +++ b/timeplus/client.h @@ -252,8 +252,9 @@ class Client { void Select(const Query& query); /// Intends for insert block of data into a table \p table_name. - void Insert(const std::string& table_name, const Block& block); - void Insert(const std::string& table_name, const std::string& query_id, const Block& block); + /// Insertion will be idempotent when `idempotent_id` is not empty. + void Insert(const std::string& table_name, const Block& block, const std::string & idempotent_id = ""); + void Insert(const std::string& table_name, const std::string & query_id, const Block& block, const std::string & idempotent_id = ""); /// Ping server for aliveness. void Ping(); diff --git a/timeplus/client_pool.cpp b/timeplus/client_pool.cpp new file mode 100644 index 0000000..ec0649e --- /dev/null +++ b/timeplus/client_pool.cpp @@ -0,0 +1,44 @@ +#include + +namespace timeplus { + +ClientPool::ClientPtr ClientPool::Acquire(int64_t timeout_ms) { + auto maybe_client = clients_.take(timeout_ms); + if (!maybe_client.has_value()) { + throw TimeoutError("Can not acquire client before timeout"); + } + + auto [client, valid] = std::move(maybe_client).value(); + try { + /// Lazy init client + if (client == nullptr) { + client = std::make_unique(client_options_); + valid = true; + } + + if (!valid) { + client->ResetConnectionEndpoint(); + valid = true; + } + } catch (const std::exception& ex) { + /// Client can not connect to server. + Release(std::move(client), false); + throw; + } + + return std::move(client); +} + +void ClientPool::GuardedClient::TestConnection() noexcept { + try { + client->Ping(); + valid = true; + } catch (...) { + valid = false; + } + + TRACE("test connection: host=%s port=%d valid=%s", client->GetCurrentEndpoint()->host.c_str(), client->GetCurrentEndpoint()->port, + valid ? "true" : "false"); +} + +} // namespace timeplus diff --git a/timeplus/client_pool.h b/timeplus/client_pool.h new file mode 100644 index 0000000..d687dbe --- /dev/null +++ b/timeplus/client_pool.h @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include + +#include + +namespace timeplus { + +/// `ClientPool` is a blocking queue to store idle clients and their validness. An invalid client need reconnect before used for other +/// operations. +class ClientPool { +public: + using ClientPtr = std::unique_ptr; + using Clients = BlockingQueue>; + + ClientPool(ClientOptions client_options, size_t pool_size) + : client_options_(std::move(client_options)), pool_size_(pool_size), clients_(pool_size_) { + for (size_t i = 0; i < pool_size_; ++i) { + clients_.emplace(nullptr, false); + } + } + + ~ClientPool() { assert(clients_.size() == pool_size_); } + + ClientPtr Acquire(int64_t timeout_ms); + + void Release(ClientPtr client, bool valid) { clients_.emplace(std::move(client), valid); } + + /// Mostly RAII to return client and its validness to pool on destruction. + class GuardedClient { + public: + GuardedClient() = default; + GuardedClient(ClientPool* pool, ClientPtr client, bool valid) : client(std::move(client)), valid(valid), pool_(pool) {} + + ~GuardedClient() { + if (pool_) { + pool_->Release(std::move(client), valid); + } + } + + GuardedClient(GuardedClient&& other) noexcept : client(std::move(other.client)), valid(other.valid), pool_(other.pool_) { + other.pool_ = nullptr; + } + + GuardedClient& operator=(GuardedClient&& other) noexcept { + if (this != &other) { + if (pool_ && client) { + pool_->Release(std::move(client), valid); + } + pool_ = other.pool_; + other.pool_ = nullptr; + client = std::move(other.client); + valid = other.valid; + } + return *this; + }; + + void TestConnection() noexcept; + + ClientPtr client; + bool valid; + + private: + ClientPool* pool_{nullptr}; + }; + + GuardedClient GetGuardedClient(int64_t timeout_ms) { + auto client = Acquire(timeout_ms); + return GuardedClient{this, std::move(client), true}; + } + +private: + ClientOptions client_options_; + size_t pool_size_; + Clients clients_; +}; + +} // namespace timeplus diff --git a/timeplus/columns/numeric.h b/timeplus/columns/numeric.h index a81c360..e743050 100644 --- a/timeplus/columns/numeric.h +++ b/timeplus/columns/numeric.h @@ -72,6 +72,7 @@ class ColumnVector : public Column { // using Int128 = absl::int128; using Int64 = int64_t; +using UInt64 = uint64_t; using Int128 = wide::integer<128, signed>; using UInt128 = wide::integer<128, unsigned>; diff --git a/timeplus/exceptions.h b/timeplus/exceptions.h index 0a38304..bb56edd 100644 --- a/timeplus/exceptions.h +++ b/timeplus/exceptions.h @@ -37,6 +37,14 @@ class CompressionError : public Error { using Error::Error; }; +class ConnectionError : public Error { + using Error::Error; +}; + +class TimeoutError : public Error { + using Error::Error; +}; + // Exception received from server. class ServerException : public Error { public: diff --git a/timeplus/macros.h b/timeplus/macros.h new file mode 100644 index 0000000..7ad6bc1 --- /dev/null +++ b/timeplus/macros.h @@ -0,0 +1,12 @@ +#pragma once + +#if defined(NDEBUG) || !defined(TRACE_TIMEPLUS_CPP) +#define TRACE(format, ...) +#else +#define TRACE(format, ...) \ + do { \ + printf("[trace] "); \ + printf(format, __VA_ARGS__); \ + printf("\n"); \ + } while (0) +#endif diff --git a/timeplus/query.cpp b/timeplus/query.cpp index de27702..461163a 100644 --- a/timeplus/query.cpp +++ b/timeplus/query.cpp @@ -13,9 +13,9 @@ Query::Query(const char* query, const char* query_id) { } -Query::Query(const std::string& query, const std::string& query_id) - : query_(query) - , query_id_(query_id) +Query::Query(std::string query, std::string query_id) + : query_(std::move(query)) + , query_id_(std::move(query_id)) { } diff --git a/timeplus/query.h b/timeplus/query.h index 9327acc..3f76779 100644 --- a/timeplus/query.h +++ b/timeplus/query.h @@ -87,10 +87,9 @@ class Query : public QueryEvents { public: Query(); Query(const char* query, const char* query_id = nullptr); - Query(const std::string& query, const std::string& query_id = default_query_id); + Query(std::string query, std::string query_id = default_query_id); ~Query() override; - /// inline const std::string& GetText() const { return query_; } diff --git a/timeplus/timeplus.cpp b/timeplus/timeplus.cpp new file mode 100644 index 0000000..d7e30ff --- /dev/null +++ b/timeplus/timeplus.cpp @@ -0,0 +1,198 @@ +#include + +#include +#include + +#include + +namespace timeplus { + +class Timeplus::Impl { +public: + explicit Impl(TimeplusConfig&& config) + : config_(std::move(config)), client_pool_(CreateClientOptions(), config_.max_connections), tasks_(config_.task_queue_capacity) { + /// Start executors for asynchronous tasks execution. + task_executors_.reserve(config_.task_executors); + for (size_t i = 0; i < config_.task_executors; ++i) { + task_executors_.emplace_back(&Impl::TaskExecutionFunc, this); + } + } + + ~Impl() { + { + std::lock_guard lk{tasks_mutex_}; + tasks_stopped_ = true; + } + tasks_cv_.notify_all(); + + for (auto& exec : task_executors_) { + if (exec.joinable()) { + exec.join(); + } + } + } + + InsertResult Insert(std::string table_name, BlockPtr block, std::string idempotent_id) { + InsertResult result{std::move(table_name), std::move(block), std::move(idempotent_id)}; + ExecuteWithRetries([&result](Client& client) { client.Insert(result.table_name, *result.block, result.idempotent_id); }, result); + return result; + } + + void InsertAsync(std::string&& table_name, BlockPtr&& block, std::string&& idempotent_id, Callback&& callback) { + auto task = std::make_shared(std::move(table_name), std::move(block), std::move(idempotent_id), std::move(callback)); + tasks_.add(std::move(task)); + tasks_cv_.notify_all(); + } + +private: + ClientOptions CreateClientOptions() { + ClientOptions client_options = config_.client_options; + client_options.rethrow_exceptions = true; + client_options.ping_before_query = false; + return client_options; + } + + void ExecuteWithRetries(std::function func, BaseResult& result) { + ClientPool::GuardedClient guarded_client; + for (int retries = config_.max_retries; retries >= 0; --retries) { + try { + if (guarded_client.client == nullptr) { + guarded_client = client_pool_.GetGuardedClient(config_.client_acquire_timeout_ms); + } + + if (!guarded_client.valid) { + /// Invalid client need to reconnect before using. + guarded_client.client->ResetConnectionEndpoint(); + guarded_client.valid = true; + } + + func(*guarded_client.client); + + return; + } catch (const ProtocolError& ex) { + /// Client failed to communicate with server. + guarded_client.TestConnection(); + result.err_code = ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER; + result.err_msg = ex.what(); + } catch (const ServerError& ex) { + guarded_client.TestConnection(); + result.err_code = ex.GetCode(); + result.err_msg = ex.what(); + switch (ex.GetCode()) { + case ErrorCodes::TIMEOUT_EXCEEDED: + case ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT: + break; + default: + /// Non-retriable exceptions returned from server + return; + } + } catch (const std::system_error& ex) { + /// Client can not connect to server. + guarded_client.valid = false; + result.err_code = ErrorCodes::NETWORK_ERROR; + result.err_msg = std::string("Failed to send request to server: ") + ex.what(); + } catch (const std::exception& ex) { + /// Other non-retriabl exceptions + guarded_client.TestConnection(); + result.err_code = ErrorCodes::UNKNOWN_EXCEPTION; + result.err_msg = ex.what(); + return; + } + TRACE("execution failed: code=%d msg=%s remaining_retries=%d", result.err_code, result.err_msg.c_str(), retries); + /// Sleep before next retry. + std::this_thread::sleep_for(std::chrono::milliseconds(config_.wait_time_before_retry_ms)); + } + } + + void TaskExecutionFunc() { + while (true) { + { + std::unique_lock lk{tasks_mutex_}; + tasks_cv_.wait(lk, [this] { return !tasks_.empty() || tasks_stopped_; }); + if (tasks_stopped_) { + return; + } + } + + auto maybe_task = tasks_.take(1000); + if (!maybe_task.has_value()) { + continue; + } + + auto& task = maybe_task.value(); + switch (task->type()) { + case TaskType::Insert: { + auto insert_task = std::static_pointer_cast(task); + auto result = + Insert(std::move(insert_task->table_name), std::move(insert_task->block), std::move(insert_task->idempotent_id)); + insert_task->callback(result); + break; + } + } + } + } + + TimeplusConfig config_; + ClientPool client_pool_; + + /** + *Asynchronous task members + */ + + enum class TaskType { + Insert, + }; + + struct Task { + explicit Task(Callback callback) : callback(std::move(callback)) {} + virtual ~Task() = default; + + virtual TaskType type() const = 0; + Callback callback; + }; + + struct InsertTask : public Task { + InsertTask(std::string table_name, BlockPtr block, std::string idempotent_id, Callback callback) + : Task(std::move(callback)), + table_name(std::move(table_name)), + block(std::move(block)), + idempotent_id(std::move(idempotent_id)) {} + + TaskType type() const override { return TaskType::Insert; } + + std::string table_name; + BlockPtr block; + std::string idempotent_id; + }; + + BlockingQueue> tasks_; + std::vector task_executors_; + std::atomic next_task_id_{0}; + + mutable std::mutex tasks_mutex_; + std::condition_variable tasks_cv_; + bool tasks_stopped_{false}; +}; + +Timeplus::Timeplus(TimeplusConfig config) : impl_(std::make_unique(std::move(config))) { +} + +Timeplus::~Timeplus() = default; + +InsertResult Timeplus::Insert(std::string table_name, BlockPtr block) { + return impl_->Insert(std::move(table_name), std::move(block), /*idempotent_id=*/""); +} + +InsertResult Timeplus::Insert(std::string table_name, BlockPtr block, std::string idempotent_id) { + return impl_->Insert(std::move(table_name), std::move(block), std::move(idempotent_id)); +} + +void Timeplus::InsertAsync(std::string table_name, BlockPtr block, Callback callback) noexcept { + impl_->InsertAsync(std::move(table_name), std::move(block), /*idempotent_id=*/"", std::move(callback)); +} + +void Timeplus::InsertAsync(std::string table_name, BlockPtr block, std::string idempotent_id, Callback callback) noexcept { + impl_->InsertAsync(std::move(table_name), std::move(block), std::move(idempotent_id), std::move(callback)); +} + +} // namespace timeplus diff --git a/timeplus/timeplus.h b/timeplus/timeplus.h new file mode 100644 index 0000000..996e075 --- /dev/null +++ b/timeplus/timeplus.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace timeplus { + +struct BaseResult { + virtual ~BaseResult() = default; + + bool ok() const { return err_code == ErrorCodes::OK; } + + int err_code = ErrorCodes::OK; + std::string err_msg; +}; + +struct InsertResult : public BaseResult { + InsertResult(std::string table_name, BlockPtr block, std::string idempotent_id) + : table_name(std::move(table_name)), block(std::move(block)), idempotent_id(std::move(idempotent_id)) {} + + std::string table_name; + BlockPtr block; + std::string idempotent_id; +}; + +using Callback = std::function; + +/// `Timeplus` class provides high-level thread-safe APIs for Timeplus operations. +/// Internally, the class maintains a pool of clients and their connection to the server. For every user request, +/// the idle client is acquired from pool, executes the operation and finally put back to pool. +/// +/// The class provides both synchronous and asynchronous APIs for the most operations. +/// It has an internal queue to store the asynchronou requests and starts a number of threads (configured by +/// `TimeplusConfig::task_executors`) to execute asynchronous requests. +class Timeplus { +public: + explicit Timeplus(TimeplusConfig config); + ~Timeplus(); + + /** + * Data ingestion APIs + */ + + /// `Insert` methods insert data to the stream in the synchronous way. The methods return the + /// result when block is successfully insert or excceeds max retry times. User need to check the result + /// and handle the error; usually resend the same data block again with the same idempotent id. + InsertResult Insert(std::string table_name, BlockPtr block); + InsertResult Insert(std::string table_name, BlockPtr block, std::string idempotent_id); + + /// `InsertAsync` methods insert data to the stream asynchronously. The methods store the insert data in + /// an internal queue and return the insert operation ID to the user. When the insert is completed, + /// the callback function will be invoked with the insert result to notify the user. + void InsertAsync(std::string table_name, BlockPtr block, Callback callback) noexcept; + void InsertAsync(std::string table_name, BlockPtr block, std::string idempotent_id, Callback callback) noexcept; + +private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace timeplus diff --git a/timeplus/timeplus_config.h b/timeplus/timeplus_config.h new file mode 100644 index 0000000..905b2bd --- /dev/null +++ b/timeplus/timeplus_config.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +namespace timeplus { + +struct TimeplusConfig { + /// Options used in creating internal clients. Some of the options may be overwritten for better performance. + ClientOptions client_options; + + /// Max number of connections maintained in pool. + size_t max_connections = 1; + /// Max waiting time to acquire a idle connection from pool. + int64_t client_acquire_timeout_ms = 10000; + + /// Number of retries before retriable error returned to user. + uint32_t max_retries = 5; + /// Amount of time to wait before next retry. + int64_t wait_time_before_retry_ms = 5000; + + /// Capacity of the queue to store asynchronous requests. + size_t task_queue_capacity = 128; + /// Number of threads to execute asynchronous requests. + size_t task_executors = 1; +}; + +} // namespace timeplus diff --git a/ut/CMakeLists.txt b/ut/CMakeLists.txt index 383605a..b638ecc 100644 --- a/ut/CMakeLists.txt +++ b/ut/CMakeLists.txt @@ -3,11 +3,13 @@ SET ( timeplus-cpp-ut-src block_ut.cpp client_ut.cpp + client_pool_ut.cpp columns_ut.cpp column_array_ut.cpp itemview_ut.cpp socket_ut.cpp stream_ut.cpp + timeplus_ut.cpp type_parser_ut.cpp types_ut.cpp utils_ut.cpp diff --git a/ut/client_pool_ut.cpp b/ut/client_pool_ut.cpp new file mode 100644 index 0000000..6823a6e --- /dev/null +++ b/ut/client_pool_ut.cpp @@ -0,0 +1,46 @@ +#include +#include + +#include + +using namespace timeplus; + +TEST(ClientPool, AcquireClient) { + ClientOptions client_options; + client_options.host = "localhost"; + ClientPool pool{client_options, /*pool_size=*/3}; + + auto client1 = pool.Acquire(1000); + ASSERT_NE(client1, nullptr); + + auto client2 = pool.Acquire(1000); + ASSERT_NE(client2, nullptr); + + auto client3 = pool.Acquire(1000); + ASSERT_NE(client3, nullptr); + + ASSERT_THROW(pool.Acquire(100), TimeoutError); + + pool.Release(std::move(client1), /*valid=*/true); + auto client5 = pool.Acquire(1000); + ASSERT_EQ(client1, nullptr); + ASSERT_NE(client5, nullptr); + + pool.Release(std::move(client2), /*valid=*/false); + auto client6 = pool.Acquire(1000); + ASSERT_EQ(client2, nullptr); + ASSERT_NE(client6, nullptr); + + pool.Release(std::move(client3), true); + pool.Release(std::move(client5), true); + pool.Release(std::move(client6), true); +} + +TEST(ClientPool, BadHost) { + ClientOptions client_options; + client_options.host = "badhost"; + client_options.retry_timeout = std::chrono::seconds(0); + ClientPool pool{client_options, /*pool_size=*/3}; + + ASSERT_THROW(pool.Acquire(1000), std::system_error); +} diff --git a/ut/roundtrip_tests.cpp b/ut/roundtrip_tests.cpp index 51758d0..e80558d 100644 --- a/ut/roundtrip_tests.cpp +++ b/ut/roundtrip_tests.cpp @@ -127,6 +127,7 @@ TEST_P(RoundtripCase, MapUUID_Tuple_String_Array_Uint64) { EXPECT_TRUE(CompareRecursive(*map, *result_typed)); } +/// Geometric tests are not supported in Proton #ifdef GEOMETRIC_TESTS_ENABLED TEST_P(RoundtripCase, Point) { diff --git a/ut/socket_ut.cpp b/ut/socket_ut.cpp index c41e19e..ff78b58 100644 --- a/ut/socket_ut.cpp +++ b/ut/socket_ut.cpp @@ -80,6 +80,10 @@ TEST(Socketcase, gaierror) { } TEST(Socketcase, connecttimeout) { +#if defined(__APPLE__) + GTEST_SKIP() << "IPv6 address 100::1 is connectable on macOS"; +#endif + using Clock = std::chrono::steady_clock; try { diff --git a/ut/timeplus_ut.cpp b/ut/timeplus_ut.cpp new file mode 100644 index 0000000..5869f60 --- /dev/null +++ b/ut/timeplus_ut.cpp @@ -0,0 +1,289 @@ +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +using namespace timeplus; + +TEST(TimeplusInsert, DISABLED_InsertSync) { + const auto* host = "localhost"; + const uint16_t port = 8463; + + ClientOptions client_options; + client_options.host = host; + client_options.port = port; + Client client{client_options}; + /// Create test table. + client.Execute("DROP STREAM IF EXISTS test_timeplus_insert_sync"); + client.Execute("CREATE STREAM test_timeplus_insert_sync(id uint64)"); + + TimeplusConfig config; + config.client_options.host = host; + config.client_options.port = port; + config.max_connections = 1; + Timeplus tp{std::move(config)}; + + auto b = std::make_shared(); + auto id = std::make_shared(); + id->Append(5); + id->Append(7); + id->Append(4); + id->Append(8); + b->AppendColumn("id", id); + + /// Insert for multiple times. + ASSERT_TRUE(tp.Insert("test_timeplus_insert_sync", b).ok()); + ASSERT_TRUE(tp.Insert("test_timeplus_insert_sync", b).ok()); + ASSERT_TRUE(tp.Insert("test_timeplus_insert_sync", b).ok()); + ASSERT_TRUE(tp.Insert("test_timeplus_insert_sync", b).ok()); + ASSERT_TRUE(tp.Insert("test_timeplus_insert_sync", b).ok()); + + const uint64_t VALUE[] = {5, 7, 4, 8}; + size_t row = 0; + while (row != 20U) { + client.Select("SELECT id FROM table(test_timeplus_insert_sync)", [VALUE, &row](const Block& block) { + if (block.GetRowCount() == 0) { + return; + } + EXPECT_EQ(block.GetColumnCount(), 1U); + for (size_t c = 0; c < block.GetRowCount(); ++c, ++row) { + auto col = block[0]->As(); + EXPECT_EQ(VALUE[row % 4], col->At(c)); + } + }); + } +} + +TEST(TimeplusInsert, DISABLED_InsertAsync) { + const auto* host = "localhost"; + const uint16_t port = 8463; + + ClientOptions client_options; + client_options.host = host; + client_options.port = port; + Client client{client_options}; + /// Create test table. + client.Execute("DROP STREAM IF EXISTS test_timeplus_insert_async"); + client.Execute("CREATE STREAM test_timeplus_insert_async(id uint64)"); + + TimeplusConfig config; + config.client_options.endpoints.push_back({host, port}); + config.max_connections = 5; + config.task_executors = 5; + Timeplus tp{std::move(config)}; + + auto b = std::make_shared(); + auto id = std::make_shared(); + id->Append(5); + id->Append(7); + id->Append(4); + id->Append(8); + b->AppendColumn("id", id); + + std::mutex insert_done_mutex; + std::condition_variable insert_done_cv; + uint64_t insert_done = 0; + bool insert_sucess = true; + auto callback = [&](const BaseResult& result) { + const auto& insert_result = dynamic_cast(result); + { + std::lock_guard lk{insert_done_mutex}; + ++insert_done; + if (!insert_result.ok()) { + insert_sucess = false; + } + } + insert_done_cv.notify_one(); + }; + + /// Insert for multiple times. + constexpr uint64_t num_inserts = 100; + for (uint64_t i = 0; i < num_inserts; ++i) { + tp.InsertAsync("test_timeplus_insert_async", b, callback); + } + + std::unique_lock lk{insert_done_mutex}; + insert_done_cv.wait(lk, [&insert_done] { return insert_done == num_inserts; }); + ASSERT_TRUE(insert_sucess); + + const uint64_t VALUE[] = {5, 7, 4, 8}; + size_t row = 0; + while (row != num_inserts * 4) { + client.Select("SELECT id FROM table(test_timeplus_insert_async)", [VALUE, &row](const Block& block) { + if (block.GetRowCount() == 0) { + return; + } + EXPECT_EQ(block.GetColumnCount(), 1U); + for (size_t c = 0; c < block.GetRowCount(); ++c, ++row) { + auto col = block[0]->As(); + EXPECT_EQ(VALUE[row % 4], col->At(c)); + } + }); + } +} + +TEST(TimeplusInsert, DISABLED_IdempotentInsert) { + const auto* host = "localhost"; + const uint16_t port = 8463; + + ClientOptions client_options; + client_options.host = host; + client_options.port = port; + Client client{client_options}; + /// Create test table. + client.Execute("DROP STREAM IF EXISTS test_timeplus_insert_idempotent"); + client.Execute("CREATE STREAM test_timeplus_insert_idempotent(id uint64)"); + + TimeplusConfig config; + config.client_options.host = host; + config.client_options.port = port; + config.max_connections = 10; + config.task_executors = 10; + Timeplus tp{std::move(config)}; + + auto b = std::make_shared(); + auto id = std::make_shared(); + id->Append(5); + id->Append(7); + id->Append(4); + id->Append(8); + b->AppendColumn("id", id); + + std::mutex insert_done_mutex; + std::condition_variable insert_done_cv; + uint64_t insert_done = 0; + bool insert_success = true; + auto callback = [&](const BaseResult& result) { + const auto& insert_result = dynamic_cast(result); + { + std::lock_guard lk{insert_done_mutex}; + ++insert_done; + if (!insert_result.ok()) { + insert_success = false; + } + } + insert_done_cv.notify_one(); + }; + + /// Insert for multiple times. + constexpr uint64_t num_inserts = 100; + constexpr uint64_t num_idempotent_id = 7; + for (uint64_t i = 0; i < num_inserts; ++i) { + tp.InsertAsync("test_timeplus_insert_idempotent", b, + /*idempotent_id=*/"idempotent-id-" + std::to_string(i % num_idempotent_id), callback); + } + + std::unique_lock lk{insert_done_mutex}; + insert_done_cv.wait(lk, [&insert_done] { return insert_done == num_inserts; }); + ASSERT_TRUE(insert_success); + + const uint64_t VALUE[] = {5, 7, 4, 8}; + size_t row = 0; + while (row < num_idempotent_id * 4) { + client.Select("SELECT id FROM table(test_timeplus_insert_idempotent)", [VALUE, &row](const Block& block) { + if (block.GetRowCount() == 0) { + return; + } + EXPECT_EQ(block.GetColumnCount(), 1U); + for (size_t c = 0; c < block.GetRowCount(); ++c, ++row) { + auto col = block[0]->As(); + EXPECT_EQ(VALUE[row % 4], col->At(c)); + } + }); + } + EXPECT_EQ(row, num_idempotent_id * 4); +} + +TEST(TimeplusInsert, InvalidColumnName) { + const auto* host = "localhost"; + const uint16_t port = 8463; + + ClientOptions client_options; + client_options.host = host; + client_options.port = port; + Client client{client_options}; + /// Create test table. + client.Execute("DROP STREAM IF EXISTS test_timeplus_insert_invalid_col_name"); + client.Execute("CREATE STREAM test_timeplus_insert_invalid_col_name(id uint64)"); + + TimeplusConfig config; + config.client_options.endpoints.push_back({host, port}); + config.max_connections = 1; + Timeplus tp{std::move(config)}; + + auto b = std::make_shared(); + auto id = std::make_shared(); + id->Append(5); + id->Append(7); + id->Append(4); + id->Append(8); + b->AppendColumn("id1", id); + + /// Synchronous insert. + auto result = tp.Insert("test_timeplus_insert_invalid_col_name", b); + ASSERT_EQ(result.err_code, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + + /// Asynchronous insert. + std::mutex insert_done_mutex; + std::condition_variable insert_done_cv; + bool insert_done = false; + tp.InsertAsync("test_timeplus_insert_invalid_col_name", b, [&](const auto& result) { + EXPECT_EQ(result.err_code, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + { + std::lock_guard lk{insert_done_mutex}; + insert_done = true; + } + insert_done_cv.notify_one(); + }); + std::unique_lock lk{insert_done_mutex}; + insert_done_cv.wait(lk, [&] { return insert_done; }); +} + +TEST(TimeplusInsert, InvalidHostPort) { + const auto* host = "100::1"; + const uint16_t port = 5748; + + TimeplusConfig config; + config.client_options.host = host; + config.client_options.port = port; + config.client_options.connection_recv_timeout = std::chrono::milliseconds(500); + config.client_options.connection_send_timeout = std::chrono::milliseconds(500); + config.max_connections = 3; + config.client_acquire_timeout_ms = 1000; + config.wait_time_before_retry_ms = 0; + Timeplus tp{std::move(config)}; + + auto b = std::make_shared(); + auto id = std::make_shared(); + id->Append(5); + id->Append(7); + id->Append(4); + id->Append(8); + b->AppendColumn("id", id); + + /// Synchronous insert. + auto insert_result = tp.Insert("test_timeplus_insert_invalid_host_port", b); + ASSERT_EQ(insert_result.err_code, ErrorCodes::NETWORK_ERROR); + + /// Asynchronous insert. + std::mutex insert_done_mutex; + std::condition_variable insert_done_cv; + bool insert_done = false; + tp.InsertAsync("test_timeplus_insert_invalid_host_port", b, [&](const auto& result) { + EXPECT_EQ(result.err_code, ErrorCodes::NETWORK_ERROR); + { + std::lock_guard lk{insert_done_mutex}; + insert_done = true; + } + insert_done_cv.notify_one(); + }); + std::unique_lock lk{insert_done_mutex}; + insert_done_cv.wait(lk, [&] { return insert_done; }); +}