Skip to content

Commit

Permalink
Compaction for C++ FASTER (#250)
Browse files Browse the repository at this point in the history
* Scan Iterator over Hybrid Log

This iterator takes in a begin and end address. Calling getNext()
returns a pointer to the next record in the range. If the end has
been reached, then nullptr is returned.

If we need to scan over disk, then the iterator can optionally
scan extra pages and buffer them internally.

* Upsert and Delete contexts for Log Compaction

The Log Compaction algorithm requires that we
 a) Collect records into a temporary faster instance
 b) Delete dead records from this instance
 c) Upsert live records at the tail of the log

To perform the above three we need an Upsert and Delete context.

* Compaction for HybridLog

Implements a 3 phased approach similar to the C# version.
 Phase 1: Collects records from the region to be compacted into
          a mini-FASTER instance.
 Phase 2: Scans records in FASTER's log upto the safe read-only offset, deleting
          them from the mini-FASTER instance.
 Phase 3: Inserts records from the mini-FASTER into the hybrid log as long as
          they don't exist in it's mutable region.

* Unit test for in-memory scan iterator

* Read correct page from disk when scanning

* Simple unit test for compaction

* Fix Windows compile error.

* Fix copyright on compact.h

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
chinkulkarni and badrishc authored Mar 25, 2020
1 parent 7122495 commit c98de54
Show file tree
Hide file tree
Showing 6 changed files with 854 additions and 1 deletion.
134 changes: 134 additions & 0 deletions cc/src/core/compact.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma once

#include "core/async.h"

#include "record.h"

namespace FASTER {
namespace core {

/// Upsert context used by FASTER's compaction algorithm.
///
/// The following are template arguments.
/// K: The type on the key of each record.
/// V: The type on the value stored inside FASTER.
template <class K, class V>
class CompactionUpsert : public IAsyncContext {
public:
// Typedefs on the key and value required internally by FASTER.
typedef K key_t;
typedef V value_t;

// Type signature on the record. Required by the constructor.
typedef Record<K, V> record_t;

/// Constructs and returns a context given a pointer to a record.
CompactionUpsert(record_t* record)
: key_(record->key())
, value_(record->value())
{}

/// Copy constructor. Required for when an Upsert operation goes async
/// inside FASTER.
CompactionUpsert(const CompactionUpsert& from)
: key_(from.key_)
, value_(from.value_)
{}

/// Accessor for the key. Invoked from within FASTER.
inline const K& key() const {
return key_;
}

/// Returns the size of the value. Invoked from within FASTER when creating
/// a new key-value pair (because the key did not map to a value to begin
/// with).
inline static constexpr uint32_t value_size() {
return V::size();
}

/// Stores this context's value into a passed in reference. This is
/// typically invoked from within FASTER when a new record corresponding
/// to the key-value pair is created at the tail of the hybrid log.
inline void Put(V& val) {
new(&val) V(value_);
}

/// Atomically stores this context's value into a passed in reference. This
/// is typically invoked from within FASTER when performing an Upsert on a
/// key-value pair in the HybridLog's mutable region.
inline bool PutAtomic(V& val) {
new(&val) V(value_);
return true;
}

protected:
/// Copies this context into a passed-in pointer if the operation goes
/// asynchronous inside FASTER.
Status DeepCopy_Internal(IAsyncContext*& context_copy) {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}

private:
/// The key the upsert must be performed against.
K key_;

/// The value that the key should map to after the Upsert operation.
V value_;
};

/// Delete context used by FASTER's compaction algorithm.
///
/// The following are template arguments.
/// K: The type on the key of each record.
/// V: The type on the value stored inside FASTER.
template <class K, class V>
class CompactionDelete : public IAsyncContext {
public:
// Typedefs on the key and value required internally by FASTER.
typedef K key_t;
typedef V value_t;

// Type signature on the record. Required by the constructor.
typedef Record<K, V> record_t;

/// Constructs and returns a context given a pointer to a record.
CompactionDelete(record_t* record)
: key_(record->key())
{}

/// Copy constructor. Required for when the operation goes async
/// inside FASTER.
CompactionDelete(const CompactionDelete& from)
: key_(from.key_)
{}

/// Accessor for the key. Invoked from within FASTER.
inline const K& key() const {
return key_;
}

/// Returns the size of the value. Invoked from within FASTER when creating
/// a new key-value pair (because the key did not map to a value to begin
/// with).
inline static constexpr uint32_t value_size() {
return V::size();
}

protected:
/// Copies this context into a passed-in pointer if the operation goes
/// asynchronous inside FASTER.
Status DeepCopy_Internal(IAsyncContext*& context_copy) {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}

private:
/// The key the delete must be performed against.
K key_;
};

} // namespace core
} // namespace FASTER
183 changes: 182 additions & 1 deletion cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstdio>
#include <cstring>
#include <type_traits>
#include <algorithm>

