Skip to content

Commit

Permalink
Move pubsub topic manager to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
joelpurra committed Jan 18, 2018
1 parent 2e48990 commit 7d72b62
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 46 deletions.
52 changes: 32 additions & 20 deletions index.js
@@ -1,39 +1,51 @@
const Promise = require("bluebird");

const TwitchPubSubConnection = require("./src/twitch-pubsub-connection");
const TwitchPubSubConnection = require("./src/twitch/pubsub-connection");
const TwitchPubSubManager = require("./src/twitch/pubsub-manager");

// TODO: better token/config handling.
const twitchWebSocketUri = "wss://pubsub-edge.twitch.tv/";
const twitchAppAccessToken = process.env["TWITCH_APP_ACCESS_TOKEN"];
const twitchUserAccessToken = process.env["TWITCH_USER_ACCESS_TOKEN"];
const twitchChannelId = 148460096;
const MAX_LISTEN_TIME_MILLISECONDS = 5 * 60 * 1000;
const MAX_LISTEN_TIME_MILLISECONDS = 5 * 1000;

const twitchPubSubConnection = new TwitchPubSubConnection(twitchWebSocketUri);
const twitchPubSubManager = new TwitchPubSubManager(twitchPubSubConnection, twitchChannelId, twitchUserAccessToken);

