From ed18626ddd6040d6f713a1d4cea9b14c6ee4efe2 Mon Sep 17 00:00:00 2001 From: aadesh Date: Thu, 31 Aug 2023 16:14:50 +0000 Subject: [PATCH] add new kafka loader actor --- src/cast_core/CMakeLists.txt | 3 + .../include/cast_core/actors/KafkaLoader.hpp | 83 +++++++++++++ src/cast_core/src/KafkaLoader.cpp | 112 ++++++++++++++++++ src/cast_core/test/KafkaLoader_test.cpp | 67 +++++++++++ .../include/testlib/KafkaTestFixture.hpp | 35 ++++++ src/testlib/src/KafkaTestFixture.cpp | 39 ++++++ src/workloads/docs/KafkaLoader.yml | 17 +++ 7 files changed, 356 insertions(+) create mode 100644 src/cast_core/include/cast_core/actors/KafkaLoader.hpp create mode 100644 src/cast_core/src/KafkaLoader.cpp create mode 100644 src/cast_core/test/KafkaLoader_test.cpp create mode 100644 src/testlib/include/testlib/KafkaTestFixture.hpp create mode 100644 src/testlib/src/KafkaTestFixture.cpp create mode 100644 src/workloads/docs/KafkaLoader.yml diff --git a/src/cast_core/CMakeLists.txt b/src/cast_core/CMakeLists.txt index b31ba3534b..9c16b0ebf7 100644 --- a/src/cast_core/CMakeLists.txt +++ b/src/cast_core/CMakeLists.txt @@ -15,6 +15,7 @@ project(cast_core VERSION 0.0.1 LANGUAGES CXX) find_package(cppcodec) +find_package(RdKafka) CreateGennyTargets( NAME cast_core @@ -23,5 +24,7 @@ CreateGennyTargets( gennylib cppcodec::cppcodec simple-beast-client + RdKafka::rdkafka + RdKafka::rdkafka++ TEST_DEPENDS testlib ) diff --git a/src/cast_core/include/cast_core/actors/KafkaLoader.hpp b/src/cast_core/include/cast_core/actors/KafkaLoader.hpp new file mode 100644 index 0000000000..79c0524708 --- /dev/null +++ b/src/cast_core/include/cast_core/actors/KafkaLoader.hpp @@ -0,0 +1,83 @@ +// Copyright 2023-present MongoDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED +#define HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED + +#include +#include + +#include + +#include +#include +#include + +#include + +#include +#include + +namespace genny::actor { + +/** + * Generates documents and publishes them to the specified kafka cluster and topic. + * + * ```yaml + * SchemaVersion: 2018-07-01 + * Actors: + * - Name: KafkaLoader + * Type: KafkaLoader + * BootstrapServers: localhost:9092 + * Topic: example-topic + * Phases: + * - Repeat: 1000 + * Document: foo + * ``` + * + * Owner: "@10gen/atlas-streams" + */ +class KafkaLoader : public Actor { +public: + explicit KafkaLoader(ActorContext& context); + ~KafkaLoader() = default; + + void run() override; + + static std::string_view defaultName() { + return "KafkaLoader"; + } + +private: + RdKafka::Conf* makeKafkaConfig() const; + + // Kafka bootstrap servers. + std::string _bootstrapServers; + + // Kafka topic to publish documents to. + std::string _topic; + + // Total number of documents inserted into the kafka topic. + genny::metrics::Operation _inserts; + + /** @private */ + struct PhaseConfig; + PhaseLoop _loop; + std::unique_ptr _producer; + std::string _err; +}; + +} // namespace genny::actor + +#endif // HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED diff --git a/src/cast_core/src/KafkaLoader.cpp b/src/cast_core/src/KafkaLoader.cpp new file mode 100644 index 0000000000..ab8700138c --- /dev/null +++ b/src/cast_core/src/KafkaLoader.cpp @@ -0,0 +1,112 @@ +// Copyright 2023-present MongoDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include + +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include + +namespace genny::actor { + +namespace { + +static constexpr int kKafkaFlushTimeoutMs = 10'000; + +}; + +struct KafkaLoader::PhaseConfig { + DocumentGenerator documentExpr; + + PhaseConfig(PhaseContext& phaseContext, ActorId id) : + documentExpr{phaseContext["Document"].to(phaseContext, id)} {} +}; + +void KafkaLoader::run() { + for (auto&& config : _loop) { + for (const auto&& _ : config) { + auto document = config->documentExpr(); + std::string json = bsoncxx::to_json(document.view()); + + auto inserts = _totalInserts.start(); + BOOST_LOG_TRIVIAL(debug) << " KafkaLoader Inserting " << json; + + RdKafka::ErrorCode err = _producer->produce( + /* topic */ _topic, + /* partition */ RdKafka::Topic::PARTITION_UA, + /* flags */ RdKafka::Producer::RK_MSG_BLOCK | RdKafka::Producer::RK_MSG_COPY, + /* payload */ const_cast(json.c_str()), + /* len */ json.size(), + /* key */ nullptr, + /* key_len */ 0, + /* timestamp */ 0, + /* msg_opaque */ nullptr + ); + + if (err != RdKafka::ERR_NO_ERROR) { + inserts.failure(); + BOOST_THROW_EXCEPTION(MongoException(std::to_string(err))); + continue; + } + + inserts.addDocuments(1); + inserts.addBytes(document.length()); + inserts.success(); + } + + _producer->flush(kKafkaFlushTimeoutMs); + } +} + +KafkaLoader::KafkaLoader(genny::ActorContext& context) + : Actor{context}, + _totalInserts{context.operation("Insert", KafkaLoader::id())}, + _bootstrapServers{context["BootstrapServers"].to()}, + _topic{context["Topic"].to()}, + _producer{RdKafka::Producer::create(makeKafkaConfig(), _err)}, + _loop{context, KafkaLoader::id()} { + if (!_producer) { + BOOST_THROW_EXCEPTION(MongoException(_err)); + } +} + +RdKafka::Conf* KafkaLoader::makeKafkaConfig() const { + RdKafka::Conf* config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + std::string err; + + config->set("bootstrap.servers", _bootstrapServers, err); + config->set("queue.buffering.max.ms", "1000", err); + + return config; +} + +namespace { +auto registerKafkaLoader = Cast::registerDefault(); +} // namespace +} // namespace genny::actor diff --git a/src/cast_core/test/KafkaLoader_test.cpp b/src/cast_core/test/KafkaLoader_test.cpp new file mode 100644 index 0000000000..8e5995849c --- /dev/null +++ b/src/cast_core/test/KafkaLoader_test.cpp @@ -0,0 +1,67 @@ +// Copyright 2023-present MongoDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include + +#include +#include +#include + +#include + +namespace genny { +namespace { +using namespace genny::testing; +namespace bson_stream = bsoncxx::builder::stream; + +// +// ⚠️ There is a "known" failure that you should find and fix as a bit of +// an exercise in reading and testing your Actor. ⚠️ +// + +TEST_CASE_METHOD(KafkaTestFixture, "KafkaLoader successfully connects to a Kafka broker.", "[standalone][KafkaLoader]") { + NodeSource nodes = NodeSource(R"( + SchemaVersion: 2018-07-01 + Clients: + Default: + URI: )" + KafkaTestFixture::connectionUri() + R"( + Actors: + - Name: KafkaLoader + Type: KafkaLoader + BootstrapServers: localhost:9092 + Topic: topic-in + Phases: + - Repeat: 1 + Document: {foo: {^RandomInt: {min: 0, max: 100}}} + )", __FILE__); + + + SECTION("Inserts documents into the kafka broker.") { + try { + genny::ActorHelper ah(nodes.root(), 1); + ah.run([](const genny::WorkloadContext& wc) { wc.actors()[0]->run(); }); + } catch (const std::exception& e) { + auto diagInfo = boost::diagnostic_information(e); + INFO("CAUGHT " << diagInfo); + FAIL(diagInfo); + } + } +} +} // namespace +} // namespace genny diff --git a/src/testlib/include/testlib/KafkaTestFixture.hpp b/src/testlib/include/testlib/KafkaTestFixture.hpp new file mode 100644 index 0000000000..9d0f500adf --- /dev/null +++ b/src/testlib/include/testlib/KafkaTestFixture.hpp @@ -0,0 +1,35 @@ +// Copyright 2023-present MongoDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef HEADER_KAFKATESTFIXTURE_INCLUDED +#define HEADER_KAFKATESTFIXTURE_INCLUDED + +#include +#include +#include + +namespace genny::testing { + +class KafkaTestFixture { +public: + static const std::string kDefaultBootstrapServers; + + KafkaTestFixture () {} + + static std::string connectionUri(); +}; // class KafkaTestFixture + +} // namespace genny::testing + +#endif // HEADER_KAFKATESTFIXTURE_INCLUDED diff --git a/src/testlib/src/KafkaTestFixture.cpp b/src/testlib/src/KafkaTestFixture.cpp new file mode 100644 index 0000000000..5c96cd4dcd --- /dev/null +++ b/src/testlib/src/KafkaTestFixture.cpp @@ -0,0 +1,39 @@ +// Copyright 2023-present MongoDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include + +#include + +namespace genny::testing { + +const std::string KafkaTestFixture::kDefaultBootstrapServers = "localhost:9092"; + +std::string KafkaTestFixture::connectionUri() { + const char* bootstrapServers = getenv("KAFKA_BOOTSTRAP_SERVERS"); + if (bootstrapServers != nullptr) { + return std::string(bootstrapServers); + } + + std::string defaultBootstrapServers = kDefaultBootstrapServers; + BOOST_LOG_TRIVIAL(info) << "KAFKA_BOOTSTRAP_SERVERS not set, using default value: " << defaultBootstrapServers; + return defaultBootstrapServers; +} + +} // namespace genny::testing diff --git a/src/workloads/docs/KafkaLoader.yml b/src/workloads/docs/KafkaLoader.yml new file mode 100644 index 0000000000..ea1495b1a9 --- /dev/null +++ b/src/workloads/docs/KafkaLoader.yml @@ -0,0 +1,17 @@ +SchemaVersion: 2018-07-01 +Owner: "@10gen/atlas-streams" +Description: | + KafkaLoader generates random JSON document based on the document generator + provided and publishes them to the kafka server and topic specificed at the + actor level. + +Actors: +- Name: KafkaLoader + Type: KafkaLoader + Threads: 100 + BootstrapServers: localhost:9092 + Topic: example-topic + Phases: + - Phase: 0 + Repeat: 1e3 + Document: {foo: {^RandomInt: {min: 0, max: 100}}}