Skip to content

Commit

Permalink
Merge branch 'feature/rxjs' into develop
Browse files Browse the repository at this point in the history
* feature/rxjs:
  Convert remaining connections, connection managers, handlers to use rxjs
  Convert irc websocket connection to use rxjs
  Convert graceful shutdown manager to rxjs (improperly)
  Add RxJS
  • Loading branch information
joelpurra committed Feb 18, 2018
2 parents 7a0a0de + 27e75dc commit e9a966a
Show file tree
Hide file tree
Showing 40 changed files with 1,448 additions and 1,009 deletions.
64 changes: 49 additions & 15 deletions index.ts
Expand Up @@ -23,6 +23,9 @@ import {
assert,
} from "check-types";

// NOTE: this is a hack, modifying the global Rx.Observable.prototype.
import "./lib/rxjs-extensions/async-filter";

import fs from "fs";

import pino from "pino";
Expand All @@ -35,7 +38,13 @@ import PinoLogger from "./src/util/pino-logger";

import TwitchApplicationTokenManager from "./src/twitch/authentication/application-token-manager";
import TwitchPollingApplicationTokenConnection from "./src/twitch/authentication/polling-application-token-connection";
import {
ApplicationAccessTokenProviderType,
AugmentedTokenProviderType,
UserAccessTokenProviderType,
} from "./src/twitch/authentication/provider-types";
import TwitchUserTokenManager from "./src/twitch/authentication/user-token-manager";

import TwitchCSRFHelper from "./src/twitch/helper/csrf-helper";
import TwitchRequestHelper from "./src/twitch/helper/request-helper";
import TwitchTokenHelper from "./src/twitch/helper/token-helper";
Expand All @@ -52,6 +61,8 @@ import TwitchIrcConnection from "./src/twitch/irc/irc-connection";
import PollingClientIdConnection from "./src/twitch/polling/connection/polling-clientid-connection";
import TwitchPollingFollowingHandler from "./src/twitch/polling/handler/following";
import TwitchPubSubLoggingHandler from "./src/twitch/pubsub/handler/logging";
import TwitchPubSubPingHandler from "./src/twitch/pubsub/handler/ping";
import TwitchPubSubReconnectHandler from "./src/twitch/pubsub/handler/reconnect";
import TwitchPubSubConnection from "./src/twitch/pubsub/pubsub-connection";

const BOTTEN_NAPPET_DEFAULT_LOGGING_LEVEL = "error";
Expand Down Expand Up @@ -82,7 +93,7 @@ const twitchOAuthTokenVerificationUri = "https://api.twitch.tv/kraken";
const twitchUsersDataUri = "https://api.twitch.tv/helix/users";
const twitchPubSubWebSocketUri = "wss://pubsub-edge.twitch.tv/";
const twitchIrcWebSocketUri = "wss://irc-ws.chat.twitch.tv:443/";
const followingPollingLimit = 10;
const followingPollingLimit = 5;
const twitchAppScopes = [
"channel_feed_read",
];
Expand Down Expand Up @@ -181,7 +192,8 @@ const main = async () => {
};

