From 3ba30b7501c6a01b5c54e209ddb3ec80892f56e9 Mon Sep 17 00:00:00 2001 From: Nikita Tsukanov Date: Thu, 10 Apr 2014 23:24:04 +0400 Subject: [PATCH] Implemented multiworker mode --- EvHttpSharp/EvHttpSharp.csproj | 3 + EvHttpSharp/EventHttpMultiworkerListener.cs | 99 +++++++++++++++++++++ EvHttpSharp/IEventHttpListener.cs | 14 +++ EvHttpSharp/Interop/EvConnListener.cs | 2 +- EvHttpSharp/Interop/Event.cs | 22 ++++- EvHttpSharp/Interop/Platform.cs | 30 +++++++ Tests/SimpleServerTests.cs | 92 +++++++++++-------- 7 files changed, 222 insertions(+), 40 deletions(-) create mode 100644 EvHttpSharp/EventHttpMultiworkerListener.cs create mode 100644 EvHttpSharp/IEventHttpListener.cs create mode 100644 EvHttpSharp/Interop/Platform.cs diff --git a/EvHttpSharp/EvHttpSharp.csproj b/EvHttpSharp/EvHttpSharp.csproj index d69f41d..e451058 100644 --- a/EvHttpSharp/EvHttpSharp.csproj +++ b/EvHttpSharp/EvHttpSharp.csproj @@ -40,7 +40,9 @@ + + @@ -57,6 +59,7 @@ + diff --git a/EvHttpSharp/EventHttpMultiworkerListener.cs b/EvHttpSharp/EventHttpMultiworkerListener.cs new file mode 100644 index 0000000..45f7d08 --- /dev/null +++ b/EvHttpSharp/EventHttpMultiworkerListener.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using EvHttpSharp.Interop; + +namespace EvHttpSharp +{ + public class EventHttpMultiworkerListener : IEventHttpListener + { + private readonly List _workers = new List(); + private readonly EventHttpListener.RequestCallback _cb; + private readonly int _workerCount; + private IntPtr _ownedFileDescriptor; + + + public EventHttpMultiworkerListener(EventHttpListener.RequestCallback cb, int workers) + { + if (workers < 1) + throw new ArgumentException("Invalid number of workers"); + _cb = cb; + _workerCount = workers; + } + + public void Dispose() + { + _workers.ForEach(x => x.Dispose()); + if (_ownedFileDescriptor != IntPtr.Zero) + { + Platform.CloseFileDescriptor(_ownedFileDescriptor); + _ownedFileDescriptor = IntPtr.Zero; + } + } + + void CheckAlreadyListening() + { + if (_workers.Count != 0) + throw new InvalidOperationException("Already listening"); + } + + public void Start(string host, ushort port) + { + var soaddr = new Event.sockaddr_in + { + sin_family = Event.AF_INET, + sin_port = (ushort)IPAddress.HostToNetworkOrder((short)port), + sin_addr = 0, + sin_zero = new byte[8] + }; + + using (var evBase = Event.EventBaseNew()) + using ( + var listener = Event.EvConnListenerNewBind(evBase, IntPtr.Zero, IntPtr.Zero, 1u << 3, 256, ref soaddr, + Marshal.SizeOf(soaddr))) + { + if (listener.IsInvalid) + throw new IOException("Unable to bind socket"); + _ownedFileDescriptor = listener.FileDescriptor; + } + Start(_ownedFileDescriptor); + + } + + public void Start(IntPtr sharedSocket) + { + _workers.AddRange(Enumerable.Repeat(0, _workerCount).Select(_ => new EventHttpListener(_cb))); + _workers.ForEach(w => w.Start(sharedSocket)); + } + + private Task Aggregate(IEnumerable tasks) + { + var ta = tasks.ToArray(); + return Task.Factory.ContinueWhenAll(ta.ToArray(), results => + { + var exceptions = results.Where(r => r.IsFaulted).Select(r => r.Exception).Cast().ToArray(); + if (exceptions.Length != 0) + throw new AggregateException(exceptions); + }); + } + + public Task StopListeningAsync() + { + return Aggregate(_workers.Select(x => x.StopListeningAsync())); + } + + public Task WaitForPendingConnections() + { + return Aggregate(_workers.Select(x => x.WaitForPendingConnections())); + } + + public Task Shutdown() + { + return Aggregate(_workers.Select(w => w.Shutdown())); + } + } +} diff --git a/EvHttpSharp/IEventHttpListener.cs b/EvHttpSharp/IEventHttpListener.cs new file mode 100644 index 0000000..44395a2 --- /dev/null +++ b/EvHttpSharp/IEventHttpListener.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; + +namespace EvHttpSharp +{ + public interface IEventHttpListener : IDisposable + { + void Start(string host, ushort port); + void Start(IntPtr sharedSocket); + Task StopListeningAsync(); + Task WaitForPendingConnections(); + Task Shutdown(); + } +} \ No newline at end of file diff --git a/EvHttpSharp/Interop/EvConnListener.cs b/EvHttpSharp/Interop/EvConnListener.cs index 7c7d0a2..4729ce4 100644 --- a/EvHttpSharp/Interop/EvConnListener.cs +++ b/EvHttpSharp/Interop/EvConnListener.cs @@ -36,7 +36,7 @@ public IntPtr FileDescriptor { get { - return Event.RunningOnWindows + return Platform.RunningOnWindows ? Event.EvConnListenerGetFdWindows(this) : new IntPtr(Event.EvConnListenerGetFdNix(this)); } diff --git a/EvHttpSharp/Interop/Event.cs b/EvHttpSharp/Interop/Event.cs index 057c272..bcb6b58 100644 --- a/EvHttpSharp/Interop/Event.cs +++ b/EvHttpSharp/Interop/Event.cs @@ -9,6 +9,18 @@ namespace EvHttpSharp.Interop static class Event { // ReSharper disable InconsistentNaming + [StructLayout(LayoutKind.Sequential)] + internal struct sockaddr_in + { + internal short sin_family; + internal ushort sin_port; + internal int sin_addr; + [MarshalAs(UnmanagedType.ByValArray, SizeConst = 8)] + internal byte[] sin_zero; + } + + public const int AF_INET = 2; + public static class D { [UnmanagedFunctionPointer (CallingConvention.Cdecl)] @@ -117,6 +129,9 @@ public static class D [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate EvHttpBoundSocket evhttp_bind_listener(EvHttp http, EvConnListener listener); + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + public delegate EvConnListener evconnlistener_new_bind( + EventBase evBase, IntPtr cb, IntPtr ptr, uint flags, int backlog, ref sockaddr_in sockaddr, int socklen); #region *nix [UnmanagedFunctionPointer (CallingConvention.Cdecl)] public delegate void event_callback_normal(int fd, short events, IntPtr arg); @@ -173,6 +188,7 @@ public static class D [EvImport] public static D.event_free EventFree; [EvImport] public static D.event_active EventActive; [EvImport] public static D.evconnlistener_free EvConnListenerFree; + [EvImport] public static D.evconnlistener_new_bind EvConnListenerNewBind; [EvImport(EvDll.Core, "event_new")] public static D.event_new_windows EventNewWindows; [EvImport(EvDll.Core, "event_new")] public static D.event_new_normal EventNewNix; [EvImport (EvDll.Core, "evconnlistener_get_fd")] public static D.evconnlistener_get_fd_windows EvConnListenerGetFdWindows; @@ -202,19 +218,19 @@ public static class D public static EvHttpBoundSocket EvHttpAcceptSocketWithHandle(EvHttp http, IntPtr fd) { - return RunningOnWindows ? EvHttpAcceptSocketWithHandleWindows(http, fd) : EvHttpAcceptSocketWithHandleNix(http, fd.ToInt32()); + return Platform.RunningOnWindows ? EvHttpAcceptSocketWithHandleWindows(http, fd) : EvHttpAcceptSocketWithHandleNix(http, fd.ToInt32()); } public static EvConnListener EvConnListenerNew(EventBase evBase, IntPtr cb, IntPtr ptr, uint flags, int backlog, IntPtr fd) { - return RunningOnWindows + return Platform.RunningOnWindows ? EvConnListenerNewWindows(evBase, cb, ptr, flags, backlog, fd) : EvConnListenerNewNix(evBase, cb, ptr, flags, backlog, fd.ToInt32()); } - public static readonly bool RunningOnWindows = Path.DirectorySeparatorChar == '\\'; + public static event Action Log; diff --git a/EvHttpSharp/Interop/Platform.cs b/EvHttpSharp/Interop/Platform.cs new file mode 100644 index 0000000..6afeb12 --- /dev/null +++ b/EvHttpSharp/Interop/Platform.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.InteropServices; +using System.Text; + +namespace EvHttpSharp.Interop +{ + static class Platform + { + public static readonly bool RunningOnWindows = Path.DirectorySeparatorChar == '\\'; + + [DllImport("libc", EntryPoint = "close")] + private static extern void Close(int fd); + + [DllImport("kernel32.dll")] + private static extern void CloseHandle(IntPtr hFile); + + public static void CloseFileDescriptor(IntPtr hFile) + { + if (RunningOnWindows) + CloseHandle(hFile); + else + Close(hFile.ToInt32()); + } + + + } +} diff --git a/Tests/SimpleServerTests.cs b/Tests/SimpleServerTests.cs index 442d101..ae79c47 100644 --- a/Tests/SimpleServerTests.cs +++ b/Tests/SimpleServerTests.cs @@ -19,68 +19,88 @@ public SimpleServerTests() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); - _freePort = (ushort)((IPEndPoint) l.LocalEndpoint).Port; + _freePort = (ushort) ((IPEndPoint) l.LocalEndpoint).Port; l.Stop(); _urlBase = "http://127.0.0.1:" + _freePort + "/"; } - void CheckPortIsStillFree() + private void CheckPortIsStillFree() { var l = new TcpListener(IPAddress.Loopback, _freePort); l.Start(); l.Stop(); } + private void WithEachServer(Action> test) + { + test(cb => new EventHttpListener(cb)); + test(cb => new EventHttpMultiworkerListener(cb, 1)); + test(cb => new EventHttpMultiworkerListener(cb, 2)); + } + [Fact] public void TestServerListening() { - using ( - var server = - new EventHttpListener(r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary(), new byte[0]))) + WithEachServer(listener => { - server.Start("127.0.0.1", _freePort); - var wc = new WebClient(); - - wc.DownloadData(_urlBase); - } - CheckPortIsStillFree(); + using ( + var server = + listener( + r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary(), new byte[0])) + ) + { + server.Start("127.0.0.1", _freePort); + var wc = new WebClient(); + + wc.DownloadData(_urlBase); + } + CheckPortIsStillFree(); + }); } [Fact] public void ServerShouldStopListening() { - using ( - var server = - new EventHttpListener(r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary(), new byte[0]))) + WithEachServer(listener => { - server.Start("127.0.0.1", _freePort); - server.StopListeningAsync().Wait(); - CheckPortIsStillFree(); - } + + using ( + var server = + listener( + r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary(), new byte[0])) + ) + { + server.Start("127.0.0.1", _freePort); + server.StopListeningAsync().Wait(); + CheckPortIsStillFree(); + } + }); } [Fact] - public void ServerShouldWaitForPendingRequests () + public void ServerShouldWaitForPendingRequests() { - using ( - var server = - new EventHttpListener (r => ThreadPool.QueueUserWorkItem(_ => - { - Thread.Sleep(1000); - r.Respond(HttpStatusCode.OK, new Dictionary(), new byte[0]); - }))) + WithEachServer(listener => { - server.Start ("127.0.0.1", _freePort); - - new WebClient().DownloadDataAsync(new Uri(_urlBase)); - Thread.Sleep(100); - var task = server.Shutdown(); - Thread.Sleep(500); - Assert.False(task.IsCompleted); - Assert.True(task.Wait(4000)); - } + using (var server = listener(r => ThreadPool.QueueUserWorkItem(_ => + { + Thread.Sleep(1000); + r.Respond(HttpStatusCode.OK, new Dictionary(), new byte[0]); + }))) + { + server.Start("127.0.0.1", _freePort); + + new WebClient().DownloadDataAsync(new Uri(_urlBase)); + Thread.Sleep(100); + var task = server.Shutdown(); + + Thread.Sleep(500); + Assert.False(task.IsCompleted); + Assert.True(task.Wait(4000)); + } + }); } } -} +} \ No newline at end of file