Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

[threadpool] Added dynamic concurrent queue implementation

	Replace the big lock around the list+array used to queue work items
	with a dynamic concurrent queue to reduce contention.
  • Loading branch information...
commit 2670761ee7f20ecfb4b49610394aca0f44aa0738 1 parent b8f9fee
Gonzalo Paniagua Javier gonzalop authored
2  mcs/class/corlib/System/Environment.cs
View
@@ -55,7 +55,7 @@ public static class Environment {
* of icalls, do not require an increment.
*/
#pragma warning disable 169
- private const int mono_corlib_version = 94;
+ private const int mono_corlib_version = 95;
#pragma warning restore 169
[ComVisible (true)]
40 mcs/class/corlib/System/MonoCQItem.cs
View
@@ -0,0 +1,40 @@
+//
+// System.MonoCQItem.cs
+//
+// Author:
+// Gonzalo Paniagua Javier (gonzalo@novell.com)
+//
+// Copyright (C) 2011 Novell, Inc (http://www.novell.com)
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+namespace System {
+#pragma warning disable 169
+ internal sealed class MonoCQItem {
+ object [] array;
+ byte [] array_state;
+ int head;
+ int tail;
+
+ }
+#pragma warning disable 169
+}
+
1  mcs/class/corlib/corlib.dll.sources
View
@@ -197,6 +197,7 @@ System/MissingFieldException.cs
System/MissingMemberException.cs
System/MissingMethodException.cs
System/MonoAsyncCall.cs
+System/MonoCQItem.cs
System/MonoCustomAttrs.cs
System/MonoListItem.cs
System/MonoType.cs
2  mono/metadata/Makefile.am
View
@@ -139,6 +139,8 @@ libmonoruntime_la_SOURCES = \
mono-basic-block.c \
mono-basic-block.h \
mono-config.c \
+ mono-cq.c \
+ mono-cq.h \
mono-debug.h \
mono-debug.c \
mono-debug-debugger.h \
2  mono/metadata/appdomain.c
View
@@ -73,7 +73,7 @@
* Changes which are already detected at runtime, like the addition
* of icalls, do not require an increment.
*/
-#define MONO_CORLIB_VERSION 94
+#define MONO_CORLIB_VERSION 95
typedef struct
{
229 mono/metadata/mono-cq.c
View
@@ -0,0 +1,229 @@
+/*
+ * mono-cq.c: concurrent queue
+ *
+ * Authors:
+ * Gonzalo Paniagua Javier (gonzalo@novell.com)
+ *
+ * Copyright (c) 2011 Novell, Inc (http://www.novell.com)
+ */
+
+#include <mono/metadata/object.h>
+#include <mono/metadata/mono-cq.h>
+#include <mono/metadata/mono-mlist.h>
+
+#define CQ_DEBUG(...)
+//#define CQ_DEBUG(...) g_message(__VA_ARGS__)
+
+struct _MonoCQ {
+ MonoMList *head;
+ MonoMList *tail;
+ volatile gint32 count;
+};
+
+/* matches the System.MonoListItem object */
+struct _MonoMList {
+ MonoObject object;
+ MonoMList *next;
+ MonoObject *data;
+};
+
+/* matches the System.MonoCQItem object */
+struct _MonoCQItem {
+ MonoObject object;
+ MonoArray *array; // MonoObjects
+ MonoArray *array_state; // byte array
+ volatile gint32 first;
+ volatile gint32 last;
+};
+
+typedef struct _MonoCQItem MonoCQItem;
+#define CQ_ARRAY_SIZE 64
+
+static MonoVTable *monocq_item_vtable = NULL;
+
+static MonoCQItem *
+mono_cqitem_alloc (void)
+{
+ MonoCQItem *queue;
+ MonoDomain *domain = mono_get_root_domain ();
+
+ if (!monocq_item_vtable) {
+ MonoClass *klass = mono_class_from_name (mono_defaults.corlib, "System", "MonoCQItem");
+ monocq_item_vtable = mono_class_vtable (domain, klass);
+ g_assert (monocq_item_vtable);
+ }
+ queue = (MonoCQItem *) mono_object_new_fast (monocq_item_vtable);
+ queue->array = mono_array_new (domain, mono_defaults.object_class, CQ_ARRAY_SIZE);
+ queue->array_state = mono_array_new (domain, mono_defaults.byte_class, CQ_ARRAY_SIZE);
+ return queue;
+}
+
+MonoCQ *
+mono_cq_create ()
+{
+ MonoCQ *cq;
+
+ cq = g_new0 (MonoCQ, 1);
+ MONO_GC_REGISTER_ROOT (cq->head);
+ MONO_GC_REGISTER_ROOT (cq->tail);
+ cq->head = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
+ cq->tail = cq->head;
+ CQ_DEBUG ("Created %p", cq);
+ return cq;
+}
+
+void
+mono_cq_destroy (MonoCQ *cq)
+{
+ CQ_DEBUG ("Destroy %p", cq);
+ if (!cq)
+ return;
+
+ memset (cq, 0, sizeof (MonoCQ));
+ MONO_GC_UNREGISTER_ROOT (cq->tail);
+ MONO_GC_UNREGISTER_ROOT (cq->head);
+ g_free (cq);
+}
+
+gint32
+mono_cq_count (MonoCQ *cq)
+{
+ if (!cq)
+ return 0;
+
+ CQ_DEBUG ("Count %d", cq->count);
+ return cq->count;
+}
+
+static void
+mono_cq_add_node (MonoCQ *cq)
+{
+ MonoMList *n;
+ MonoMList *prev_tail;
+
+ CQ_DEBUG ("Adding node");
+ n = mono_mlist_alloc ((MonoObject *) mono_cqitem_alloc ());
+ prev_tail = cq->tail;
+ prev_tail->next = n;
+ cq->tail = n;
+}
+
+static gboolean
+mono_cqitem_try_enqueue (MonoCQ *cq, MonoObject *obj)
+{
+ MonoCQItem *queue;
+ MonoMList *tail;
+ gint32 pos;
+
+ tail = cq->tail;
+ queue = (MonoCQItem *) tail->data;
+ do {
+ pos = queue->last;
+ if (pos >= CQ_ARRAY_SIZE) {
+ CQ_DEBUG ("enqueue(): pos >= CQ_ARRAY_SIZE, %d >= %d", pos, CQ_ARRAY_SIZE);
+ return FALSE;
+ }
+
+ if (InterlockedCompareExchange (&queue->last, pos + 1, pos) == pos) {
+ mono_array_setref (queue->array, pos, obj);
+ mono_array_set (queue->array_state, char, pos, TRUE);
+ if ((pos + 1) == CQ_ARRAY_SIZE) {
+ CQ_DEBUG ("enqueue(): pos + 1 == CQ_ARRAY_SIZE, %d. Adding node.", CQ_ARRAY_SIZE);
+ mono_cq_add_node (cq);
+ }
+ return TRUE;
+ }
+ } while (TRUE);
+ g_assert_not_reached ();
+}
+
+void
+mono_cq_enqueue (MonoCQ *cq, MonoObject *obj)
+{
+ if (cq == NULL || obj == NULL)
+ return;
+
+ do {
+ if (mono_cqitem_try_enqueue (cq, obj)) {
+ CQ_DEBUG ("Queued one");
+ InterlockedIncrement (&cq->count);
+ break;
+ }
+ SleepEx (0, FALSE);
+ } while (TRUE);
+}
+
+static void
+mono_cq_remove_node (MonoCQ *cq)
+{
+ MonoMList *old_head;
+
+ CQ_DEBUG ("Removing node");
+ old_head = cq->head;
+ /* Not needed now that array_state is GC memory
+ MonoCQItem *queue;
+ int i;
+ gboolean retry;
+ queue = (MonoCQItem *) old_head->data;
+ do {
+ retry = FALSE;
+ for (i = 0; i < CQ_ARRAY_SIZE; i++) {
+ if (mono_array_get (queue->array_state, char, i) == TRUE) {
+ retry = TRUE;
+ break;
+ }
+ }
+ if (retry)
+ SleepEx (0, FALSE);
+ } while (retry);
+ */
+ while (old_head->next == NULL)
+ SleepEx (0, FALSE);
+ cq->head = old_head->next;
+ old_head = NULL;
+}
+
+static gboolean
+mono_cqitem_try_dequeue (MonoCQ *cq, MonoObject **obj)
+{
+ MonoCQItem *queue;
+ MonoMList *head;
+ gint32 pos;
+
+ head = cq->head;
+ queue = (MonoCQItem *) head->data;
+ do {
+ pos = queue->first;
+ if (pos >= queue->last || pos >= CQ_ARRAY_SIZE)
+ return FALSE;
+
+ if (InterlockedCompareExchange (&queue->first, pos + 1, pos) == pos) {
+ while (mono_array_get (queue->array_state, char, pos) == FALSE) {
+ SleepEx (0, FALSE);
+ }
+ *obj = mono_array_get (queue->array, MonoObject *, pos);
+ mono_array_set (queue->array, MonoObject *, pos, NULL);
+ mono_array_set (queue->array_state, char, pos, FALSE);
+ if ((pos + 1) == CQ_ARRAY_SIZE) {
+ mono_cq_remove_node (cq);
+ }
+ return TRUE;
+ }
+ } while (TRUE);
+ g_assert_not_reached ();
+}
+
+gboolean
+mono_cq_dequeue (MonoCQ *cq, MonoObject **result)
+{
+ while (cq->count > 0) {
+ if (mono_cqitem_try_dequeue (cq, result)) {
+ CQ_DEBUG ("Dequeued one");
+ InterlockedDecrement (&cq->count);
+ return TRUE;
+ }
+ SleepEx (0, FALSE);
+ }
+ return FALSE;
+}
+
22 mono/metadata/mono-cq.h
View
@@ -0,0 +1,22 @@
+#ifndef _MONO_CQ_H
+#define _MONO_CQ_H
+
+#include <config.h>
+#include <glib.h>
+#include <mono/metadata/object.h>
+#include <mono/metadata/gc-internal.h>
+
+G_BEGIN_DECLS
+
+typedef struct _MonoCQ MonoCQ;
+
+MonoCQ *mono_cq_create (void) MONO_INTERNAL;
+void mono_cq_destroy (MonoCQ *cq) MONO_INTERNAL;
+gint mono_cq_count (MonoCQ *cq) MONO_INTERNAL;
+void mono_cq_enqueue (MonoCQ *cq, MonoObject *obj) MONO_INTERNAL;
+gboolean mono_cq_dequeue (MonoCQ *cq, MonoObject **result) MONO_INTERNAL;
+
+G_END_DECLS
+
+#endif
+
169 mono/metadata/threadpool.c
View
@@ -12,12 +12,6 @@
#include <config.h>
#include <glib.h>
-#ifdef MONO_SMALL_CONFIG
-#define QUEUE_LENGTH 16 /* Must be 2^N */
-#else
-#define QUEUE_LENGTH 64 /* Must be 2^N */
-#endif
-
#include <mono/metadata/domain-internals.h>
#include <mono/metadata/profiler-private.h>
#include <mono/metadata/tabledefs.h>
@@ -31,6 +25,7 @@
#include <mono/metadata/marshal.h>
#include <mono/metadata/mono-perfcounters.h>
#include <mono/metadata/socket-io.h>
+#include <mono/metadata/mono-cq.h>
#include <mono/metadata/mono-wsq.h>
#include <mono/io-layer/io-layer.h>
#include <mono/metadata/gc-internal.h>
@@ -131,11 +126,7 @@ typedef struct {
typedef struct {
MonoSemType lock;
- MonoMList *first; /* GC root */
- MonoMList *last;
- MonoMList *unused; /* Up to 20 chunks. GC root */
- gint head;
- gint tail;
+ MonoCQ *queue; /* GC root */
MonoSemType new_job;
volatile gint waiting; /* threads waiting for a work item */
@@ -1169,10 +1160,10 @@ static void
threadpool_init (ThreadPool *tp, int min_threads, int max_threads, void (*async_invoke) (gpointer))
{
memset (tp, 0, sizeof (ThreadPool));
- MONO_SEM_INIT (&tp->lock, 1);
tp->min_threads = min_threads;
tp->max_threads = max_threads;
tp->async_invoke = async_invoke;
+ tp->queue = mono_cq_create ();
MONO_SEM_INIT (&tp->new_job, 0);
}
@@ -1229,15 +1220,11 @@ signal_handler (int signo)
ThreadPool *tp;
tp = &async_tp;
- MONO_SEM_WAIT (&tp->lock);
g_print ("\n-----Non-IO-----\n");
print_pool_info (tp);
- MONO_SEM_POST (&tp->lock);
tp = &async_io_tp;
- MONO_SEM_WAIT (&tp->lock);
g_print ("\n-----IO-----\n");
print_pool_info (tp);
- MONO_SEM_POST (&tp->lock);
alarm (2);
}
#endif
@@ -1272,8 +1259,7 @@ monitor_thread (gpointer data)
break;
if (tp->waiting > 0)
continue;
- MONO_SEM_WAIT (&tp->lock);
- need_one = (tp->head != tp->tail);
+ need_one = (mono_cq_count (tp->queue) > 0);
if (!need_one) {
EnterCriticalSection (&wsqs_lock);
for (i = 0; wsqs != NULL && i < wsqs->len; i++) {
@@ -1286,7 +1272,6 @@ monitor_thread (gpointer data)
}
LeaveCriticalSection (&wsqs_lock);
}
- MONO_SEM_POST (&tp->lock);
if (need_one)
threadpool_start_thread (tp);
}
@@ -1312,13 +1297,6 @@ mono_thread_pool_init ()
}
}
- MONO_GC_REGISTER_ROOT_SINGLE (async_tp.first);
- MONO_GC_REGISTER_ROOT_SINGLE (async_tp.last);
- MONO_GC_REGISTER_ROOT_SINGLE (async_tp.unused);
- MONO_GC_REGISTER_ROOT_SINGLE (async_io_tp.first);
- MONO_GC_REGISTER_ROOT_SINGLE (async_io_tp.unused);
- MONO_GC_REGISTER_ROOT_SINGLE (async_io_tp.last);
-
MONO_GC_REGISTER_ROOT_FIXED (socket_io_data.sock_to_state);
InitializeCriticalSection (&socket_io_data.io_lock);
if (g_getenv ("MONO_THREADS_PER_CPU") != NULL) {
@@ -1469,16 +1447,12 @@ mono_thread_pool_cleanup (void)
if (!(async_tp.pool_status == 0 || async_tp.pool_status == 2)) {
if (!(async_tp.pool_status == 1 && InterlockedCompareExchange (&async_tp.pool_status, 2, 1) == 2)) {
InterlockedExchange (&async_io_tp.pool_status, 2);
- MONO_SEM_WAIT (&async_tp.lock);
threadpool_free_queue (&async_tp);
threadpool_kill_idle_threads (&async_tp);
- MONO_SEM_POST (&async_tp.lock);
socket_io_cleanup (&socket_io_data); /* Empty when DISABLE_SOCKETS is defined */
- MONO_SEM_WAIT (&async_io_tp.lock);
threadpool_free_queue (&async_io_tp);
threadpool_kill_idle_threads (&async_io_tp);
- MONO_SEM_POST (&async_io_tp.lock);
MONO_SEM_DESTROY (&async_io_tp.new_job);
}
}
@@ -1492,42 +1466,6 @@ mono_thread_pool_cleanup (void)
MONO_SEM_DESTROY (&async_tp.new_job);
}
-/* Caller must enter &tp->lock */
-static MonoObject*
-dequeue_job_nolock (ThreadPool *tp)
-{
- MonoObject *ar;
- MonoArray *array;
- MonoMList *list;
-
- list = tp->first;
- do {
- if (mono_runtime_is_shutting_down ())
- return NULL;
- if (!list || tp->head == tp->tail)
- return NULL;
-
- array = (MonoArray *) mono_mlist_get_data (list);
- ar = mono_array_get (array, MonoObject *, tp->head % QUEUE_LENGTH);
- mono_array_set (array, MonoObject *, tp->head % QUEUE_LENGTH, NULL);
- tp->head++;
- if ((tp->head % QUEUE_LENGTH) == 0) {
- list = tp->first;
- tp->first = mono_mlist_next (list);
- if (tp->first == NULL)
- tp->last = NULL;
- if (mono_mlist_length (tp->unused) < 20) {
- /* reuse this chunk */
- tp->unused = mono_mlist_set_next (list, tp->unused);
- }
- tp->head -= QUEUE_LENGTH;
- tp->tail -= QUEUE_LENGTH;
- }
- list = tp->first;
- } while (ar == NULL);
- return ar;
-}
-
static gboolean
threadpool_start_thread (ThreadPool *tp)
{
@@ -1563,35 +1501,12 @@ threadpool_append_job (ThreadPool *tp, MonoObject *ar)
threadpool_append_jobs (tp, &ar, 1);
}
-static MonoMList *
-create_or_reuse_list (ThreadPool *tp)
-{
- MonoMList *list;
- MonoArray *array;
-
- list = NULL;
- if (tp->unused) {
- list = tp->unused;
- tp->unused = mono_mlist_next (list);
- mono_mlist_set_next (list, NULL);
- //TP_DEBUG (tp->nodes_reused++);
- } else {
- array = mono_array_new_cached (mono_get_root_domain (), mono_defaults.object_class, QUEUE_LENGTH);
- list = mono_mlist_alloc ((MonoObject *) array);
- //TP_DEBUG (tp->nodes_created++);
- }
- return list;
-}
-
static void
threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
{
static int job_counter;
- MonoArray *array;
- MonoMList *list;
MonoObject *ar;
gint i;
- gboolean lock_taken = FALSE; /* We won't take the lock when the local queue is used */
if (mono_runtime_is_shutting_down ())
return;
@@ -1612,25 +1527,8 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
if (!tp->is_io && mono_wsq_local_push (ar))
continue;
- if (!lock_taken) {
- MONO_SEM_WAIT (&tp->lock);
- lock_taken = TRUE;
- }
- if ((tp->tail % QUEUE_LENGTH) == 0) {
- list = create_or_reuse_list (tp);
- if (tp->last != NULL)
- mono_mlist_set_next (tp->last, list);
- tp->last = list;
- if (tp->first == NULL)
- tp->first = tp->last;
- }
-
- array = (MonoArray *) mono_mlist_get_data (tp->last);
- mono_array_setref (array, tp->tail % QUEUE_LENGTH, ar);
- tp->tail++;
+ mono_cq_enqueue (tp->queue, ar);
}
- if (lock_taken)
- MONO_SEM_POST (&tp->lock);
for (i = 0; i < MIN(njobs, tp->max_threads); i++)
pulse_on_new_job (tp);
@@ -1639,45 +1537,24 @@ threadpool_append_jobs (ThreadPool *tp, MonoObject **jobs, gint njobs)
static void
threadpool_clear_queue (ThreadPool *tp, MonoDomain *domain)
{
- MonoMList *current;
- MonoArray *array;
MonoObject *obj;
+ MonoMList *other;
int domain_count;
- int i;
+ other = NULL;
domain_count = 0;
- MONO_SEM_WAIT (&tp->lock);
- current = tp->first;
- while (current) {
- array = (MonoArray *) mono_mlist_get_data (current);
- for (i = 0; i < QUEUE_LENGTH; i++) {
- obj = mono_array_get (array, MonoObject*, i);
- if (obj != NULL && obj->vtable->domain == domain) {
- domain_count++;
- mono_array_setref (array, i, NULL);
- threadpool_jobs_dec (obj);
- }
+ while (mono_cq_dequeue (tp->queue, &obj)) {
+ if (obj != NULL && obj->vtable->domain == domain) {
+ domain_count++;
+ threadpool_jobs_dec (obj);
+ } else if (obj != NULL) {
+ other = mono_mlist_prepend (other, obj);
}
- current = mono_mlist_next (current);
}
- if (!domain_count) {
- MONO_SEM_POST (&tp->lock);
- return;
- }
-
- current = tp->first;
- tp->first = NULL;
- tp->last = NULL;
- tp->head = 0;
- tp->tail = 0;
- MONO_SEM_POST (&tp->lock);
- /* Re-add everything but the nullified elements */
- while (current) {
- array = (MonoArray *) mono_mlist_get_data (current);
- threadpool_append_jobs (tp, mono_array_addr (array, MonoObject *, 0), QUEUE_LENGTH);
- memset (mono_array_addr (array, MonoObject *, 0), 0, sizeof (MonoObject *) * QUEUE_LENGTH);
- current = mono_mlist_next (current);
+ while (other) {
+ threadpool_append_job (tp, (MonoObject *) mono_mlist_get_data (other));
+ other = mono_mlist_next (other);
}
}
@@ -1730,19 +1607,17 @@ mono_thread_pool_remove_domain_jobs (MonoDomain *domain, int timeout)
static void
threadpool_free_queue (ThreadPool *tp)
{
- tp->head = tp->tail = 0;
- tp->first = NULL;
- tp->unused = NULL;
+ mono_cq_destroy (tp->queue);
+ tp->queue = NULL;
}
gboolean
mono_thread_pool_is_queue_array (MonoArray *o)
{
- gpointer obj = o;
+ // gpointer obj = o;
// FIXME: need some fix in sgen code.
- // There are roots at: async*tp.unused (MonoMList) and wsqs [n]->queue (MonoArray)
- return obj == async_tp.first || obj == async_io_tp.first;
+ return FALSE;
}
static MonoWSQ *
@@ -1830,9 +1705,7 @@ dequeue_or_steal (ThreadPool *tp, gpointer *data)
if (mono_runtime_is_shutting_down ())
return FALSE;
TP_DEBUG ("Dequeue");
- MONO_SEM_WAIT (&tp->lock);
- *data = dequeue_job_nolock (tp);
- MONO_SEM_POST (&tp->lock);
+ mono_cq_dequeue (tp->queue, (MonoObject **) data);
if (!tp->is_io && !*data)
try_steal (data, FALSE);
return (*data != NULL);
Please sign in to comment.
Something went wrong with that request. Please try again.