Skip to content

Commit

Permalink
asyn-thread: Don't block waiting on resolver threads during multi_done
Browse files Browse the repository at this point in the history
This commit changes the behavior of libcurl to no longer block on
incomplete resolve threads from the parent thread (ie the user's thread
which is the multi perform thread) during multi_done.

Instead it now orphans those threads in a master list which is culled
periodically by soon-to-be exiting orphans to wait on and destroy those
that are in the process of or have since exited, which is fast. On
global cleanup we wait on and destroy any remaining threads, which may
be slow but at that point we cannot defer it any longer.

Ideally we would not wait for orphaned threads at all, but we have to
because after global cleanup the user may choose to unload the shared
library that is/contains libcurl.

Note: thread_wait_resolv (Curl_resolver_wait_resolv) which blocks
waiting for a resolver thread remains unchanged, since that is
essentially the purpose of the function. It is currently called by FTP
and SOCKS code, and continues to be a known issue that those two may
block waiting for async resolves to finish.

Fixes xxxxs://github.com/curl/issues/4852
Closes #xxxx
  • Loading branch information
jay committed Feb 5, 2020
1 parent 8a5c1cf commit 2db0ff6
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 23 deletions.
232 changes: 215 additions & 17 deletions lib/asyn-thread.c
Expand Up @@ -84,23 +84,72 @@ struct resdata {
struct curltime start;
};

/* Doubly linked list of orphaned thread handles. */
struct thread_list {
curl_thread_t handle;

/* 'exiting' is set true right before an orphaned thread exits.
it should only be set by the orphaned thread from
signal_orphan_is_exiting(). */
bool exiting;

struct thread_list *prev, *next;
};

/* Orphaned threads: A global list of resolver threads that could not be
* completed in time and so they were abandoned by their parent. The list is
* culled periodically by soon-to-be exiting orphans to wait on and destroy
* those that are in the process of or have since exited, which is fast. On
* global cleanup we wait on and destroy any remaining threads, which may be
* slow but at that point we cannot defer it any longer.
*/
struct orphaned_threads {
/* Mutex to lock this. To avoid deadlock the thread-specific thread_sync_data
mutex cannot be used as an inner lock when orphaned_threads is locked. */
curl_mutex_t mutex;

/* List of orphaned threads. */
struct thread_list *first, *last;

/* Count of threads in the list that are in the process of or have exited.
(ie .exiting member of the thread_list item is set true) */
size_t exiting_count;
} orphaned_threads;

/* Flags for wait_and_destroy_orphaned_threads().
They're documented above the function definition. */
#define WAIT_DESTROY_ALL (1<<0)
#define WAIT_DESTROY_EXITING_THREADS_ONLY (1<<1)

static void wait_and_destroy_orphaned_threads(int flags);
static void signal_orphan_is_exiting(struct thread_list *orphan);


/*
* Curl_resolver_global_init()
* Called from curl_global_init() to initialize global resolver environment.
* Does nothing here.
*/
int Curl_resolver_global_init(void)
{
memset(&orphaned_threads, 0, sizeof(orphaned_threads));

if(Curl_mutex_init(&orphaned_threads.mutex))
return CURLE_FAILED_INIT;

return CURLE_OK;
}

/*
* Curl_resolver_global_cleanup()
* Called from curl_global_cleanup() to destroy global resolver environment.
* Does nothing here.
*/
void Curl_resolver_global_cleanup(void)
{
/* Take ownership of all orphaned resolver threads and wait for them to exit.
This is necessary because the user may choose to unload the shared library
that is/contains libcurl. */
wait_and_destroy_orphaned_threads(WAIT_DESTROY_ALL);
Curl_mutex_destroy(&orphaned_threads.mutex);
}

/*
Expand Down Expand Up @@ -181,6 +230,8 @@ struct thread_data {
unsigned int poll_interval;
time_t interval_end;
struct thread_sync_data tsd;
/* 'reserved' memory must be available in case the thread is orphaned */
void *reserved;
};

static struct thread_sync_data *conn_thread_sync_data(struct connectdata *conn)
Expand Down Expand Up @@ -217,9 +268,9 @@ void destroy_thread_sync_data(struct thread_sync_data * tsd)
/* Initialize resolver thread synchronization data */
static
int init_thread_sync_data(struct thread_data * td,
const char *hostname,
int port,
const struct addrinfo *hints)
const char *hostname,
int port,
const struct addrinfo *hints)
{
struct thread_sync_data *tsd = &td->tsd;

Expand All @@ -242,7 +293,11 @@ int init_thread_sync_data(struct thread_data * td,
if(tsd->mtx == NULL)
goto err_exit;

Curl_mutex_init(tsd->mtx);
if(Curl_mutex_init(tsd->mtx)) {
free(tsd->mtx);
tsd->mtx = NULL;
goto err_exit;
}

#ifdef USE_SOCKETPAIR
/* create socket pair, avoid AF_LOCAL since it doesn't build on Solaris */
Expand Down Expand Up @@ -296,6 +351,7 @@ static unsigned int CURL_STDCALL getaddrinfo_thread(void *arg)
{
struct thread_sync_data *tsd = (struct thread_sync_data*)arg;
struct thread_data *td = tsd->td;
struct thread_list *orphan = NULL;
char service[12];
int rc;
#ifdef USE_SOCKETPAIR
Expand All @@ -320,6 +376,7 @@ static unsigned int CURL_STDCALL getaddrinfo_thread(void *arg)
/* too late, gotta clean up the mess */
Curl_mutex_release(tsd->mtx);
destroy_thread_sync_data(tsd);
orphan = (struct thread_list *)td->reserved;
free(td);
}
else {
Expand All @@ -337,6 +394,9 @@ static unsigned int CURL_STDCALL getaddrinfo_thread(void *arg)
Curl_mutex_release(tsd->mtx);
}

if(orphan)
signal_orphan_is_exiting(orphan);

return 0;
}

Expand All @@ -349,6 +409,7 @@ static unsigned int CURL_STDCALL gethostbyname_thread(void *arg)
{
struct thread_sync_data *tsd = (struct thread_sync_data *)arg;
struct thread_data *td = tsd->td;
struct thread_list *orphan = NULL;

tsd->res = Curl_ipv4_resolve_r(tsd->hostname, tsd->port);

Expand All @@ -363,13 +424,17 @@ static unsigned int CURL_STDCALL gethostbyname_thread(void *arg)
/* too late, gotta clean up the mess */
Curl_mutex_release(tsd->mtx);
destroy_thread_sync_data(tsd);
orphan = (struct thread_list *)td->reserved;
free(td);
}
else {
tsd->done = 1;
Curl_mutex_release(tsd->mtx);
}

if(orphan)
signal_orphan_is_exiting(orphan);

return 0;
}

Expand All @@ -388,25 +453,60 @@ static void destroy_async_data(struct Curl_async *async)
struct connectdata *conn = td->tsd.conn;
#endif

/*
* if the thread is still blocking in the resolve syscall, detach it and
* let the thread do the cleanup...
/* We can't wait any longer for the resolver thread so if it's not done
* then it must be orphaned.
*
* 1) add thread to orphaned threads list
* 2) set thread done (this signals to thread it has been orphaned)
*
* An orphaned thread does most of its own cleanup, and any remaining
* cleanup is handled during global cleanup.
*/

Curl_mutex_acquire(td->tsd.mtx);

if(!td->tsd.done && td->thread_hnd != curl_thread_t_null) {
struct thread_list *orphan = (struct thread_list *)td->reserved;

Curl_mutex_acquire(&orphaned_threads.mutex);

#ifdef DEBUGBUILD
{
struct thread_list empty;
memset(&empty, 0, sizeof(empty));
DEBUGASSERT(!memcmp(&empty, orphan, sizeof(empty)));
}
#endif

orphan->handle = td->thread_hnd;
orphan->exiting = false;

if(orphaned_threads.last) {
orphaned_threads.last->next = orphan;
orphan->prev = orphaned_threads.last;
}
else {
orphaned_threads.first = orphan;
orphan->prev = NULL;
}
orphaned_threads.last = orphan;
orphan->next = NULL;

Curl_mutex_release(&orphaned_threads.mutex);
}

done = td->tsd.done;
td->tsd.done = 1;

Curl_mutex_release(td->tsd.mtx);

