Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SERVER-81719: add new kafka loader actor #1029

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/cast_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
project(cast_core VERSION 0.0.1 LANGUAGES CXX)

find_package(cppcodec)
find_package(RdKafka)

CreateGennyTargets(
NAME cast_core
Expand All @@ -23,5 +24,7 @@ CreateGennyTargets(
gennylib
cppcodec::cppcodec
simple-beast-client
RdKafka::rdkafka
RdKafka::rdkafka++
TEST_DEPENDS testlib
)
83 changes: 83 additions & 0 deletions src/cast_core/include/cast_core/actors/KafkaLoader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED
#define HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED

#include <memory>
#include <string_view>

#include <mongocxx/pool.hpp>

#include <gennylib/Actor.hpp>
#include <gennylib/PhaseLoop.hpp>
#include <gennylib/context.hpp>

#include <metrics/metrics.hpp>

#include <librdkafka/rdkafka.h>
#include <librdkafka/rdkafkacpp.h>

namespace genny::actor {

/**
* Generates documents and publishes them to the specified kafka cluster and topic.
*
* ```yaml
* SchemaVersion: 2018-07-01
* Actors:
* - Name: KafkaLoader
* Type: KafkaLoader
* BootstrapServers: localhost:9092
* Topic: example-topic
* Phases:
* - Repeat: 1000
* Document: foo
* ```
*
* Owner: "@10gen/atlas-streams"
*/
class KafkaLoader : public Actor {
public:
explicit KafkaLoader(ActorContext& context);
~KafkaLoader() = default;

void run() override;

static std::string_view defaultName() {
return "KafkaLoader";
}

private:
RdKafka::Conf* makeKafkaConfig() const;

// Kafka bootstrap servers.
std::string _bootstrapServers;

// Kafka topic to publish documents to.
std::string _topic;

// Total number of documents inserted into the kafka topic.
genny::metrics::Operation _inserts;

/** @private */
struct PhaseConfig;
PhaseLoop<PhaseConfig> _loop;
std::unique_ptr<RdKafka::Producer> _producer;
std::string _err;
};

} // namespace genny::actor

#endif // HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED
113 changes: 113 additions & 0 deletions src/cast_core/src/KafkaLoader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cast_core/actors/KafkaLoader.hpp>

#include <memory>

#include <yaml-cpp/yaml.h>

#include <bsoncxx/json.hpp>

#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
#include <mongocxx/database.hpp>

#include <boost/log/trivial.hpp>
#include <boost/throw_exception.hpp>

#include <gennylib/Cast.hpp>
#include <gennylib/MongoException.hpp>
#include <gennylib/context.hpp>

#include <value_generators/DocumentGenerator.hpp>

namespace genny::actor {

namespace {

// Kafka producer flush timeout. Flush is called after each phase.
static constexpr int kKafkaFlushTimeoutMs = 10'000;

};

struct KafkaLoader::PhaseConfig {
DocumentGenerator documentExpr;

PhaseConfig(PhaseContext& phaseContext, ActorId id) :
documentExpr{phaseContext["Document"].to<DocumentGenerator>(phaseContext, id)} {}
};

void KafkaLoader::run() {
for (auto&& config : _loop) {
for (const auto&& _ : config) {
auto document = config->documentExpr();
std::string json = bsoncxx::to_json(document.view());

auto inserts = _totalInserts.start();
BOOST_LOG_TRIVIAL(debug) << " KafkaLoader Inserting " << json;
Comment on lines +58 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging might introduce unpredictable performance variance, so I think we should do any logging outside of the code path being measured as much as possible:

Suggested change
auto inserts = _totalInserts.start();
BOOST_LOG_TRIVIAL(debug) << " KafkaLoader Inserting " << json;
BOOST_LOG_TRIVIAL(debug) << " KafkaLoader Inserting " << json;
auto inserts = _totalInserts.start();


RdKafka::ErrorCode err = _producer->produce(
/* topic */ _topic,
/* partition */ RdKafka::Topic::PARTITION_UA,
/* flags */ RdKafka::Producer::RK_MSG_BLOCK | RdKafka::Producer::RK_MSG_COPY,
/* payload */ const_cast<char*>(json.c_str()),
/* len */ json.size(),
/* key */ nullptr,
/* key_len */ 0,
/* timestamp */ 0,
/* msg_opaque */ nullptr
);

if (err != RdKafka::ERR_NO_ERROR) {
inserts.failure();
BOOST_THROW_EXCEPTION(std::runtime_error(std::to_string(err)));
continue;
}

inserts.addDocuments(1);
inserts.addBytes(document.length());
inserts.success();
}

_producer->flush(kKafkaFlushTimeoutMs);
}
}

KafkaLoader::KafkaLoader(genny::ActorContext& context)
: Actor{context},
_totalInserts{context.operation("Insert", KafkaLoader::id())},
_bootstrapServers{context["BootstrapServers"].to<std::string>()},
_topic{context["Topic"].to<std::string>()},
_producer{RdKafka::Producer::create(makeKafkaConfig(), _err)},
_loop{context, KafkaLoader::id()} {
if (!_producer) {
BOOST_THROW_EXCEPTION(MongoException(_err));
}
}

RdKafka::Conf* KafkaLoader::makeKafkaConfig() const {
RdKafka::Conf* config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string err;

config->set("bootstrap.servers", _bootstrapServers, err);
config->set("queue.buffering.max.ms", "1000", err);

return config;
}

namespace {
auto registerKafkaLoader = Cast::registerDefault<KafkaLoader>();
} // namespace
} // namespace genny::actor
62 changes: 62 additions & 0 deletions src/cast_core/test/KafkaLoader_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <bsoncxx/json.hpp>
#include <bsoncxx/builder/stream/document.hpp>

#include <boost/exception/diagnostic_information.hpp>

#include <yaml-cpp/yaml.h>

#include <testlib/KafkaTestFixture.hpp>
#include <testlib/ActorHelper.hpp>
#include <testlib/helpers.hpp>

#include <gennylib/context.hpp>

namespace genny {
namespace {
using namespace genny::testing;
namespace bson_stream = bsoncxx::builder::stream;

TEST_CASE_METHOD(KafkaTestFixture, "KafkaLoader successfully connects to a Kafka broker.", "[streams][KafkaLoader]") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Actor test case would only be run if one of the square-bracketed tags match one of the tags defined in a YAML file in this directory. The existing defined tags are [sharded], [single_node_replset], and [three_node_replset] ([SelfTestActor] is to test Genny code, not Actor code, so it should not be used). The square-bracketed tags are not arbitrary. So we have 2 options here:

  1. Add one of the three existing defined tags above.
  2. Add a new YAML file under the above directory to define either [streams] or [KafkaLoader].

On a related note, I found that this actor test case is also not being run because neither [streams] nor [StreamStatsReporter] are defined tags. But I think this should be fixed in another PR instead of in this PR.

NodeSource nodes = NodeSource(R"(
SchemaVersion: 2018-07-01
Clients:
Default:
URI: )" + KafkaTestFixture::connectionUri() + R"(
Actors:
- Name: KafkaLoader
Type: KafkaLoader
BootstrapServers: localhost:9092
Topic: topic-in
Phases:
- Repeat: 1
Document: {foo: {^RandomInt: {min: 0, max: 100}}}
)", __FILE__);


SECTION("Inserts documents into the kafka broker.") {
try {
genny::ActorHelper ah(nodes.root(), 1);
ah.run([](const genny::WorkloadContext& wc) { wc.actors()[0]->run(); });
} catch (const std::exception& e) {
auto diagInfo = boost::diagnostic_information(e);
INFO("CAUGHT " << diagInfo);
FAIL(diagInfo);
}
}
}
} // namespace
} // namespace genny
35 changes: 35 additions & 0 deletions src/testlib/include/testlib/KafkaTestFixture.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef HEADER_KAFKATESTFIXTURE_INCLUDED
#define HEADER_KAFKATESTFIXTURE_INCLUDED

