-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unbounded Mpsc channels issue #19
Comments
Detailed logs on the second problem with only 4 threads:
|
Well seems like the bug may be linked to nim-lang/Nim#12695. In particular the casting may create a copy or something and I need to cast via pointers as Nim may do union type casting. weave/weave/channels/channels_mpsc_unbounded.nim Lines 66 to 78 in 76db49e
|
|
With some modification proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool =
## Send an item to the back of the channel
## As the channel as unbounded capacity, this should never fail
assert not(chan.front.isNil)
assert not(chan.back.load(moRelaxed).isNil)
log("Channel 0x%.08x sending 0x%.08x (next: 0x%.08x)\n", chan.addr, cast[ByteAddress](src), cast[ByteAddress](src.next))
src.next.store(nil, moRelaxed)
fence(moRelease)
let oldBack = chan.back.exchange(src, moRelaxed)
# Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695
# And make sure we cast through pointers to avoid leaving wild copies around
# https://github.com/mratsim/weave/issues/19#issuecomment-557705071
let oldBackT = cast[ptr T](oldBack.unsafeAddr)
oldBackT.next.store(src, moRelaxed)
discard chan.count.fetchAdd(1, moRelaxed)
return true
proc tryRecv*[T](chan: var ChannelMpscUnbounded[T], dst: var T): bool =
## Try receiving the next item buffered in the channel
## Returns true if successful (channel was not empty)
assert not(chan.front.isNil)
assert not(chan.back.load(moRelaxed).isNil)
let first = chan.front # dummy
# Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695
# And make sure we cast through pointers to avoid leaving wild copies around
# https://github.com/mratsim/weave/issues/19#issuecomment-557705071
let nextTypeErased = first.next.load(moRelaxed)
let next = cast[ptr T](nextTypeErased.unsafeAddr)[]
if next.isNil:
fence(moAcquire)
dst = nil
return false
log("Channel 0x%.08x receiving 0x%.08x (next: 0x%.08x)\n", chan.addr, cast[ByteAddress](next), cast[ByteAddress](next.next))
chan.front = next
prefetch(first.next.load(moRelaxed))
fence(moAcquire)
dst = next
discard chan.count.fetchSub(1, moRelaxed)
return true
|
It seems like there is an issue with snmalloc channels (adapted from Pony's) or I adapted them improperly. snmalloc pseudo code from paper void init(T* stub)
{
stub->next.store(nullptr, std::memory_order_relaxed);
front = stub;
back.store(stub, std::memory_order_relaxed);
invariant();
}
T* destroy()
{
T* fnt = front;
back.store(nullptr, std::memory_order_relaxed);
front = nullptr;
return fnt;
}
inline bool is_empty()
{
T* bk = back.load(std::memory_order_relaxed);
return bk == front;
}
void enqueue(T* first, T* last)
{
// Pushes a list of messages to the queue. Each message from first to
// last should be linked together through their next pointers.
invariant();
last->next.store(nullptr, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
T* prev = back.exchange(last, std::memory_order_relaxed);
prev->next.store(first, std::memory_order_relaxed);
}
std::pair<T*, bool> dequeue()
{
// Returns the front message, or null if not possible to return a message.
invariant();
T* first = front;
T* next = first->next.load(std::memory_order_relaxed);
if (next != nullptr)
{
front = next;
AAL::prefetch(&(next->next));
assert(front);
std::atomic_thread_fence(std::memory_order_acquire);
invariant();
return std::pair(first, true);
}
return std::pair(nullptr, false);
}
};
} // namespace snmalloc Pony: https://qconlondon.com/london-2016/system/files/presentation-slides/sylvanclebsch.pdf Note that Pony enqueue at the head and dequeue at the tail static bool messageq_push(messageq_t* q, pony_msg_t* first, pony_msg_t* last)
{
atomic_store_explicit(&last->next, NULL, memory_order_relaxed);
// Without that fence, the store to last->next above could be reordered after
// the exchange on the head and after the store to prev->next done by the
// next push, which would result in the pop incorrectly seeing the queue as
// empty.
// Also synchronise with the pop on prev->next.
atomic_thread_fence(memory_order_release);
pony_msg_t* prev = atomic_exchange_explicit(&q->head, last,
memory_order_relaxed);
bool was_empty = ((uintptr_t)prev & 1) != 0;
prev = (pony_msg_t*)((uintptr_t)prev & ~(uintptr_t)1);
#ifdef USE_VALGRIND
// Double fence with Valgrind since we need to have prev in scope for the
// synchronisation annotation.
ANNOTATE_HAPPENS_BEFORE(&prev->next);
atomic_thread_fence(memory_order_release);
#endif
atomic_store_explicit(&prev->next, first, memory_order_relaxed);
return was_empty;
}
void ponyint_messageq_init(messageq_t* q)
{
pony_msg_t* stub = POOL_ALLOC(pony_msg_t);
stub->index = POOL_INDEX(sizeof(pony_msg_t));
atomic_store_explicit(&stub->next, NULL, memory_order_relaxed);
atomic_store_explicit(&q->head, (pony_msg_t*)((uintptr_t)stub | 1),
memory_order_relaxed);
q->tail = stub;
#ifndef PONY_NDEBUG
messageq_size_debug(q);
#endif
}
pony_msg_t* ponyint_thread_messageq_pop(messageq_t* q
#ifdef USE_DYNAMIC_TRACE
, uintptr_t thr
#endif
)
{
pony_msg_t* tail = q->tail;
pony_msg_t* next = atomic_load_explicit(&tail->next, memory_order_relaxed);
if(next != NULL)
{
DTRACE2(THREAD_MSG_POP, (uint32_t) next->id, (uintptr_t) thr);
q->tail = next;
atomic_thread_fence(memory_order_acquire);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(&tail->next);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(tail);
#endif
ponyint_pool_free(tail->index, tail);
}
return next;
}
bool ponyint_messageq_markempty(messageq_t* q)
{
pony_msg_t* tail = q->tail;
pony_msg_t* head = atomic_load_explicit(&q->head, memory_order_relaxed);
if(((uintptr_t)head & 1) != 0)
return true;
if(head != tail)
return false;
head = (pony_msg_t*)((uintptr_t)head | 1);
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(&q->head);
#endif
return atomic_compare_exchange_strong_explicit(&q->head, &tail, head,
memory_order_release, memory_order_relaxed);
} However, I probably have missed something here but it seems to me like:
|
- they hold on the last item of a queue (breaking for steal requests) - they require memory management of the dummy node (snmalloc deletes it and its memory doesn't seem to be reclaimed) - they never touch the "back" pointer of the queue when dequeuing, meaning if an item was last, dequeuing will still points to it. Pony has an emptiness check via tagged pointer and snmalloc does ???
There is 2 strange issues in the branch https://github.com/mratsim/weave/tree/test-lockless-unbounded-mpsc.
edit: This issue will focus on the unbounded channel issue
The text was updated successfully, but these errors were encountered: