Skip to content

Commit

Permalink
Implemented multiworker mode
Browse files Browse the repository at this point in the history
  • Loading branch information
kekekeks committed Apr 10, 2014
1 parent fb09472 commit 3ba30b7
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 40 deletions.
3 changes: 3 additions & 0 deletions EvHttpSharp/EvHttpSharp.csproj
Expand Up @@ -40,7 +40,9 @@
</ItemGroup>
<ItemGroup>
<Compile Include="EventHttpListener.cs" />
<Compile Include="EventHttpMultiworkerListener.cs" />
<Compile Include="EventHttpRequest.cs" />
<Compile Include="IEventHttpListener.cs" />
<Compile Include="Interop\EvBuffer.cs" />
<Compile Include="Interop\EvConnListener.cs" />
<Compile Include="Interop\Event.cs" />
Expand All @@ -57,6 +59,7 @@
<Compile Include="Interop\EvLoopFlags.cs" />
<Compile Include="Interop\IDynLoader.cs" />
<Compile Include="Interop\LinuxLoader.cs" />
<Compile Include="Interop\Platform.cs" />
<Compile Include="Interop\Win32Loader.cs" />
<Compile Include="LibLocator.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
Expand Down
99 changes: 99 additions & 0 deletions EvHttpSharp/EventHttpMultiworkerListener.cs
@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using EvHttpSharp.Interop;

namespace EvHttpSharp
{
public class EventHttpMultiworkerListener : IEventHttpListener
{
private readonly List<EventHttpListener> _workers = new List<EventHttpListener>();
private readonly EventHttpListener.RequestCallback _cb;
private readonly int _workerCount;
private IntPtr _ownedFileDescriptor;


public EventHttpMultiworkerListener(EventHttpListener.RequestCallback cb, int workers)
{
if (workers < 1)
throw new ArgumentException("Invalid number of workers");
_cb = cb;
_workerCount = workers;
}

public void Dispose()
{
_workers.ForEach(x => x.Dispose());
if (_ownedFileDescriptor != IntPtr.Zero)
{
Platform.CloseFileDescriptor(_ownedFileDescriptor);
_ownedFileDescriptor = IntPtr.Zero;
}
}

void CheckAlreadyListening()
{
if (_workers.Count != 0)
throw new InvalidOperationException("Already listening");
}

public void Start(string host, ushort port)
{
var soaddr = new Event.sockaddr_in
{
sin_family = Event.AF_INET,
sin_port = (ushort)IPAddress.HostToNetworkOrder((short)port),
sin_addr = 0,
sin_zero = new byte[8]
};

using (var evBase = Event.EventBaseNew())
using (
var listener = Event.EvConnListenerNewBind(evBase, IntPtr.Zero, IntPtr.Zero, 1u << 3, 256, ref soaddr,
Marshal.SizeOf(soaddr)))
{
if (listener.IsInvalid)
throw new IOException("Unable to bind socket");
_ownedFileDescriptor = listener.FileDescriptor;
}
Start(_ownedFileDescriptor);

}

public void Start(IntPtr sharedSocket)
{
_workers.AddRange(Enumerable.Repeat(0, _workerCount).Select(_ => new EventHttpListener(_cb)));
_workers.ForEach(w => w.Start(sharedSocket));
}

private Task Aggregate(IEnumerable<Task> tasks)
{
var ta = tasks.ToArray();
return Task.Factory.ContinueWhenAll(ta.ToArray(), results =>
{
var exceptions = results.Where(r => r.IsFaulted).Select(r => r.Exception).Cast<Exception>().ToArray();
if (exceptions.Length != 0)
throw new AggregateException(exceptions);
});
}

public Task StopListeningAsync()
{
return Aggregate(_workers.Select(x => x.StopListeningAsync()));
}

public Task WaitForPendingConnections()
{
return Aggregate(_workers.Select(x => x.WaitForPendingConnections()));
}

public Task Shutdown()
{
return Aggregate(_workers.Select(w => w.Shutdown()));
}
}
}
14 changes: 14 additions & 0 deletions EvHttpSharp/IEventHttpListener.cs
@@ -0,0 +1,14 @@
using System;
using System.Threading.Tasks;

