Skip to content

QueueAsyncProcessor

ikopylov edited this page Mar 21, 2015 · 7 revisions

Qoollo QueueAsyncProcessor

One of the most common scenario in parallel data processing is to put the data to the thread-safe blocking queue and later take and process them in multiple threads. If you want to use this pattern then QueueAsyncProcessor is right what you need.

QueueAsyncProcessor pattern

Sample source code:

// Define subclass of QueueAsyncProcessor parametrized with type of elements
public class DataProcessor: QueueAsyncProcessor<int>
{
    public DataProcessor(int threadCount, int maxQueueSize)
        : base(threadCount: threadCount, maxQueueSize: maxQueueSize, name: "name")
    {
    }

    // Implement main process method
    protected override void Process(int element, object state, CancellationToken token)
    {
        Console.WriteLine(element);
    }
}


// usage
static void Main()
{
    // Create instance of QueueAsyncProcessor
    DataProcessor processor = new DataProcessor(Environment.ProcessorCount, 64 * Environment.ProcessorCount);
    
    // Start our processor
    processor.Start();

    // Add elements for processing
    for (int i = 0; i < 100; i++)
        processor.Add(i);

    // Stop our processor
    processor.Stop(waitForStop: true, letFinishProcess: true, completeAdding: true);
}

If you need, there's also exist a version of processor which received a processing delegate as constructor parameter (DelegateQueueAsyncProcessor).