Skip to content

Commit

Permalink
Merge pull request #1020 from clement911/master
Browse files Browse the repository at this point in the history
Using a fifo semaphore to ensure that requests are granted access to …
  • Loading branch information
clement911 committed Feb 19, 2024
2 parents 3f722c7 + 32758ff commit c693018
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
@@ -0,0 +1,37 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ShopifySharp
{
internal class FifoSemaphore
{
private SemaphoreSlim _semaphore;

private ConcurrentQueue<TaskCompletionSource<bool>> queue = new ();

public int QueueCount => queue.Count;

public FifoSemaphore(int initialCount, int maxConcurrency)
{
_semaphore = new SemaphoreSlim(initialCount, maxConcurrency);
}

public Task WaitAsync(CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<bool>();
queue.Enqueue(tcs);
_semaphore.WaitAsync(cancellationToken).ContinueWith(t =>
{
if (queue.TryDequeue(out var popped))
popped.SetResult(true);
});
return tcs.Task;
}

public void Release()
{
_semaphore.Release();
}
}
}
Expand Up @@ -9,7 +9,7 @@ internal class LeakyBucket
private class Request
{
public int cost;
public SemaphoreSlim semaphore = new SemaphoreSlim(0, 1);
public FifoSemaphore fifoSemaphore = new FifoSemaphore(0, 1);
public CancellationToken cancelToken;

public Request(int cost, CancellationToken cancelToken)
Expand Down Expand Up @@ -109,7 +109,7 @@ public async Task WaitForAvailableAsync(int requestCost, CancellationToken cance
if (_waitingRequests.Count == 1)
ScheduleTryGrantNextPendingRequest(r);
}
await r.semaphore.WaitAsync(cancellationToken);
await r.fifoSemaphore.WaitAsync(cancellationToken);
}

private void ScheduleTryGrantNextPendingRequest(Request r)
Expand All @@ -132,7 +132,7 @@ private void TryGrantNextPendingRequest()
var r = _waitingRequests.Dequeue();
if (!r.cancelToken.IsCancellationRequested)
{
r.semaphore.Release();
r.fifoSemaphore.Release();
ConsumeAvailable(r);
}
}
Expand Down

0 comments on commit c693018

Please sign in to comment.