Skip to content

Commit

Permalink
Fix sending of messages to the calling Task in vibe.core.concurrency.…
Browse files Browse the repository at this point in the history
… Semantic change!

This change also slightly changes the semantics in case of boolean message handlers. In particular, if there are message handlers that accept a certain type of message, but all of them return false, the message will be removed from the queue anyway as if it was handled.

Since std.concurrency doesn't really specify a behavior in this case, it should generally be avoided in the first place.
  • Loading branch information
s-ludwig committed May 6, 2013
1 parent bf65a19 commit 42857a7
Showing 1 changed file with 57 additions and 30 deletions.
87 changes: 57 additions & 30 deletions source/vibe/core/task.d
Expand Up @@ -213,8 +213,8 @@ class MessageQueue {

void prioritySend(Variant msg)
{
synchronized(m_mutex){
if( m_priorityQueue.full )
synchronized (m_mutex) {
if (m_priorityQueue.full)
m_priorityQueue.capacity = (m_priorityQueue.capacity * 3) / 2;
m_priorityQueue.put(msg);
}
Expand All @@ -224,71 +224,98 @@ class MessageQueue {
void receive(OPS...)(OPS ops)
{
bool notify;
scope(exit) if( notify ) m_condition.notify();
synchronized(m_mutex){
scope (exit) if (notify) m_condition.notify();

Variant args;
synchronized (m_mutex) {
notify = this.full;
while(true){
while (true) {
import vibe.core.log;
logTrace("looking for messages");
if( receiveQueue(m_priorityQueue, ops) ) return;
if( receiveQueue(m_queue, ops) ) return;
if (receiveQueue(m_priorityQueue, args, ops)) break;
if (receiveQueue(m_queue, args, ops)) break;
logTrace("received no message, waiting..");
m_condition.wait();
}
}

callOps(args, ops);
}

bool receiveTimeout(OPS...)(Duration timeout, OPS ops)
{
import std.datetime;

bool notify;
scope(exit) if( notify ) m_condition.notify();
scope (exit) if (notify) m_condition.notify();
auto limit_time = Clock.currTime(UTC()) + timeout;
synchronized(m_mutex){
Variant args;
synchronized (m_mutex) {
notify = this.full;
while(true){
if( receiveQueue(m_priorityQueue, ops) ) return true;
if( receiveQueue(m_queue, ops) ) return true;
while (true) {
if (receiveQueue(m_priorityQueue, args, ops)) break;
if (receiveQueue(m_queue, args, ops)) break;
auto now = Clock.currTime(UTC());
if( now > limit_time ) return false;
if (now > limit_time) return false;
m_condition.wait(limit_time - now);
}
}

callOps(args, ops);
}

private static bool receiveQueue(OPS...)(ref FixedRingBuffer!Variant queue, OPS ops)
private static bool receiveQueue(OPS...)(ref FixedRingBuffer!Variant queue, ref Variant dst, OPS ops)
{
auto r = queue[];
while(!r.empty){
scope(failure) queue.removeAt(r);
while (!r.empty) {
scope (failure) queue.removeAt(r);
auto msg = r.front;
bool matched;
foreach(i, TO; OPS){
foreach (i, TO; OPS) {
alias ParameterTypeTuple!TO ArgTypes;

static if( ArgTypes.length == 1 ){
static if( is(ArgTypes[0] == Variant) )
matched = callOp(ops[i], msg);
else if( msg.convertsTo!(ArgTypes[0]) )
matched = callOp(ops[i], msg.get!(ArgTypes[0]));
} else if( msg.convertsTo!(Tuple!ArgTypes) ){
matched = callOp(ops[i], msg.get!(Tuple!ArgTypes).expand);
static if (ArgTypes.length == 1) {
static if (is(ArgTypes[0] == Variant)) {
dst = msg;
queue.removeAt(r);
return true;
} else if (msg.convertsTo!(ArgTypes[0])) {
dst = msg;
queue.removeAt(r);
return true;
}
} else if (msg.convertsTo!(Tuple!ArgTypes)) {
dst = msg;
queue.removeAt(r);
return true;
}
if( matched ) break;
}
if( matched ){
queue.removeAt(r);
return true;
}
r.popFront();
}
return false;
}

private static bool callOps(OPS...)(Variant msg, OPS ops)
{
foreach(i, TO; OPS){
alias ParameterTypeTuple!TO ArgTypes;

static if (ArgTypes.length == 1) {
static if (is(ArgTypes[0] == Variant)) {
if (callOp(ops[i], msg)) return true;
} else if (msg.convertsTo!(ArgTypes[0])) {
if (callOp(ops[i], msg.get!(ArgTypes[0]))) return true;
}
} else if (msg.convertsTo!(Tuple!ArgTypes)) {
if (callOp(ops[i], msg.get!(Tuple!ArgTypes).expand)) return true;
}
}
return false;
}

private static bool callOp(OP, ARGS...)(OP op, ARGS args)
{
static if( is(ReturnType!op == bool) ){
static if (is(ReturnType!op == bool)) {
return op(args);
} else {
op(args);
Expand Down

0 comments on commit 42857a7

Please sign in to comment.