Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New asynchronous task API #1357

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open

Conversation

Spacechild1
Copy link
Contributor

@Spacechild1 Spacechild1 commented Jul 15, 2021

New API for asynchronous tasks

Quite a few externals try to do certain tasks asynchronously to avoid blocking the audio thread. They usually have a thread per object and protect shared state with a mutex, which is both wasteful and error prone. Also, there is currently no safe and easy way to pass data or messages from a helper thread to the main thread. (sys_lock/sys_unlock can deadlock!)

The task API provides a simple and - most importantly - safe alternative. It consists of 7 new functions for externals + 4 new functions for libpd. Of course, the API can also be used for core objects. For example, it would vastly simplify #655.


How does it work?

A task consists of an owning object, user-specified data, a worker function and a completion callback. The basic workflow looks like this:

  1. The user allocates a new task data object, copies the required input (e.g. a filename) and passes it to task_start together with a work function and completion callback. task_start returns a new task handle which the user should store in the object, so that the task can be cancelled later if needed.
  2. A worker thread receives the task, executes the work function and sends the task back to the scheduler.
    In the work function, the user would typically read the input from the task data object, perform the desired operation and store the result back to the data object.
  3. The scheduler regularly polls for completed tasks and runs their callback functions.
  4. In the callback the user can safely move the result to the owning object, deallocate the task data object and unset the task handle.

A task can also be cancelled. In this case, the callback function will be called with owner set to NULL, preventing the user from accessing the owning Pd object (which might not even exist anymore at this point).

It is also possible to safely send messages from any thread back to main thread with task_notify.

