Skip to content

Commit

Permalink
feat: add eventsub websocket module (#716)
Browse files Browse the repository at this point in the history
* feat: add eventsub websocket module

* refactor: use more SubscriptionWrapper

* fix: obtain latest sub id on unregister

* fix: ensure transport is set

* fix: avoid NPE on null token

* fix: serialize transport method as lowercase

* chore(eventsub): skip nulls when serializing subscription

* fix: avoid NPE in TwitchEventSocketPool#handleUnsubscription

* fix: use associated token on unregister

* fix: use default token even if null is passed to register

* chore: add websocket_disconnected status

* feat: allow multi-user eventsub ws pool

* fix: stricter pool close behavior

* refactor: simplify eventsub ws pools

* chore: rename package

* refactor: simplify some logic

* docs: explain methods of IEventSubSocket

* docs: explain various eventsocket impl classes

* feat: add meta events for eventsub websockets

* refactor: throw exception on pool usage after close

* refactor: use EventManagerUtils.initializeEventManager

* fix: improve long-term reconnect robustness

* feat: allow custom backoff strategy

* feat: add sub delete failure meta event

* fix: add EventSocketDeleteSubscriptionFailureEvent

* feat: add eventsocket to client builders

* feat: skip retry create sub if 4xx error

* feat: add advancedConfiguration to TwitchEventSocketPool

* chore: log when not retrying

* fix: avoid reconnecting when unused

* fix(builders): apply eventsub proxy via advanced config

* feat: add EventSocketSubscriptionFailureEvent#willRetry

* chore: more annotations
  • Loading branch information
iProdigy committed Feb 15, 2023
1 parent a2c1e58 commit c8599cc
Show file tree
Hide file tree
Showing 38 changed files with 1,931 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
@Slf4j
public class TwitchIdentityProvider extends OAuth2IdentityProvider {

public static final String PROVIDER_NAME = "twitch";

/**
* Constructor
*
Expand All @@ -28,7 +30,7 @@ public class TwitchIdentityProvider extends OAuth2IdentityProvider {
* @param redirectUrl Redirect Url
*/
public TwitchIdentityProvider(String clientId, String clientSecret, String redirectUrl) {
super("twitch", "oauth2", clientId, clientSecret, "https://id.twitch.tv/oauth2/authorize", "https://id.twitch.tv/oauth2/token", redirectUrl);
super(PROVIDER_NAME, "oauth2", clientId, clientSecret, "https://id.twitch.tv/oauth2/authorize", "https://id.twitch.tv/oauth2/token", redirectUrl);

// configuration
this.tokenEndpointPostType = "QUERY";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ protected int getSubscriptionSize(String s) {

@Override
protected TwitchChat createConnection() {
if (closed.get()) throw new IllegalStateException("Chat socket cannot be created after pool was closed!");

// Instantiate with configuration
TwitchChat chat = advancedConfiguration.apply(
TwitchChatBuilder.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public void onTextMessage(WebSocket ws, String text) {
config.onTextMessage().accept(text);
}

@Override
public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) {
config.onCloseFrame().accept(frame.getCloseCode(), frame.getCloseReason());
}

@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) {
if (connectionState.get() != WebsocketConnectionState.DISCONNECTING) {
Expand Down Expand Up @@ -248,6 +253,11 @@ public void connect() {
*/
@Synchronized
public void disconnect() {
// Cancel any outstanding reconnect task
Future<?> reconnector = reconnectTask.getAndSet(null);
if (reconnector != null)
reconnector.cancel(false);

WebsocketConnectionState connectionState = this.connectionState.get();

if (connectionState == WebsocketConnectionState.DISCONNECTED) {
Expand Down Expand Up @@ -319,11 +329,6 @@ public void close() throws Exception {
if (backoffClearer != null)
backoffClearer.cancel(false);

// Cancel any outstanding reconnect task
Future<?> reconnector = reconnectTask.getAndSet(null);
if (reconnector != null)
reconnector.cancel(false);

// Disconnect from socket
try {
// This call does not block, so we use CountdownLatch to block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -56,6 +58,7 @@ public void validate() {
Objects.requireNonNull(onDisconnecting, "onDisconnecting may not be null!");
Objects.requireNonNull(onPreDisconnect, "onPreDisconnect may not be null!");
Objects.requireNonNull(onPostDisconnect, "onPostDisconnect may not be null!");
Objects.requireNonNull(onCloseFrame, "onCloseFrame may not be null!");
}

/**
Expand Down Expand Up @@ -149,6 +152,11 @@ public void validate() {
*/
private Runnable onPostDisconnect = () -> {};

/**
* called after receiving a close frame from the server
*/
private BiConsumer<@NotNull Integer, @Nullable String> onCloseFrame = (code, reason) -> {};

/**
* proxy config
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ public abstract class SubscriptionConnectionPool<C, S, T, U> extends AbstractCon
*/
protected final ConcurrentMap<S, C> subscriptions = new ConcurrentHashMap<>();

/**
* Tracks whether the pool has been closed.
*/
protected final AtomicBoolean closed = new AtomicBoolean();

@Override
public T subscribe(S s) {
if (closed.get()) throw new IllegalStateException("Subscription cannot be created after pool was closed!");
C prevConnection = subscriptions.get(s);
if (prevConnection != null) return handleDuplicateSubscription(null, prevConnection, s);
final int size = getSubscriptionSize(s);
Expand All @@ -83,7 +89,7 @@ public U unsubscribe(T t) {
final S request = getRequestFromSubscription(t);
final C connection = subscriptions.remove(request);
final U u = handleUnsubscription(connection, t);
if (connection != null)
if (connection != null && !closed.get())
decrementSubscriptions(connection, getSubscriptionSize(request));
return u;
}
Expand All @@ -101,6 +107,17 @@ protected Iterable<C> getConnections() {
return Collections.unmodifiableCollection(connections);
}

@Override
public void close() {
if (!closed.getAndSet(true)) {
Collection<C> drained = new ArrayList<>(numConnections());
saturatedConnections.removeIf(drained::add);
unsaturatedConnections.keySet().removeIf(drained::add);
drained.forEach(this::disposeConnection);
subscriptions.clear();
}
}

/**
* @return the total number of subscriptions held by all connections
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.philippheuer.events4j.core.EventManager;
import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.util.EventManagerUtils;
import com.github.twitch4j.common.util.ThreadUtils;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -28,7 +29,7 @@ public abstract class TwitchModuleConnectionPool<C, X, Y, Z, B> extends Subscrip
*/
@Getter
@Builder.Default
private final EventManager eventManager = createEventManager();
private final EventManager eventManager = EventManagerUtils.initializeEventManager(SimpleEventHandler.class);

/**
* The {@link EventManager} to be used by connections in this pool.
Expand Down Expand Up @@ -85,11 +86,4 @@ protected EventManager getDefaultConnectionEventManager() {
return this.getEventManager();
}

protected static EventManager createEventManager() {
EventManager em = new EventManager();
em.autoDiscovery();
em.setDefaultEventHandler(SimpleEventHandler.class); // Unfortunately hard-coded due to the static nature of @Builder.Default; the user can override later
return em;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Setter;
import org.jetbrains.annotations.Nullable;

@Data
@Setter(AccessLevel.PRIVATE)
Expand All @@ -26,7 +27,10 @@ public class EventSubNotification {

/**
* The value of challenge from the callback verification request must be returned to complete the verification process.
* <p>
* Note: websocket subscriptions do not involve a challenge handshake.
*/
@Nullable
private String challenge;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.twitch4j.eventsub.condition.EventSubCondition;
import com.github.twitch4j.eventsub.subscriptions.SubscriptionType;
Expand All @@ -17,15 +18,17 @@
import lombok.ToString;
import lombok.With;
import lombok.experimental.Accessors;
import org.jetbrains.annotations.ApiStatus;

import java.time.Instant;
import java.util.Map;

@Data
@Builder
@Builder(toBuilder = true)
@Setter(AccessLevel.PRIVATE)
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EventSubSubscription {

/**
Expand All @@ -36,6 +39,7 @@ public class EventSubSubscription {
/**
* Status of the subscription.
*/
@Setter(value = AccessLevel.PUBLIC, onMethod_ = { @Deprecated, @ApiStatus.Internal })
private EventSubSubscriptionStatus status;

/**
Expand All @@ -60,6 +64,7 @@ public class EventSubSubscription {
* Object indicating the notification delivery specific information.
*/
@With
@Setter(value = AccessLevel.PUBLIC, onMethod_ = { @Deprecated, @ApiStatus.Internal })
private EventSubTransport transport;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,78 @@ public enum EventSubSubscriptionStatus {
* Designates that the subscription is in an operable state and is valid.
*/
ENABLED,

/**
* Webhook is pending verification of the callback specified in the subscription creation request.
*/
WEBHOOK_CALLBACK_VERIFICATION_PENDING,

/**
* Webhook failed verification of the callback specified in the subscription creation request.
*/
WEBHOOK_CALLBACK_VERIFICATION_FAILED,

/**
* Notification delivery failure rate was too high.
*/
NOTIFICATION_FAILURES_EXCEEDED,

/**
* Authorization for user(s) in the condition was revoked.
*/
AUTHORIZATION_REVOKED,

/**
* The moderator that authorized the subscription is no longer one of the broadcaster’s moderators.
*/
MODERATOR_REMOVED,

/**
* A user in the condition of the subscription was removed.
*/
USER_REMOVED,

/**
* Twitch revoked your subscription because the subscribed to subscription type and version is no longer supported.
*/
VERSION_REMOVED;
VERSION_REMOVED,

/**
* When you connect to the server, you must create a subscription within 10 seconds or the connection is closed.
* <p>
* Note: The time limit is subject to change.
*/
WEBSOCKET_CONNECTION_UNUSED,

/**
* The client closed the connection.
*/
WEBSOCKET_DISCONNECTED,

/**
* You must respond to ping messages with a pong message.
*/
WEBSOCKET_FAILED_PING_PONG,

/**
* Indicates a problem with the server (similar to an HTTP 500 status code).
*/
WEBSOCKET_INTERNAL_ERROR,

/**
* Transient network error.
*/
WEBSOCKET_NETWORK_ERROR,

/**
* Transient network timeout.
*/
WEBSOCKET_NETWORK_TIMEOUT,

/**
* Sending outgoing messages to the server is prohibited, except for pong messages.
*/
WEBSOCKET_RECEIVED_INBOUND_TRAFFIC;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,77 @@
package com.github.twitch4j.eventsub;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.With;
import lombok.extern.jackson.Jacksonized;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Instant;

@Data
@Builder
@Setter(AccessLevel.PRIVATE)
@NoArgsConstructor
@AllArgsConstructor
@Jacksonized
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EventSubTransport {

/**
* The transport method.
*/
@NotNull
private EventSubTransportMethod method;

/**
* The callback URL where the notification should be sent.
* <p>
* Specify this field only if method is set to webhook.
*/
@Nullable
private String callback;

/**
* The secret used for verifying a signature.
* The secret used for verifying a webhook signature.
* <p>
* The secret must be an ASCII string that’s a minimum of 10 characters long and a maximum of 100 characters long.
* <p>
* Specify this field only if method is set to webhook.
*/
@Nullable
private String secret;

/**
* An ID that identifies the WebSocket that notifications are sent to.
* <p>
* Specify this field only if method is set to websocket.
*/
@With
@Nullable
@Setter(value = AccessLevel.PUBLIC, onMethod_ = { @Deprecated, @ApiStatus.Internal })
private String sessionId;

/**
* The UTC date and time that the WebSocket connection was established.
* <p>
* Included only if method is set to websocket.
*/
@Nullable
private Instant connectedAt;

/**
* The UTC date and time that the WebSocket connection was lost.
* <p>
* Included only if method is set to websocket.
*/
@Nullable
private Instant disconnectedAt;

}

0 comments on commit c8599cc

Please sign in to comment.