Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[threadpool] Rework delegate begin/end invoke enqueuing #1706

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions mcs/class/System/System.Diagnostics/Process.cs
Expand Up @@ -38,6 +38,7 @@
using System.ComponentModel.Design;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using System.Collections.Generic;
using System.Security;
Expand Down Expand Up @@ -1315,7 +1316,7 @@ enum AsyncModes {
}

[StructLayout (LayoutKind.Sequential)]
sealed class ProcessAsyncReader
sealed class ProcessAsyncReader : IThreadPoolWorkItem
{
/*
The following fields match those of SocketAsyncResult.
Expand Down Expand Up @@ -1352,7 +1353,7 @@ sealed class ProcessAsyncReader
bool err_out; // true -> stdout, false -> stderr
internal int error;
public int operation = 8; // MAGIC NUMBER: see Socket.cs:AsyncOperation
public object ares;
public AsyncResult async_result;
public int EndCalled;

// These fields are not in SocketAsyncResult
Expand Down Expand Up @@ -1456,6 +1457,15 @@ void Flush (bool last)
public void Close () {
stream.Close ();
}

void IThreadPoolWorkItem.ExecuteWorkItem()
{
async_result.Invoke ();
}

void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae)
{
}
}

AsyncModes async_mode;
Expand Down
10 changes: 5 additions & 5 deletions mcs/class/System/System.Net.Sockets/Socket.cs
Expand Up @@ -70,13 +70,13 @@ public partial class Socket : IDisposable

int linger_timeout;

/* the field "safe_handle" is looked up by name by the runtime */
SafeSocketHandle safe_handle;

AddressFamily address_family;
SocketType socket_type;
ProtocolType protocol_type;

/* the field "safe_handle" is looked up by name by the runtime */
internal SafeSocketHandle safe_handle;

/*
* This EndPoint is used when creating new endpoints. Because
* there are many types of EndPoints possible,
Expand Down Expand Up @@ -1806,7 +1806,7 @@ static int Receive_internal (SafeSocketHandle safeHandle, WSABUF[] bufarray, Soc
[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static int Receive_internal (IntPtr sock, WSABUF[] bufarray, SocketFlags flags, out int error);

static int Receive_internal (SafeSocketHandle safeHandle, byte[] buffer, int offset, int count, SocketFlags flags, out int error)
internal static int Receive_internal (SafeSocketHandle safeHandle, byte[] buffer, int offset, int count, SocketFlags flags, out int error)
{
try {
safeHandle.RegisterForBlockingSyscall ();
Expand Down Expand Up @@ -2348,7 +2348,7 @@ static int Send_internal (SafeSocketHandle safeHandle, WSABUF[] bufarray, Socket
[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static int Send_internal (IntPtr sock, WSABUF[] bufarray, SocketFlags flags, out int error);

static int Send_internal (SafeSocketHandle safeHandle, byte[] buf, int offset, int count, SocketFlags flags, out int error)
internal static int Send_internal (SafeSocketHandle safeHandle, byte[] buf, int offset, int count, SocketFlags flags, out int error)
{
try {
safeHandle.RegisterForBlockingSyscall ();
Expand Down
24 changes: 23 additions & 1 deletion mcs/class/System/System.Net.Sockets/SocketAsyncResult.cs
Expand Up @@ -35,7 +35,7 @@
namespace System.Net.Sockets
{
[StructLayout (LayoutKind.Sequential)]
internal sealed class SocketAsyncResult: IAsyncResult
internal sealed class SocketAsyncResult: IAsyncResult, IThreadPoolWorkItem
{
/* Same structure in the runtime. Keep this in sync with
* MonoSocketAsyncResult in metadata/socket-io.h and
Expand Down Expand Up @@ -301,5 +301,27 @@ public void Complete (Socket s, int total)
this.total = total;
Complete ();
}

void IThreadPoolWorkItem.ExecuteWorkItem()
{
switch (operation) {
case SocketOperation.Receive:
total = Socket.Receive_internal (socket.safe_handle, Buffer, Offset, Size, SockFlags, out error);
break;
case SocketOperation.Send:
total = Socket.Send_internal (socket.safe_handle, Buffer, Offset, Size, SockFlags, out error);
break;
}

async_result.Invoke ();

if (completed && callback != null) {
ThreadPool.UnsafeQueueCustomWorkItem (new AsyncResult (state => callback ((IAsyncResult) state), this, false), false);
}
}

void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae)
{
}
}
}
1 change: 0 additions & 1 deletion mcs/class/System/System.dll.sources
Expand Up @@ -725,7 +725,6 @@ ReferenceSources/SRCategoryAttribute.cs
ReferenceSources/SystemNetworkCredential.cs
ReferenceSources/WebHeaderCollectionType.cs
ReferenceSources/Win32Exception.cs
ReferenceSources/MonoSocketRuntimeWorkItem.cs

../../../external/referencesource/System/misc/PrivilegedConfigurationManager.cs
../../../external/referencesource/System/regex/system/text/regularexpressions/Regex.cs
Expand Down
18 changes: 0 additions & 18 deletions mcs/class/corlib/ReferenceSources/MonoRuntimeWorkItem.cs

This file was deleted.

Expand Up @@ -39,7 +39,7 @@ namespace System.Runtime.Remoting.Messaging {

[System.Runtime.InteropServices.ComVisible (true)]
[StructLayout (LayoutKind.Sequential)]
public class AsyncResult : IAsyncResult, IMessageSink {
public class AsyncResult : IAsyncResult, IMessageSink, IThreadPoolWorkItem {

#pragma warning disable 169, 414, 649
object async_state;
Expand Down Expand Up @@ -185,5 +185,17 @@ internal MonoMethodMessage CallMessage
get { return call_message; }
set { call_message = value; }
}

void IThreadPoolWorkItem.ExecuteWorkItem()
{
Invoke ();
}

void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae)
{
}

[MethodImplAttribute(MethodImplOptions.InternalCall)]
internal extern object Invoke ();
}
}
1 change: 0 additions & 1 deletion mcs/class/corlib/corlib.dll.sources
Expand Up @@ -1066,7 +1066,6 @@ ReferenceSources/EncodingTable.cs
ReferenceSources/TypeNameParser.cs
ReferenceSources/RuntimeType.cs
ReferenceSources/RemotingFieldCachedData.cs
ReferenceSources/MonoRuntimeWorkItem.cs
ReferenceSources/MethodBase.cs
ReferenceSources/RuntimeHandles.cs
ReferenceSources/CompareInfo.cs
Expand Down
7 changes: 2 additions & 5 deletions mono/metadata/console-unix.c
Expand Up @@ -225,8 +225,6 @@ do_console_cancel_event (void)
MonoClass *klass;
MonoDelegate *load_value;
MonoMethod *method;
MonoMethodMessage *msg;
MonoMethod *im;
MonoVTable *vtable;

/* FIXME: this should likely iterate all the domains, instead */
Expand All @@ -252,9 +250,8 @@ do_console_cancel_event (void)
klass = load_value->object.vtable->klass;
method = mono_class_get_method_from_name (klass, "BeginInvoke", -1);
g_assert (method != NULL);
im = mono_get_delegate_invoke (method->klass);
msg = mono_method_call_message_new (method, NULL, im, NULL, NULL);
mono_thread_pool_add ((MonoObject *) load_value, msg, NULL, NULL);

mono_thread_pool_begin_invoke (domain, (MonoObject*) load_value, method, NULL);
}

static int need_cancel = FALSE;
Expand Down
9 changes: 3 additions & 6 deletions mono/metadata/icall-def.h
Expand Up @@ -413,9 +413,6 @@ ICALL_TYPE(MAC_IFACE_PROPS, "System.Net.NetworkInformation.MacOsIPInterfacePrope
ICALL(MAC_IFACE_PROPS_1, "ParseRouteInfo_internal", ves_icall_System_Net_NetworkInformation_MacOsIPInterfaceProperties_ParseRouteInfo_internal)
#endif

ICALL_TYPE(MSRWI, "System.Net.Sockets.MonoSocketRuntimeWorkItem", MSRWI_1)
ICALL(MSRWI_1, "ExecuteWorkItem", ves_icall_System_Net_Sockets_MonoSocketRuntimeWorkItem_ExecuteWorkItem)

ICALL_TYPE(SOCK, "System.Net.Sockets.Socket", SOCK_1)
ICALL(SOCK_1, "Accept_internal(intptr,int&,bool)", ves_icall_System_Net_Sockets_Socket_Accept_internal)
ICALL(SOCK_2, "Available_internal(intptr,int&)", ves_icall_System_Net_Sockets_Socket_Available_internal)
Expand Down Expand Up @@ -707,6 +704,9 @@ ICALL_TYPE(ACTS, "System.Runtime.Remoting.Activation.ActivationServices", ACTS_1
ICALL(ACTS_1, "AllocateUninitializedClassInstance", ves_icall_System_Runtime_Activation_ActivationServices_AllocateUninitializedClassInstance)
ICALL(ACTS_2, "EnableProxyActivation", ves_icall_System_Runtime_Activation_ActivationServices_EnableProxyActivation)

ICALL_TYPE(ARES, "System.Runtime.Remoting.Messaging.AsyncResult", ARES_1)
ICALL(ARES_1, "Invoke", ves_icall_System_Runtime_Remoting_Messaging_AsyncResult_Invoke)

ICALL_TYPE(MONOMM, "System.Runtime.Remoting.Messaging.MonoMethodMessage", MONOMM_1)
ICALL(MONOMM_1, "InitMessage", ves_icall_MonoMethodMessage_InitMessage)

Expand Down Expand Up @@ -886,9 +886,6 @@ ICALL(MONIT_6, "Monitor_try_enter", ves_icall_System_Threading_Monitor_Monitor_t
ICALL(MONIT_7, "Monitor_wait", ves_icall_System_Threading_Monitor_Monitor_wait)
ICALL(MONIT_9, "try_enter_with_atomic_var", ves_icall_System_Threading_Monitor_Monitor_try_enter_with_atomic_var)

ICALL_TYPE(MRWI, "System.Threading.MonoRuntimeWorkItem", MRWI_1)
ICALL(MRWI_1, "ExecuteWorkItem", ves_icall_System_Threading_MonoRuntimeWorkItem_ExecuteWorkItem)

ICALL_TYPE(MUTEX, "System.Threading.Mutex", MUTEX_1)
ICALL(MUTEX_1, "CreateMutex_internal(bool,string,bool&)", ves_icall_System_Threading_Mutex_CreateMutex_internal)
ICALL(MUTEX_2, "OpenMutex_internal(string,System.Security.AccessControl.MutexRights,System.IO.MonoIOError&)", ves_icall_System_Threading_Mutex_OpenMutex_internal)
Expand Down
25 changes: 10 additions & 15 deletions mono/metadata/marshal.c
Expand Up @@ -2055,13 +2055,9 @@ mono_marshal_emit_thread_force_interrupt_checkpoint (MonoMethodBuilder *mb)
static MonoAsyncResult *
mono_delegate_begin_invoke (MonoDelegate *delegate, gpointer *params)
{
MonoMethodMessage *msg;
MonoDelegate *async_callback;
MonoMulticastDelegate *mcast_delegate;
MonoObject *state;
MonoMethod *im;
MonoClass *klass;
MonoMethod *method = NULL, *method2 = NULL;
MonoMethod *method;

g_assert (delegate);
mcast_delegate = (MonoMulticastDelegate *) delegate;
Expand All @@ -2077,6 +2073,9 @@ mono_delegate_begin_invoke (MonoDelegate *delegate, gpointer *params)
/* If the target is a proxy, make a direct call. Is proxy's work
// to make the call asynchronous.
*/
MonoMethodMessage *msg;
MonoDelegate *async_callback;
MonoObject *state;
MonoAsyncResult *ares;
MonoObject *exc;
MonoArray *out_args;
Expand All @@ -2100,16 +2099,12 @@ mono_delegate_begin_invoke (MonoDelegate *delegate, gpointer *params)

klass = delegate->object.vtable->klass;

method = mono_get_delegate_invoke (klass);
method2 = mono_class_get_method_from_name (klass, "BeginInvoke", -1);
if (method2)
method = method2;
g_assert (method != NULL);

im = mono_get_delegate_invoke (method->klass);
msg = mono_method_call_message_new (method, params, im, &async_callback, &state);
method = mono_class_get_method_from_name (klass, "BeginInvoke", -1);
if (!method)
method = mono_get_delegate_invoke (klass);
g_assert (method);

return mono_thread_pool_add ((MonoObject *)delegate, msg, async_callback, state);
return mono_thread_pool_begin_invoke (mono_domain_get (), (MonoObject*) delegate, method, params);
}

#ifndef DISABLE_JIT
Expand Down Expand Up @@ -2824,7 +2819,7 @@ mono_delegate_end_invoke (MonoDelegate *delegate, gpointer *params)
} else
#endif
{
res = mono_thread_pool_finish (ares, &out_args, &exc);
res = mono_thread_pool_end_invoke (ares, &out_args, &exc);
}

if (exc) {
Expand Down
3 changes: 3 additions & 0 deletions mono/metadata/object-internals.h
Expand Up @@ -646,6 +646,9 @@ mono_async_result_new (MonoDomain *domain, HANDLE handle,
MonoObject *
mono_async_result_invoke (MonoAsyncResult *ares, MonoObject **exc);

MonoObject *
ves_icall_System_Runtime_Remoting_Messaging_AsyncResult_Invoke (MonoAsyncResult *this);

MonoWaitHandle *
mono_wait_handle_new (MonoDomain *domain, HANDLE handle);

Expand Down
12 changes: 11 additions & 1 deletion mono/metadata/object.c
Expand Up @@ -6100,6 +6100,16 @@ mono_async_result_invoke (MonoAsyncResult *ares, MonoObject **exc)
return res;
}

MonoObject *
ves_icall_System_Runtime_Remoting_Messaging_AsyncResult_Invoke (MonoAsyncResult *this)
{
MonoObject *exc = NULL;
MonoObject *res = mono_async_result_invoke (this, &exc);
if (exc)
mono_raise_exception ((MonoException*) exc);
return res;
}

void
mono_message_init (MonoDomain *domain,
MonoMethodMessage *this,
Expand Down Expand Up @@ -6471,7 +6481,7 @@ mono_method_call_message_new (MonoMethod *method, gpointer *params, MonoMethod *
mono_array_setref (msg->args, i, arg);
}

if (cb != NULL && state != NULL) {
if (params != NULL && cb != NULL && state != NULL) {
*cb = *((MonoDelegate **)params [i]);
i++;
*state = *((MonoObject **)params [i]);
Expand Down