Skip to content

Commit

Permalink
use only one rocksdb instance, fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniofilipovic committed Jun 5, 2024
1 parent fefd4e6 commit 0fb9c04
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 94 deletions.
18 changes: 8 additions & 10 deletions src/coordination/coordinator_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,18 @@ CoordinatorInstance::CoordinatorInstance(CoordinatorInstanceInitConfig const &co
repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
};
auto const coordinator_state_manager_durability_dir = config.durability_dir / "coordinator" / "state_manager";
auto const coordinator_state_manager_durability_dir = config.durability_dir / "state_manager";
memgraph::utils::EnsureDirOrDie(coordinator_state_manager_durability_dir);
CoordinatorStateMachineConfig state_machine_config{config.coordinator_id};
CoordinatorStateMachineConfig state_machine_config{config.coordinator_id, nullptr};
CoordinatorStateManagerConfig state_manager_config{config.coordinator_id, config.coordinator_port, config.bolt_port,
coordinator_state_manager_durability_dir};
coordinator_state_manager_durability_dir, nullptr};

if (FLAGS_coordinator_use_durability) {
auto const log_store_durability_dir = config.durability_dir / "coordinator" / "log_store";
auto const state_machine_durability_dir = config.durability_dir / "coordinator" / "state_machine";
memgraph::utils::EnsureDirOrDie(log_store_durability_dir);
memgraph::utils::EnsureDirOrDie(state_machine_durability_dir);

state_manager_config.log_store_durability_dir_ = log_store_durability_dir;
state_machine_config.state_machine_durability_dir_ = state_machine_durability_dir;
memgraph::utils::EnsureDirOrDie(config.durability_dir / "durability");
std::shared_ptr<kvstore::KVStore> durability_store =
std::make_shared<kvstore::KVStore>(config.durability_dir / "durability");
state_machine_config.durability_store_ = durability_store;
state_manager_config.durability_store_ = durability_store;
}

// Delay constructing of Raft state until everything is constructed in coordinator instance
Expand Down
59 changes: 29 additions & 30 deletions src/coordination/coordinator_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,40 +104,38 @@ bool StoreToDisk(const ptr<log_entry> &clone, const uint64_t slot, bool is_new_l

} // namespace

