Skip to content

Commit

Permalink
CC: Split checkpoint into index & hybrid-log (full combines both).
Browse files Browse the repository at this point in the history
Also store checkpoints by GUID, not by version. Now follows C# behavior.
  • Loading branch information
jahunter-m committed Sep 13, 2018
1 parent d7b3aba commit 276c20f
Show file tree
Hide file tree
Showing 13 changed files with 473 additions and 201 deletions.
5 changes: 3 additions & 2 deletions cc/CMakeLists.txt
Expand Up @@ -63,18 +63,19 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
set (FASTER_LINK_LIBS
faster
)

# Set the link libraries to for test compilation
set (FASTER_TEST_LINK_LIBS ${FASTER_LINK_LIBS} gtest)
if(WIN32)
set(FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} rpcrt4)
else()
set (FASTER_TEST_LINK_LIBS ${FASTER_TEST_LINK_LIBS} stdc++fs uuid tbb gcc aio m stdc++ pthread)
endif()

# Set the link libraries to for benchmark binary compilation
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_LINK_LIBS})
if(WIN32)
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_LINK_LIBS} wsock32 Ws2_32)
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_LINK_LIBS} rpcrt4 wsock32 Ws2_32)
else()
set (FASTER_BENCHMARK_LINK_LIBS ${FASTER_BENCHMARK_LINK_LIBS} stdc++fs uuid tbb gcc aio m stdc++ pthread)
endif()
Expand Down
12 changes: 9 additions & 3 deletions cc/benchmark-dir/benchmark.cc
Expand Up @@ -544,8 +544,13 @@ void run_benchmark(store_t* store, size_t num_threads) {
if(kCheckpointSeconds == 0) {
std::this_thread::sleep_for(std::chrono::seconds(kRunSeconds));
} else {
auto callback = [](uint64_t persistent_serial_num) {
++num_checkpoints;
auto callback = [](Status result, uint64_t persistent_serial_num) {
if(result != Status::Ok) {
printf("Thread %" PRIu32 " reports checkpoint failed.\n",
Thread::id());
} else {
++num_checkpoints;
}
};

auto start_time = std::chrono::high_resolution_clock::now();
Expand All @@ -558,7 +563,8 @@ void run_benchmark(store_t* store, size_t num_threads) {
std::this_thread::sleep_for(std::chrono::seconds(1));
current_time = std::chrono::high_resolution_clock::now();
if(current_time - last_checkpoint_time >= std::chrono::seconds(kCheckpointSeconds)) {
bool success = store->Checkpoint(callback);
Guid token;
bool success = store->Checkpoint(nullptr, callback, token);
if(success) {
printf("Starting checkpoint %" PRIu64 ".\n", checkpoint_num);
++checkpoint_num;
Expand Down
54 changes: 36 additions & 18 deletions cc/playground/sum_store-dir/concurrent_recovery_test.h
Expand Up @@ -45,9 +45,14 @@ class ConcurrentRecoveryTest {
assert(result == Status::Ok);
};

auto persistence_callback = [](uint64_t persistent_serial_num) {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);
auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
if(result != Status::Ok) {
printf("Thread %" PRIu32 " reports checkpoint failed.\n",
Thread::id());
} else {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);
}
};

// Register thread with the store
Expand All @@ -60,9 +65,10 @@ class ConcurrentRecoveryTest {
RmwContext context{ idx % kNumUniqueKeys, 1 };
store->Rmw(context, callback, idx);
if(idx % kCheckpointInterval == 0 && *num_active_threads == num_threads) {
if(store->Checkpoint(persistence_callback)) {
printf("Thread %" PRIu32 " calling Checkpoint(), %" PRIu32 "\n", Thread::id(),
++(*num_checkpoints));
Guid token;
if(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token)) {
printf("Thread %" PRIu32 " calling Checkpoint(), version = %" PRIu32 ", token = %s\n",
Thread::id(), ++(*num_checkpoints), token.ToString().c_str());
}
}
if(idx % kCompletePendingInterval == 0) {
Expand Down Expand Up @@ -125,15 +131,17 @@ class ConcurrentRecoveryTest {
}
}

void RecoverAndTest(uint32_t cpr_version, uint32_t index_version) {
void RecoverAndTest(const Guid& index_token, const Guid& hybrid_log_token) {
auto callback = [](IAsyncContext* ctxt, Status result) {
CallbackContext<ReadContext> context{ ctxt };
assert(result == Status::Ok);
};

// Recover
uint32_t version;
std::vector<Guid> session_ids;
FASTER::core::Status result = store.Recover(cpr_version, index_version, session_ids);
FASTER::core::Status result = store.Recover(index_token, hybrid_log_token, version,
session_ids);
if(result != FASTER::core::Status::Ok) {
printf("Recovery failed with error %u\n", static_cast<uint8_t>(result));
exit(1);
Expand Down Expand Up @@ -205,9 +213,15 @@ class ConcurrentRecoveryTest {
assert(result == Status::Ok);
};

auto persistence_callback = [](uint64_t persistent_serial_num) {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);

auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
if(result != Status::Ok) {
printf("Thread %" PRIu32 " reports checkpoint failed.\n",
Thread::id());
} else {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);
}
};

// Register thread with the store
Expand All @@ -220,9 +234,10 @@ class ConcurrentRecoveryTest {
RmwContext context{ idx % kNumUniqueKeys, 1 };
store->Rmw(context, callback, idx);
if(idx % kCheckpointInterval == 0 && *num_active_threads == num_threads) {
if(store->Checkpoint(persistence_callback)) {
printf("Thread %" PRIu32 " calling Checkpoint(), %" PRIu32 "\n", Thread::id(),
++(*num_checkpoints));
Guid token;
if(store->Checkpoint(nullptr, hybrid_log_persistence_callback, token)) {
printf("Thread %" PRIu32 " calling Checkpoint(), version = %" PRIu32 ", token = %s\n",
Thread::id(), ++(*num_checkpoints), token.ToString().c_str());
}
}
if(idx % kCompletePendingInterval == 0) {
Expand All @@ -241,19 +256,22 @@ class ConcurrentRecoveryTest {
printf("Populate successful on thread %" PRIu32 ".\n", Thread::id());
}

void Continue(uint32_t cpr_version, uint32_t index_version) {
void Continue(const Guid& index_token, const Guid& hybrid_log_token) {
// Recover
printf("Recovering version (%" PRIu32 ", %" PRIu32 ")\n", cpr_version, index_version);
printf("Recovering version (index_token = %s, hybrid_log_token = %s)\n",
index_token.ToString().c_str(), hybrid_log_token.ToString().c_str());
uint32_t version;
std::vector<Guid> session_ids;
FASTER::core::Status result = store.Recover(cpr_version, index_version, session_ids);
FASTER::core::Status result = store.Recover(index_token, hybrid_log_token, version,
session_ids);
if(result != FASTER::core::Status::Ok) {
printf("Recovery failed with error %u\n", static_cast<uint8_t>(result));
exit(1);
} else {
printf("Recovery Done!\n");
}

num_checkpoints.store(cpr_version);
num_checkpoints.store(version);
// Some threads may have already completed.
num_threads = session_ids.size();

Expand Down
20 changes: 14 additions & 6 deletions cc/playground/sum_store-dir/single_threaded_recovery_test.h
Expand Up @@ -39,9 +39,14 @@ class SingleThreadedRecoveryTest {
assert(result == Status::Ok);
};

auto persistence_callback = [](uint64_t persistent_serial_num) {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);
auto hybrid_log_persistence_callback = [](Status result, uint64_t persistent_serial_num) {
if(result != Status::Ok) {
printf("Thread %" PRIu32 " reports checkpoint failed.\n",
Thread::id());
} else {
printf("Thread %" PRIu32 " reports persistence until %" PRIu64 "\n",
Thread::id(), persistent_serial_num);
}
};

// Register thread with FASTER
Expand All @@ -53,7 +58,9 @@ class SingleThreadedRecoveryTest {
store.Rmw(context, callback, idx);

if(idx % kCheckpointInterval == 0) {
store.Checkpoint(persistence_callback);
Guid token;
store.Checkpoint(nullptr, hybrid_log_persistence_callback, token);
printf("Calling Checkpoint(), token = %s\n", token.ToString().c_str());
}
if(idx % kCompletePendingInterval == 0) {
store.CompletePending(false);
Expand All @@ -73,15 +80,16 @@ class SingleThreadedRecoveryTest {
std::getline(std::cin, discard);
}

void RecoverAndTest(uint32_t cpr_version, uint32_t index_version) {
void RecoverAndTest(const Guid& index_token, const Guid& hybrid_log_token) {
auto callback = [](IAsyncContext* ctxt, Status result) {
CallbackContext<ReadContext> context{ ctxt };
assert(result == Status::Ok);
};

// Recover
uint32_t version;
std::vector<Guid> session_ids;
store.Recover(cpr_version, index_version, session_ids);
store.Recover(index_token, hybrid_log_token, version, session_ids);

// Create array for reading
auto read_results = alloc_aligned<uint64_t>(64, sizeof(uint64_t) * kNumUniqueKeys);
Expand Down
19 changes: 10 additions & 9 deletions cc/playground/sum_store-dir/sum_store.cc
Expand Up @@ -14,7 +14,8 @@
int main(int argc, char* argv[]) {
if(argc < 3) {
printf("Usage: sum_store.exe single <operation>\n");
printf("Where <operation> is one of \"populate\", \"recover <version>\", or \"continue\".\n");
printf("Where <operation> is one of \"populate\", \"recover <token>\", or "
"\"continue <token>\".\n");
exit(0);
}

Expand All @@ -34,11 +35,11 @@ int main(int argc, char* argv[]) {
test.Populate();
} else if(task == "recover") {
if(argc != 4) {
printf("Must specify version to recover to.\n");
printf("Must specify token to recover to.\n");
exit(1);
}
uint32_t version = std::atoi(argv[3]);
test.RecoverAndTest(version, version);
Guid token = Guid::Parse(argv[3]);
test.RecoverAndTest(token, token);
}
} else if(type == "concurrent") {
if(argc < 4) {
Expand All @@ -55,18 +56,18 @@ int main(int argc, char* argv[]) {
test.Populate();
} else if(task == "recover") {
if(argc != 5) {
printf("Must specify version to recover to.\n");
printf("Must specify token to recover to.\n");
exit(1);
}
uint32_t version = std::atoi(argv[4]);
test.RecoverAndTest(version, version);
Guid token = Guid::Parse(argv[4]);
test.RecoverAndTest(token, token);
} else if(task == "continue") {
if(argc != 5) {
printf("Must specify version to continue from.\n");
exit(1);
}
uint32_t version = std::atoi(argv[4]);
test.Continue(version, version);
Guid token = Guid::Parse(argv[4]);
test.Continue(token, token);
}

}
Expand Down
71 changes: 60 additions & 11 deletions cc/src/core/checkpoint_state.h
Expand Up @@ -9,6 +9,7 @@
#include "address.h"
#include "guid.h"
#include "malloc_fixed_page_size.h"
#include "status.h"
#include "thread.h"

namespace FASTER {
Expand Down Expand Up @@ -101,45 +102,88 @@ template <class F>
class CheckpointState {
public:
typedef F file_t;
typedef void(*persistence_callback_t)(uint64_t persistent_serial_num);
typedef void(*index_persistence_callback_t)(Status result);
typedef void(*hybrid_log_persistence_callback_t)(Status result, uint64_t persistent_serial_num);

CheckpointState()
: index_checkpoint_started{ false }
, failed{ false }
, flush_pending{ UINT32_MAX }
, persistence_callback{ nullptr } {
, index_persistence_callback{ nullptr }
, hybrid_log_persistence_callback{ nullptr } {
}

void InitializeCheckpoint(uint32_t version, uint64_t table_size, Address log_begin_address,
Address checkpoint_start_address, bool use_snapshot_file,
Address flushed_until_address,
persistence_callback_t persistence_callback_) {
void InitializeIndexCheckpoint(const Guid& token, uint32_t version, uint64_t table_size,
Address log_begin_address, Address checkpoint_start_address,
index_persistence_callback_t callback) {
failed = false;
index_checkpoint_started = false;
continue_tokens.clear();
index_token = token;
hybrid_log_token = Guid{};
index_metadata.Initialize(version, table_size, log_begin_address, checkpoint_start_address);
log_metadata.Reset();
flush_pending = 0;
index_persistence_callback = callback;
hybrid_log_persistence_callback = nullptr;
}

void InitializeHybridLogCheckpoint(const Guid& token, uint32_t version, bool use_snapshot_file,
Address flushed_until_address,
hybrid_log_persistence_callback_t callback) {
failed = false;
index_checkpoint_started = false;
continue_tokens.clear();
index_token = Guid{};
hybrid_log_token = token;
index_metadata.Reset();
log_metadata.Initialize(use_snapshot_file, version, flushed_until_address);
if(use_snapshot_file) {
flush_pending = UINT32_MAX;
} else {
flush_pending = 0;
}
persistence_callback = persistence_callback_;
index_persistence_callback = nullptr;
hybrid_log_persistence_callback = callback;
}

void InitializeCheckpoint(const Guid& token, uint32_t version, uint64_t table_size,
Address log_begin_address, Address checkpoint_start_address,
bool use_snapshot_file, Address flushed_until_address,
index_persistence_callback_t index_persistence_callback_,
hybrid_log_persistence_callback_t hybrid_log_persistence_callback_) {
failed = false;
index_checkpoint_started = false;
continue_tokens.clear();
index_token = token;
hybrid_log_token = token;
index_metadata.Initialize(version, table_size, log_begin_address, checkpoint_start_address);
log_metadata.Initialize(use_snapshot_file, version, flushed_until_address);
if(use_snapshot_file) {
flush_pending = UINT32_MAX;
} else {
flush_pending = 0;
}
index_persistence_callback = index_persistence_callback_;
hybrid_log_persistence_callback = hybrid_log_persistence_callback_;
}

void CheckpointDone() {
assert(!failed);
assert(index_checkpoint_started);
assert(index_token == Guid{} || index_checkpoint_started);
assert(continue_tokens.empty());
assert(flush_pending == 0);
index_metadata.Reset();
log_metadata.Reset();
snapshot_file.Close();
persistence_callback = nullptr;
index_persistence_callback = nullptr;
hybrid_log_persistence_callback = nullptr;
}

inline void InitializeRecover() {
inline void InitializeRecover(const Guid& index_token_, const Guid& hybrid_log_token_) {
failed = false;
index_token = index_token_;
hybrid_log_token = hybrid_log_token_;
}

void RecoverDone() {
Expand All @@ -153,11 +197,16 @@ class CheckpointState {
std::atomic<bool> failed;
IndexMetadata index_metadata;
LogMetadata log_metadata;

Guid index_token;
Guid hybrid_log_token;

/// State used when fold_over_snapshot = false.
file_t snapshot_file;
std::atomic<uint32_t> flush_pending;

persistence_callback_t persistence_callback;
index_persistence_callback_t index_persistence_callback;
hybrid_log_persistence_callback_t hybrid_log_persistence_callback;
std::unordered_map<Guid, uint64_t> continue_tokens;
};

Expand Down

0 comments on commit 276c20f

Please sign in to comment.