Skip to content

Commit

Permalink
thor: Use ImmediateMemory for IPC queue chunks
Browse files Browse the repository at this point in the history
This gives a huge performance improvement for raw
IPC queue submissions. We also remove the helSetupChunk
syscall as it is not needed anymore.
  • Loading branch information
avdgrinten committed Apr 27, 2021
1 parent 9d612ea commit bc755d8
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 131 deletions.
6 changes: 0 additions & 6 deletions hel/include/hel-syscalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ extern inline __attribute__ (( always_inline )) HelError helCreateQueue(
return error;
};

extern inline __attribute__ (( always_inline )) HelError helSetupChunk(HelHandle queue,
int index, struct HelChunk *chunk, uint32_t flags) {
return helSyscall4(kHelCallSetupChunk, (HelWord)queue, (HelWord)index,
(HelWord)chunk, (HelWord)flags);
};

extern inline __attribute__ (( always_inline )) HelError helCancelAsync(HelHandle handle,
uint64_t async_id) {
return helSyscall2(kHelCallCancelAsync, (HelWord)handle, (HelWord)async_id);
Expand Down
3 changes: 0 additions & 3 deletions hel/include/hel.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ enum {
kHelCallCloseDescriptor = 21,

kHelCallCreateQueue = 89,
kHelCallSetupChunk = 90,
kHelCallCancelAsync = 92,

kHelCallAllocateMemory = 51,
Expand Down Expand Up @@ -612,8 +611,6 @@ HEL_C_LINKAGE HelError helCloseDescriptor(HelHandle universeHandle, HelHandle ha
HEL_C_LINKAGE HelError helCreateQueue(struct HelQueueParameters *params,
HelHandle *handle);

HEL_C_LINKAGE HelError helSetupChunk(HelHandle queue, int index, struct HelChunk *chunk, uint32_t flags);

//! Cancels an ongoing asynchronous operation.
//! @param[in] queueHandle
//! Handle to the queue that the operation was submitted to.
Expand Down
36 changes: 16 additions & 20 deletions hel/include/helix/ipc.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#ifndef HELIX_HPP
#define HELIX_HPP

Expand Down Expand Up @@ -195,30 +194,32 @@ struct Dispatcher {
};
HEL_CHECK(helCreateQueue(&params, &_handle));

auto chunksOffset = (sizeof(HelQueue) + (sizeof(int) << sizeShift) + 63) & ~size_t(63);
auto reservedPerChunk = (sizeof(HelChunk) + params.chunkSize + 63) & ~size_t(63);
auto overallSize = chunksOffset + params.numChunks * reservedPerChunk;

void *mapping;
HEL_CHECK(helMapMemory(_handle, kHelNullHandle, nullptr,
0, (sizeof(HelQueue) + (sizeof(int) << sizeShift) + 0xFFF) & ~size_t(0xFFF),
0, (overallSize + 0xFFF) & ~size_t(0xFFF),
kHelMapProtRead | kHelMapProtWrite, &mapping));

_queue = reinterpret_cast<HelQueue *>(mapping);
auto chunksPtr = reinterpret_cast<std::byte *>(mapping) + chunksOffset;
for(unsigned int i = 0; i < 16; ++i)
_chunks[i] = reinterpret_cast<HelChunk *>(chunksPtr + i * reservedPerChunk);
}

return _handle;
}

void wait() {
while(true) {
// TODO: Initialize all chunks when setting up the queue.
if(_retrieveIndex == _nextIndex) {
assert(_activeChunks < (1 << sizeShift));
if(_activeChunks >= 16)
std::cerr << "\e[35mhelix: Queue is forced to grow to " << _activeChunks
<< " chunks (memory leak?)\e[39m" << std::endl;

auto chunk = reinterpret_cast<HelChunk *>(operator new(sizeof(HelChunk) + 4096));
_chunks[_activeChunks] = chunk;
HEL_CHECK(helSetupChunk(_handle, _activeChunks, chunk, 0));
assert(_activeChunks < 16);

// Reset and enqueue the new chunk.
chunk->progressFutex = 0;
_chunks[_activeChunks]->progressFutex = 0;

_queue->indexQueue[_nextIndex & ((1 << sizeShift) - 1)] = _activeChunks;
_nextIndex = ((_nextIndex + 1) & kHelHeadMask);
Expand All @@ -228,15 +229,10 @@ struct Dispatcher {
_activeChunks++;
continue;
}else if (_hadWaiters && _activeChunks < (1 << sizeShift)) {
// std::cerr << "\e[35mhelix: Growing queue to " << _activeChunks
// << " chunks to improve throughput\e[39m" << std::endl;

auto chunk = reinterpret_cast<HelChunk *>(operator new(sizeof(HelChunk) + 4096));
_chunks[_activeChunks] = chunk;
HEL_CHECK(helSetupChunk(_handle, _activeChunks, chunk, 0));
assert(_activeChunks < 16);

// Reset and enqueue the new chunk.
chunk->progressFutex = 0;
_chunks[_activeChunks]->progressFutex = 0;

_queue->indexQueue[_nextIndex & ((1 << sizeShift) - 1)] = _activeChunks;
_nextIndex = ((_nextIndex + 1) & kHelHeadMask);
Expand Down Expand Up @@ -335,7 +331,7 @@ struct Dispatcher {
private:
HelHandle _handle;
HelQueue *_queue;
HelChunk *_chunks[1 << sizeShift];
HelChunk *_chunks[16];

int _activeChunks;
bool _hadWaiters;
Expand All @@ -348,7 +344,7 @@ struct Dispatcher {
int _lastProgress;

// Per-chunk reference counts.
int _refCounts[1 << sizeShift];
int _refCounts[16];
};

async::run_queue *globalQueue();
Expand Down
25 changes: 1 addition & 24 deletions kernel/thor/generic/hel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ HelError helCreateQueue(HelQueueParameters *paramsPtr, HelHandle *handle) {
return kHelErrIllegalArgs;

auto queue = smarter::allocate_shared<IpcQueue>(*kernelAlloc,
params.ringShift, params.chunkSize);
params.ringShift, params.numChunks, params.chunkSize);
queue->setupSelfPtr(queue);
{
auto irq_lock = frg::guard(&irqMutex());
Expand All @@ -304,29 +304,6 @@ HelError helCreateQueue(HelQueueParameters *paramsPtr, HelHandle *handle) {
return kHelErrNone;
}

HelError helSetupChunk(HelHandle queue_handle, int index, HelChunk *chunk, uint32_t flags) {
assert(!flags);
auto this_thread = getCurrentThread();
auto this_universe = this_thread->getUniverse();

smarter::shared_ptr<IpcQueue> queue;
{
auto irq_lock = frg::guard(&irqMutex());
Universe::Guard universe_guard(this_universe->lock);

auto queue_wrapper = this_universe->getDescriptor(universe_guard, queue_handle);
if(!queue_wrapper)
return kHelErrNoDescriptor;
if(!queue_wrapper->is<QueueDescriptor>())
return kHelErrBadDescriptor;
queue = queue_wrapper->get<QueueDescriptor>().queue;
}

queue->setupChunk(index, this_thread->getAddressSpace().lock(), chunk);

return kHelErrNone;
}

HelError helCancelAsync(HelHandle handle, uint64_t async_id) {
auto this_thread = getCurrentThread();
auto this_universe = this_thread->getUniverse();
Expand Down
94 changes: 37 additions & 57 deletions kernel/thor/generic/ipc-queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,24 @@ namespace thor {
// IpcQueue
// ----------------------------------------------------------------------------

IpcQueue::IpcQueue(unsigned int ringShift, size_t)
: _ringShift{ringShift},
_chunks{*kernelAlloc},
IpcQueue::IpcQueue(unsigned int ringShift, unsigned int numChunks, size_t chunkSize)
: _ringShift{ringShift}, _chunkSize{chunkSize}, _chunkOffsets{*kernelAlloc},
_currentIndex{0}, _currentProgress{0}, _anyNodes{false} {
// Setup internal state.
_memory = smarter::allocate_shared<ImmediateMemory>(*kernelAlloc,
sizeof(QueueStruct) + (sizeof(int) << ringShift));
_chunks.resize(1 << _ringShift);
auto chunksOffset = (sizeof(QueueStruct) + (sizeof(int) << ringShift) + 63) & ~size_t(63);
auto reservedPerChunk = (sizeof(ChunkStruct) + chunkSize + 63) & ~size_t(63);
auto overallSize = chunksOffset + numChunks * reservedPerChunk;

// Setup the queue data structure.
auto head = _memory->accessImmediate<QueueStruct>(0);
memset(head, 0, sizeof(QueueStruct));
// Setup internal state.
_memory = smarter::allocate_shared<ImmediateMemory>(*kernelAlloc, overallSize);
_chunkOffsets.resize(numChunks);
for(unsigned int i = 0; i < numChunks; ++i)
_chunkOffsets[i] = chunksOffset + i * reservedPerChunk;

async::detach_with_allocator(*kernelAlloc, _runQueue());
}

bool IpcQueue::validSize(size_t size) {
// TODO: Note that the chunk size is currently hardcoded.
return sizeof(ElementStruct) + size <= 4096;
}

void IpcQueue::setupChunk(size_t index, smarter::shared_ptr<AddressSpace, BindableHandle> space, void *pointer) {
auto irq_lock = frg::guard(&irqMutex());
auto lock = frg::guard(&_mutex);

assert(index < _chunks.size());
_chunks[index] = Chunk{std::move(space), pointer};
return sizeof(ElementStruct) + size <= _chunkSize;
}

void IpcQueue::submit(IpcNode *node) {
Expand Down Expand Up @@ -83,8 +74,8 @@ coroutine<void> IpcQueue::_runQueue() {
if(pastCurrentChunk)
break;

auto futexOrError = co_await takeGlobalFutex(
_memory, offsetof(QueueStruct, headFutex),
auto futexOrError = co_await takeGlobalFutex(_memory,
offsetof(QueueStruct, headFutex),
WorkQueue::generalQueue()->take());
if(!futexOrError) {
infoLogger() << "thor: Shutting down IPC queue after fault" << frg::endlog;
Expand All @@ -95,21 +86,18 @@ coroutine<void> IpcQueue::_runQueue() {
}

// Lock the chunk.
Chunk *currentChunk;
size_t chunkOffset;
{
auto irqLock = frg::guard(&irqMutex());
auto lock = frg::guard(&_mutex);

size_t iq = + _currentIndex & ((size_t{1} << _ringShift) - 1);
size_t cn = *_memory->accessImmediate<int>(offsetof(QueueStruct, indexQueue) + iq * sizeof(int));
assert(cn < _chunks.size());
assert(_chunks[cn].space);

currentChunk = &_chunks[cn];
assert(cn < _chunkOffsets.size());
chunkOffset = _chunkOffsets[cn];
}
AddressSpaceLockHandle chunkLock{currentChunk->space,
currentChunk->pointer, sizeof(ChunkStruct)};
co_await chunkLock.acquire(WorkQueue::generalQueue()->take());

auto chunkHead = _memory->accessImmediate<ChunkStruct>(chunkOffset);

// This inner loop runs until the chunk is exhausted.
while(true) {
Expand All @@ -131,36 +119,31 @@ coroutine<void> IpcQueue::_runQueue() {
progress = _currentProgress;
}

// Compute destion pointer and length of the element.
auto dest = reinterpret_cast<Address>(currentChunk->pointer)
+ offsetof(ChunkStruct, buffer) + _currentProgress;
assert(!(dest & 0x7));

// Compute the overall length of the element.
size_t length = 0;
for(auto source = _nodeQueue.front()->_source; source; source = source->link)
length += (source->size + 7) & ~size_t(7);
assert(length <= currentChunk->bufferSize);
for(auto sgSource = _nodeQueue.front()->_source; sgSource; sgSource = sgSource->link)
length += (sgSource->size + 7) & ~size_t(7);
assert(length <= _chunkSize);

// Check if we need to retire the current chunk.
bool emitElement = true;
if(progress + length <= currentChunk->bufferSize) {
AddressSpaceLockHandle elementLock{currentChunk->space,
reinterpret_cast<void *>(dest), sizeof(ElementStruct) + length};
co_await elementLock.acquire(WorkQueue::generalQueue()->take());

if(progress + length <= _chunkSize) {
// Emit the next element to the current chunk.
auto elementOffset = offsetof(ChunkStruct, buffer) + _currentProgress;
assert(!(elementOffset & 0x7));

ElementStruct element;
memset(&element, 0, sizeof(element));
element.length = length;
element.context = reinterpret_cast<void *>(node->_context);
auto err = elementLock.write(0, &element, sizeof(ElementStruct));
assert(err == Error::success);

size_t disp = sizeof(ElementStruct);
for(auto source = node->_source; source; source = source->link) {
err = elementLock.write(disp, source->pointer, source->size);
assert(err == Error::success);
disp += (source->size + 7) & ~size_t(7);
_memory->writeImmediate(chunkOffset + elementOffset,
&element, sizeof(ElementStruct));

size_t sgOffset = sizeof(ElementStruct);
for(auto sgSource = node->_source; sgSource; sgSource = sgSource->link) {
_memory->writeImmediate(chunkOffset + elementOffset + sgOffset,
sgSource->pointer, sgSource->size);
sgOffset += (sgSource->size + 7) & ~size_t(7);
}
}else{
emitElement = false;
Expand All @@ -174,16 +157,13 @@ coroutine<void> IpcQueue::_runQueue() {
newProgressWord = progress | kProgressDone;
}

DirectSpaceAccessor<ChunkStruct> chunkAccessor{chunkLock, 0};

auto progressFutexWord = __atomic_exchange_n(&chunkAccessor.get()->progressFutex,
auto progressFutexWord = __atomic_exchange_n(&chunkHead->progressFutex,
newProgressWord, __ATOMIC_RELEASE);
// If user-space modifies any non-flags field, that's a contract violation.
// TODO: Shut down the queue in this case.
if(progressFutexWord & kProgressWaiters) {
auto fa = reinterpret_cast<Address>(currentChunk->pointer)
+ offsetof(ChunkStruct, progressFutex);
auto identityOrError = resolveGlobalFutex(currentChunk->space.get(), fa);
auto identityOrError = resolveGlobalFutex(_memory.get(),
chunkOffset + offsetof(ChunkStruct, progressFutex));
if(!identityOrError) {
infoLogger() << "thor: Shutting down IPC queue after fault" << frg::endlog;
co_return;
Expand Down
4 changes: 0 additions & 4 deletions kernel/thor/generic/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,6 @@ void handleSyscall(SyscallImageAccessor image) {
*image.error() = helCreateQueue((HelQueueParameters *)arg0, &handle);
*image.out0() = handle;
} break;
case kHelCallSetupChunk: {
*image.error() = helSetupChunk((HelHandle)arg0, (int)arg1,
(HelChunk *)arg2, (uint32_t)arg3);
} break;
case kHelCallCancelAsync: {
*image.error() = helCancelAsync((HelHandle)arg0, (uint64_t)arg1);
} break;
Expand Down
20 changes: 3 additions & 17 deletions kernel/thor/generic/thor-internal/ipc-queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,8 @@ struct IpcQueue : CancelRegistry {

using Mutex = frg::ticket_spinlock;

struct Chunk {
Chunk()
: pointer{nullptr} { }

Chunk(smarter::shared_ptr<AddressSpace, BindableHandle> space_, void *pointer_)
: space{std::move(space_)}, pointer{pointer_}, bufferSize{4096} { }

// Pointer (+ address space) to queue chunk struct.
smarter::shared_ptr<AddressSpace, BindableHandle> space;
void *pointer;

// Size of the chunk's buffer.
size_t bufferSize;
};

public:
IpcQueue(unsigned int ringShift, size_t chunkSize);
IpcQueue(unsigned int ringShift, unsigned int numChunks, size_t chunkSize);

IpcQueue(const IpcQueue &) = delete;

Expand Down Expand Up @@ -184,8 +169,9 @@ struct IpcQueue : CancelRegistry {
smarter::shared_ptr<ImmediateMemory> _memory;

unsigned int _ringShift;
size_t _chunkSize;

frg::vector<Chunk, KernelAlloc> _chunks;
frg::vector<size_t, KernelAlloc> _chunkOffsets;

// Index into the queue that we are currently processing.
int _currentIndex;
Expand Down
15 changes: 15 additions & 0 deletions kernel/thor/generic/thor-internal/memory-view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,21 @@ struct ImmediateMemory final : MemoryView {
reinterpret_cast<std::byte *>(accessor.get()) + misalign);
}

void writeImmediate(uintptr_t offset, void *pointer, size_t size) {
size_t progress = 0;
while(progress < size) {
auto misalign = (offset + progress) & (kPageSize - 1);
auto chunk = frg::min(size - progress, kPageSize - misalign);

auto index = (offset + progress) >> kPageShift;
assert(index < _physicalPages.size());
PageAccessor accessor{_physicalPages[index]};
memcpy(reinterpret_cast<std::byte *>(accessor.get()) + misalign,
reinterpret_cast<std::byte *>(pointer) + progress, chunk);
progress += chunk;
}
}

private:
frg::ticket_spinlock _mutex;

Expand Down

0 comments on commit bc755d8

Please sign in to comment.