Skip to content

Commit

Permalink
[XRay] Move buffer extents back to the heap
Browse files Browse the repository at this point in the history
Summary:
This change addresses an issue which shows up with the synchronised race
between threads writing into a buffer, and another thread reading the
buffer.

In a lot of cases, we cannot guarantee that threads will always see the
signal to finalise their buffers in time despite the grace periods and
state machine maintained through atomic variables. This change addresses
it by ensuring that the same instance being updated to indicate how much
of the buffer is "used" by the writing thread is the same instance being
read by the thread processing the buffer to be written out to disk or
handled through the iterators.

To do this, we ensure that all the "extents" instances live in their own
the backing store, in a different contiguous page from the
buffer-specific backing store. We also take precautions to ensure that
the atomic variables are cache-line-sized to prevent false-sharing from
unnecessarily causing cache contention on unrelated writes/reads.

It's feasible that we may in the future be able to move the storage of
the extents objects into the single backing store, slightly changing the
way to compute the size(s) of the buffers, but in the meantime we'll
settle for the isolation afforded by having a different backing store
for the extents instances.

Reviewers: mboerger

Subscribers: jfb, llvm-commits

Differential Revision: https://reviews.llvm.org/D54684

llvm-svn: 347280
  • Loading branch information
deanberris committed Nov 20, 2018
1 parent 8e0e35a commit ba02cb5
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 22 deletions.
2 changes: 1 addition & 1 deletion compiler-rt/lib/xray/tests/unit/test_helpers.cc
Expand Up @@ -82,7 +82,7 @@ std::string serialize(BufferQueue &Buffers, int32_t Version) {
Serialized.append(reinterpret_cast<const char *>(&HeaderStorage),
sizeof(XRayFileHeader));
Buffers.apply([&](const BufferQueue::Buffer &B) {
auto Size = atomic_load_relaxed(&B.Extents);
auto Size = atomic_load_relaxed(B.Extents);
auto Extents =
createMetadataRecord<MetadataRecord::RecordKinds::BufferExtents>(Size);
Serialized.append(reinterpret_cast<const char *>(&Extents),
Expand Down
43 changes: 38 additions & 5 deletions compiler-rt/lib/xray/xray_buffer_queue.cc
Expand Up @@ -23,7 +23,6 @@
#include <sys/mman.h>

using namespace __xray;
using namespace __sanitizer;

namespace {

Expand Down Expand Up @@ -53,6 +52,18 @@ void incRefCount(BufferQueue::ControlBlock *C) {
atomic_fetch_add(&C->RefCount, 1, memory_order_acq_rel);
}

// We use a struct to ensure that we are allocating one atomic_uint64_t per
// cache line. This allows us to not worry about false-sharing among atomic
// objects being updated (constantly) by different threads.
struct ExtentsPadded {
union {
atomic_uint64_t Extents;
unsigned char Storage[kCacheLineSize];
};
};

constexpr size_t kExtentsSize = sizeof(ExtentsPadded);

} // namespace

BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
Expand All @@ -71,13 +82,25 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
if (BackingStore == nullptr)
return BufferQueue::ErrorCode::NotEnoughMemory;

auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
auto CleanupBackingStore = at_scope_exit([&, this] {
if (Success)
return;
deallocControlBlock(BackingStore, BufferSize, BufferCount);
BackingStore = nullptr;
});

// Initialize enough atomic_uint64_t instances, each
ExtentsBackingStore = allocControlBlock(kExtentsSize, BufferCount);
if (ExtentsBackingStore == nullptr)
return BufferQueue::ErrorCode::NotEnoughMemory;

auto CleanupExtentsBackingStore = at_scope_exit([&, this] {
if (Success)
return;
deallocControlBlock(ExtentsBackingStore, kExtentsSize, BufferCount);
ExtentsBackingStore = nullptr;
});

Buffers = initArray<BufferRep>(BufferCount);
if (Buffers == nullptr)
return BufferQueue::ErrorCode::NotEnoughMemory;
Expand All @@ -89,18 +112,23 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
// First, we initialize the refcount in the ControlBlock, which we treat as
// being at the start of the BackingStore pointer.
atomic_store(&BackingStore->RefCount, 1, memory_order_release);
atomic_store(&ExtentsBackingStore->RefCount, 1, memory_order_release);

// Then we initialise the individual buffers that sub-divide the whole backing
// store. Each buffer will start at the `Data` member of the ControlBlock, and
// will be offsets from these locations.
for (size_t i = 0; i < BufferCount; ++i) {
auto &T = Buffers[i];
auto &Buf = T.Buff;
atomic_store(&Buf.Extents, 0, memory_order_release);
auto *E = reinterpret_cast<ExtentsPadded *>(&ExtentsBackingStore->Data +
(kExtentsSize * i));
Buf.Extents = &E->Extents;
atomic_store(Buf.Extents, 0, memory_order_release);
Buf.Generation = generation();
Buf.Data = &BackingStore->Data + (BufferSize * i);
Buf.Size = BufferSize;
Buf.BackingStore = BackingStore;
Buf.ExtentsBackingStore = ExtentsBackingStore;
Buf.Count = BufferCount;
T.Used = false;
}
Expand All @@ -120,6 +148,7 @@ BufferQueue::BufferQueue(size_t B, size_t N,
Mutex(),
Finalizing{1},
BackingStore(nullptr),
ExtentsBackingStore(nullptr),
Buffers(nullptr),
Next(Buffers),
First(Buffers),
Expand All @@ -144,6 +173,7 @@ BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
}

incRefCount(BackingStore);
incRefCount(ExtentsBackingStore);
Buf = B->Buff;
Buf.Generation = generation();
B->Used = true;
Expand All @@ -159,6 +189,7 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
if (Buf.Generation != generation() || LiveBuffers == 0) {
Buf = {};
decRefCount(Buf.BackingStore, Buf.Size, Buf.Count);
decRefCount(Buf.ExtentsBackingStore, kExtentsSize, Buf.Count);
return BufferQueue::ErrorCode::Ok;
}

Expand All @@ -176,8 +207,8 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
B->Buff = Buf;
B->Used = true;
decRefCount(Buf.BackingStore, Buf.Size, Buf.Count);
atomic_store(&B->Buff.Extents,
atomic_load(&Buf.Extents, memory_order_acquire),
decRefCount(Buf.ExtentsBackingStore, kExtentsSize, Buf.Count);
atomic_store(B->Buff.Extents, atomic_load(Buf.Extents, memory_order_acquire),
memory_order_release);
Buf = {};
return ErrorCode::Ok;
Expand All @@ -194,7 +225,9 @@ void BufferQueue::cleanupBuffers() {
B->~BufferRep();
deallocateBuffer(Buffers, BufferCount);
decRefCount(BackingStore, BufferSize, BufferCount);
decRefCount(ExtentsBackingStore, kExtentsSize, BufferCount);
BackingStore = nullptr;
ExtentsBackingStore = nullptr;
Buffers = nullptr;
BufferCount = 0;
BufferSize = 0;
Expand Down
15 changes: 10 additions & 5 deletions compiler-rt/lib/xray/xray_buffer_queue.h
Expand Up @@ -32,10 +32,11 @@ namespace __xray {
class BufferQueue {
public:
/// ControlBlock represents the memory layout of how we interpret the backing
/// store for all buffers managed by a BufferQueue instance. The ControlBlock
/// has the reference count as the first member, sized according to
/// platform-specific cache-line size. We never use the Buffer member of the
/// union, which is only there for compiler-supported alignment and sizing.
/// store for all buffers and extents managed by a BufferQueue instance. The
/// ControlBlock has the reference count as the first member, sized according
/// to platform-specific cache-line size. We never use the Buffer member of
/// the union, which is only there for compiler-supported alignment and
/// sizing.
///
/// This ensures that the `Data` member will be placed at least kCacheLineSize
/// bytes from the beginning of the structure.
Expand All @@ -52,14 +53,15 @@ class BufferQueue {
};

struct Buffer {
atomic_uint64_t Extents{0};
atomic_uint64_t *Extents = nullptr;
uint64_t Generation{0};
void *Data = nullptr;
size_t Size = 0;

private:
friend class BufferQueue;
ControlBlock *BackingStore = nullptr;
ControlBlock *ExtentsBackingStore = nullptr;
size_t Count = 0;
};

Expand Down Expand Up @@ -142,6 +144,9 @@ class BufferQueue {
// The collocated ControlBlock and buffer storage.
ControlBlock *BackingStore;

// The collocated ControlBlock and extents storage.
ControlBlock *ExtentsBackingStore;

// A dynamically allocated array of BufferRep instances.
BufferRep *Buffers;

Expand Down
4 changes: 2 additions & 2 deletions compiler-rt/lib/xray/xray_fdr_controller.h
Expand Up @@ -64,7 +64,7 @@ template <size_t Version = 5> class FDRController {
First = true;
UndoableFunctionEnters = 0;
UndoableTailExits = 0;
atomic_store(&B.Extents, 0, memory_order_release);
atomic_store(B.Extents, 0, memory_order_release);
return true;
}

Expand Down Expand Up @@ -123,7 +123,7 @@ template <size_t Version = 5> class FDRController {
if (First) {
First = false;
W.resetRecord();
atomic_store(&B.Extents, 0, memory_order_release);
atomic_store(B.Extents, 0, memory_order_release);
return setupNewBuffer();
}

Expand Down
14 changes: 7 additions & 7 deletions compiler-rt/lib/xray/xray_fdr_log_writer.h
Expand Up @@ -86,7 +86,7 @@ class FDRLogWriter {
// read the bytes in the buffer will see the writes committed before the
// extents are updated.
atomic_thread_fence(memory_order_release);
atomic_fetch_add(&Buffer.Extents, sizeof(T), memory_order_acq_rel);
atomic_fetch_add(Buffer.Extents, sizeof(T), memory_order_acq_rel);
}

public:
Expand Down Expand Up @@ -116,7 +116,7 @@ class FDRLogWriter {
// read the bytes in the buffer will see the writes committed before the
// extents are updated.
atomic_thread_fence(memory_order_release);
atomic_fetch_add(&Buffer.Extents, Size, memory_order_acq_rel);
atomic_fetch_add(Buffer.Extents, Size, memory_order_acq_rel);
return Size;
}

Expand Down Expand Up @@ -160,7 +160,7 @@ class FDRLogWriter {
// read the bytes in the buffer will see the writes committed before the
// extents are updated.
atomic_thread_fence(memory_order_release);
atomic_fetch_add(&Buffer.Extents, sizeof(R) + sizeof(A),
atomic_fetch_add(Buffer.Extents, sizeof(R) + sizeof(A),
memory_order_acq_rel);
return true;
}
Expand All @@ -185,7 +185,7 @@ class FDRLogWriter {
// read the bytes in the buffer will see the writes committed before the
// extents are updated.
atomic_thread_fence(memory_order_release);
atomic_fetch_add(&Buffer.Extents, sizeof(R) + EventSize,
atomic_fetch_add(Buffer.Extents, sizeof(R) + EventSize,
memory_order_acq_rel);
return true;
}
Expand All @@ -208,21 +208,21 @@ class FDRLogWriter {
// read the bytes in the buffer will see the writes committed before the
// extents are updated.
atomic_thread_fence(memory_order_release);
atomic_fetch_add(&Buffer.Extents, EventSize, memory_order_acq_rel);
atomic_fetch_add(Buffer.Extents, EventSize, memory_order_acq_rel);
return true;
}

char *getNextRecord() const { return NextRecord; }

void resetRecord() {
NextRecord = reinterpret_cast<char *>(Buffer.Data);
atomic_store(&Buffer.Extents, 0, memory_order_release);
atomic_store(Buffer.Extents, 0, memory_order_release);
}

void undoWrites(size_t B) {
DCHECK_GE(NextRecord - B, reinterpret_cast<char *>(Buffer.Data));
NextRecord -= B;
atomic_fetch_sub(&Buffer.Extents, B, memory_order_acq_rel);
atomic_fetch_sub(Buffer.Extents, B, memory_order_acq_rel);
}

}; // namespace __xray
Expand Down
4 changes: 2 additions & 2 deletions compiler-rt/lib/xray/xray_fdr_logging.cc
Expand Up @@ -250,7 +250,7 @@ XRayBuffer fdrIterator(const XRayBuffer B) {
// fence ordering to ensure that writes we expect to have been completed
// before the fence are fully committed before we read the extents.
atomic_thread_fence(memory_order_acquire);
auto BufferSize = atomic_load(&It->Extents, memory_order_acquire);
auto BufferSize = atomic_load(It->Extents, memory_order_acquire);
SerializedBufferSize = BufferSize + sizeof(MetadataRecord);
CurrentBuffer = allocateBuffer(SerializedBufferSize);
if (CurrentBuffer == nullptr)
Expand Down Expand Up @@ -364,7 +364,7 @@ XRayLogFlushStatus fdrLoggingFlush() XRAY_NEVER_INSTRUMENT {
// still use a Metadata record, but fill in the extents instead for the
// data.
MetadataRecord ExtentsRecord;
auto BufferExtents = atomic_load(&B.Extents, memory_order_acquire);
auto BufferExtents = atomic_load(B.Extents, memory_order_acquire);
DCHECK(BufferExtents <= B.Size);
ExtentsRecord.Type = uint8_t(RecordType::Metadata);
ExtentsRecord.RecordKind =
Expand Down

0 comments on commit ba02cb5

Please sign in to comment.