See src/s_task.c for complete documentation. doc/6.externs contains a comprehensive code example (https://github.com/Spacechild1/pure-data/blob/task/doc/6.externs/taskobj.c) with an example patch (https://github.com/Spacechild1/pure-data/blob/task/doc/6.externs/test-taskobj.pd).

This PR also adds a new command line option -workerthreads <n> which sets the number of worker threads (default: 1). 0 will disable the task system.

Finally, it is also possible to compile PD without worker thread support (-DPD_WORKERTHREADS=0), e.g. for single-threaded environments. If the task system is disabled, all worker functions will run synchronously, but for compatibility reasons the callback function will be deferred (= delayed by 0 milliseconds).


New functions for externals

t_task *task_start(t_pd *owner, void *data, task_workfn workfn, task_callback cb)

description: schedule a new task.

arguments:

  1. t_pd * owner: the Pd object that owns the given task.
  2. void * data: a user-specific dynamically allocated struct.
  3. task_workfn workfn: a function to be called in the background, or NULL (see below)
  4. task_callback cb: (optional) an function to be called after the task has completed.

returns: a new task handle

If workfn is NULL, the function returns a "dummy task" which you can pass to task_notify in your own threads, i.e. outside the context of a worker function. For example, you might have a network thread and want to handle incoming data on the main thread. After you have joined the thread, you must free the dummy task with task_stop.


t_task *task_spawn(t_pd *owner, void *data, task_workfn workfn, task_callback cb)

description: spawn a new task.
This function behaves just like task_start(), except it runs the worker function in its own thread, so that other tasks can run concurrently. Use this if a task might take a long time - let's say more than 1 second - and you cannot use asynchronous function calls with task_suspend() + task_resume().


int task_stop(t_task *task, int sync)

description: try to stop or cancel a given task.
The task handle will be invalidated and the callback will be called with owner set to NULL.

arguments:

  1. t_task * task: the task that shall be stopped.
  2. int sync: synchronize with worker thread(s).
    true (1): check if the task is currently being executed and if yes, wait for its completion.
    false (0): return immediately without blocking.
    NB: synchronization might only be necessary if the task data object contains references to shared mutable data - which should be avoided in the first place!

int task_check(t_task *task)

description: check the task state.
In the worker function you can regularly call task_check() and return early if the task has been cancelled. This makes the overall task system more responsive.

returns: 1 if running, 0 if cancelled.


void task_suspend(t_task *task)

description: suspend current execution.
Call this function after deferring to another thread, typically as the last statement before returning from the worker function. The task will be suspended until the other thread finally calls task_resume(), allowing other tasks to run concurrently. This is useful for interfacing with event loops, existing thread-pools or asynchronous library functions.


void task_resume(t_task *task, t_task_workfn workfn)

description: resume execution.
This resumes a task that has been suspended with task_suspend(). It is always called from another thread.

arguments:

  1. (t_task *) task: the task handle
  2. (t_task_workfn) workfn: (optional) worker function
    The task will continue with the provided worker function; if 'workfn' is NULL, the task is considered finished.

void task_notify(t_task *task, void *data, t_task_callback fn)

description: send a message from any thread back to the main thread.

arguments:

  1. (t_task *) task: the associated task handle
  2. (void *) data: message data
  3. (t_task_callback) fn: message function (called on the main thread)

NB: If the underlying task is stopped, the callback will be called with owner set to NULL!


New functions for libpd:

libpd clients might want to manage their own worker thread(s), so we provide the following functions:

void sys_taskqueue_start(struct _pdinstance *pd, int threads)

description: start the task queue system

arguments:

  1. t_pdinstance * pd: the Pd instance
  2. int threads: the number of internal worker threads, or one of the following constants:
    TASKQUEUE_DEFAULT (0): use the default number of internal worker threads
    TASKQUEUE_EXTERNAL (-1): use external worker threads; the libpd client is responsible for managing the worker thread(s) and calling sys_taskqueue_perform.

NOTE: if sys_taskqueue_start() is not called, the tasks are executed synchronously on the audio/scheduler thread. This might be useful for batch/offline processing.


void sys_taskqueue_stop(struct _pdinstance *pd)

description: stop the task queue system


int sys_taskqueue_perform(struct _pdinstance *pd, int nonblocking)

description : perform asynchronous tasks.
This function is called from the external worker thread(s). It can be called from multiple threads at the same time!

arguments:

  1. t_pdinstance * pd: the Pd instance
  2. int nonblocking: call in non-blocking mode.
    true (1): perform a single task, if available, otherwise return immediately.
    false (0): block until the task queue is stopped with sys_taskqueue_stop.

returns: 1 if it did something or 0 if there was nothing to do.


int sys_taskqueue_running(void);

description: check if the task system is running. (Not sure if this is really useful.)

returns: true (1) or false (0)

@Spacechild1 Spacechild1 changed the title [WIP] task API [WIP] asynchronous task API Jul 15, 2021
@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jul 15, 2021

might be relevant for @giuliomoro because it is somewhat complementary to #1261

@Spacechild1 Spacechild1 added feature suggestion for an enhancement subject:API "External Programming Interface" (things concerning development of externals) subject:core things concerning the Pd-core labels Nov 20, 2021
Spacechild1 referenced this pull request in Lucarda/pd-flite Jun 2, 2022
@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 2, 2022

@millerpuckette Any thoughts on this? I've seen several people struggle writing multi-threaded Pd externals; they typically have nasty race conditions and/or are not real-time safe. An asynchronous task API like this one would make things a lot simpler.

@umlaeute, @danomatika, @Ant1r: what do you think about the API?

@Ant1r
Copy link
Contributor

Ant1r commented Jun 3, 2022

I think having such an easy way to delegate tasks to helper thread(s) is a very valuable addition.
I really like the simple and straightforward API.

However... I wonder if it will meet all the use cases, maybe it's a bit too simple (or maybe I'm asking too much;-)

- multiple task queues:

I think there are cases where tasks should be sent to different task queues, because they compete for different resources.

For example I can see at least 3 different kind of resource: CPU, disk and network.
It would be a shame to have to wait for a big file to be downloaded from the network before being able to launch a CPU consuming task. It can be useful to have multiple threads dedicated to CPU tasks (in order to benefit from a multi-core processor), while there's little interest to have multiple threads accessing the same mass storage volume.

In some cases, an external could benefit from creating its own task queue(s) to optimize these kind of things.

So it could be good to add to the API the possibility to create new task queues, and change the API to use a t_taskqueue* as the main key instead of t_pdinstance*, such as:

int taskqueue_perform(t_taskqueue *q, int nonblocking);
void taskqueue_poll(t_taskqueue *q);
t_task *task_sched(t_taskqueue *q, t_pd *owner, void *data,  t_task_workfn workfn, t_task_callback cb);

- signal the thread to stop the task before completion

Maybe it's covered by this PR, but I'm not sure: what if you want to cancel a running task, e.g a large file transfer is running while you destroy the object owning this task? Is it possible to poll t_cancel in the worker thread's loop, if this one is written to process limited size chunks at a time?

- lifetime communication

Use case: my task is running a large file transfer, and I would like to be informed about the progress of the task, e.g to display a progress bar. How could we allow this too?

@danomatika
Copy link
Contributor