CoordinatorLogStore::CoordinatorLogStore(std::optional<std::filesystem::path> durability_dir) {
CoordinatorLogStore::CoordinatorLogStore(std::shared_ptr<kvstore::KVStore> durability_store)
: durability_store_(std::move(durability_store)) {
ptr<buffer> buf = buffer::alloc(sizeof(uint64_t));
logs_[0] = cs_new<log_entry>(0, buf);

if (durability_dir) {
kv_store_ = std::make_unique<kvstore::KVStore>(durability_dir.value());
}

if (!kv_store_) {
if (!durability_store_) {
spdlog::warn("No durability directory provided, logs will not be persisted to disk");
start_idx_ = 1;
return;
}

int version{0};
auto maybe_version = kv_store_->Get(kLogStoreDurabilityVersion);
auto maybe_version = durability_store_->Get(kLogStoreDurabilityVersion);
if (maybe_version.has_value()) {
version = std::stoi(maybe_version.value());
} else {
spdlog::trace("Assuming first start of log store with durability as version is missing, storing version 1.");
MG_ASSERT(kv_store_->Put(kLogStoreDurabilityVersion, std::to_string(kActiveVersion)),
MG_ASSERT(durability_store_->Put(kLogStoreDurabilityVersion, std::to_string(kActiveVersion)),
"Failed to store version to disk");
version = 1;
}

MG_ASSERT(version <= kActiveVersion && version > 0, "Unsupported version of log store with durability");

auto const maybe_last_log_entry = kv_store_->Get(kLastLogEntry);
auto const maybe_start_idx = kv_store_->Get(kStartIdx);
auto const maybe_last_log_entry = durability_store_->Get(kLastLogEntry);
auto const maybe_start_idx = durability_store_->Get(kStartIdx);
if (!maybe_last_log_entry.has_value() || !maybe_start_idx.has_value()) {
spdlog::trace("No last log entry or start index found on disk, assuming first start of log store with durability");
start_idx_ = 1;
MG_ASSERT(kv_store_->Put(kStartIdx, std::to_string(start_idx_.load())), "Failed to store start index to disk");
MG_ASSERT(kv_store_->Put(kLastLogEntry, std::to_string(start_idx_.load() - 1)),
MG_ASSERT(durability_store_->Put(kStartIdx, std::to_string(start_idx_.load())),
"Failed to store start index to disk");
MG_ASSERT(durability_store_->Put(kLastLogEntry, std::to_string(start_idx_.load() - 1)),
"Failed to store last log entry to disk");
return;
}
Expand All @@ -147,7 +145,7 @@ CoordinatorLogStore::CoordinatorLogStore(std::optional<std::filesystem::path> du

// Compaction might have happened so we might be missing some logs.
for (auto const id : std::ranges::iota_view{start_idx_.load(), last_log_entry + 1}) {
auto const entry = kv_store_->Get(std::string{kLogEntryPrefix} + std::to_string(id));
auto const entry = durability_store_->Get(std::string{kLogEntryPrefix} + std::to_string(id));

MG_ASSERT(entry.has_value(), "Missing entry with id {} in range [{}:{}>", id, start_idx_.load(),
last_log_entry + 1);
Expand Down Expand Up @@ -182,12 +180,11 @@ void CoordinatorLogStore::DeleteLogs(uint64_t start, uint64_t end) {
continue;
}
logs_.erase(entry);
if (kv_store_) {
MG_ASSERT(kv_store_->Delete(std::string{kLogEntryPrefix} + std::to_string(i)),
if (durability_store_) {
MG_ASSERT(durability_store_->Delete(std::string{kLogEntryPrefix} + std::to_string(i)),
"Failed to delete log entry from disk");
}
}
return;
}

uint64_t CoordinatorLogStore::next_slot() const {
Expand Down Expand Up @@ -217,8 +214,8 @@ uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) {
auto lock = std::lock_guard{logs_lock_};
uint64_t next_slot = start_idx_ + logs_.size() - 1;

if (kv_store_) {
StoreToDisk(clone, next_slot, next_slot == start_idx_ + logs_.size() - 1, *kv_store_);
if (durability_store_) {
StoreToDisk(clone, next_slot, next_slot == start_idx_ + logs_.size() - 1, *durability_store_);
}

logs_[next_slot] = clone;
Expand All @@ -237,8 +234,8 @@ void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) {
}
logs_[index] = clone;

if (kv_store_) {
StoreToDisk(clone, index, index >= start_idx_ - logs_.size() - 1, *kv_store_);
if (durability_store_) {
StoreToDisk(clone, index, index >= start_idx_ - logs_.size() - 1, *durability_store_);
}
}

Expand Down Expand Up @@ -319,8 +316,8 @@ void CoordinatorLogStore::apply_pack(uint64_t index, buffer &pack) {
auto lock = std::lock_guard{logs_lock_};
logs_[cur_idx] = le;
spdlog::trace("Applying pack to log entry with id {}", std::to_string(cur_idx));
if (kv_store_) {
StoreToDisk(le, cur_idx, cur_idx >= start_idx_ + logs_.size() - 1, *kv_store_);
if (durability_store_) {
StoreToDisk(le, cur_idx, cur_idx >= start_idx_ + logs_.size() - 1, *durability_store_);
}
}
}
Expand All @@ -330,8 +327,9 @@ void CoordinatorLogStore::apply_pack(uint64_t index, buffer &pack) {
auto const entry = logs_.upper_bound(0);
if (entry != logs_.end()) {
start_idx_ = entry->first;
if (kv_store_) {
MG_ASSERT(kv_store_->Put(kStartIdx, std::to_string(start_idx_.load())), "Failed to store start index to disk");
if (durability_store_) {
MG_ASSERT(durability_store_->Put(kStartIdx, std::to_string(start_idx_.load())),
"Failed to store start index to disk");
}
} else {
start_idx_ = 1;
Expand All @@ -349,24 +347,25 @@ bool CoordinatorLogStore::compact(uint64_t last_log_index) {
continue;
}
logs_.erase(entry);
if (kv_store_) {
MG_ASSERT(kv_store_->Delete(std::string{kLogEntryPrefix} + std::to_string(ii)),
if (durability_store_) {
MG_ASSERT(durability_store_->Delete(std::string{kLogEntryPrefix} + std::to_string(ii)),
"Failed to delete log entry from disk");
}
}

if (start_idx_ <= last_log_index) {
start_idx_ = last_log_index + 1;
if (kv_store_) {
MG_ASSERT(kv_store_->Put(kStartIdx, std::to_string(start_idx_.load())), "Failed to store start index to disk");
if (durability_store_) {
MG_ASSERT(durability_store_->Put(kStartIdx, std::to_string(start_idx_.load())),
"Failed to store start index to disk");
}
}
return true;
}

bool CoordinatorLogStore::flush() {
if (kv_store_) {
return kv_store_->SyncWal();
if (durability_store_) {
return durability_store_->SyncWal();
}
return true;
}
Expand Down
14 changes: 6 additions & 8 deletions src/coordination/coordinator_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,15 @@ auto SerializeSnapshotCtxToJson(memgraph::coordination::SnapshotCtx const &snaps

namespace memgraph::coordination {

CoordinatorStateMachine::CoordinatorStateMachine(std::optional<std::filesystem::path> durability_dir,
ptr<CoordinatorLogStore> log_store)
: log_store_(std::move(log_store)) {
spdlog::trace("Restoring coordinator state machine with durability.");
if (durability_dir) {
kv_store_ = std::make_unique<kvstore::KVStore>(durability_dir.value());
}
CoordinatorStateMachine::CoordinatorStateMachine(std::shared_ptr<kvstore::KVStore> durability)
: kv_store_(std::move(durability)) {
if (!kv_store_) {
spdlog::info("Storing snapshots only in memory");
return;
}

spdlog::trace("Restoring coordinator state machine with durability.");

int version{0};
auto maybe_version = kv_store_->Get(kSnapshotVersion);
if (maybe_version.has_value()) {
Expand Down Expand Up @@ -138,7 +135,8 @@ CoordinatorStateMachine::CoordinatorStateMachine(std::optional<std::filesystem::
cluster_state_ = snapshots_[last_committed_idx_]->cluster_state_;
// no need for log store, only get last commited index
last_committed_idx_ = last_committed_idx_.load();
spdlog::trace("Last committed index: {}", last_committed_idx_);
// TODO(antoniofilipovic): Remove
spdlog::trace("Last committed index from coordinator state machine: {}", last_committed_idx_);
}

auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
Expand Down
21 changes: 11 additions & 10 deletions src/coordination/coordinator_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ constexpr int kActiveStateManagerDurabilityVersion = 1;

} // namespace

CoordinatorStateManager::CoordinatorStateManager(CoordinatorStateManagerConfig const &config,
ptr<CoordinatorLogStore> log_store)
CoordinatorStateManager::CoordinatorStateManager(CoordinatorStateManagerConfig const &config)
: my_id_(static_cast<int>(config.coordinator_id_)),
cur_log_store_(std::move(log_store)),
kv_store_(config.state_manager_durability_dir_) {
cur_log_store_(cs_new<CoordinatorLogStore>(config.durability_store_)),
state_manager_durability_(config.state_manager_durability_dir_) {
auto const c2c =
CoordinatorToCoordinatorConfig{config.coordinator_id_, io::network::Endpoint("0.0.0.0", config.bolt_port_),
io::network::Endpoint{"0.0.0.0", static_cast<uint16_t>(config.coordinator_port_)}};
Expand All @@ -54,12 +53,13 @@ CoordinatorStateManager::CoordinatorStateManager(CoordinatorStateManagerConfig c
cluster_config_->get_servers().push_back(my_srv_config_);

int version{0};
auto maybe_version = kv_store_.Get(kStateManagerDurabilityVersionKey);
auto maybe_version = state_manager_durability_.Get(kStateManagerDurabilityVersionKey);
if (maybe_version.has_value()) {
version = std::stoi(maybe_version.value());
} else {
spdlog::trace("Assuming first start of state manager with durability as version is missing, storing version 1.");
MG_ASSERT(kv_store_.Put(kStateManagerDurabilityVersionKey, std::to_string(kActiveStateManagerDurabilityVersion)),
MG_ASSERT(state_manager_durability_.Put(kStateManagerDurabilityVersionKey,
std::to_string(kActiveStateManagerDurabilityVersion)),
"Failed to store version to disk");
version = 1;
}
Expand All @@ -70,7 +70,7 @@ CoordinatorStateManager::CoordinatorStateManager(CoordinatorStateManagerConfig c

auto CoordinatorStateManager::load_config() -> ptr<cluster_config> {
spdlog::trace("Loading cluster config from RocksDb");
auto const maybe_cluster_config = kv_store_.Get(kClusterConfigKey);
auto const maybe_cluster_config = state_manager_durability_.Get(kClusterConfigKey);
if (!maybe_cluster_config.has_value()) {
spdlog::trace("Didn't find anything stored on disk for cluster config.");
return cluster_config_;
Expand All @@ -88,7 +88,7 @@ auto CoordinatorStateManager::save_config(cluster_config const &config) -> void
cluster_config_ = cluster_config::deserialize(*buf);
spdlog::info("Saving cluster config to disk.");
auto json = SerializeClusterConfig(config);
MG_ASSERT(kv_store_.Put(kClusterConfigKey, json.dump()), "Failed to save servers to disk");
MG_ASSERT(state_manager_durability_.Put(kClusterConfigKey, json.dump()), "Failed to save servers to disk");
}

auto CoordinatorStateManager::save_state(srv_state const &state) -> void {
Expand All @@ -98,7 +98,8 @@ auto CoordinatorStateManager::save_state(srv_state const &state) -> void {
{kVotedFor, state.get_voted_for()},
{kElectionTimer, state.is_election_timer_allowed()}};
spdlog::trace("!!!!STORED SERVER STATE TO DISK {}!!!!", server_state_json.dump());
MG_ASSERT(kv_store_.Put(kServerStateKey, server_state_json.dump()), "Couldn't store server state to disk.");
MG_ASSERT(state_manager_durability_.Put(kServerStateKey, server_state_json.dump()),
"Couldn't store server state to disk.");

ptr<buffer> buf = state.serialize();
saved_state_ = srv_state::deserialize(*buf);
Expand All @@ -107,7 +108,7 @@ auto CoordinatorStateManager::save_state(srv_state const &state) -> void {
auto CoordinatorStateManager::read_state() -> ptr<srv_state> {
spdlog::trace("Reading server state in coordinator state manager.");

auto const maybe_server_state = kv_store_.Get(kServerStateKey);
auto const maybe_server_state = state_manager_durability_.Get(kServerStateKey);
if (!maybe_server_state.has_value()) {
spdlog::trace("Didn't find anything stored on disk for server state.");
return saved_state_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <optional>
#include <string>
#include <utility>
#include "kvstore/kvstore.hpp"

#include <fmt/format.h>
#include "json/json.hpp"
Expand Down Expand Up @@ -57,30 +58,26 @@ struct CoordinatorStateManagerConfig {
int coordinator_port_{0};
int bolt_port_{0};
std::filesystem::path state_manager_durability_dir_;
std::optional<std::filesystem::path> log_store_durability_dir_;
std::shared_ptr<kvstore::KVStore> durability_store_;

CoordinatorStateManagerConfig(uint32_t coordinator_id, int coordinator_port, int bolt_port,
std::filesystem::path state_manager_durability_dir,
std::optional<std::filesystem::path> log_store_durability_dir = std::nullopt)
std::shared_ptr<kvstore::KVStore> durability_store)
: coordinator_id_(coordinator_id),
coordinator_port_(coordinator_port),
bolt_port_(bolt_port),
state_manager_durability_dir_(std::move(state_manager_durability_dir)),
log_store_durability_dir_(std::move(log_store_durability_dir)) {
durability_store_(std::move(durability_store)) {
MG_ASSERT(!this->state_manager_durability_dir_.empty(), "State manager durability dir path is empty");
}
};

struct CoordinatorStateMachineConfig {
uint32_t coordinator_id_{0};
std::optional<std::filesystem::path> state_machine_durability_dir_;
std::shared_ptr<kvstore::KVStore> durability_store_;

explicit CoordinatorStateMachineConfig(
uint32_t coordinator_id, std::optional<std::filesystem::path> state_machine_durability_dir = std::nullopt)
: coordinator_id_(coordinator_id), state_machine_durability_dir_(std::move(state_machine_durability_dir)) {
MG_ASSERT(!this->state_machine_durability_dir_.has_value() || !this->state_machine_durability_dir_.value().empty(),
"State machine durability dir path is empty");
}
explicit CoordinatorStateMachineConfig(uint32_t coordinator_id, std::shared_ptr<kvstore::KVStore> durability_store)
: coordinator_id_(coordinator_id), durability_store_(std::move(durability_store)) {}
};

struct ReplicationClientInfo {
Expand Down
1 change: 0 additions & 1 deletion src/coordination/include/coordination/raft_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class RaftState {
io::network::Endpoint raft_endpoint_;
uint32_t coordinator_id_;

ptr<CoordinatorLogStore> log_store_;
ptr<CoordinatorStateMachine> state_machine_;
ptr<CoordinatorStateManager> state_manager_;

Expand Down
4 changes: 2 additions & 2 deletions src/coordination/include/nuraft/coordinator_log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ using nuraft::raft_server;
*/
class CoordinatorLogStore : public log_store {
public:
CoordinatorLogStore(std::optional<std::filesystem::path> durability_dir);
CoordinatorLogStore(std::shared_ptr<kvstore::KVStore> durability_store);
CoordinatorLogStore(CoordinatorLogStore const &) = delete;
CoordinatorLogStore &operator=(CoordinatorLogStore const &) = delete;
CoordinatorLogStore(CoordinatorLogStore &&) = delete;
Expand Down Expand Up @@ -77,7 +77,7 @@ class CoordinatorLogStore : public log_store {
mutable std::mutex logs_lock_;
std::atomic<ulong> start_idx_;
std::atomic<ulong> next_idx_;
std::unique_ptr<kvstore::KVStore> kv_store_;
std::shared_ptr<kvstore::KVStore> durability_store_;
};

} // namespace memgraph::coordination
Expand Down
5 changes: 2 additions & 3 deletions src/coordination/include/nuraft/coordinator_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct SnapshotCtx {

class CoordinatorStateMachine : public state_machine {
public:
CoordinatorStateMachine(std::optional<std::filesystem::path> durability_dir, ptr<CoordinatorLogStore> log_store);
CoordinatorStateMachine(std::shared_ptr<kvstore::KVStore> durability);
CoordinatorStateMachine(CoordinatorStateMachine const &) = delete;
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;
Expand Down Expand Up @@ -117,8 +117,7 @@ class CoordinatorStateMachine : public state_machine {
ptr<snapshot> last_snapshot_;
std::mutex last_snapshot_lock_;

std::unique_ptr<kvstore::KVStore> kv_store_;
ptr<CoordinatorLogStore> log_store_;
std::shared_ptr<kvstore::KVStore> kv_store_;
};

} // namespace memgraph::coordination
Expand Down
4 changes: 2 additions & 2 deletions src/coordination/include/nuraft/coordinator_state_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using nuraft::state_mgr;

class CoordinatorStateManager : public state_mgr {
public:
explicit CoordinatorStateManager(CoordinatorStateManagerConfig const &config, ptr<CoordinatorLogStore> log_store);
explicit CoordinatorStateManager(CoordinatorStateManagerConfig const &config);

CoordinatorStateManager(CoordinatorStateManager const &) = delete;
CoordinatorStateManager &operator=(CoordinatorStateManager const &) = delete;
Expand Down Expand Up @@ -61,7 +61,7 @@ class CoordinatorStateManager : public state_mgr {
ptr<srv_config> my_srv_config_;
ptr<cluster_config> cluster_config_;
ptr<srv_state> saved_state_;
kvstore::KVStore kv_store_;
kvstore::KVStore state_manager_durability_;
};

} // namespace memgraph::coordination
Expand Down
Loading

0 comments on commit 0fb9c04

Please sign in to comment.