Skip to content

Commit

Permalink
Merge pull request #672 from clement911/master
Browse files Browse the repository at this point in the history
  • Loading branch information
nozzlegear committed Oct 14, 2021
2 parents 5ca6bc0 + 84b795c commit f9a7d48
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 11 deletions.
26 changes: 25 additions & 1 deletion ShopifySharp.Tests/RetryExecutionPolicies_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task NonFullLeakyBucketBreachShouldRetryWhenConstructorBoolIsFalse(
try
{
//trip the 5 orders per minute limit on dev stores
foreach (var i in Enumerable.Range(0, 10))
foreach (var i in Enumerable.Range(0, 6))
{
await OrderService.CreateAsync(this.Order);
}
Expand Down Expand Up @@ -131,5 +131,29 @@ public async Task LeakyBucketGraphQLBreachShouldAttemptRetry()

Assert.False(caught);
}


[Fact]
public async Task ForegroundRequestsMustRunBeforeBackgroundRequests()
{
var context = RequestContext.Background;
DateTime? backgroundCompletedAt = null;
DateTime? foregroundCompletedAt = null;


OrderService.SetExecutionPolicy(new LeakyBucketExecutionPolicy(getRequestContext: () => context));

//kick off background requests, which will trigger a throttle
var bgTask = Task.WhenAll(Enumerable.Range(0, 50).Select(async _ => await OrderService.ListAsync()))
.ContinueWith(_ => backgroundCompletedAt = DateTime.UtcNow);

context = RequestContext.Foreground;
var fgTask = Task.WhenAll(Enumerable.Range(0, 10).Select(async _ => await OrderService.ListAsync(new Filters.OrderListFilter { Status = "any" })))
.ContinueWith(_ => foregroundCompletedAt = DateTime.UtcNow);

await Task.WhenAll(bgTask, fgTask);

Assert.True(foregroundCompletedAt < backgroundCompletedAt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;

namespace ShopifySharp
{
internal class ContextAwareQueue<T>
{
private readonly Queue<T> _BackgroundQueue = new Queue<T>();

private readonly Queue<T> _ForegroundQueue = new Queue<T>();

private readonly Func<RequestContext> _getContext;

public int Count => _BackgroundQueue.Count + _ForegroundQueue.Count;

public ContextAwareQueue(Func<RequestContext> getContext)
{
_getContext = getContext;
}

public void Enqueue(T i)
{
(_getContext() == RequestContext.Background ? _BackgroundQueue : _ForegroundQueue).Enqueue(i);
}

public T Peek() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Peek() : _BackgroundQueue.Peek();

public T Dequeue() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Dequeue() : _BackgroundQueue.Dequeue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ public Request(int cost, CancellationToken cancelToken)
internal double ComputedCurrentlyAvailable => Math.Min(MaximumAvailable,
LastCurrentlyAvailable + ((_getTime() - LastUpdatedAt).TotalSeconds * RestoreRatePerSecond));

private Func<DateTime> _getTime;
private readonly Func<DateTime> _getTime;

private Queue<Request> _waitingRequests = new Queue<Request>();
private readonly ContextAwareQueue<Request> _waitingRequests;

private object _lock = new object();

private CancellationTokenSource _cancelNextSchedule;

public LeakyBucket(int maximumAvailable, int restoreRatePerSecond)
: this(maximumAvailable, restoreRatePerSecond, () => DateTime.UtcNow)
public LeakyBucket(int maximumAvailable, int restoreRatePerSecond, Func<RequestContext> getRequestContext)
: this(maximumAvailable, restoreRatePerSecond, () => DateTime.UtcNow, getRequestContext)
{
}

internal LeakyBucket(int maximumAvailable, int restoreRatePerSecond, Func<DateTime> getTime)
internal LeakyBucket(int maximumAvailable, int restoreRatePerSecond, Func<DateTime> getTime, Func<RequestContext> getRequestContext = null)
{
if (maximumAvailable <= 0 || restoreRatePerSecond <= 0)
throw new ArgumentOutOfRangeException();

_getTime = getTime;
_waitingRequests = new ContextAwareQueue<Request>(getRequestContext ?? (new Func<RequestContext>(() => RequestContext.Foreground)));
MaximumAvailable = maximumAvailable;
RestoreRatePerSecond = restoreRatePerSecond;
LastCurrentlyAvailable = maximumAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,32 @@ public class LeakyBucketExecutionPolicy : IRequestExecutionPolicy
private static ConcurrentDictionary<string, MultiShopifyAPIBucket> _shopAccessTokenToLeakyBucket = new ConcurrentDictionary<string, MultiShopifyAPIBucket>();

private readonly bool _retryRESTOnlyIfLeakyBucketFull;

public LeakyBucketExecutionPolicy(bool retryRESTOnlyIfLeakyBucketFull = true)
private readonly Func<RequestContext> _getRequestContext;

/// <summary>
/// Creates a new LeakyBucketExecutionPolicy.
/// It is not recommended to create multiple instances for different access tokens because each instance maintain that leaky bucket state
/// and is not aware of other instances
/// </summary>
/// <param name="retryRESTOnlyIfLeakyBucketFull">Controls when the request should be retried when a Shopify returns an HTTP 429 to a REST request.
/// If true (default), then the policy only retries if the 429 is due to a empty bucket.
/// If false, then the policy also retries for other types of 429. For example, Shopify will return a 429 if one tries to create too many products too quickly on dev stores
/// </param>
/// <param name="getRequestContext">Indicates the current request context, either Foreground or Background.
/// Foreground requests will be priortized to execute before any background requests can run.
/// RequestContext.Foregroud can be used for requests where a user is waiting (e.g loading a web page to show results of query).
/// RequestContext.Background can be used for background requests triggered by job, where no user is waiting.
/// By default, all requests are served in FIFO order</param>
public LeakyBucketExecutionPolicy(bool retryRESTOnlyIfLeakyBucketFull = true, Func<RequestContext> getRequestContext = null)
{
_retryRESTOnlyIfLeakyBucketFull = retryRESTOnlyIfLeakyBucketFull;
_getRequestContext = getRequestContext ?? new Func<RequestContext>(() => RequestContext.Foreground);
}

public async Task<RequestResult<T>> Run<T>(CloneableRequestMessage baseRequest, ExecuteRequestAsync<T> executeRequestAsync, CancellationToken cancellationToken, int? graphqlQueryCost = null)
{
var accessToken = GetAccessToken(baseRequest);
var bucket = accessToken == null ? null : _shopAccessTokenToLeakyBucket.GetOrAdd(accessToken, _ => new MultiShopifyAPIBucket());
var bucket = accessToken == null ? null : _shopAccessTokenToLeakyBucket.GetOrAdd(accessToken, _ => new MultiShopifyAPIBucket(_getRequestContext));
bool isGraphQL = baseRequest.RequestUri.AbsolutePath.EndsWith("graphql.json");

while (true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ internal class MultiShopifyAPIBucket
private const int DEFAULT_GRAPHQL_MAX_AVAILABLE = 1_000;
private const int DEFAULT_GRAPHQL_RESTORE_RATE = 50;

private LeakyBucket RESTBucket { get; } = new LeakyBucket(DEFAULT_REST_MAX_AVAILABLE, DEFAULT_REST_RESTORE_RATE);
private LeakyBucket RESTBucket { get; }

private LeakyBucket GraphQLBucket { get; } = new LeakyBucket(DEFAULT_GRAPHQL_MAX_AVAILABLE, DEFAULT_GRAPHQL_RESTORE_RATE);
private LeakyBucket GraphQLBucket { get; }

public MultiShopifyAPIBucket(Func<RequestContext> getRequestContext)
{
RESTBucket = new LeakyBucket(DEFAULT_REST_MAX_AVAILABLE, DEFAULT_REST_RESTORE_RATE, getRequestContext);
GraphQLBucket = new LeakyBucket(DEFAULT_GRAPHQL_MAX_AVAILABLE, DEFAULT_GRAPHQL_RESTORE_RATE, getRequestContext);
}

public async Task WaitForAvailableRESTAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ShopifySharp
{
public enum RequestContext
{
Foreground,
Background
}
}

0 comments on commit f9a7d48

Please sign in to comment.