namespace EvHttpSharp
{
public interface IEventHttpListener : IDisposable
{
void Start(string host, ushort port);
void Start(IntPtr sharedSocket);
Task StopListeningAsync();
Task WaitForPendingConnections();
Task Shutdown();
}
}
2 changes: 1 addition & 1 deletion EvHttpSharp/Interop/EvConnListener.cs
Expand Up @@ -36,7 +36,7 @@ public IntPtr FileDescriptor
{
get
{
return Event.RunningOnWindows
return Platform.RunningOnWindows
? Event.EvConnListenerGetFdWindows(this)
: new IntPtr(Event.EvConnListenerGetFdNix(this));
}
Expand Down
22 changes: 19 additions & 3 deletions EvHttpSharp/Interop/Event.cs
Expand Up @@ -9,6 +9,18 @@ namespace EvHttpSharp.Interop
static class Event
{
// ReSharper disable InconsistentNaming
[StructLayout(LayoutKind.Sequential)]
internal struct sockaddr_in
{
internal short sin_family;
internal ushort sin_port;
internal int sin_addr;
[MarshalAs(UnmanagedType.ByValArray, SizeConst = 8)]
internal byte[] sin_zero;
}

public const int AF_INET = 2;

public static class D
{
[UnmanagedFunctionPointer (CallingConvention.Cdecl)]
Expand Down Expand Up @@ -117,6 +129,9 @@ public static class D
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate EvHttpBoundSocket evhttp_bind_listener(EvHttp http, EvConnListener listener);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate EvConnListener evconnlistener_new_bind(
EventBase evBase, IntPtr cb, IntPtr ptr, uint flags, int backlog, ref sockaddr_in sockaddr, int socklen);
#region *nix
[UnmanagedFunctionPointer (CallingConvention.Cdecl)]
public delegate void event_callback_normal(int fd, short events, IntPtr arg);
Expand Down Expand Up @@ -173,6 +188,7 @@ public static class D
[EvImport] public static D.event_free EventFree;
[EvImport] public static D.event_active EventActive;
[EvImport] public static D.evconnlistener_free EvConnListenerFree;
[EvImport] public static D.evconnlistener_new_bind EvConnListenerNewBind;
[EvImport(EvDll.Core, "event_new")] public static D.event_new_windows EventNewWindows;
[EvImport(EvDll.Core, "event_new")] public static D.event_new_normal EventNewNix;
[EvImport (EvDll.Core, "evconnlistener_get_fd")] public static D.evconnlistener_get_fd_windows EvConnListenerGetFdWindows;
Expand Down Expand Up @@ -202,19 +218,19 @@ public static class D

public static EvHttpBoundSocket EvHttpAcceptSocketWithHandle(EvHttp http, IntPtr fd)
{
return RunningOnWindows ? EvHttpAcceptSocketWithHandleWindows(http, fd) : EvHttpAcceptSocketWithHandleNix(http, fd.ToInt32());
return Platform.RunningOnWindows ? EvHttpAcceptSocketWithHandleWindows(http, fd) : EvHttpAcceptSocketWithHandleNix(http, fd.ToInt32());
}

public static EvConnListener EvConnListenerNew(EventBase evBase, IntPtr cb, IntPtr ptr, uint flags, int backlog, IntPtr fd)
{
return RunningOnWindows
return Platform.RunningOnWindows
? EvConnListenerNewWindows(evBase, cb, ptr, flags, backlog, fd)

: EvConnListenerNewNix(evBase, cb, ptr, flags, backlog, fd.ToInt32());

}

public static readonly bool RunningOnWindows = Path.DirectorySeparatorChar == '\\';


public static event Action<int, string> Log;

Expand Down
30 changes: 30 additions & 0 deletions EvHttpSharp/Interop/Platform.cs
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;

namespace EvHttpSharp.Interop
{
static class Platform
{
public static readonly bool RunningOnWindows = Path.DirectorySeparatorChar == '\\';

[DllImport("libc", EntryPoint = "close")]
private static extern void Close(int fd);

[DllImport("kernel32.dll")]
private static extern void CloseHandle(IntPtr hFile);

public static void CloseFileDescriptor(IntPtr hFile)
{
if (RunningOnWindows)
CloseHandle(hFile);
else
Close(hFile.ToInt32());
}


}
}
92 changes: 56 additions & 36 deletions Tests/SimpleServerTests.cs
Expand Up @@ -19,68 +19,88 @@ public SimpleServerTests()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
_freePort = (ushort)((IPEndPoint) l.LocalEndpoint).Port;
_freePort = (ushort) ((IPEndPoint) l.LocalEndpoint).Port;
l.Stop();
_urlBase = "http://127.0.0.1:" + _freePort + "/";
}


void CheckPortIsStillFree()
private void CheckPortIsStillFree()
{
var l = new TcpListener(IPAddress.Loopback, _freePort);
l.Start();
l.Stop();
}

private void WithEachServer(Action<Func<EventHttpListener.RequestCallback, IEventHttpListener>> test)
{
test(cb => new EventHttpListener(cb));
test(cb => new EventHttpMultiworkerListener(cb, 1));
test(cb => new EventHttpMultiworkerListener(cb, 2));
}

[Fact]
public void TestServerListening()
{
using (
var server =
new EventHttpListener(r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary<string, string>(), new byte[0])))
WithEachServer(listener =>
{
server.Start("127.0.0.1", _freePort);
var wc = new WebClient();

wc.DownloadData(_urlBase);
}
CheckPortIsStillFree();
using (
var server =
listener(
r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary<string, string>(), new byte[0]))
)
{
server.Start("127.0.0.1", _freePort);
var wc = new WebClient();
wc.DownloadData(_urlBase);
}
CheckPortIsStillFree();
});
}