try {
const twitchApplicationAccessTokenProvider = async () => twitchApplicationTokenManager.getOrWait();
const twitchApplicationAccessTokenProvider: ApplicationAccessTokenProviderType =
async () => twitchApplicationTokenManager.getOrWait();

const twitchUserHelper = new TwitchUserHelper(
rootLogger,
Expand All @@ -206,24 +218,36 @@ const main = async () => {

const userTokenManager = new TwitchUserTokenManager(rootLogger, twitchTokenHelper, twitchUserTokenHelper);

const twitchAugmentedTokenProvider = async () => userTokenManager.get(twitchUserName);
const twitchUserAccessTokenProvider = async () => {
const augmentedToken = await twitchAugmentedTokenProvider();
const twitchAugmentedTokenProvider: AugmentedTokenProviderType =
async () => userTokenManager.get(twitchUserName);
const twitchUserAccessTokenProvider: UserAccessTokenProviderType =
async () => {
const augmentedToken = await twitchAugmentedTokenProvider();

return augmentedToken.token.access_token;
};
return augmentedToken.token.access_token;
};

const twitchAugmentedToken = await twitchAugmentedTokenProvider();
const twitchUserRawToken = twitchAugmentedToken.token;
const twitchUserId = await twitchTokenHelper.getUserIdByRawAccessToken(twitchUserRawToken);

// TODO: use twitchUserIdProvider instead of twitchUserId.
// const twitchUserIdProvider = () => Promise.resolve(twitchUserId);
const allPubSubTopicsForTwitchUserId = [
`channel-bits-events-v1.${twitchUserId}`,
`channel-subscribe-events-v1.${twitchUserId}`,
`channel-commerce-events-v1.${twitchUserId}`,
`whispers.${twitchUserId}`,
];

const followingPollingUri =
`https://api.twitch.tv/kraken/channels/${twitchUserId}/follows?limit=${followingPollingLimit}`;

const twitchPubSubConnection = new TwitchPubSubConnection(rootLogger, twitchPubSubWebSocketUri);
const twitchAllPubSubTopicsForTwitchUserIdConnection = new TwitchPubSubConnection(
rootLogger,
twitchPubSubWebSocketUri,
allPubSubTopicsForTwitchUserId,
twitchUserAccessTokenProvider,
);

const twitchIrcConnection = new TwitchIrcConnection(
rootLogger,
twitchIrcWebSocketUri,
Expand All @@ -232,13 +256,21 @@ const main = async () => {
twitchUserAccessTokenProvider,
);

// TODO: use twitchUserIdProvider instead of twitchUserId.
const twitchPubSubPingHandler = new TwitchPubSubPingHandler(
rootLogger,
twitchAllPubSubTopicsForTwitchUserIdConnection,
);

const twitchPubSubReconnectHandler = new TwitchPubSubReconnectHandler(
rootLogger,
twitchAllPubSubTopicsForTwitchUserIdConnection,
);

const twitchPubSubLoggingHandler = new TwitchPubSubLoggingHandler(
rootLogger,
twitchPubSubConnection,
twitchUserAccessTokenProvider,
twitchUserId,
twitchAllPubSubTopicsForTwitchUserIdConnection,
);

const twitchPollingFollowingConnection = new PollingClientIdConnection(
rootLogger,
twitchAppClientId,
Expand All @@ -249,7 +281,7 @@ const main = async () => {
);

const connectables = [
twitchPubSubConnection,
twitchAllPubSubTopicsForTwitchUserIdConnection,
twitchIrcConnection,
twitchPollingFollowingConnection,
];
Expand Down Expand Up @@ -298,6 +330,8 @@ const main = async () => {
);

const startables = [
twitchPubSubPingHandler,
twitchPubSubReconnectHandler,
twitchPubSubLoggingHandler,
twitchIrcLoggingHandler,
twitchIrcPingHandler,
Expand Down
84 changes: 84 additions & 0 deletions lib/rxios/index.ts
@@ -0,0 +1,84 @@
// https://github.com/davguij/rxios
//
// NOTE: remove this file once rxios targets rxjs v6.
// https://github.com/davguij/rxios/pull/2

/*
The MIT License (MIT)
Copyright (c) 2017 David Guijarro
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

import axios, { AxiosInstance, AxiosPromise, AxiosRequestConfig } from "axios";
import { Observable } from "rxjs";

export interface rxiosConfig extends AxiosRequestConfig {
localCache?: boolean;
}

export class rxios {
private _httpClient: AxiosInstance;

constructor(private options: rxiosConfig = {}) {
this._httpClient = axios.create(options);
}

private _makeRequest<T>(method: string, url: string, queryParams?: object, body?: object) {
let request: AxiosPromise<T>;
switch (method) {
case "GET":
request = this._httpClient.get<T>(url, { params: queryParams });
break;
case "POST":
request = this._httpClient.post<T>(url, body, { params: queryParams });
break;
case "PUT":
request = this._httpClient.put<T>(url, body, { params: queryParams });
break;
case "PATCH":
request = this._httpClient.patch<T>(url, body, { params: queryParams });
break;
case "DELETE":
request = this._httpClient.delete(url, { params: queryParams });
break;

default:
throw new Error("Method not supported");
}
return new Observable<T>((subscriber) => {
request.then((response) => {
subscriber.next(response.data);
subscriber.complete();
}).catch((err: Error) => {
subscriber.error(err);
subscriber.complete();
});
});
}

public get<T>(url: string, queryParams?: object) {
return this._makeRequest<T>("GET", url, queryParams);
}

public post<T>(url: string, body: object, queryParams?: object) {
return this._makeRequest<T>("POST", url, queryParams, body);
}

public put<T>(url: string, body: object, queryParams?: object) {
return this._makeRequest<T>("PUT", url, queryParams, body);
}

public patch<T>(url: string, body: object, queryParams?: object) {
return this._makeRequest<T>("PATCH", url, queryParams, body);
}

public delete(url: string, queryParams?: object) {
return this._makeRequest("DELETE", url, queryParams);
}
}
30 changes: 30 additions & 0 deletions lib/rxjs-extensions/async-filter.ts
@@ -0,0 +1,30 @@
// https://stackoverflow.com/questions/28490700/is-there-an-async-version-of-filter-operator-in-rxjs
// https://stackoverflow.com/a/28561930/907779

// tslint:disable:only-arrow-functions
// tslint:disable:space-before-function-paren

import Rx from "rxjs";

// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
return this.flatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};

// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
return this.concatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};

// tslint:enable:space-before-function-paren
// tslint:enable:only-arrow-functions
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -48,6 +48,7 @@
"nedb": "^1.8.0",
"pino": "^4.10.3",
"qs": "^6.5.1",
"rxjs": "^6.0.0-alpha.3",
"ws": "^4.0.0"
},
"devDependencies": {
Expand Down
3 changes: 3 additions & 0 deletions rollup.config.index.js
Expand Up @@ -39,6 +39,9 @@ export default {
"pino",
"qs",
"readline",
"rxios",
"rxjs",
"rxjs/internal/observable/dom/WebSocketSubject",
"ws",
],
input: inputName,
Expand Down
8 changes: 4 additions & 4 deletions src/storage/manager/user-storage-manager.ts
Expand Up @@ -30,9 +30,9 @@ import IUserCamo from "../iuser-camo";
import UserRepositoryClass from "../repository/user-repository";

export default class UserStorageManager {
public _UserRepository: UserRepositoryClass;
public _UserRepository: typeof UserRepositoryClass;
public _logger: PinoLogger;
constructor(logger: PinoLogger, UserRepository: UserRepositoryClass) {
constructor(logger: PinoLogger, UserRepository: typeof UserRepositoryClass) {
assert.hasLength(arguments, 2);
assert.equal(typeof logger, "object");
assert.equal(typeof UserRepository, "function");
Expand All @@ -41,7 +41,7 @@ export default class UserStorageManager {
this._UserRepository = UserRepository;
}

public async getByUsername(username: string) {
public async getByUsername(username: string): Promise<IUser> {
assert.hasLength(arguments, 1);
assert.nonEmptyString(username);

Expand Down Expand Up @@ -93,7 +93,7 @@ export default class UserStorageManager {
return userAfterStoring;
}

public async storeToken(username: string, rawToken: IRawToken): Promise<IUser> {
public async storeToken(username: string, rawToken: IRawToken | null): Promise<IUser> {
assert.hasLength(arguments, 2);
assert.nonEmptyString(username);
// NOTE: rawToken can be null, to "forget" it.
Expand Down
38 changes: 23 additions & 15 deletions src/twitch/authentication/application-token-manager.ts
Expand Up @@ -26,20 +26,25 @@ import axios from "axios";

import PinoLogger from "../../util/pino-logger";
import ConnectionManager from "../connection-manager";
import IConnection from "../iconnection";
import IPollingConnection from "../polling/ipolling-connection";
import IRawToken from "./iraw-token";

export default class ApplicationTokenManager extends ConnectionManager {
public _waitForFirstTokenPromise: Promise<undefined>;
public _tokenHasBeenSet: (() => void) | null;
public _applicationAccessToken: string | null;
public _rawOAuthToken: IRawToken | null;
public _oauthTokenRevocationHeaders: {};
public _oauthTokenRevocationMethod: string;
public _oauthTokenRevocationUri: string;
public _clientId: string;

constructor(logger: PinoLogger, connection: IConnection, clientId: string, oauthTokenRevocationUri: string) {
export default class ApplicationTokenManager extends ConnectionManager<IRawToken, void> {
private _waitForFirstTokenPromise: Promise<undefined>;
private _tokenHasBeenSet: (() => void) | null;
private _applicationAccessToken: string | null;
private _rawOAuthToken: IRawToken | null;
private _oauthTokenRevocationHeaders: {};
private _oauthTokenRevocationMethod: string;
private _oauthTokenRevocationUri: string;
private _clientId: string;

constructor(
logger: PinoLogger,
connection: IPollingConnection<IRawToken, void>,
clientId: string,
oauthTokenRevocationUri: string,
) {
super(logger, connection);

assert.hasLength(arguments, 4);
Expand Down Expand Up @@ -81,7 +86,10 @@ export default class ApplicationTokenManager extends ConnectionManager {
assert.hasLength(arguments, 0);

await super.start();
return this._connection.force(true);

const pollingConnection = this._connection as IPollingConnection<IRawToken, void>;

return pollingConnection.send(undefined);
}

public async stop() {
Expand All @@ -91,7 +99,7 @@ export default class ApplicationTokenManager extends ConnectionManager {
return super.stop();
}

public async _dataHandler(data: any): Promise<void> {
public async _dataHandler(data: IRawToken): Promise<void> {
assert.hasLength(arguments, 1);
assert.equal(typeof data, "object");

Expand All @@ -103,7 +111,7 @@ export default class ApplicationTokenManager extends ConnectionManager {
]);
}

public async _filter(data: any) {
public async _filter(data: IRawToken): Promise<boolean> {
assert.hasLength(arguments, 1);
assert.equal(typeof data, "object");

Expand Down

0 comments on commit e9a966a

Please sign in to comment.