Skip to content
ikopylov edited this page Aug 25, 2020 · 1 revision

Qoollo ConcurrentBatchingQueue<T> and BlockingBatchingQueue<T>

BatchingQueue is a queue in which items are enqueued one-by-one and dequeued in batches. ConcurrentBatchingQueue<T> has similar interface to the standard ConcurrentQueue<T>, but it dequeues array of items and additionally implements CompleteCurrentBatch method. BlockingBatchingQueue<T> is similar to BlockingCollection<T>, but it wraps ConcurrentBatchingQueue<T>.

Sample code:

// Create queue
var queue = new ConcurrentBatchingQueue<int>(batchSize: 32);

// Enqueue full batch
for (int i = 0; i < 32; i++)
    queue.Enqueue(i);

// Dequeue batch
int[] batch = null;
queue.TryDequeue(out batch);

// Enqueue single item (not enough for one batch)
queue.Enqueue(1);

// Complete batch, which will allow to dequeue it
queue.CompleteCurrentBatch();

// Dequeue batch with single element
queue.TryDequeue(out batch);

// Dispose queue
queue.Dispose();

Typical code for batch processing in separate thread:

private readonly BlockingBatchingQueue<int> _queue;
private readonly int _timeout;

private void ThreadProc(CancellationToken token)
{
    int[] batch = null;
    try
    {
        while (!token.IsCancellationRequested)
        { 
            if (_queue.TryTake(_timeout, token, out batch))
                ProcessBatch(batch);
            else if (_queue.ElementCount > 0)
                _queue.CompleteCurrentBatch();
        }
    }
    catch (OperationCanceledException)
    {
        if (!token.IsCancellationRequested)
            throw;
    }

    while (_queue.TryTake(out batch))
        ProcessBatch(batch);

    _queue.CompleteCurrentBatch();

    if (_queue.TryTake(out batch))
        ProcessBatch(batch);
}

private void ProcessBatch(int[] batch)
{
    // ......
}