Skip to content

Commit

Permalink
SERVER-81719: add new kafka loader actor
Browse files Browse the repository at this point in the history
  • Loading branch information
Aadeshp committed Oct 3, 2023
1 parent 6b98aa8 commit d96ffd0
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/cast_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
project(cast_core VERSION 0.0.1 LANGUAGES CXX)

find_package(cppcodec)
find_package(RdKafka)

CreateGennyTargets(
NAME cast_core
Expand All @@ -23,5 +24,7 @@ CreateGennyTargets(
gennylib
cppcodec::cppcodec
simple-beast-client
RdKafka::rdkafka
RdKafka::rdkafka++
TEST_DEPENDS testlib
)
83 changes: 83 additions & 0 deletions src/cast_core/include/cast_core/actors/KafkaLoader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED
#define HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED

#include <memory>
#include <string_view>

#include <mongocxx/pool.hpp>

#include <gennylib/Actor.hpp>
#include <gennylib/PhaseLoop.hpp>
#include <gennylib/context.hpp>

#include <metrics/metrics.hpp>

#include <librdkafka/rdkafka.h>
#include <librdkafka/rdkafkacpp.h>

namespace genny::actor {

/**
* Generates documents and publishes them to the specified kafka cluster and topic.
*
* ```yaml
* SchemaVersion: 2018-07-01
* Actors:
* - Name: KafkaLoader
* Type: KafkaLoader
* BootstrapServers: localhost:9092
* Topic: example-topic
* Phases:
* - Repeat: 1000
* Document: foo
* ```
*
* Owner: "@10gen/atlas-streams"
*/
class KafkaLoader : public Actor {
public:
explicit KafkaLoader(ActorContext& context);
~KafkaLoader() = default;

void run() override;

static std::string_view defaultName() {
return "KafkaLoader";
}

private:
RdKafka::Conf* makeKafkaConfig() const;

// Kafka bootstrap servers.
std::string _bootstrapServers;

// Kafka topic to publish documents to.
std::string _topic;

// Total number of documents inserted into the kafka topic.
genny::metrics::Operation _inserts;

/** @private */
struct PhaseConfig;
PhaseLoop<PhaseConfig> _loop;
std::unique_ptr<RdKafka::Producer> _producer;
std::string _err;
};

} // namespace genny::actor

#endif // HEADER_0D8E1871_D41D_4AAA_83DC_1F73C9B6D110_INCLUDED
112 changes: 112 additions & 0 deletions src/cast_core/src/KafkaLoader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cast_core/actors/KafkaLoader.hpp>

#include <memory>

#include <yaml-cpp/yaml.h>

#include <bsoncxx/json.hpp>

#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
#include <mongocxx/database.hpp>

#include <boost/log/trivial.hpp>
#include <boost/throw_exception.hpp>

#include <gennylib/Cast.hpp>
#include <gennylib/MongoException.hpp>
#include <gennylib/context.hpp>

#include <value_generators/DocumentGenerator.hpp>

namespace genny::actor {

namespace {

static constexpr int kKafkaFlushTimeoutMs = 10'000;

};

struct KafkaLoader::PhaseConfig {
DocumentGenerator documentExpr;

PhaseConfig(PhaseContext& phaseContext, ActorId id) :
documentExpr{phaseContext["Document"].to<DocumentGenerator>(phaseContext, id)} {}
};

void KafkaLoader::run() {
for (auto&& config : _loop) {
for (const auto&& _ : config) {
auto document = config->documentExpr();
std::string json = bsoncxx::to_json(document.view());

auto inserts = _totalInserts.start();
BOOST_LOG_TRIVIAL(debug) << " KafkaLoader Inserting " << json;

RdKafka::ErrorCode err = _producer->produce(
/* topic */ _topic,
/* partition */ RdKafka::Topic::PARTITION_UA,
/* flags */ RdKafka::Producer::RK_MSG_BLOCK | RdKafka::Producer::RK_MSG_COPY,
/* payload */ const_cast<char*>(json.c_str()),
/* len */ json.size(),
/* key */ nullptr,
/* key_len */ 0,
/* timestamp */ 0,
/* msg_opaque */ nullptr
);

if (err != RdKafka::ERR_NO_ERROR) {
inserts.failure();
BOOST_THROW_EXCEPTION(MongoException(std::to_string(err)));
continue;
}

inserts.addDocuments(1);
inserts.addBytes(document.length());
inserts.success();
}

_producer->flush(kKafkaFlushTimeoutMs);
}
}

KafkaLoader::KafkaLoader(genny::ActorContext& context)
: Actor{context},
_totalInserts{context.operation("Insert", KafkaLoader::id())},
_bootstrapServers{context["BootstrapServers"].to<std::string>()},
_topic{context["Topic"].to<std::string>()},
_producer{RdKafka::Producer::create(makeKafkaConfig(), _err)},
_loop{context, KafkaLoader::id()} {
if (!_producer) {
BOOST_THROW_EXCEPTION(MongoException(_err));
}
}

RdKafka::Conf* KafkaLoader::makeKafkaConfig() const {
RdKafka::Conf* config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string err;

config->set("bootstrap.servers", _bootstrapServers, err);
config->set("queue.buffering.max.ms", "1000", err);

return config;
}

namespace {
auto registerKafkaLoader = Cast::registerDefault<KafkaLoader>();
} // namespace
} // namespace genny::actor
67 changes: 67 additions & 0 deletions src/cast_core/test/KafkaLoader_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <bsoncxx/json.hpp>
#include <bsoncxx/builder/stream/document.hpp>

#include <boost/exception/diagnostic_information.hpp>

#include <yaml-cpp/yaml.h>

#include <testlib/KafkaTestFixture.hpp>
#include <testlib/ActorHelper.hpp>
#include <testlib/helpers.hpp>

#include <gennylib/context.hpp>

namespace genny {
namespace {
using namespace genny::testing;
namespace bson_stream = bsoncxx::builder::stream;

//
// ⚠️ There is a "known" failure that you should find and fix as a bit of
// an exercise in reading and testing your Actor. ⚠️
//

TEST_CASE_METHOD(KafkaTestFixture, "KafkaLoader successfully connects to a Kafka broker.", "[standalone][KafkaLoader]") {
NodeSource nodes = NodeSource(R"(
SchemaVersion: 2018-07-01
Clients:
Default:
URI: )" + KafkaTestFixture::connectionUri() + R"(
Actors:
- Name: KafkaLoader
Type: KafkaLoader
BootstrapServers: localhost:9092
Topic: topic-in
Phases:
- Repeat: 1
Document: {foo: {^RandomInt: {min: 0, max: 100}}}
)", __FILE__);


SECTION("Inserts documents into the kafka broker.") {
try {
genny::ActorHelper ah(nodes.root(), 1);
ah.run([](const genny::WorkloadContext& wc) { wc.actors()[0]->run(); });
} catch (const std::exception& e) {
auto diagInfo = boost::diagnostic_information(e);
INFO("CAUGHT " << diagInfo);
FAIL(diagInfo);
}
}
}
} // namespace
} // namespace genny
35 changes: 35 additions & 0 deletions src/testlib/include/testlib/KafkaTestFixture.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef HEADER_KAFKATESTFIXTURE_INCLUDED
#define HEADER_KAFKATESTFIXTURE_INCLUDED

#include <string>
#include <mongocxx/client.hpp>
#include <mongocxx/instance.hpp>

namespace genny::testing {

class KafkaTestFixture {
public:
static const std::string kDefaultBootstrapServers;

KafkaTestFixture () {}

static std::string connectionUri();
}; // class KafkaTestFixture

} // namespace genny::testing

#endif // HEADER_KAFKATESTFIXTURE_INCLUDED
39 changes: 39 additions & 0 deletions src/testlib/src/KafkaTestFixture.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023-present MongoDB Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <testlib/KafkaTestFixture.hpp>

#include <cstdint>
#include <string_view>

#include <boost/log/trivial.hpp>

#include <testlib/helpers.hpp>

namespace genny::testing {

const std::string KafkaTestFixture::kDefaultBootstrapServers = "localhost:9092";

std::string KafkaTestFixture::connectionUri() {
const char* bootstrapServers = getenv("KAFKA_BOOTSTRAP_SERVERS");
if (bootstrapServers != nullptr) {
return std::string(bootstrapServers);
}

std::string defaultBootstrapServers = kDefaultBootstrapServers;
BOOST_LOG_TRIVIAL(info) << "KAFKA_BOOTSTRAP_SERVERS not set, using default value: " << defaultBootstrapServers;
return defaultBootstrapServers;
}

} // namespace genny::testing
17 changes: 17 additions & 0 deletions src/workloads/docs/KafkaLoader.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
SchemaVersion: 2018-07-01
Owner: "@10gen/atlas-streams"
Description: |
KafkaLoader generates random JSON document based on the document generator
provided and publishes them to the kafka server and topic specificed at the
actor level.
Actors:
- Name: KafkaLoader
Type: KafkaLoader
Threads: 100
BootstrapServers: localhost:9092
Topic: example-topic
Phases:
- Phase: 0
Repeat: 1e3
Document: {foo: {^RandomInt: {min: 0, max: 100}}}

0 comments on commit d96ffd0

Please sign in to comment.