-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathChannel.cs
60 lines (50 loc) · 2.03 KB
/
Channel.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
using System.Collections.Generic;
using System.Threading;
namespace cschan
{
public class Channel<T> : IChannel<T>
{
private readonly WaitHandle _exitHandle;
private readonly int _timeout;
private readonly Semaphore _sPut;
private readonly Semaphore _sGet;
private readonly object _guard = new object();
private readonly Queue<T> _q = new Queue<T>();
public Channel(int capacity) : this(capacity, new ManualResetEvent(false)) { }
public Channel(int capacity, WaitHandle exitHandle) : this(capacity, exitHandle, Timeout.Infinite) { }
public Channel(int capacity, WaitHandle exitHandle, int timeout)
{
_exitHandle = exitHandle;
_timeout = timeout;
_sPut = new Semaphore(capacity, capacity);
_sGet = new Semaphore(0, capacity);
}
public ChannelResult<T> Put(T item)
{
var waitAny = WaitHandle.WaitAny(new[] { _sPut, _exitHandle }, _timeout);
if (waitAny == 1)
return new ChannelResult<T>(default(T), true, false, true, "Exited");
if (waitAny == WaitHandle.WaitTimeout)
return new ChannelResult<T>(default(T), true, true, false, "Timedout");
lock (_guard)
{
_sGet.Release();
_q.Enqueue(item);
return new ChannelResult<T>(item, false, false, false, "Enqueued");
}
}
public ChannelResult<T> Get()
{
var waitAny = WaitHandle.WaitAny(new[] { _sGet, _exitHandle }, _timeout);
if (waitAny == 1)
return new ChannelResult<T>(default(T), true, false, true, "Exited");
if (waitAny == WaitHandle.WaitTimeout)
return new ChannelResult<T>(default(T), true, true, false, "Timedout");
lock (_guard)
{
_sPut.Release();
return new ChannelResult<T>(_q.Dequeue(), false, false, false, "Dequeued");
}
}
}
}