Skip to content

Commit

Permalink
Move the twitch per-user pubsub api to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
joelpurra committed Apr 2, 2018
1 parent b1dbb1c commit 7a3953c
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 90 deletions.
73 changes: 43 additions & 30 deletions src/backend/src/main/authenticated-application-main.ts
Expand Up @@ -68,11 +68,12 @@ import IIncomingWhisperEvent from "../twitch/polling/event/iincoming-whisper-eve
import IPollingCheermotesResponse from "../twitch/polling/handler/icheermotes-polling-response";
import IPollingFollowingResponse from "../twitch/polling/handler/ifollowing-polling-response";
import IPollingStreamingResponse from "../twitch/polling/handler/istreaming-polling-response";

import perUserHandlersMain from "./per-user-handlers-main";
import twitchPerUserPubSubApi from "./twitch-per-user-pubsub-api";

export default async function authenticatedApplicationMain(
export default async function backendAuthenticatedApplicationMain(
config: Config,
mainLogger: PinoLogger,
rootLogger: PinoLogger,
gracefulShutdownManager: GracefulShutdownManager,
messageQueuePublisher: MessageQueuePublisher,
Expand All @@ -81,6 +82,8 @@ export default async function authenticatedApplicationMain(
twitchCSRFHelper: TwitchCSRFHelper,
twitchTokenHelper: TwitchTokenHelper,
): Promise<void> {
const authenticatedApplicationMainLogger = rootLogger.child("backendAuthenticatedApplicationMain");

const twitchApplicationAccessTokenProvider: ApplicationAccessTokenProviderType =
async () => twitchApplicationTokenManager.getOrWait();

Expand Down Expand Up @@ -274,53 +277,63 @@ export default async function authenticatedApplicationMain(

await Bluebird.map(connectables, async (connectable) => connectable.connect());

mainLogger.info("Connected.");
authenticatedApplicationMainLogger.info("Connected.");

const disconnect = async (incomingError?: Error) => {
await Bluebird.map(connectables, async (connectable) => {
try {
connectable.disconnect();
} catch (error) {
mainLogger.error(error, connectable, "Swallowed error while disconnecting.");
authenticatedApplicationMainLogger.error(error, connectable, "Swallowed error while disconnecting.");
}
});

if (incomingError) {
mainLogger.error(incomingError, "Disconnected.");
authenticatedApplicationMainLogger.error(incomingError, "Disconnected.");

throw incomingError;
}

mainLogger.info("Disconnected.");
authenticatedApplicationMainLogger.info("Disconnected.");

return undefined;
};

try {
await perUserHandlersMain(
config,
mainLogger,
rootLogger,
gracefulShutdownManager,
messageQueuePublisher,
twitchIrcConnection,
twitchPollingFollowingConnection,
twitchPollingStreamingConnection,
twitchPollingCheermotesConnection,
twitchAllPubSubTopicsForTwitchUserIdConnection,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingPubSubEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForITwitchIncomingIrcCommand,
twitchMessageQueueSingleItemJsonTopicsSubscriberForITwitchOutgoingIrcCommand,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingFollowingEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingStreamingEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingCheermotesEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingCheeringEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingWhisperEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingSubscriptionEvent,
vidyMessageQueueSingleItemJsonTopicsSubscriberForIOutgoingSearchCommand,
vidyMessageQueueSingleItemJsonTopicsSubscriberForIIncomingSearchResultEvent,
twitchUserId,
);
await Promise.all([
twitchPerUserPubSubApi(
config,
rootLogger,
gracefulShutdownManager,
messageQueuePublisher,
twitchAllPubSubTopicsForTwitchUserIdConnection,
twitchUserId,
),

perUserHandlersMain(
config,
authenticatedApplicationMainLogger,
rootLogger,
gracefulShutdownManager,
messageQueuePublisher,
twitchIrcConnection,
twitchPollingFollowingConnection,
twitchPollingStreamingConnection,
twitchPollingCheermotesConnection,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingPubSubEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForITwitchIncomingIrcCommand,
twitchMessageQueueSingleItemJsonTopicsSubscriberForITwitchOutgoingIrcCommand,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingFollowingEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingStreamingEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingCheermotesEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingCheeringEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingWhisperEvent,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingSubscriptionEvent,
vidyMessageQueueSingleItemJsonTopicsSubscriberForIOutgoingSearchCommand,
vidyMessageQueueSingleItemJsonTopicsSubscriberForIIncomingSearchResultEvent,
twitchUserId,
),
]);

await disconnect();
} catch (error) {
Expand Down
9 changes: 4 additions & 5 deletions src/backend/src/main/main.ts
Expand Up @@ -50,9 +50,9 @@ import TwitchCSRFHelper from "../twitch/helper/csrf-helper";
import TwitchRequestHelper from "../twitch/helper/request-helper";
import TwitchTokenHelper from "../twitch/helper/token-helper";

import managerMain from "./manager-main";
import backendManagerMain from "./manager-main";

export default async function main(
export default async function backendMain(
logger: PinoLogger,
gracefulShutdownManager: GracefulShutdownManager,
messageQueuePublisher: MessageQueuePublisher,
Expand All @@ -71,7 +71,7 @@ export default async function main(

config.validate();

const backendLogger = logger.child("backend");
const backendLogger = logger.child("backendMain");

const databaseConnection = new DatabaseConnection(backendLogger, config.databaseUri);

Expand Down Expand Up @@ -108,10 +108,9 @@ export default async function main(
config.twitchAppClientId,
);

await managerMain(
await backendManagerMain(
config,
backendLogger,
backendLogger,
gracefulShutdownManager,
databaseConnection,
messageQueueAllRawTopicsSubscriber,
Expand Down
16 changes: 8 additions & 8 deletions src/backend/src/main/managed-main.ts
Expand Up @@ -31,11 +31,10 @@ import TwitchCSRFHelper from "../twitch/helper/csrf-helper";
import TwitchRequestHelper from "../twitch/helper/request-helper";
import TwitchTokenHelper from "../twitch/helper/token-helper";

import authenticatedApplicationMain from "./authenticated-application-main";
import backendAuthenticatedApplicationMain from "./authenticated-application-main";

export default async function managedMain(
export default async function backendManagedMain(
config: Config,
mainLogger: PinoLogger,
rootLogger: PinoLogger,
gracefulShutdownManager: GracefulShutdownManager,
messageQueuePublisher: MessageQueuePublisher,
Expand All @@ -45,31 +44,32 @@ export default async function managedMain(
twitchPollingApplicationTokenConnection: TwitchPollingApplicationTokenConnection,
twitchApplicationTokenManager: TwitchApplicationTokenManager,
): Promise<void> {
const backendManagedMainLogger = rootLogger.child("backendManagedMain");

await twitchPollingApplicationTokenConnection.connect();
await twitchApplicationTokenManager.start();
await twitchApplicationTokenManager.getOrWait();

mainLogger.info("Application authenticated.");
backendManagedMainLogger.info("Application authenticated.");

const disconnectAuthentication = async (incomingError?: Error) => {
await twitchApplicationTokenManager.stop();
await twitchPollingApplicationTokenConnection.disconnect();

if (incomingError) {
mainLogger.error(incomingError, "Unauthenticated.");
backendManagedMainLogger.error(incomingError, "Unauthenticated.");

throw incomingError;
}

mainLogger.info("Unauthenticated.");
backendManagedMainLogger.info("Unauthenticated.");

return undefined;
};

try {
await authenticatedApplicationMain(
await backendAuthenticatedApplicationMain(
config,
mainLogger,
rootLogger,
gracefulShutdownManager,
messageQueuePublisher,
Expand Down
20 changes: 10 additions & 10 deletions src/backend/src/main/manager-main.ts
Expand Up @@ -36,11 +36,10 @@ import TwitchTokenHelper from "../twitch/helper/token-helper";

import DistributedEventManager from "../distributed-events/distributed-event-manager";
import DistributedEventRepository from "../storage/repository/distributed-event-repository";
import managedMain from "./managed-main";
import backendManagedMain from "./managed-main";

export default async function managerMain(
export default async function backendManagerMain(
config: Config,
mainLogger: PinoLogger,
rootLogger: PinoLogger,
gracefulShutdownManager: GracefulShutdownManager,
databaseConnection: DatabaseConnection,
Expand All @@ -52,44 +51,45 @@ export default async function managerMain(
twitchPollingApplicationTokenConnection: TwitchPollingApplicationTokenConnection,
twitchApplicationTokenManager: TwitchApplicationTokenManager,
): Promise<void> {
const backendManagerMainLogger = rootLogger.child("backendManagerMain");

await databaseConnection.connect();
await messageQueueAllRawTopicsSubscriber.connect();

// TODO: ensure event distributed event manager starts sooner?
const distributedEventStorageManager = new DistributedEventStorageManager(
mainLogger,
backendManagerMainLogger,
DistributedEventRepository,
);
const distributedEventManager = new DistributedEventManager(
mainLogger,
backendManagerMainLogger,
messageQueueAllRawTopicsSubscriber,
distributedEventStorageManager,
);

await distributedEventManager.start();

mainLogger.info("Managed.");
backendManagerMainLogger.info("Managed.");

const shutdown = async (incomingError?: Error) => {
await distributedEventManager.stop();
await messageQueueAllRawTopicsSubscriber.disconnect();
await databaseConnection.disconnect();

if (incomingError) {
mainLogger.error(incomingError, "Unmanaged.");
backendManagerMainLogger.error(incomingError, "Unmanaged.");

throw incomingError;
}

mainLogger.info("Unmanaged.");
backendManagerMainLogger.info("Unmanaged.");

return undefined;
};

try {
await managedMain(
await backendManagedMain(
config,
mainLogger,
rootLogger,
gracefulShutdownManager,
messageQueuePublisher,
Expand Down
37 changes: 0 additions & 37 deletions src/backend/src/main/per-user-handlers-main.ts
Expand Up @@ -66,7 +66,6 @@ import TwitchWhisperIrcReplyHandler from "../twitch/polling/handler/whisper-irc-
import IncomingCheeringCommandEventTranslator from "../twitch/polling/event-handler/incoming-cheering-event-translator";
import IncomingCheermotesCommandEventTranslator from "../twitch/polling/event-handler/incoming-cheermotes-event-translator";
import IncomingFollowingCommandEventTranslator from "../twitch/polling/event-handler/incoming-following-event-translator";
import IncomingPubSubEventTranslator from "../twitch/polling/event-handler/incoming-pubsub-event-translator";
import IncomingStreamingCommandEventTranslator from "../twitch/polling/event-handler/incoming-streaming-event-translator";
import IncomingSubscriptionCommandEventTranslator from "../twitch/polling/event-handler/incoming-subscription-event-translator";
/* tslint:enable max-line-length */
Expand All @@ -87,11 +86,6 @@ import VidyAuthenticatedRequest from "../../vidy/request/authenticated-request";
import IncomingWhisperCommandEventTranslator from "../twitch/polling/event-handler/incoming-whisper-event-translator";
import IIncomingWhisperEvent from "../twitch/polling/event/iincoming-whisper-event";

import TwitchPubSubLoggingHandler from "../twitch/pubsub/handler/logging";
import TwitchPubSubPingHandler from "../twitch/pubsub/handler/ping";
import TwitchPubSubReconnectHandler from "../twitch/pubsub/handler/reconnect";
import TwitchPubSubConnection from "../twitch/pubsub/pubsub-connection";

export default async function perUserHandlersMain(
config: Config,
mainLogger: PinoLogger,
Expand All @@ -102,7 +96,6 @@ export default async function perUserHandlersMain(
twitchPollingFollowingConnection: PollingClientIdConnection<IPollingFollowingResponse>,
twitchPollingStreamingConnection: PollingClientIdConnection<IPollingStreamingResponse>,
twitchPollingCheermotesConnection: PollingClientIdConnection<IPollingCheermotesResponse>,
twitchAllPubSubTopicsForTwitchUserIdConnection: TwitchPubSubConnection,
twitchMessageQueueSingleItemJsonTopicsSubscriberForIIncomingPubSubEvent:
MessageQueueSingleItemJsonTopicsSubscriber<IIncomingPubSubEvent>,
twitchMessageQueueSingleItemJsonTopicsSubscriberForITwitchIncomingIrcCommand:
Expand All @@ -127,32 +120,6 @@ export default async function perUserHandlersMain(
MessageQueueSingleItemJsonTopicsSubscriber<VidyIIncomingSearchResultEvent>,
twitchUserId: number,
): Promise<void> {
const twitchPubSubPingHandler = new TwitchPubSubPingHandler(
rootLogger,
twitchAllPubSubTopicsForTwitchUserIdConnection,
);
const twitchPubSubReconnectHandler = new TwitchPubSubReconnectHandler(
rootLogger,
twitchAllPubSubTopicsForTwitchUserIdConnection,
);
const twitchPubSubLoggingHandler = new TwitchPubSubLoggingHandler(
rootLogger,
twitchAllPubSubTopicsForTwitchUserIdConnection,
);

const messageQueueTopicPublisherForIIncomingPubSubEvent =
new MessageQueueTopicPublisher<IIncomingPubSubEvent>(
rootLogger,
messageQueuePublisher,
config.topicTwitchIncomingPubSubEvent,
);

const twitchIncomingPubSubEventTranslator = new IncomingPubSubEventTranslator(
rootLogger,
twitchAllPubSubTopicsForTwitchUserIdConnection,
messageQueueTopicPublisherForIIncomingPubSubEvent,
);

const twitchIrcReconnectHandler = new TwitchIrcReconnectHandler(
rootLogger,
twitchIrcConnection,
Expand Down Expand Up @@ -392,10 +359,6 @@ export default async function perUserHandlersMain(

const startables: IStartableStoppable[] = [
twitchIrcReconnectHandler,
twitchPubSubPingHandler,
twitchPubSubReconnectHandler,
twitchPubSubLoggingHandler,
twitchIncomingPubSubEventTranslator,
twitchIrcLoggingHandler,
twitchIrcPingHandler,
twitchIrcGreetingHandler,
Expand Down

0 comments on commit 7a3953c

Please sign in to comment.