Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to ioredis for streaming #26581

Merged
merged 21 commits into from Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -82,6 +82,7 @@
"immutable": "^4.3.0",
"imports-loader": "^1.2.0",
"intl-messageformat": "^10.3.5",
"ioredis": "^5.3.2",
"js-yaml": "^4.1.0",
"jsdom": "^22.1.0",
"lodash": "^4.17.21",
Expand Down Expand Up @@ -118,7 +119,6 @@
"react-swipeable-views": "^0.14.0",
"react-textarea-autosize": "^8.4.1",
"react-toggle": "^4.1.3",
"redis": "^4.6.5",
"redux": "^4.2.1",
"redux-immutable": "^4.0.0",
"redux-thunk": "^2.4.2",
Expand Down
82 changes: 38 additions & 44 deletions streaming/index.js
Expand Up @@ -6,12 +6,12 @@ const url = require('url');

const dotenv = require('dotenv');
const express = require('express');
const Redis = require('ioredis');
const { JSDOM } = require('jsdom');
const log = require('npmlog');
const pg = require('pg');
const dbUrlToConfig = require('pg-connection-string').parse;
const metrics = require('prom-client');
const redis = require('redis');
const uuid = require('uuid');
const WebSocket = require('ws');

Expand All @@ -24,30 +24,12 @@ dotenv.config({
log.level = process.env.LOG_LEVEL || 'verbose';

/**
* @param {Object.<string, any>} defaultConfig
* @param {string} redisUrl
* @param {Object.<string, any>} config
*/
const redisUrlToClient = async (defaultConfig, redisUrl) => {
const config = defaultConfig;

let client;

if (!redisUrl) {
client = redis.createClient(config);
} else if (redisUrl.startsWith('unix://')) {
client = redis.createClient(Object.assign(config, {
socket: {
path: redisUrl.slice(7),
},
}));
} else {
client = redis.createClient(Object.assign(config, {
url: redisUrl,
}));
}

const createRedisClient = async (config) => {
const { redisParams, redisUrl } = config;
const client = new Redis(redisUrl, redisParams);
client.on('error', (err) => log.error('Redis Client Error!', err));
await client.connect();

return client;
};
Expand Down Expand Up @@ -147,23 +129,22 @@ const pgConfigFromEnv = (env) => {
* @returns {Object.<string, any>} configuration for the Redis connection
*/
const redisConfigFromEnv = (env) => {
const redisNamespace = env.REDIS_NAMESPACE || null;
// ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
// which means we can't use it. But this is something that should be looked into.
const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';

const redisParams = {
socket: {
host: env.REDIS_HOST || '127.0.0.1',
port: env.REDIS_PORT || 6379,
},
database: env.REDIS_DB || 0,
host: env.REDIS_HOST || '127.0.0.1',
port: env.REDIS_PORT || 6379,
gmemstr marked this conversation as resolved.
Show resolved Hide resolved
db: env.REDIS_DB || 0,
password: env.REDIS_PASSWORD || undefined,
};

if (redisNamespace) {
redisParams.namespace = redisNamespace;
// redisParams.path takes precedence over host and port.
if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
gmemstr marked this conversation as resolved.
Show resolved Hide resolved
redisParams.path = env.REDIS_URL.slice(7);
}

const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';

return {
redisParams,
redisPrefix,
Expand All @@ -179,15 +160,15 @@ const startServer = async () => {
const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
const server = http.createServer(app);

const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);

/**
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
*/
const subs = {};

const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl);
const redisClient = await redisUrlToClient(redisParams, redisUrl);
const redisConfig = redisConfigFromEnv(process.env);
const redisSubscribeClient = await createRedisClient(redisConfig);
const redisClient = await createRedisClient(redisConfig);
const { redisPrefix } = redisConfig;

// Collect metrics from Node.js
metrics.collectDefaultMetrics();
Expand Down Expand Up @@ -277,13 +258,13 @@ const startServer = async () => {
};

/**
* @param {string} message
* @param {string} channel
* @param {string} message
*/
const onRedisMessage = (message, channel) => {
const onRedisMessage = (channel, message) => {
gmemstr marked this conversation as resolved.
Show resolved Hide resolved
const callbacks = subs[channel];

log.silly(`New message on channel ${channel}`);
log.silly(`New message on channel ${redisPrefix}${channel}`);

if (!callbacks) {
return;
Expand All @@ -294,6 +275,7 @@ const startServer = async () => {

callbacks.forEach(callback => callback(json));
};
redisSubscribeClient.on("message", onRedisMessage);
gmemstr marked this conversation as resolved.
Show resolved Hide resolved
gmemstr marked this conversation as resolved.
Show resolved Hide resolved

/**
* @callback SubscriptionListener
Expand All @@ -312,8 +294,14 @@ const startServer = async () => {

if (subs[channel].length === 0) {
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel, onRedisMessage);
gmemstr marked this conversation as resolved.
Show resolved Hide resolved
redisSubscriptions.inc();
redisSubscribeClient.subscribe(channel, (err, count) => {
if (err) {
log.error(`Error subscribing to ${channel}`);
}
else {
redisSubscriptions.set(count);
gmemstr marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

subs[channel].push(callback);
Expand All @@ -334,8 +322,14 @@ const startServer = async () => {

if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
redisSubscriptions.dec();
redisSubscribeClient.unsubscribe(channel, (err, count) => {
if (err) {
log.error(`Error unsubscribing to ${channel}`);
}
else {
redisSubscriptions.set(count);
}
});
delete subs[channel];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's arguable if this should be done in the callback too, as if we unsubscribe but it fails we've not really unsubscribed — I'm fine with it either way though

}
};
Expand Down
101 changes: 46 additions & 55 deletions yarn.lock
Expand Up @@ -1452,6 +1452,11 @@
resolved "https://registry.yarnpkg.com/@humanwhocodes/object-schema/-/object-schema-1.2.1.tgz#b520529ec21d8e5945a1851dfd1c32e94e39ff45"
integrity sha512-ZnQMnLV4e7hDlUvw8H+U8ASL02SS2Gn6+9Ac3wGGLIe7+je2AeAOxPY+izIPJDfFDb7eDjev0Us8MO1iFRN8hA==

"@ioredis/commands@^1.1.1":
version "1.2.0"
resolved "https://registry.yarnpkg.com/@ioredis/commands/-/commands-1.2.0.tgz#6d61b3097470af1fdbbe622795b8921d42018e11"
integrity sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==

"@isaacs/cliui@^8.0.2":
version "8.0.2"
resolved "https://registry.yarnpkg.com/@isaacs/cliui/-/cliui-8.0.2.tgz#b37667b7bc181c168782259bab42474fbf52b550"
Expand Down Expand Up @@ -1786,40 +1791,6 @@
resolved "https://registry.yarnpkg.com/@rails/ujs/-/ujs-7.0.7.tgz#54af8d66160a8a7bf7d8f184703d2bf4b3fab914"
integrity sha512-J2v5Ca7HgejO7diGKiDylaVDQKmbQ5FJih6Oo3hXuBKEuXlcaccJu64lj8MNVLaPVyZx0g4gaOQZQz95QEb/hg==

"@redis/bloom@1.2.0":
version "1.2.0"
resolved "https://registry.yarnpkg.com/@redis/bloom/-/bloom-1.2.0.tgz#d3fd6d3c0af3ef92f26767b56414a370c7b63b71"
integrity sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==

"@redis/client@1.5.9":
version "1.5.9"
resolved "https://registry.yarnpkg.com/@redis/client/-/client-1.5.9.tgz#c4ee81bbfedb4f1d9c7c5e9859661b9388fb4021"
integrity sha512-SffgN+P1zdWJWSXBvJeynvEnmnZrYmtKSRW00xl8pOPFOMJjxRR9u0frSxJpPR6Y4V+k54blJjGW7FgxbTI7bQ==
dependencies:
cluster-key-slot "1.1.2"
generic-pool "3.9.0"
yallist "4.0.0"

"@redis/graph@1.1.0":
version "1.1.0"
resolved "https://registry.yarnpkg.com/@redis/graph/-/graph-1.1.0.tgz#cc2b82e5141a29ada2cce7d267a6b74baa6dd519"
integrity sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==

"@redis/json@1.0.4":
version "1.0.4"
resolved "https://registry.yarnpkg.com/@redis/json/-/json-1.0.4.tgz#f372b5f93324e6ffb7f16aadcbcb4e5c3d39bda1"
integrity sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==

"@redis/search@1.1.3":
version "1.1.3"
resolved "https://registry.yarnpkg.com/@redis/search/-/search-1.1.3.tgz#b5a6837522ce9028267fe6f50762a8bcfd2e998b"
integrity sha512-4Dg1JjvCevdiCBTZqjhKkGoC5/BcB7k9j99kdMnaXFXg8x4eyOIVg9487CMv7/BUVkFLZCaIh8ead9mU15DNng==

"@redis/time-series@1.0.5":
version "1.0.5"
resolved "https://registry.yarnpkg.com/@redis/time-series/-/time-series-1.0.5.tgz#a6d70ef7a0e71e083ea09b967df0a0ed742bc6ad"
integrity sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==

"@reduxjs/toolkit@^1.9.5":
version "1.9.5"
resolved "https://registry.yarnpkg.com/@reduxjs/toolkit/-/toolkit-1.9.5.tgz#d3987849c24189ca483baa7aa59386c8e52077c4"
Expand Down Expand Up @@ -4111,7 +4082,7 @@ clone-deep@^4.0.1:
kind-of "^6.0.2"
shallow-clone "^3.0.0"

cluster-key-slot@1.1.2:
cluster-key-slot@^1.1.0:
version "1.1.2"
resolved "https://registry.yarnpkg.com/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz#88ddaa46906e303b5de30d3153b7d9fe0a0c19ac"
integrity sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==
Expand Down Expand Up @@ -4857,6 +4828,11 @@ delegates@^1.0.0:
resolved "https://registry.yarnpkg.com/delegates/-/delegates-1.0.0.tgz#84c6e159b81904fdca59a0ef44cd870d31250f9a"
integrity sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==

denque@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/denque/-/denque-2.1.0.tgz#e93e1a6569fb5e66f16a3c2a2964617d349d6ab1"
integrity sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==

depd@2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/depd/-/depd-2.0.0.tgz#b696163cc757560d09cf22cc8fad1571b79e76df"
Expand Down Expand Up @@ -6139,11 +6115,6 @@ gauge@^5.0.0:
strip-ansi "^6.0.1"
wide-align "^1.1.5"

generic-pool@3.9.0:
version "3.9.0"
resolved "https://registry.yarnpkg.com/generic-pool/-/generic-pool-3.9.0.tgz#36f4a678e963f4fdb8707eab050823abc4e8f5e4"
integrity sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==

gensync@^1.0.0-beta.2:
version "1.0.0-beta.2"
resolved "https://registry.yarnpkg.com/gensync/-/gensync-1.0.0-beta.2.tgz#32a6ee76c3d7f52d46b2b1ae5d93fea8580a25e0"
Expand Down Expand Up @@ -6823,6 +6794,21 @@ invariant@^2.2.2, invariant@^2.2.4:
dependencies:
loose-envify "^1.0.0"

ioredis@^5.3.2:
version "5.3.2"
resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.2.tgz#9139f596f62fc9c72d873353ac5395bcf05709f7"
integrity sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==
dependencies:
"@ioredis/commands" "^1.1.1"
cluster-key-slot "^1.1.0"
debug "^4.3.4"
denque "^2.1.0"
lodash.defaults "^4.2.0"
lodash.isarguments "^3.1.0"
redis-errors "^1.2.0"
redis-parser "^3.0.0"
standard-as-callback "^2.1.0"

ip-regex@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/ip-regex/-/ip-regex-2.1.0.tgz#fa78bf5d2e6913c911ce9f819ee5146bb6d844e9"
Expand Down Expand Up @@ -10283,17 +10269,17 @@ redent@^4.0.0:
indent-string "^5.0.0"
strip-indent "^4.0.0"

redis@^4.6.5:
version "4.6.8"
resolved "https://registry.yarnpkg.com/redis/-/redis-4.6.8.tgz#54c5992e8a5ba512506fe9f53142cadc405547e7"
integrity sha512-S7qNkPUYrsofQ0ztWlTHSaK0Qqfl1y+WMIxrzeAGNG+9iUZB4HGeBgkHxE6uJJ6iXrkvLd1RVJ2nvu6H1sAzfQ==
redis-errors@^1.0.0, redis-errors@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad"
integrity sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==

redis-parser@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-3.0.0.tgz#b66d828cdcafe6b4b8a428a7def4c6bcac31c8b4"
integrity sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==
dependencies:
"@redis/bloom" "1.2.0"
"@redis/client" "1.5.9"
"@redis/graph" "1.1.0"
"@redis/json" "1.0.4"
"@redis/search" "1.1.3"
"@redis/time-series" "1.0.5"
redis-errors "^1.0.0"

redux-immutable@^4.0.0:
version "4.0.0"
Expand Down Expand Up @@ -11211,6 +11197,11 @@ stacktrace-js@^2.0.2:
stack-generator "^2.0.5"
stacktrace-gps "^3.0.4"

standard-as-callback@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/standard-as-callback/-/standard-as-callback-2.1.0.tgz#8953fc05359868a77b5b9739a665c5977bb7df45"
integrity sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==

static-extend@^0.1.1:
version "0.1.2"
resolved "https://registry.yarnpkg.com/static-extend/-/static-extend-0.1.2.tgz#60809c39cbff55337226fd5e0b520f341f1fb5c6"
Expand Down Expand Up @@ -12966,16 +12957,16 @@ y18n@^5.0.5:
resolved "https://registry.yarnpkg.com/y18n/-/y18n-5.0.8.tgz#7f4934d0f7ca8c56f95314939ddcd2dd91ce1d55"
integrity sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==

yallist@4.0.0, yallist@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"
integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==

yallist@^3.0.2:
version "3.1.1"
resolved "https://registry.yarnpkg.com/yallist/-/yallist-3.1.1.tgz#dbb7daf9bfd8bac9ab45ebf602b8cbad0d5d08fd"
integrity sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g==

yallist@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"
integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==

yaml@^1.10.0:
version "1.10.2"
resolved "https://registry.yarnpkg.com/yaml/-/yaml-1.10.2.tgz#2301c5ffbf12b467de8da2333a459e29e7920e4b"
Expand Down