#include "device/file_system_disk.h"

Expand All @@ -30,6 +31,8 @@
#include "state_transitions.h"
#include "status.h"
#include "utility.h"
#include "log_scan.h"
#include "compact.h"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -146,6 +149,9 @@ class FasterKv {
Status Recover(const Guid& index_token, const Guid& hybrid_log_token, uint32_t& version,
std::vector<Guid>& session_ids);

/// Log compaction entry method.
bool Compact(uint64_t untilAddress);

/// Truncating the head of the log.
bool ShiftBeginAddress(Address address, GcState::truncate_callback_t truncate_callback,
GcState::complete_callback_t complete_callback);
Expand Down Expand Up @@ -258,6 +264,9 @@ class FasterKv {
void AddHashEntry(HashBucket*& bucket, uint32_t& next_idx, uint8_t version,
HashBucketEntry entry);

Address LogScanForValidity(Address from, faster_t* temp);
bool ContainsKeyInMemory(key_t key, Address offset);

/// Access the current and previous (thread-local) execution contexts.
const ExecutionContext& thread_ctx() const {
return thread_contexts_[Thread::id()].cur();
Expand Down Expand Up @@ -2881,5 +2890,177 @@ inline std::ostream& operator << (std::ostream& out, const FixedPageAddress addr
return out << address.control();
}

/// When invoked, compacts the hybrid-log between the begin address and a
/// passed in offset (`untilAddress`).
template <class K, class V, class D>
bool FasterKv<K, V, D>::Compact(uint64_t untilAddress)
{
// First, initialize a mini FASTER that will store all live records in
// the range [beginAddress, untilAddress).
Address begin = hlog.begin_address.load();
auto size = 2 * (untilAddress - begin.control());
if (size < 0) return false;

auto pSize = PersistentMemoryMalloc<D>::kPageSize;
size = std::max(8 * pSize, size);

if (size % pSize != 0) size += pSize - (size % pSize);

faster_t tempKv(min_table_size_, size, "");
tempKv.StartSession();

// In the first phase of compaction, scan the hybrid-log between addresses
// [beginAddress, untilAddress), adding all live records to the mini FASTER.
// On encountering a tombstone, we try to delete the record from the mini
// instance of FASTER.
int numOps = 0;
ScanIterator<faster_t> iter(&hlog, Buffering::DOUBLE_PAGE, begin,
Address(untilAddress), &disk);
while (true) {
auto r = iter.GetNext();
if (r == nullptr) break;

if (!r->header.tombstone) {
CompactionUpsert<K, V> ctxt(r);
auto cb = [](IAsyncContext* ctxt, Status result) {
CallbackContext<CompactionUpsert<K, V>> context(ctxt);
assert(result == Status::Ok);
};
tempKv.Upsert(ctxt, cb, 0);
} else {
CompactionDelete<K, V> ctxt(r);
auto cb = [](IAsyncContext* ctxt, Status result) {
CallbackContext<CompactionDelete<K, V>> context(ctxt);
assert(result == Status::Ok);
};
tempKv.Delete(ctxt, cb, 0);
}

if (++numOps % 1000 == 0) {
tempKv.Refresh();
Refresh();
}
}

// Scan the remainder of the hybrid log, deleting all encountered records
// from the temporary/mini FASTER instance.
auto upto = LogScanForValidity(Address(untilAddress), &tempKv);

// Finally, scan through all records within the temporary FASTER instance,
// inserting those that don't already exist within FASTER's mutable region.
numOps = 0;
ScanIterator<faster_t> iter2(&tempKv.hlog, Buffering::DOUBLE_PAGE,
tempKv.hlog.begin_address.load(),
tempKv.hlog.GetTailAddress(), &tempKv.disk);
while (true) {
auto r = iter2.GetNext();
if (r == nullptr) break;

if (!r->header.tombstone && !ContainsKeyInMemory(r->key(), upto)) {
CompactionUpsert<K, V> ctxt(r);
auto cb = [](IAsyncContext* ctxt, Status result) {
CallbackContext<CompactionUpsert<K, V>> context(ctxt);
assert(result == Status::Ok);
};

Upsert(ctxt, cb, 0);
}

if (++numOps % 1000 == 0) {
tempKv.Refresh();
Refresh();
}

// The safe-read-only region might have moved forward since the previous
// log scan. If it has, perform another validity scan over the delta.
if (upto < hlog.safe_read_only_address.load()) {
upto = LogScanForValidity(upto, &tempKv);
}
}

tempKv.StopSession();
return true;
}

/// Scans the hybrid log starting at `from` until the safe-read-only address,
/// deleting all encountered records from within a passed in temporary FASTER
/// instance.
///
/// Useful for log compaction where the temporary instance contains potentially
/// live records that were found before `from` on the log. This method will then
/// delete all records from within that instance that are dead because they exist
/// in the safe-read-only region of the main FASTER instance.
///
/// Returns the address upto which the scan was performed.
template <class K, class V, class D>
Address FasterKv<K, V, D>::LogScanForValidity(Address from, faster_t* temp)
{
// Scan upto the safe read only region of the log, deleting all encountered
// records from the temporary instance of FASTER. Since the safe-read-only
// offset can advance while we're scanning, we repeat this operation until
// we converge.
Address sRO = hlog.safe_read_only_address.load();
while (from < sRO) {
int numOps = 0;
ScanIterator<faster_t> iter(&hlog, Buffering::DOUBLE_PAGE, from,
sRO, &disk);
while (true) {
auto r = iter.GetNext();
if (r == nullptr) break;

CompactionDelete<K, V> ctxt(r);
auto cb = [](IAsyncContext* ctxt, Status result) {
CallbackContext<CompactionDelete<K, V>> context(ctxt);
assert(result == Status::Ok);
};
temp->Delete(ctxt, cb, 0);

if (++numOps % 1000 == 0) {
temp->Refresh();
Refresh();
}
}

// Refresh Faster, updating our start and end addresses for the convergence
// check in the while loop above.
Refresh();
from = sRO;
sRO = hlog.safe_read_only_address.load();
}

return sRO;
}

/// Checks if a key exists between a passed in address (`offset`) and the
/// current tail of the hybrid log.
template <class K, class V, class D>
bool FasterKv<K, V, D>::ContainsKeyInMemory(key_t key, Address offset)
{
// First, retrieve the hash table entry corresponding to this key.
KeyHash hash = key.GetHash();
HashBucketEntry _entry;
const AtomicHashBucketEntry* atomic_entry = FindEntry(hash, _entry);
if (!atomic_entry) return false;

HashBucketEntry entry = atomic_entry->load();
Address address = entry.address();

if (address >= offset) {
// Look through the in-memory portion of the log, to find the first record
// (if any) whose key matches.
const record_t* record =
reinterpret_cast<const record_t*>(hlog.Get(address));
if(key != record->key()) {
address = TraceBackForKeyMatch(key, record->header.previous_address(),
offset);
}
}

// If we found a record after the passed in address then we succeeded.
// Otherwise, we failed and so return false.
if (address >= offset) return true;
return false;
}

}
} // namespace FASTER::core
} // namespace FASTER::core
Loading

0 comments on commit c98de54

Please sign in to comment.