Skip to content

Commit

Permalink
Improve (%41) read speed by replacing concurrent queue (#28)
Browse files Browse the repository at this point in the history
* Add single producer and single consumer queue using LIFO enumeration capability..

* Use the new queue. Performance improvementr for reads: %41

* Usae inline functions instead of lambda delegates.
  • Loading branch information
koculu authored May 26, 2023
1 parent 5874454 commit 3e960ce
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 78 deletions.
6 changes: 3 additions & 3 deletions src/ZoneTree.UnitTests/BottomSegmentMergeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public void IntIntBottomMerge()
}
}

var expected1 = new long[] { 11, 13, 15, 17, 19, 21 };
var expected2 = new long[] { 11, 13, 51, 21 };
var expected1 = new long[] { 21, 19, 17, 15, 13, 11 };
var expected2 = new long[] { 21, 51, 13, 11 };

var sum =
var sum =
m.BottomSegments.Sum(x => x.Length) +
m.InMemoryRecordCount +
m.DiskSegment.Length;
Expand Down
238 changes: 238 additions & 0 deletions src/ZoneTree/Collections/SingleProducerSingleConsumerQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
using System.Collections;

namespace Tenray.ZoneTree.Collections;

/// <summary>
/// Special Queue for ZoneTree.
/// 1. SingleProducerSingleConsumerQueue is
/// - thread-safe for single producer and single consumer.
/// - thread safe for many readers / enumerations
/// 2. enquue method uses lock when it is full which makes it almost lock-free for inserts.
/// 3. dequeue uses lock but the producer almost never hit the lock.
/// 4. Despite this is a FIFO Queue, the enumerator is in LIFO order
/// to optimize record lookup at TryGetFromReadonlySegments.
/// Enqueue/Dequeue items in FIFO order: i1,i2,i3,i4
/// Enumeration in LIFO order: i4,i3,i2,i1
/// </summary>
/// <typeparam name="TQueueItem">Type of the queue item.</typeparam>
public sealed class SingleProducerSingleConsumerQueue<TQueueItem>
: IEnumerable<TQueueItem>
where TQueueItem : class
{
class QueueItemsChunk
{
const int ChunkSize = 16;

/// <summary>
/// Start of the queue inclusive.
/// </summary>
public volatile int Start = 0;

/// <summary>
/// End of the queue exclusive.
/// </summary>
public volatile int End = 0;

public volatile TQueueItem[] Items = new TQueueItem[ChunkSize];

public bool IsEmpty => Start == End;

public int ItemsCount
{
get
{
var size = Items.Length;
return (End + size - Start) % size;
}
}

public IReadOnlyList<TQueueItem> ToFirstInFirstArray()
{
var items = Items;
var size = items.Length;
var end = End;
var start = Start;
var list = new List<TQueueItem>(ItemsCount);

while (start != end)
{
var item = items[start];
if (item == null)
continue;
list.Add(item);
start = (start + 1) % size;
}
return list;
}

public IReadOnlyList<TQueueItem> ToLastInFirstArray()
{
var items = Items;
var size = items.Length;
var end = (size + End - 1) % size;
var start = (size + Start - 1) % size;
var list = new List<TQueueItem>(ItemsCount);

while (start != end)
{
var item = items[end];
if (item == null)
continue;
list.Add(item);
end = (size + end - 1) % size;
}
return list;
}
}

public int Length => Chunk.ItemsCount;

public bool IsEmpty => Chunk.IsEmpty;

volatile QueueItemsChunk Chunk = new();

public SingleProducerSingleConsumerQueue()
{
}

public SingleProducerSingleConsumerQueue(IEnumerable<TQueueItem> list)
{
foreach (var item in list)
{
Enqueue(item);
}
}

/// <summary>
/// Enqueue should not be called more than once at the same time.
/// </summary>
public void Enqueue(TQueueItem item)
{
var chunk = Chunk;
var items = chunk.Items;
var size = items.Length;
var end = chunk.End;
if ((end + 1) % size == chunk.Start)
{
// queue is full or was full.
// lock frequency of enqueue is almost zero due to the exponential size increase.
lock (this)
{
var newItems = new TQueueItem[size * 2];
Array.Copy(items, newItems, size);
if (end < chunk.Start)
{
Array.Copy(items, 0, newItems, size, end);
Array.Fill(newItems, null, 0, end);
end = size + chunk.End;
}
chunk = Chunk = new QueueItemsChunk
{
Items = newItems,
Start = chunk.Start,
End = end
};
items = newItems;
}
size *= 2;
}
items[end] = item;
chunk.End = (end + 1) % size;
}

/// <summary>
/// TryDequeue should not be called more than once at the same time.
/// </summary>
public bool TryDequeue(out TQueueItem item)
{
var chunk = Chunk;
var start = chunk.Start;
var items = chunk.Items;
var size = items.Length;
item = items[start];
if (item == null)
return false;

lock (this)
{
if (!ReferenceEquals(chunk, Chunk))
{
chunk = Chunk;
items = chunk.Items;
size = items.Length;
}
items[start] = null;
chunk.Start = (start + 1) % size;
}
return true;
}

public IReadOnlyList<TQueueItem> ToLastInFirstArray() => Chunk.ToLastInFirstArray();

public IReadOnlyList<TQueueItem> ToFirstInFirstArray() => Chunk.ToFirstInFirstArray();

class LastInFirstEnumerator : IEnumerator<TQueueItem>
{
TQueueItem current;

public TQueueItem Current => current;

object IEnumerator.Current => current;

readonly QueueItemsChunk Chunk;

TQueueItem[] Items;

int Start;

int End;

int Size;

public LastInFirstEnumerator(QueueItemsChunk chunk)
{
Chunk = chunk;
Reset();
}

public void Dispose()
{
}

/// <summary>
/// The enumeration of this quewue is LIFO.
/// </summary>
/// <returns></returns>
public bool MoveNext()
{
do
{
if (Start == End)
return false;
current = Items[End];
End = (Size + End - 1) % Size;
}
while (current == null);
return true;
}

public void Reset()
{
var chunk = Chunk;
Items = chunk.Items;
Size = Items.Length;
Start = (Size + chunk.Start - 1) % Size;
End = (Size + chunk.End - 1) % Size;
}
}

public IEnumerator<TQueueItem> GetEnumerator()
{
return new LastInFirstEnumerator(Chunk);
}

IEnumerator IEnumerable.GetEnumerator()
{
return new LastInFirstEnumerator(Chunk);
}
}
Loading

0 comments on commit 3e960ce

Please sign in to comment.