Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LeakyBucketExecutionPolicy support for requests context #672

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion ShopifySharp.Tests/RetryExecutionPolicies_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task NonFullLeakyBucketBreachShouldRetryWhenConstructorBoolIsFalse(
try
{
//trip the 5 orders per minute limit on dev stores
foreach (var i in Enumerable.Range(0, 10))
foreach (var i in Enumerable.Range(0, 6))
{
await OrderService.CreateAsync(this.Order);
}
Expand Down Expand Up @@ -131,5 +131,29 @@ public async Task LeakyBucketGraphQLBreachShouldAttemptRetry()

Assert.False(caught);
}


[Fact]
public async Task ForegroundRequestsMustRunBeforeBackgroundRequests()
{
var context = RequestContext.Background;
DateTime? backgroundCompletedAt = null;
DateTime? foregroundCompletedAt = null;


OrderService.SetExecutionPolicy(new LeakyBucketExecutionPolicy(getRequestContext: () => context));

//kick off background requests, which will trigger a throttle
var bgTask = Task.WhenAll(Enumerable.Range(0, 50).Select(async _ => await OrderService.ListAsync()))
.ContinueWith(_ => backgroundCompletedAt = DateTime.UtcNow);

context = RequestContext.Foreground;
var fgTask = Task.WhenAll(Enumerable.Range(0, 10).Select(async _ => await OrderService.ListAsync(new Filters.OrderListFilter { Status = "any" })))
.ContinueWith(_ => foregroundCompletedAt = DateTime.UtcNow);

await Task.WhenAll(bgTask, fgTask);

Assert.True(foregroundCompletedAt < backgroundCompletedAt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;

namespace ShopifySharp
{
internal class ContextAwareQueue<T>
{
private readonly Queue<T> _BackgroundQueue = new Queue<T>();

private readonly Queue<T> _ForegroundQueue = new Queue<T>();

private readonly Func<RequestContext> _getContext;

public int Count => _BackgroundQueue.Count + _ForegroundQueue.Count;

public ContextAwareQueue(Func<RequestContext> getContext)
{
_getContext = getContext;
}

public void Enqueue(T i)
{
(_getContext() == RequestContext.Background ? _BackgroundQueue : _ForegroundQueue).Enqueue(i);
}

public T Peek() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Peek() : _BackgroundQueue.Peek();

public T Dequeue() => _ForegroundQueue.Count > 0 ? _ForegroundQueue.Dequeue() : _BackgroundQueue.Dequeue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,26 @@ public Request(int cost, CancellationToken cancelToken)
internal double ComputedCurrentlyAvailable => Math.Min(MaximumAvailable,
LastCurrentlyAvailable + ((_getTime() - LastUpdatedAt).TotalSeconds * RestoreRatePerSecond));

private Func<DateTime> _getTime;
private readonly Func<DateTime> _getTime;

private Queue<Request> _waitingRequests = new Queue<Request>();
private readonly ContextAwareQueue<Request> _waitingRequests;

private object _lock = new object();

private CancellationTokenSource _cancelNextSchedule;

public LeakyBucket(int maximumAvailable, int restoreRatePerSecond)
: this(maximumAvailable, restoreRatePerSecond, () => DateTime.UtcNow)
public LeakyBucket(int maximumAvailable, int restoreRatePerSecond, Func<RequestContext> getRequestContext)
: this(maximumAvailable, restoreRatePerSecond, () => DateTime.UtcNow, getRequestContext)
{
}

internal LeakyBucket(int maximumAvailable, int restoreRatePerSecond, Func<DateTime> getTime)
internal LeakyBucket(int maximumAvailable, int restoreRatePerSecond, Func<DateTime> getTime, Func<RequestContext> getRequestContext = null)
{
if (maximumAvailable <= 0 || restoreRatePerSecond <= 0)
throw new ArgumentOutOfRangeException();

_getTime = getTime;
_waitingRequests = new ContextAwareQueue<Request>(getRequestContext ?? (new Func<RequestContext>(() => RequestContext.Foreground)));
MaximumAvailable = maximumAvailable;
RestoreRatePerSecond = restoreRatePerSecond;
LastCurrentlyAvailable = maximumAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,32 @@ public class LeakyBucketExecutionPolicy : IRequestExecutionPolicy
private static ConcurrentDictionary<string, MultiShopifyAPIBucket> _shopAccessTokenToLeakyBucket = new ConcurrentDictionary<string, MultiShopifyAPIBucket>();

private readonly bool _retryRESTOnlyIfLeakyBucketFull;

public LeakyBucketExecutionPolicy(bool retryRESTOnlyIfLeakyBucketFull = true)
private readonly Func<RequestContext> _getRequestContext;

/// <summary>
/// Creates a new LeakyBucketExecutionPolicy.
/// It is not recommended to create multiple instances for different access tokens because each instance maintain that leaky bucket state
/// and is not aware of other instances
/// </summary>
/// <param name="retryRESTOnlyIfLeakyBucketFull">Controls when the request should be retried when a Shopify returns an HTTP 429 to a REST request.
/// If true (default), then the policy only retries if the 429 is due to a empty bucket.
/// If false, then the policy also retries for other types of 429. For example, Shopify will return a 429 if one tries to create too many products too quickly on dev stores
/// </param>
/// <param name="getRequestContext">Indicates the current request context, either Foreground or Background.
/// Foreground requests will be priortized to execute before any background requests can run.
/// RequestContext.Foregroud can be used for requests where a user is waiting (e.g loading a web page to show results of query).
/// RequestContext.Background can be used for background requests triggered by job, where no user is waiting.
/// By default, all requests are served in FIFO order</param>
public LeakyBucketExecutionPolicy(bool retryRESTOnlyIfLeakyBucketFull = true, Func<RequestContext> getRequestContext = null)
{
_retryRESTOnlyIfLeakyBucketFull = retryRESTOnlyIfLeakyBucketFull;
_getRequestContext = getRequestContext ?? new Func<RequestContext>(() => RequestContext.Foreground);
}

public async Task<RequestResult<T>> Run<T>(CloneableRequestMessage baseRequest, ExecuteRequestAsync<T> executeRequestAsync, CancellationToken cancellationToken, int? graphqlQueryCost = null)
{
var accessToken = GetAccessToken(baseRequest);
var bucket = accessToken == null ? null : _shopAccessTokenToLeakyBucket.GetOrAdd(accessToken, _ => new MultiShopifyAPIBucket());
var bucket = accessToken == null ? null : _shopAccessTokenToLeakyBucket.GetOrAdd(accessToken, _ => new MultiShopifyAPIBucket(_getRequestContext));
bool isGraphQL = baseRequest.RequestUri.AbsolutePath.EndsWith("graphql.json");

while (true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ internal class MultiShopifyAPIBucket
private const int DEFAULT_GRAPHQL_MAX_AVAILABLE = 1_000;
private const int DEFAULT_GRAPHQL_RESTORE_RATE = 50;

private LeakyBucket RESTBucket { get; } = new LeakyBucket(DEFAULT_REST_MAX_AVAILABLE, DEFAULT_REST_RESTORE_RATE);
private LeakyBucket RESTBucket { get; }

private LeakyBucket GraphQLBucket { get; } = new LeakyBucket(DEFAULT_GRAPHQL_MAX_AVAILABLE, DEFAULT_GRAPHQL_RESTORE_RATE);
private LeakyBucket GraphQLBucket { get; }

public MultiShopifyAPIBucket(Func<RequestContext> getRequestContext)
{
RESTBucket = new LeakyBucket(DEFAULT_REST_MAX_AVAILABLE, DEFAULT_REST_RESTORE_RATE, getRequestContext);
GraphQLBucket = new LeakyBucket(DEFAULT_GRAPHQL_MAX_AVAILABLE, DEFAULT_GRAPHQL_RESTORE_RATE, getRequestContext);
}

public async Task WaitForAvailableRESTAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ShopifySharp
{
public enum RequestContext
{
Foreground,
Background
}
}