if(!done) {
Curl_thread_destroy(td->thread_hnd);
}
else {
if(done) {
if(td->thread_hnd != curl_thread_t_null)
Curl_thread_join(&td->thread_hnd);

destroy_thread_sync_data(&td->tsd);

free(async->os_specific);
free(td->reserved);
free(td);
}
#ifdef USE_SOCKETPAIR
/*
Expand Down Expand Up @@ -446,9 +546,11 @@ static bool init_resolve_thread(struct connectdata *conn,
conn->async.status = 0;
conn->async.dns = NULL;
td->thread_hnd = curl_thread_t_null;
td->reserved = calloc(1, sizeof(struct thread_list));

if(!init_thread_sync_data(td, hostname, port, hints)) {
if(!td->reserved || !init_thread_sync_data(td, hostname, port, hints)) {
conn->async.os_specific = NULL;
free(td->reserved);
free(td);
goto errno_exit;
}
Expand All @@ -467,7 +569,7 @@ static bool init_resolve_thread(struct connectdata *conn,
td->thread_hnd = Curl_thread_create(gethostbyname_thread, &td->tsd);
#endif

if(!td->thread_hnd) {
if(td->thread_hnd == curl_thread_t_null) {
/* The thread never started, so mark it as done here for proper cleanup. */
td->tsd.done = 1;
err = errno;
Expand Down Expand Up @@ -802,4 +904,100 @@ CURLcode Curl_set_dns_local_ip6(struct Curl_easy *data,
return CURLE_NOT_BUILT_IN;
}

/* Helper function to wait and destroy some or all orphaned threads.
*
* WAIT_DESTROY_ALL:
* Wait and destroy all orphaned threads. This operation is not safe to specify
* in code that could run in any thread that may be orphaned (ie any resolver
* thread). Waiting on all orphaned threads may take some time. This operation
* must be specified in the call from global cleanup, and ideally nowhere else.
*
* WAIT_DESTROY_EXITING_THREADS_ONLY:
* Wait and destroy only orphaned threads that are in the process of or have
* since exited (ie those with .exiting set true). This is fast.
*
* When the calling thread owns orphaned_threads.mutex it must not call this
* function or deadlock my occur.
*/
static void wait_and_destroy_orphaned_threads(int flags)
{
struct thread_list *thread = NULL;

Curl_mutex_acquire(&orphaned_threads.mutex);

if((flags & WAIT_DESTROY_EXITING_THREADS_ONLY)) {
struct thread_list *p, *next;
struct thread_list *first = NULL, *last = NULL;

if(!orphaned_threads.exiting_count) {
Curl_mutex_release(&orphaned_threads.mutex);
return;
}

for(p = orphaned_threads.first; p; p = next) {
next = p->next;

if(!p->exiting)
continue;

/* remove thread list item from orphaned_threads */
if(p->prev)
p->prev->next = p->next;
if(p->next)
p->next->prev = p->prev;
if(orphaned_threads.first == p)
orphaned_threads.first = p->next;
if(orphaned_threads.last == p)
orphaned_threads.last = p->prev;

/* add thread list item to new thread list */
if(last) {
last->next = p;
p->prev = last;
}
else {
first = p;
p->prev = NULL;
}
last = p;
p->next = NULL;
}

thread = first;
orphaned_threads.exiting_count = 0;
}
else if((flags & WAIT_DESTROY_ALL)) {
thread = orphaned_threads.first;
orphaned_threads.first = NULL;
orphaned_threads.last = NULL;
orphaned_threads.exiting_count = 0;
}

Curl_mutex_release(&orphaned_threads.mutex);

/* Wait and free. Must be done unlocked or there could be deadlock. */
while(thread) {
struct thread_list *next = thread->next;
Curl_thread_join(&thread->handle);
free(thread);
thread = next;
}
}

/* Helper function that must be called from an orphaned thread right before it
exits. */
static void signal_orphan_is_exiting(struct thread_list *orphan)
{
DEBUGASSERT(orphan->handle && !orphan->exiting);

wait_and_destroy_orphaned_threads(WAIT_DESTROY_EXITING_THREADS_ONLY);

Curl_mutex_acquire(&orphaned_threads.mutex);

orphan->exiting = true;
orphaned_threads.exiting_count++;

Curl_mutex_release(&orphaned_threads.mutex);
}

#endif /* CURLRES_THREADED */
9 changes: 6 additions & 3 deletions lib/curl_threads.h
Expand Up @@ -37,16 +37,19 @@
# define curl_mutex_t CRITICAL_SECTION
# define curl_thread_t HANDLE
# define curl_thread_t_null (HANDLE)0
/* The Windows init/destroy macros are set to return 0 (success) even if the
respective Windows API functions do not return values. That makes them
behave the same as pthreads init/destroy which return 0 on success. */
# if !defined(_WIN32_WINNT) || !defined(_WIN32_WINNT_VISTA) || \
(_WIN32_WINNT < _WIN32_WINNT_VISTA) || \
(defined(__MINGW32__) && !defined(__MINGW64_VERSION_MAJOR))
# define Curl_mutex_init(m) InitializeCriticalSection(m)
# define Curl_mutex_init(m) (InitializeCriticalSection(m), 0)
# else
# define Curl_mutex_init(m) InitializeCriticalSectionEx(m, 0, 1)
# define Curl_mutex_init(m) (!InitializeCriticalSectionEx(m, 0, 1))
# endif
# define Curl_mutex_acquire(m) EnterCriticalSection(m)
# define Curl_mutex_release(m) LeaveCriticalSection(m)
# define Curl_mutex_destroy(m) DeleteCriticalSection(m)
# define Curl_mutex_destroy(m) (DeleteCriticalSection(m), 0)
#endif

#if defined(USE_THREADS_POSIX) || defined(USE_THREADS_WIN32)
Expand Down
6 changes: 4 additions & 2 deletions lib/multi.c
Expand Up @@ -552,8 +552,10 @@ static CURLcode multi_done(struct Curl_easy *data,

conn->data = data; /* ensure the connection uses this transfer now */

/* Stop the resolver and free its own resources (but not dns_entry yet). */
Curl_resolver_kill(conn);
/* Cancel the resolver (but not dns_entry yet). We used to call
Curl_resolver_kill here but that blocks waiting for incomplete resolve
threads (eg getaddrinfo has not returned), which may take a while. */
Curl_resolver_cancel(conn);

/* Cleanup possible redirect junk */
Curl_safefree(data->req.newurl);
Expand Down
2 changes: 1 addition & 1 deletion lib/urldata.h
Expand Up @@ -509,7 +509,7 @@ struct Curl_async {
int port;
struct Curl_dns_entry *dns;
int status; /* if done is TRUE, this is the status from the callback */
void *os_specific; /* 'struct thread_data' for Windows */
void *os_specific; /* resolver backend / os specific data */
BIT(done); /* set TRUE when the lookup is complete */
};

Expand Down

0 comments on commit 2db0ff6

Please sign in to comment.