@Ant1r Those points also make me think of priority levels... however I think that would far too much complexity. I could see the need for resource-specific queues, however as you say, it depends on the complexity involved as well. At the very least, having even a single queue would help greatly and formalize externals away from risky thread implementations.

@Ant1r
Copy link
Contributor

Ant1r commented Jun 3, 2022

priority levels

Priority can be easily set at the thread creation time, so if you have multiple task queues you can have different priorities as well...

even a single queue would help greatly and formalize externals

sure enough! I was just worried of having a scalable API, while keeping the proposed simple interface as a default.

@danomatika
Copy link
Contributor

I would suggest some sort of cleanup / free function which can be called on completion or failure. The Apple GCD API calls this a "dispatch finalizer" so you have some sort of guarantee resources will be freed.

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 3, 2022

Thanks for the feedback! Some quick replies:

I think there are cases where tasks should be sent to different task queues, because they compete for different resources.

I thought about that. Currently, the default implementation uses a single worker thread, but added another thread would be trivial (and could be set via a command line option). The libpd API actually allows users to freely distribute worker threads among Pd instances - see the last paragraph in the PR description.

I don't think different priorities would be really necessary. Once you have several threads, they will just do work-stealing.

NOTE: having a thread-pool necessarily implies that async tasks are not guaranteed to execute in the order they have been scheduled. I noticed that I have not mentioned this anywhere. This is actually mentioned in s_task.c:

NOTE: the order in which tasks are executed is undefined! Implementations might
use multiple worker threads which execute task concurrently.

It's definitely something to watch out for. However, I don't think it would be a big issue in practice. Typically, most externals would not allow to run (the same) async method concurrently anyway, otherwise things can get confusing pretty quickly.

Now, having dedicated threads for different tasks would allow to keep FIFO ordering for scheduled tasks (of the same priority) while still distributing the workload over different threads. It's a nice idea and I have actually considered it. However, I'm not sure it's worth the additional complexity...

It's worth pointing out that SuperCollider only has a single worker thread and the fact that async tasks are executed in order is a fundamental part of the language, so it cannot be changed. In Pd land, we do not have such restrictions (yet), so we're really free to decide what we want to do.

if you have multiple task queues you can have different priorities as well...

Generally, this API is really about non-realtime tasks, i.e. you do not care about meating deadlines, so I don't think that having different priorities would buy us much.


signal the thread to stop the task before completion

Maybe it's covered by this PR, but I'm not sure: what if you want to cancel a running task, e.g a large file transfer is running while you destroy the object owning this task?

Yes! That's what task_cancel and task_join are for (see the PR description). Objects using the task API are supposed to call task_join in their destructor - which will cancel/join all pending tasks that are associated with this object. It is a very important part of making the API safe!
(SuperCollider's async task API does not support cancellation out of the box and it's a major pain point. We have to learn the lessons from our ancestors :-)


lifetime communication

I think this is outside the API.

Use case: my task is running a large file transfer, and I would like to be informed about the progress of the task, e.g to display a progress bar. How could we allow this too?

Actually, it's pretty easy to do: write to an (atomic) counter from your worker function and read the counter periodically with a clock.

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 3, 2022

I would suggest some sort of cleanup / free function which can be called on completion or failure. The Apple GCD API calls this a "dispatch finalizer" so you have some sort of guarantee resources will be freed.

This is already taken care of, see s_task.c:

4) (task_callback) cb: a function to be called after the task has completed.
 *    This is where the output data can be safely moved to the owning object.
 *    If the task has been cancelled, 'owner' is NULL.
 *    Here you can also safely free the 'data' object!

It's also mentioned in the PR description about task_cancel:

description: try to cancel a given task. The task handle will be invalidated and the callback will be called with owner set to NULL.

And here's an example: https://github.com/pure-data/pure-data/pull/1357/files#diff-a05d1a5f45a158a71cb20972edec39704e3ed3f4d69eda0e8b80aabce4faa26bR86-R114

TL;DR: the callback function is always executed, but if the task has been cancelled, the owner argument is NULL.

@danomatika
Copy link
Contributor

TL;DR: the callback function is always executed, but if the task has been cancelled

Ok, then there is no real need for an extra callback if the completion cb is always called.

@Ant1r
Copy link
Contributor

Ant1r commented Jun 3, 2022

the default implementation uses a single worker thread, but added another thread would be trivial