twitchPubSubConnection.connect().then(() => {
console.log("Connected.");
Promise.resolve().then(() => twitchPubSubConnection.connect()).then(() => {
const disconnect = (error) => twitchPubSubConnection.disconnect().then(() => {
if (error) {
console.error("Disconnected.", error);
} else {
console.log("Disconnected.");
}

const disconnect = () => twitchPubSubConnection.disconnect().then(() => {
console.log("Disconnected.");
return undefined;
});

return Promise.resolve().then(() => {
const topics = [`channel-bits-events-v1.${twitchChannelId}`, `channel-subscribe-events-v1.${twitchChannelId}`, `channel-commerce-events-v1.${twitchChannelId}`, `whispers.${twitchChannelId}`];
const dataHandler = (topic, data) => {
console.log("dataHandler", topic, JSON.stringify(data, null, 2));
};
return Promise.resolve().then(() => twitchPubSubManager.start()).then(() => {
console.log("Started.");

return twitchPubSubConnection.listen(twitchUserAccessToken, topics, dataHandler).then((killSwitch) => {
console.log(`Listening for ${MAX_LISTEN_TIME_MILLISECONDS} milliseconds.`);
const stop = (error) => twitchPubSubManager.stop().then(() => {
if (error) {
console.error("Stopped.", error);
} else {
console.log("Stopped.");
}

return Promise.delay(MAX_LISTEN_TIME_MILLISECONDS).then(() => killSwitch(), () => killSwitch());
return undefined;
});
}).then(() => disconnect(), (error) => {
console.error("Error.", error);

return disconnect();
}).then(() => {
process.exit();
});
console.log(`Online for ${MAX_LISTEN_TIME_MILLISECONDS} milliseconds.`);

// TODO: perform more work here.
return Promise.delay(MAX_LISTEN_TIME_MILLISECONDS).then(() => stop(), (error) => stop(error));
}).then(() => disconnect(), (error) => disconnect(error))
}).then(() => {
process.exit();
}, (error) => {
console.log("Error.", error);

process.exit(1);
});
53 changes: 27 additions & 26 deletions src/twitch-pubsub-connection.js → src/twitch/pubsub-connection.js
Expand Up @@ -3,16 +3,16 @@ const Promise = require("bluebird");

const WebSocket = require("ws");

module.exports = class TwitchPubSubConnection {
module.exports = class PubSubConnection {
constructor(uri) {
this.uri = uri;
this._uri = uri;

this.ws = null;
this.connected = false;
this._ws = null;
this._connected = false;
}

connect() {
assert(!this.connected);
assert(!this._connected);

return new Promise((resolve, reject) => {
const onOpen = () => {
Expand All @@ -21,7 +21,7 @@ module.exports = class TwitchPubSubConnection {
};
const message = JSON.stringify(data);

this.ws.send(message);
this._ws.send(message);
}

const onError = (e) => {
Expand All @@ -31,11 +31,11 @@ module.exports = class TwitchPubSubConnection {
}

const onClose = () => {
this.connected = false;
this._connected = false;

unregisterListeners();

this.ws = null;
this._ws = null;

}

Expand All @@ -44,43 +44,43 @@ module.exports = class TwitchPubSubConnection {
const data = JSON.parse(message);

if (data.type === "PONG") {
this.connected = true;
this._connected = true;
resolve();
}

unregisterListeners();
}

const registerListeners = () => {
this.ws.once("open", onOpen);
this.ws.once("error", onError);
this.ws.once("close", onClose);
this.ws.once("message", onMessage);
this._ws.once("open", onOpen);
this._ws.once("error", onError);
this._ws.once("close", onClose);
this._ws.once("message", onMessage);
}

const unregisterListeners = () => {
this.ws.removeListener("open", onOpen);
this.ws.removeListener("error", onError);
this.ws.removeListener("message", onMessage);
this._ws.removeListener("open", onOpen);
this._ws.removeListener("error", onError);
this._ws.removeListener("message", onMessage);
}

this.ws = new WebSocket(this.uri);
this._ws = new WebSocket(this._uri);

registerListeners();
});
}

disconnect() {
assert(this.connected);
assert(this._connected);

return new Promise.try(() => {
// TODO: ensure the connection was closed.
this.ws.close();
this._ws.close();
});
}

listen(userAccessToken, topics, dataHandler) {
assert(this.connected);
assert(this._connected);

return new Promise((resolve, reject) => {
const onMessage = (message) => {
Expand All @@ -93,7 +93,8 @@ module.exports = class TwitchPubSubConnection {

if (!topics.includes(data.data.topic)) {
return;
}w
}
w

const messageData = JSON.parse(data.data.message);

Expand All @@ -118,11 +119,11 @@ module.exports = class TwitchPubSubConnection {
reject(new Error(`Bad type: ${JSON.stringify(data.type)}`));
}

this.ws.removeListener("message", onListen);
this.ws.on("message", onMessage);
this._ws.removeListener("message", onListen);
this._ws.on("message", onMessage);

const killSwitch = () => {
this.ws.removeListener("message", onMessage);
this._ws.removeListener("message", onMessage);
};

resolve(killSwitch);
Expand All @@ -140,9 +141,9 @@ module.exports = class TwitchPubSubConnection {
};
const message = JSON.stringify(data);

this.ws.on("message", onListen);
this._ws.on("message", onListen);

this.ws.send(message);
this._ws.send(message);
});
}
}
41 changes: 41 additions & 0 deletions src/twitch/pubsub-manager.js
@@ -0,0 +1,41 @@
const Promise = require("bluebird");

module.exports = class PubSubManager {
constructor(pubSubConnection, channelId, userAccessToken) {
this._pubSubConnection = pubSubConnection;
this._channelId = channelId;
this._userAccessToken = userAccessToken;

// TODO: one class per listen-topic, or one class per concern?
this._topics = [`channel-bits-events-v1.${this._channelId}`, `channel-subscribe-events-v1.${this._channelId}`, `channel-commerce-events-v1.${this._channelId}`, `whispers.${this._channelId}`];

this._killSwitch = null;
}

start() {
return this._pubSubConnection.listen(this._userAccessToken, this._topics, this._dataHandler.bind(this)).then((killSwitch) => {
this._killSwitch = killSwitch;
}).tapCatch(() => this._executeKillSwitch());
}

stop() {
// TODO: assert killSwitch?
return Promise.try(() => {
if (typeof this._killSwitch === "function") {
this._executeKillSwitch();
}
});
}

_dataHandler(topic, data) {
console.log("dataHandler", topic, JSON.stringify(data, null, 2));
}

_executeKillSwitch() {
return Promise.try(() => {
const killSwitch = this._killSwitch;
this._killSwitch = null;
killSwitch();
});
}
}

0 comments on commit 7d72b62

Please sign in to comment.