Skip to content
ikopylov edited this page Mar 1, 2018 · 1 revision

Qoollo Queues

Qoollo.Turbo.Queues namespace expose 4 types of thread-safe blocking queues:

  1. MemoryQueue<T> - queue that stores all elements in memory. This is a subclass of BlockingQueue<T>;
  2. DiskQueue<T> - queue that stores its elements on disk. It supports two types of storing segments:
    • NonPersistentDiskQueueSegment<T> - stored elements are valid only during the current execution of the program, they will all be deleted after the application is restarted;
    • PersistentDiskQueueSegment<T> - stored elements will remain in the queue even after the application is restarted;
  3. TransformationQueue<T> - a special type of queue that transforms its elements on the fly and puts them to the lower-level queue;
  4. LevelingQueue<T> - a special type of queue that allows to combine two different queues and work with them as if they are a single queue (e.g. combine MemoryQueue<T> with DiskQueue<T> to achieve high performance and large capacity).

Example:

class ItemSerializer : IDiskQueueItemSerializer<int>
{
    public int ExpectedSizeInBytes { get { return 4; } }

    public int Deserialize(BinaryReader reader)
    {
        return reader.ReadInt32();
    }

    public void Serialize(BinaryWriter writer, int item)
    {
        writer.Write(item);
    }
}

class TypeConverter : ITransformationQueueConverter<long, int>
{
    public int Convert(long item)
    {
        return (int)item;
    }

    public long ConvertBack(int item)
    {
        return item;
    }
}

private static IQueue<long> CreateQueue(string directoryOnDisk)
{
        // Low level disk queue
    DiskQueue<int> disk = new DiskQueue<int>(directoryOnDisk, 
                new NonPersistentDiskQueueSegmentFactory<int>(capacity: 10000, fileNamePrefix: "diskQ", serializer: new ItemSerializer()), 
                maxSegmentCount: 100, backgroundCompaction: true);
        // Memory queue above DiskQueue that allows to store up to 5000 transformed elements in memory
    MemoryQueue<int> preDisk = new MemoryQueue<int>(5000);
        // LevelingQueue to combine MemoryQueue with DiskQueue
    LevelingQueue<int> diskWrap = new LevelingQueue<int>(preDisk, disk, LevelingQueueAddingMode.PreserveOrder, isBackgroundTransferingEnabled: true);
        // Queue to transform long to int
    TransformationQueue<long, int> transform = new TransformationQueue<long, int>(diskWrap, new TypeConverter());
        // Memory queue above DiskQueue that allows to store up to 1000 original elements in memory
    MemoryQueue<long> topMem = new MemoryQueue<long>(1000);
        // Upper level queue that combines MemoryQueue with TransformationQueue
    LevelingQueue<long> topQ = new LevelingQueue<long>(topMem, transform, LevelingQueueAddingMode.PreserveOrder, isBackgroundTransferingEnabled: true);

    return topQ;
}

static void Main()
{
    using (var queue = CreateQueue(@"C:\disk_queue"))
    {
        queue.Add(100);
        Console.WriteLine(queue.Take());
    }
}