[Fact]
public void ServerShouldStopListening()
{
using (
var server =
new EventHttpListener(r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary<string, string>(), new byte[0])))
WithEachServer(listener =>
{
server.Start("127.0.0.1", _freePort);
server.StopListeningAsync().Wait();
CheckPortIsStillFree();
}
using (
var server =
listener(
r => r.Respond(System.Net.HttpStatusCode.OK, new Dictionary<string, string>(), new byte[0]))
)
{
server.Start("127.0.0.1", _freePort);
server.StopListeningAsync().Wait();
CheckPortIsStillFree();
}
});
}

[Fact]
public void ServerShouldWaitForPendingRequests ()
public void ServerShouldWaitForPendingRequests()
{
using (
var server =
new EventHttpListener (r => ThreadPool.QueueUserWorkItem(_ =>
{
Thread.Sleep(1000);
r.Respond(HttpStatusCode.OK, new Dictionary<string, string>(), new byte[0]);
})))
WithEachServer(listener =>
{
server.Start ("127.0.0.1", _freePort);

new WebClient().DownloadDataAsync(new Uri(_urlBase));
Thread.Sleep(100);
var task = server.Shutdown();
Thread.Sleep(500);
Assert.False(task.IsCompleted);
Assert.True(task.Wait(4000));
}
using (var server = listener(r => ThreadPool.QueueUserWorkItem(_ =>
{
Thread.Sleep(1000);
r.Respond(HttpStatusCode.OK, new Dictionary<string, string>(), new byte[0]);
})))
{
server.Start("127.0.0.1", _freePort);
new WebClient().DownloadDataAsync(new Uri(_urlBase));
Thread.Sleep(100);
var task = server.Shutdown();
Thread.Sleep(500);
Assert.False(task.IsCompleted);
Assert.True(task.Wait(4000));
}
});
}

}
}
}

0 comments on commit 3ba30b7

Please sign in to comment.