I was really talking about multiple task queues: while I understand adding other threads to the task queue is easy, there's no possibility to create another independent task queue. It would be super cool if the API could keep the possibility to do so later.

I my Pof library for instance, I have 2 dedicated threads for image loading, one for reading from the local disk, one for downloading from the network. This greatly improves the overall loading speed. If only one thread was used, the average speed would be severely limited by the (usually much lower) network download rate (or by the upload rate of the content provider). Even with multiple threads, every one would be quickly occupied by "slow" tasks, while fast tasks would be waiting.

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 3, 2022

I was really talking about multiple task queues: while I understand adding other threads to the task queue is easy, there's no possibility to create another independent task queue. It would be super cool if the API could keep the possibility to do so later.

I totally get your point. And as I said, I also considered it. I'm just worried it would complicate both the API and the implementation quite a bit. I'm even not sure it is the best approach.

I my Pof library for instance, I have 2 dedicated threads for image loading, one for reading from the local disk, one for downloading from the network. This greatly improves the overall loading speed. If only one thread was used, the average speed would be severely limited by the (usually much lower) network download rate (or by the upload rate of the content provider). Even with multiple threads, every one would be quickly occupied by "slow" tasks, while fast tasks would be waiting.

That's a totally valid objection. Even with multiple threads, the user can easily "saturate" the queue by sending many slow tasks in immediate succession. However, let's examine the actual problem from a higher point of view:

I think the cases we have to worry about are basically all about slow/unbounded I/O (mostly uploading/downloading stuff to/from the web). Generally, it's wasteful for a thread to just sit around and wait (e.g. for a slow network connection). For this very reason, most networking libraries/frameworks provide asynchronous I/O with completion handlers. I think what we actually need is a way to suspend the current worker function - giving other tasks a chance to run - and resume it later when I/O is ready.

Example:

// completion handler for fictional networking library
void myhandler(t_foo_data *user, const char *data, size_t size)
{
    // copy/move result to user data
    // ...
    // finally resume the task
    task_resume(user->task);
}

// worker function for task_sched()
void foo_worker(t_task *task, t_foo *x, t_foo_data *y)
{
    // start an asynchronous download request
    somelib_download_async(y->url, myhandler, y);
    // task_suspend() blocks until task_resume() is called from the completion function above;
    // in the meantime, other pending tasks are executed.
    task_suspend(task);
}

Alternatively, you could also use non-blocking I/O:

#define SLEEP_INTERVAL 10

void foo_worker(t_task *task, t_foo *x, t_foo_data *y)
{
    char buf[1024];
    while (1)
    {
        // socket is non-blocking!
        int result = recv(y->socket, buf, sizeof(buf), 0);
        if (result < 0)
        {
            if (errno == EWOULDBLOCK)
            {
                // task_yield() temporarily suspends our worker function to execute other pending tasks.
                // it returns 1 if other tasks have been executed and 0 if there was nothing to do.
                if (task_yield(task) > 0)
                    continue;
                else
                    // nothing to do - sleep
                    usleep(SLEEP_INTERVAL * 1000);
            }
            else
            {
                // handle error
                return;
            }
        }
        else
        {
            // process data and break if finished
        }
    }
}

// EDIT: alternative (better) solution using poll():

void foo_worker(t_task *task, t_foo *x, t_foo_data *y)
{
    char buf[1024];
    struct pollfd pd;
    pd.event = POLLIN;
    pd.fd = y->socket;
    while (1)
    {
        pd.revents = 0;
        int result = poll(&pd, 1, 0);
        if (result == 0) // timeout
        {
try_again:
            if (task_yield(task) > 0)
                continue; // did something, now try to poll again
            else
            {
                // nothing to do - sleep, but wake up if I/O is ready
                result = poll(&pd, 1, SLEEP_INTERVAL);
                if (result == 0) // timeout
                    goto try_again;
            }
        }
        // read data from socket and break if finished
    }
}

I think this would scale much better than dedicated task queues. You can literally have thousands of slow asynchronous request and they are all guaranteed to make steady progress. Dedicated task queues only prevent slow tasks from blocking fast tasks, but slow tasks can still block each other!

What do you think?

@Ant1r
Copy link
Contributor

Ant1r commented Jun 3, 2022

Yes, task_yield() seems great! task_suspend() / task_resume() too, but maybe less versatile?
Anyway, I agree that it would be a much better workflow than dedicated task queues.

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 3, 2022

Some more ideas:

