Skip to content
ikopylov edited this page Mar 1, 2018 · 1 revision

MonitorObject

MonitorObject is an object oriented abstraction over BCL Monitor.Wait(), Monitor.Pulse()/Monitor.PulseAll(). It allows to wait until some condition is fulfilled. It supports CancellationToken out of the box, which was always a problem when using Monitor.Wait() methods.

Simple example:

private MonitorObject _monitor = new MonitorObject();
private voltile int _count = 1;

public bool Process(int timeout, CancellationToken token)
{
    using (var waiter = monitor.Enter(timeout, token)) // Enter to the synchronized section
    {
        if (!waiter.Wait(state => state._count == 0, this)) // Wait until _count becomes 0
            return false; // Timeout happened

        // Perform some processing when _count become 0
        Console.WriteLine("Processing");
        return true;
    }
}
public void Notify()
{
    Interlocked.Decrement(ref _count); // Change count variable
    _monitor.Pulse(); // Notify one waiter that count has changed
}

Another example (blocking queue):

public class MonitorThreadSafeQueue<T>
{
    private readonly Threading.MonitorObject _monitorFull = new Threading.MonitorObject();
    private readonly Threading.MonitorObject _monitorEmpty = new Threading.MonitorObject();
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly int _maxCount = 1000;
    private volatile int _itemCount = 0;

    public MonitorThreadSafeQueue(int boundingCapacity)
    {
        _maxCount = boundingCapacity;
    }

    public bool TryAdd(T value, int timeout, CancellationToken token)
    {
        bool result = false;
        using (var waiter = _monitorFull.Enter(timeout, token))
        {
            do
            {
                if (_itemCount < _maxCount)
                {
                    _queue.Enqueue(value);
                    Interlocked.Increment(ref _itemCount);
                    result = true;
                    break;
                }
            }
            while (waiter.Wait());
        }

        if (result)
            _monitorEmpty.Pulse();
        return result;
    }

    public bool TryTake(out T value, int timeout, CancellationToken token)
    {
        bool result = false;
        value = default(T);

        using (var waiter = _monitorEmpty.Enter(timeout, token))
        {
            do
            {
                if (_itemCount > 0)
                {
                    _queue.TryDequeue(out value);
                    Interlocked.Decrement(ref _itemCount);
                    result = true;
                    break;
                }
            }
            while (waiter.Wait());
        }

        if (result)
        {
            _monitorFull.Pulse();
            return true;
        }

        return false;
    }
}