#include <string>
#include <mongocxx/client.hpp>
#include <mongocxx/instance.hpp>

namespace genny::testing {

class KafkaTestFixture {
public:
static const std::string kDefaultBootstrapServers;

KafkaTestFixture () {}

static std::string connectionUri();
}; // class KafkaTestFixture

} // namespace genny::testing

#endif // HEADER_KAFKATESTFIXTURE_INCLUDED
39 changes: 39 additions & 0 deletions src/testlib/src/KafkaTestFixture.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <testlib/KafkaTestFixture.hpp>

#include <cstdint>
#include <string_view>

#include <boost/log/trivial.hpp>

#include <testlib/helpers.hpp>

namespace genny::testing {

const std::string KafkaTestFixture::kDefaultBootstrapServers = "localhost:9092";

std::string KafkaTestFixture::connectionUri() {
const char* bootstrapServers = getenv("KAFKA_BOOTSTRAP_SERVERS");
if (bootstrapServers != nullptr) {
return std::string(bootstrapServers);
}

std::string defaultBootstrapServers = kDefaultBootstrapServers;
BOOST_LOG_TRIVIAL(info) << "KAFKA_BOOTSTRAP_SERVERS not set, using default value: " << defaultBootstrapServers;
return defaultBootstrapServers;
}

} // namespace genny::testing
17 changes: 17 additions & 0 deletions src/workloads/docs/KafkaLoader.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
SchemaVersion: 2018-07-01
Owner: "@10gen/atlas-streams"
Description: |
KafkaLoader generates random JSON document based on the document generator
provided and publishes them to the kafka server and topic specificed at the
actor level.
Actors:
- Name: KafkaLoader
Type: KafkaLoader
Threads: 100
BootstrapServers: localhost:9092
Topic: example-topic
Phases:
- Phase: 0
Repeat: 1e3
Document: {foo: {^RandomInt: {min: 0, max: 100}}}