Skip to content

Commit

Permalink
Split part of the Task struct into a separate struct InCall
Browse files Browse the repository at this point in the history
The idea is that this leaves Tasks and OSThread in one-to-one
correspondence.  The part of a Task that represents a call into
Haskell from C is split into a separate struct InCall, pointed to by
the Task and the TSO bound to it.  A given OSThread/Task thus always
uses the same mutex and condition variable, rather than getting a new
one for each callback.  Conceptually it is simpler, although there are
more types and indirections in a few places now.

This improves callback performance by removing some of the locks that
we had to take when making in-calls.  Now we also keep the current Task
in a thread-local variable if supported by the OS and gcc (currently
only Linux).
  • Loading branch information
simonmar committed Mar 9, 2010
1 parent 4e8b07d commit 7effbbb
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 274 deletions.
2 changes: 1 addition & 1 deletion includes/rts/storage/TSO.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ typedef struct StgTSO_ {
StgTSOBlockInfo block_info;
StgThreadID id;
int saved_errno;
struct Task_* bound;
struct InCall_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */

Expand Down
40 changes: 19 additions & 21 deletions rts/Capability.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->return_link == NULL);
ASSERT(task->next == NULL);
if (cap->returning_tasks_hd) {
ASSERT(cap->returning_tasks_tl->return_link == NULL);
cap->returning_tasks_tl->return_link = task;
ASSERT(cap->returning_tasks_tl->next == NULL);
cap->returning_tasks_tl->next = task;
} else {
cap->returning_tasks_hd = task;
}
Expand All @@ -190,11 +190,11 @@ popReturningTask (Capability *cap)
Task *task;
task = cap->returning_tasks_hd;
ASSERT(task);
cap->returning_tasks_hd = task->return_link;
cap->returning_tasks_hd = task->next;
if (!cap->returning_tasks_hd) {
cap->returning_tasks_tl = NULL;
}
task->return_link = NULL;
task->next = NULL;
return task;
}
#endif
Expand All @@ -220,7 +220,7 @@ initCapability( Capability *cap, nat i )
initMutex(&cap->lock);
cap->running_task = NULL; // indicates cap is free
cap->spare_workers = NULL;
cap->suspended_ccalling_tasks = NULL;
cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
Expand Down Expand Up @@ -342,7 +342,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
debugTrace(DEBUG_sched, "passing capability %d to %s %p",
cap->no, task->tso ? "bound task" : "worker",
cap->no, task->incall->tso ? "bound task" : "worker",
(void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
Expand Down Expand Up @@ -398,7 +398,7 @@ releaseCapability_ (Capability* cap,
// assertion is false: in schedule() we force a yield after
// ThreadBlocked, but the thread may be back on the run queue
// by now.
task = cap->run_queue_hd->bound;
task = cap->run_queue_hd->bound->task;
giveCapabilityToTask(cap,task);
return;
}
Expand All @@ -411,7 +411,7 @@ releaseCapability_ (Capability* cap,
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no);
startWorkerTask(cap, workerStart);
startWorkerTask(cap);
return;
}
}
Expand Down Expand Up @@ -462,9 +462,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
// in which case it is not replaced on the spare_worker queue.
// This happens when the system is shutting down (see
// Schedule.c:workerStart()).
// Also, be careful to check that this task hasn't just exited
// Haskell to do a foreign call (task->suspended_tso).
if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
if (!isBoundTask(task) && !task->stopped) {
task->next = cap->spare_workers;
cap->spare_workers = task;
}
Expand Down Expand Up @@ -612,7 +610,7 @@ yieldCapability (Capability** pCap, Task *task)
continue;
}

if (task->tso == NULL) {
if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it
// again. This is unlikely to happen.
Expand Down Expand Up @@ -655,12 +653,12 @@ wakeupThreadOnCapability (Capability *my_cap,

// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
tso->bound->cap = other_cap;
ASSERT(tso->bound->task->cap == tso->cap);
tso->bound->task->cap = other_cap;
}
tso->cap = other_cap;

ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);

if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
Expand Down Expand Up @@ -781,7 +779,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
// that will try to return to code that has been unloaded.
// We can be a bit more relaxed when this is a standalone
// program that is about to terminate, and let safe=false.
if (cap->suspended_ccalling_tasks && safe) {
if (cap->suspended_ccalls && safe) {
debugTrace(DEBUG_sched,
"thread(s) are involved in foreign calls, yielding");
cap->running_task = NULL;
Expand Down Expand Up @@ -871,7 +869,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
{
nat i;
Capability *cap;
Task *task;
InCall *incall;

// Each GC thread is responsible for following roots from the
// Capability of the same number. There will usually be the same
Expand All @@ -886,9 +884,9 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
#endif
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
evac(user, (StgClosure **)(void *)&task->suspended_tso);
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
evac(user, (StgClosure **)(void *)&incall->suspended_tso);
}

#if defined(THREADED_RTS)
Expand Down
2 changes: 1 addition & 1 deletion rts/Capability.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct Capability_ {
// the suspended TSOs easily. Hence, when migrating a Task from
// the returning_tasks list, we must also migrate its entry from
// this list.
Task *suspended_ccalling_tasks;
InCall *suspended_ccalls;

// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. This also lets us
Expand Down
Loading

0 comments on commit 7effbbb

Please sign in to comment.