diff --git a/rest-helix/build.gradle b/rest-helix/build.gradle
index f5e388d2a..9019995a8 100644
--- a/rest-helix/build.gradle
+++ b/rest-helix/build.gradle
@@ -7,6 +7,9 @@ dependencies {
api group: 'io.github.openfeign', name: 'feign-hystrix'
api group: 'commons-configuration', name: 'commons-configuration'
+ // Rate Limiting
+ api group: 'com.github.vladimir-bukhtoyarov', name: 'bucket4j-core'
+
// Jackson (JSON)
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java b/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java
index 0ab86add5..cc6f601f3 100644
--- a/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java
+++ b/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java
@@ -4,8 +4,11 @@
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
+import com.github.twitch4j.common.util.ThreadUtils;
import com.github.twitch4j.common.util.TypeConvert;
import com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor;
+import com.github.twitch4j.helix.interceptor.TwitchHelixDecoder;
+import com.github.twitch4j.helix.interceptor.TwitchHelixHttpClient;
import com.netflix.config.ConfigurationManager;
import feign.Logger;
import feign.Request;
@@ -17,7 +20,9 @@
import feign.slf4j.Slf4jLogger;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -82,6 +87,12 @@ public class TwitchHelixBuilder {
@With
private ProxyConfig proxyConfig = null;
+ /**
+ * Scheduler Thread Pool Executor
+ */
+ @With
+ private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;
+
/**
* Initialize the builder
*
@@ -113,19 +124,22 @@ public TwitchHelix build() {
if (proxyConfig != null)
proxyConfig.apply(clientBuilder);
+ // Executor for rate limiting
+ if (scheduledThreadPoolExecutor == null)
+ scheduledThreadPoolExecutor = ThreadUtils.getDefaultScheduledThreadPoolExecutor("twitch4j-" + RandomStringUtils.random(4, true, true), 1);
+
// Feign
- TwitchHelix client = HystrixFeign.builder()
- .client(new OkHttpClient(clientBuilder.build()))
+ TwitchHelixClientIdInterceptor interceptor = new TwitchHelixClientIdInterceptor(this);
+ return HystrixFeign.builder()
+ .client(new TwitchHelixHttpClient(new OkHttpClient(clientBuilder.build()), scheduledThreadPoolExecutor, interceptor, timeout))
.encoder(new JacksonEncoder(mapper))
- .decoder(new JacksonDecoder(mapper))
+ .decoder(new TwitchHelixDecoder(mapper, interceptor))
.logger(new Slf4jLogger())
.logLevel(logLevel)
.errorDecoder(new TwitchHelixErrorDecoder(new JacksonDecoder()))
- .requestInterceptor(new TwitchHelixClientIdInterceptor(this))
+ .requestInterceptor(interceptor)
.options(new Request.Options(timeout / 3, TimeUnit.MILLISECONDS, timeout, TimeUnit.MILLISECONDS, true))
.retryer(new Retryer.Default(500, timeout, 2))
.target(TwitchHelix.class, baseUrl);
-
- return client;
}
}
diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java
index f924e60a5..0d9534e18 100644
--- a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java
+++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java
@@ -7,10 +7,16 @@
import com.github.twitch4j.helix.TwitchHelixBuilder;
import feign.RequestInterceptor;
import feign.RequestTemplate;
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.Bucket;
+import io.github.bucket4j.Bucket4j;
+import lombok.AccessLevel;
+import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -20,6 +26,14 @@
@Slf4j
public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
+ public static final String AUTH_HEADER = "Authorization";
+ public static final String BEARER_PREFIX = "Bearer ";
+
+ /**
+ * @see Helix Rate Limit Reference
+ */
+ private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.simple(800, Duration.ofMinutes(1));
+
/**
* Reference to the Client Builder
*/
@@ -34,11 +48,19 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
/**
* Access token cache
*/
+ @Getter(value = AccessLevel.PROTECTED)
private final Cache accessTokenCache = Caffeine.newBuilder()
- .expireAfterWrite(15, TimeUnit.MINUTES)
+ .expireAfterAccess(15, TimeUnit.MINUTES)
.maximumSize(10_000)
.build();
+ /**
+ * Rate limit buckets by user/app
+ */
+ private final Cache buckets = Caffeine.newBuilder()
+ .expireAfterAccess(1, TimeUnit.MINUTES)
+ .build();
+
/**
* The default app access token that is used if no oauth was passed by the user
*/
@@ -47,7 +69,7 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
/**
* The default client id, typically associated with {@link TwitchHelixClientIdInterceptor#defaultAuthToken}
*/
- private String defaultClientId;
+ private volatile String defaultClientId;
/**
* Constructor
@@ -60,8 +82,10 @@ public TwitchHelixClientIdInterceptor(TwitchHelixBuilder twitchHelixBuilder) {
this.defaultClientId = twitchAPIBuilder.getClientId();
this.defaultAuthToken = twitchHelixBuilder.getDefaultAuthToken();
if (defaultAuthToken != null)
- twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken)
- .ifPresent(oauth -> this.defaultClientId = (String) oauth.getContext().get("client_id"));
+ twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken).ifPresent(oauth -> {
+ this.defaultClientId = (String) oauth.getContext().get("client_id");
+ accessTokenCache.put(oauth.getAccessToken(), oauth);
+ });
}
/**
@@ -74,8 +98,8 @@ public void apply(RequestTemplate template) {
String clientId = this.defaultClientId;
// if a oauth token is passed is has to match that client id, default to global client id otherwise (for ie. token verification)
- if (template.headers().containsKey("Authorization")) {
- String oauthToken = template.headers().get("Authorization").iterator().next().substring("Bearer ".length());
+ if (template.headers().containsKey(AUTH_HEADER)) {
+ String oauthToken = template.headers().get(AUTH_HEADER).iterator().next().substring(BEARER_PREFIX.length());
if (oauthToken.isEmpty()) {
String clientSecret = twitchAPIBuilder.getClientSecret();
@@ -88,8 +112,8 @@ public void apply(RequestTemplate template) {
throw new RuntimeException("Failed to generate an app access token as no oauth token was passed to this Helix call", e);
}
- template.removeHeader("Authorization");
- template.header("Authorization", "Bearer " + oauthToken);
+ template.removeHeader(AUTH_HEADER);
+ template.header(AUTH_HEADER, BEARER_PREFIX + oauthToken);
} else {
OAuth2Credential verifiedCredential = accessTokenCache.getIfPresent(oauthToken);
if (verifiedCredential == null) {
@@ -115,11 +139,43 @@ public void apply(RequestTemplate template) {
template.header("User-Agent", twitchAPIBuilder.getUserAgent());
}
+ public void updateRemaining(String token, int remaining) {
+ OAuth2Credential credential = accessTokenCache.getIfPresent(token);
+ if (credential == null) return;
+
+ String key = getKey(credential);
+ if (key == null) return;
+
+ Bucket bucket = getOrInitializeBucket(key);
+ long diff = bucket.getAvailableTokens() - remaining;
+ if (diff > 0) bucket.tryConsumeAsMuchAsPossible(diff);
+ }
+
+ public void clearDefaultToken() {
+ this.defaultAuthToken = null;
+ }
+
+ protected String getKey(OAuth2Credential credential) {
+ String clientId = (String) credential.getContext().get("client_id");
+ return clientId == null ? null : credential.getUserId() == null ? clientId : clientId + "-" + credential.getUserId();
+ }
+
+ protected Bucket getOrInitializeBucket(String key) {
+ return buckets.get(key, k -> Bucket4j.builder().addLimit(DEFAULT_BANDWIDTH).build());
+ }
+
private OAuth2Credential getOrCreateAuthToken() {
if (defaultAuthToken == null)
synchronized (this) {
- if (defaultAuthToken == null)
- return (this.defaultAuthToken = twitchIdentityProvider.getAppAccessToken());
+ if (defaultAuthToken == null) {
+ String clientId = twitchAPIBuilder.getClientId();
+ OAuth2Credential token = twitchIdentityProvider.getAppAccessToken();
+ token.getContext().put("client_id", clientId);
+ getOrInitializeBucket(clientId);
+ accessTokenCache.put(token.getAccessToken(), token);
+ this.defaultClientId = clientId;
+ return this.defaultAuthToken = token;
+ }
}
return this.defaultAuthToken;
diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java
new file mode 100644
index 000000000..fefc32b4b
--- /dev/null
+++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java
@@ -0,0 +1,48 @@
+package com.github.twitch4j.helix.interceptor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import feign.Response;
+import feign.jackson.JacksonDecoder;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Collection;
+
+import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.AUTH_HEADER;
+import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.BEARER_PREFIX;
+
+public class TwitchHelixDecoder extends JacksonDecoder {
+
+ public static final String REMAINING_HEADER = "Ratelimit-Remaining";
+
+ private final TwitchHelixClientIdInterceptor interceptor;
+
+ public TwitchHelixDecoder(ObjectMapper mapper, TwitchHelixClientIdInterceptor interceptor) {
+ super(mapper);
+ this.interceptor = interceptor;
+ }
+
+ @Override
+ public Object decode(Response response, Type type) throws IOException {
+ // track rate limit for token
+ String token = singleFirst(response.request().headers().get(AUTH_HEADER));
+ if (token != null && token.startsWith(BEARER_PREFIX)) {
+ String remaining = singleFirst(response.headers().get(REMAINING_HEADER));
+ if (remaining != null) {
+ try {
+ interceptor.updateRemaining(token.substring(BEARER_PREFIX.length()), Integer.parseInt(remaining));
+ } catch (Exception ignored) {
+ }
+ }
+ }
+
+ // delegate to JacksonDecoder
+ return super.decode(response, type);
+ }
+
+ static String singleFirst(Collection collection) {
+ if (collection == null || collection.size() != 1) return null;
+ return collection.toArray(new String[1])[0];
+ }
+
+}
diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java
new file mode 100644
index 000000000..612542bb2
--- /dev/null
+++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java
@@ -0,0 +1,73 @@
+package com.github.twitch4j.helix.interceptor;
+
+import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
+import feign.Client;
+import feign.Request;
+import feign.Response;
+import feign.okhttp.OkHttpClient;
+import io.github.bucket4j.Bucket;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.AUTH_HEADER;
+import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.BEARER_PREFIX;
+import static com.github.twitch4j.helix.interceptor.TwitchHelixDecoder.singleFirst;
+
+@Slf4j
+public class TwitchHelixHttpClient implements Client {
+
+ private final Client client;
+ private final ScheduledExecutorService executor;
+ private final TwitchHelixClientIdInterceptor interceptor;
+ private final long timeout;
+
+ public TwitchHelixHttpClient(OkHttpClient client, ScheduledThreadPoolExecutor executor, TwitchHelixClientIdInterceptor interceptor, Integer timeout) {
+ this.client = client;
+ this.executor = executor;
+ this.interceptor = interceptor;
+ this.timeout = timeout == null ? 60 * 1000 : timeout.longValue();
+ }
+
+ @Override
+ public Response execute(Request request, Request.Options options) throws IOException {
+ // Check whether this request should be delayed to conform to rate limits
+ String token = singleFirst(request.headers().get(AUTH_HEADER));
+ if (token != null && token.startsWith(BEARER_PREFIX)) {
+ OAuth2Credential credential = interceptor.getAccessTokenCache().getIfPresent(token.substring(BEARER_PREFIX.length()));
+ if (credential != null) {
+ Bucket bucket = interceptor.getOrInitializeBucket(interceptor.getKey(credential));
+ if (bucket.tryConsume(1)) {
+ // no delay needed
+ return client.execute(request, options);
+ } else {
+ try {
+ // effectively blocking, unfortunately
+ return bucket.asAsyncScheduler().consume(1, executor)
+ .thenApplyAsync(v -> {
+ try {
+ return client.execute(request, options);
+ } catch (IOException e) {
+ log.error("Helix API call execution failed", e);
+ return null;
+ }
+ })
+ .get(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Throttled Helix API call timed-out before completion", e);
+ return null;
+ }
+ }
+ }
+ }
+
+ // Fallback: just run the http request
+ return client.execute(request, options);
+ }
+
+}
diff --git a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java
index 029533c34..74fb696ba 100644
--- a/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java
+++ b/twitch4j/src/main/java/com/github/twitch4j/TwitchClientBuilder.java
@@ -315,6 +315,7 @@ public TwitchClient build() {
.withUserAgent(userAgent)
.withDefaultAuthToken(defaultAuthToken)
.withRequestQueueSize(requestQueueSize)
+ .withScheduledThreadPoolExecutor(scheduledThreadPoolExecutor)
.withTimeout(timeout)
.withProxyConfig(proxyConfig)
.withLogLevel(feignLogLevel)