task_yield could also be useful for long-running CPU intensive background tasks. Sometimes you have no idea how long a task is going to take. Instead of having to choose a priority value upfront, you can just yield in regular intervals - which automatically guarantees other tasks to progress.

If you have a very long CPU bound task (that can't be broken up in multiple steps), you can spawn a dedicated thread in the worker function, call task_suspend() and finally join the thread; the thread function only needs to call task_resume() when it's done.

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 3, 2022

Yes, task_yield() seems great! task_suspend() / task_resume() too, but maybe less versatile?

They serve different use cases:
task_yield is for synchronous non-blocking I/O or other scenarios where you can yield in regular intervals.
task_suspend + task_resume are for asynchronous I/O or other scenarios, where the worker function is resumed from another thread.

@Spacechild1
Copy link
Contributor Author

I'll try to come up with a POC for task_suspend/task_resume/task_yield. I already have an idea how to implement it :-)

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Aug 15, 2022

Some updates:

  • I changed the signature for sys_taskqueue_start so that libpd clients can specify the number of (internal) worker threads.

  • I removed sys_taskqueue_setsignalhook because it's a bit too much for now. It's easy to add again later.

  • I removed notes about 1:N and M:N scheduling, i.e. one external thread(pool) serving multiple Pd instances, because it's tricky to implement safely. I think for most use cases it's appropriate to have 1 worker thread (or more) per Pd instance.

The API has more or less stabilized and the code seems to work fine in my tests, so I think I can drop the WIP label. Of course, I'm always open for more feedback :-)

@Spacechild1 Spacechild1 changed the title [WIP] asynchronous task API New asynchronous task API Aug 15, 2022
replace MSVC target with MinGW target
this may serve as a context for sending notifications from *any* thread back to the main thread via task_notify().
this simplifies the logic in task_stop() and taskqueue_stop() a lot.

Also, task_stop() is now safe because we can check whether the given task is alive!
(For example, an external might accidentally call task_stop() on a stale task handle.)
task_stop() now returns 1 on success and 0 if the task could not be found.
@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 12, 2023

I have now added task_notify which allows to dispatch messages from any thread back to Pd's main thread. Currently, this is not trivially possible! You might think you can just do the following:

sys_lock();
outlet_float(&x->x_obj, f);
sys_unlock();

Unfortunately, this can deadlock! For example, imagine the object attempts to join the thread (Pd is locked!) right before the thread calls the above code.

(Currently, the only safe solution is to write your own thread-safe queue and poll it regularly with a clock.)


Now you can do the following:

void mycallback(t_myobj *x, float *f)
{
    if (x) /* check if owner is still alive */
        outlet_float(&x->x_obj, *f);
    free(f);
}

// thread function:
float *data = malloc(sizeof(float));
*data = f;
task_notify(task, data, (t_task_callback)mycallback);

Finally, the task argument can be a "dummy task" (see task_start), so you can use task_notify even outside the context of a worker function. For example, imagine you have a webserver with an internal network thread and you want to handle incoming messages on Pd's main thread. You can also find examples in doc/6.externs/taskobj.c.

@giuliomoro
Copy link
Contributor

I've been following this thread for a while and I like what I've seen.

I like that task_cancel() (which is semantically aligned with pthread_cancel(), which is confusing), is now task_stop(), however that's still slightly inaccurate, in that the task has to be polling a flag before it decides to stop. Maybe it could be even clearer as task_request_stop()? (FWIW, on Bela we have Bela_requestStop() to set the flag and Bela_stopRequested() to check the flag).

@Spacechild1
Copy link
Contributor Author

Spacechild1 commented Jun 12, 2023

however that's still slightly inaccurate, in that the task has to be polling a flag before it decides to stop.

task_stop actually serves two purposes:

  1. provide a safe way for invalidating a running task, e.g. because the underlying object will be freed.

  2. allow to stop/cancel a task while still in progress (which requires the worker function to poll regularly with task_check)

1 is the more important one of the two, 2 is just the icing on the cake :)

task_request_stop() would make sense for 2, but less so for 1. (After all, the task will be invalidated.)

There is another reason why I renamed task_cancel to task_stop: the same function is also used to stop "dummy tasks", in which case task_cancel or task_request_stop would be even more misleading. (But then again, there could be two distinct API functions for "dummy tasks" instead of overloading task_start and task_stop 🤷‍♂️)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature suggestion for an enhancement subject:API "External Programming Interface" (things concerning development of externals) subject:core things concerning the Pd-core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants