Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Improve streaming metrics #26299

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"pg-connection-string": "^2.6.0",
"postcss": "^8.4.24",
"postcss-loader": "^4.3.0",
"prom-client": "^14.2.0",
"prop-types": "^15.8.1",
"punycode": "^2.3.0",
"react": "^18.2.0",
Expand Down
230 changes: 167 additions & 63 deletions streaming/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { JSDOM } = require('jsdom');
const log = require('npmlog');
const pg = require('pg');
const dbUrlToConfig = require('pg-connection-string').parse;
const metrics = require('prom-client');
const redis = require('redis');
const uuid = require('uuid');
const WebSocket = require('ws');
Expand Down Expand Up @@ -183,6 +184,73 @@ const startServer = async () => {
const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl);
const redisClient = await redisUrlToClient(redisParams, redisUrl);

// Collect metrics from Node.js
metrics.collectDefaultMetrics();
ClearlyClaire marked this conversation as resolved.
Show resolved Hide resolved

new metrics.Gauge({
name: 'pg_pool_total_connections',
help: 'The total number of clients existing within the pool',
collect() {
this.set(pgPool.totalCount);
},
});

new metrics.Gauge({
name: 'pg_pool_idle_connections',
help: 'The number of clients which are not checked out but are currently idle in the pool',
collect() {
this.set(pgPool.idleCount);
},
});

new metrics.Gauge({
name: 'pg_pool_waiting_queries',
help: 'The number of queued requests waiting on a client when all clients are checked out',
collect() {
this.set(pgPool.waitingCount);
},
});

const connectedClients = new metrics.Gauge({
name: 'connected_clients',
help: 'The number of clients connected to the streaming server',
labelNames: ['type'],
});

connectedClients.set({ type: 'websocket' }, 0);
connectedClients.set({ type: 'eventsource' }, 0);

const connectedChannels = new metrics.Gauge({
name: 'connected_channels',
help: 'The number of channels the streaming server is streaming to',
labelNames: [ 'type', 'channel' ]
ClearlyClaire marked this conversation as resolved.
Show resolved Hide resolved
});

const redisSubscriptions = new metrics.Gauge({
name: 'redis_subscriptions',
help: 'The number of Redis channels the streaming server is subscribed to',
});

// When checking metrics in the browser, the favicon is requested this
// prevents the request from falling through to the API Router, which would
// error for this endpoint:
app.get('/favicon.ico', (req, res) => res.end());
ClearlyClaire marked this conversation as resolved.
Show resolved Hide resolved
ThisIsMissEm marked this conversation as resolved.
Show resolved Hide resolved

app.get('/api/v1/streaming/health', (req, res) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should be changed to /health, but that could break people's monitoring & proxying setups, so I've not made that change for now.

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want this endpoint to be accessible from the end-user, and when the streaming server is on the main domain, it means it needs to be reachable under /api/v1/streaming/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does a health check endpoint need to be publicly accessible? It's only used by monitoring..?

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent was at some point that apps could check it to know if it was working correctly, rather than attempting to retry a WS connection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this makes sense, especially in the load balanced environment: say you run multiple instances of streaming, and have nginx load balance them, then the GET /api/v1/streaming/health could hit the healthy instance, an the WS connection could reach an unhealthy instance.

The correct strategy here is exponential reconnection backoff with jitter and max retries before failing to non-streaming connectivity.

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but as I remember this was the intent behind making this public.

Changing this may be a breaking change, I dont know if this is used by some clients.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so, I'm leaving it as a public endpoint now, but I think it'd be very wise to announce that it will be deprecated in 4.3.0 or something — giving people time to upgrade away from it, if they're using it.

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we also want to have /health answering the same response as this one, and people can move to it before its deprecated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add in /health now, just wasn't sure if I'd get approval overall for that; hence just pointing it out now that it's technically an internal endpoint, not an external one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was added when running a single server was considered the norm, so a single public-facing endpoint kinda made sense. I don't have a very strong opinion on this but having /health could make sense indeed.

res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('OK');
});

app.get('/metrics', async (req, res) => {
try {
res.set('Content-Type', metrics.register.contentType);
res.end(await metrics.register.metrics());
} catch (ex) {
log.error(ex);
res.status(500).end();
}
});

/**
* @param {string[]} channels
* @returns {function(): void}
Expand Down Expand Up @@ -240,6 +308,7 @@ const startServer = async () => {
if (subs[channel].length === 0) {
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel, onRedisMessage);
redisSubscriptions.inc();
}

subs[channel].push(callback);
Expand All @@ -261,6 +330,7 @@ const startServer = async () => {
if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
redisSubscriptions.dec();
delete subs[channel];
}
};
Expand Down Expand Up @@ -434,7 +504,7 @@ const startServer = async () => {

/**
* @param {any} req
* @param {string} channelName
* @param {string|undefined} channelName
* @returns {Promise.<void>}
*/
const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
Expand Down Expand Up @@ -537,10 +607,14 @@ const startServer = async () => {
res.on('close', () => {
unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
unsubscribe(`${redisPrefix}${systemChannelId}`, listener);

connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
});

subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener);

connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
};

/**
Expand All @@ -554,7 +628,19 @@ const startServer = async () => {
return;
}

accountFromRequest(req).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
const channelName = channelNameFromPath(req);

// If no channelName can be found for the request, then we should terminate
// the connection, as there's nothing to stream back
if (!channelName) {
const err = new Error('Unknown channel requested');
err.status = 400;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone with 400 here, instead of 404, though arguably it could be either.

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

404 is used for non-existing endpoints. 400 is generic. What about 410 (Gone)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not technically gone, it kind of never existed — basically you could try connecting to /api/v1/streaming/foo and that'd hit here, but only because "foo" is an unknown channel. I went with 400 because 404 didn't really feel like it made sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

400 seems fine to me.


next(err);
return;
}

accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => {
subscribeHttpToSystemChannel(req, res);
}).then(() => {
next();
Expand Down Expand Up @@ -849,6 +935,15 @@ const startServer = async () => {
const streamToHttp = (req, res) => {
const accountId = req.accountId || req.remoteAddress;

const channelName = channelNameFromPath(req);

connectedClients.labels({ type: 'eventsource' }).inc();

// In theory we'll always have a channel name, but channelNameFromPath can return undefined:
if (typeof channelName === 'string') {
connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
}

res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-store');
res.setHeader('Transfer-Encoding', 'chunked');
Expand All @@ -859,6 +954,14 @@ const startServer = async () => {

req.on('close', () => {
log.verbose(req.requestId, `Ending stream for ${accountId}`);
// We decrement these counters here instead of in streamHttpEnd as in that
// method we don't have knowledge of the channel names
connectedClients.labels({ type: 'eventsource' }).dec();
// In theory we'll always have a channel name, but channelNameFromPath can return undefined:
if (typeof channelName === 'string') {
connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
}

clearInterval(heartbeat);
});

Expand Down Expand Up @@ -913,40 +1016,18 @@ const startServer = async () => {
res.end(JSON.stringify({ error: 'Not found' }));
};

app.use(setRequestId);
app.use(setRemoteAddress);
app.use(allowCrossDomain);
const api = express.Router();

app.get('/api/v1/streaming/health', (req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('OK');
});
app.use(api);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally all of the following code would only happen for known eventsource streaming connections, but the spec doesn't allow us the ability to ensure that:

User agents may set ([Accept](https://httpwg.org/specs/rfc7231.html#header.accept), [text/event-stream](https://html.spec.whatwg.org/multipage/iana.html#text/event-stream)) in request's header list.

emphasis my own: https://html.spec.whatwg.org/multipage/server-sent-events.html

(so basically we can't add middleware that checks for the Accept: text/event-stream because the spec didn't mandate that clients send that header)


api.use(setRequestId);
api.use(setRemoteAddress);
api.use(allowCrossDomain);
ClearlyClaire marked this conversation as resolved.
Show resolved Hide resolved

app.get('/metrics', (req, res) => server.getConnections((err, count) => {
res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' });
res.write('# TYPE connected_clients gauge\n');
res.write('# HELP connected_clients The number of clients connected to the streaming server\n');
res.write(`connected_clients ${count}.0\n`);
res.write('# TYPE connected_channels gauge\n');
res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n');
res.write(`connected_channels ${Object.keys(subs).length}.0\n`);
res.write('# TYPE pg_pool_total_connections gauge\n');
res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n');
res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`);
res.write('# TYPE pg_pool_idle_connections gauge\n');
res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n');
res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`);
res.write('# TYPE pg_pool_waiting_queries gauge\n');
res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n');
res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`);
res.write('# EOF\n');
res.end();
}));

app.use(authenticationMiddleware);
app.use(errorMiddleware);

app.get('/api/v1/streaming/*', (req, res) => {
api.use(authenticationMiddleware);
api.use(errorMiddleware);

api.get('/api/v1/streaming/*', (req, res) => {
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
Expand Down Expand Up @@ -1141,15 +1222,16 @@ const startServer = async () => {
* @typedef WebSocketSession
* @property {any} socket
* @property {any} request
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
* @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/

/**
* @param {WebSocketSession} session
* @param {string} channelName
* @param {StreamParams} params
* @returns {void}
*/
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => {
checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
channelIds,
options,
Expand All @@ -1162,43 +1244,58 @@ const startServer = async () => {
const stopHeartbeat = subscriptionHeartbeat(channelIds);
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);

connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();

subscriptions[channelIds.join(';')] = {
channelName,
listener,
stopHeartbeat,
};
}).catch(err => {
log.verbose(request.requestId, 'Subscription error:', err.toString());
socket.send(JSON.stringify({ error: err.toString() }));
});
}


const removeSubscription = (subscriptions, channelIds, request) => {
log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);

const subscription = subscriptions[channelIds.join(';')];

if (!subscription) {
return;
}

channelIds.forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
});

connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
subscription.stopHeartbeat();

delete subscriptions[channelIds.join(';')];
}

/**
* @param {WebSocketSession} session
* @param {string} channelName
* @param {StreamParams} params
* @returns {void}
*/
const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => {
channelNameToIds(request, channelName, params).then(({ channelIds }) => {
log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);

const subscription = subscriptions[channelIds.join(';')];
removeSubscription(subscriptions, channelIds, request);
}).catch(err => {
log.verbose(request.requestId, 'Unsubscribe error:', err);

if (!subscription) {
return;
// If we have a socket that is alive and open still, send the error back to the client:
// FIXME: In other parts of the code ws === socket
if (socket.isAlive && socket.readyState === socket.OPEN) {
socket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in theory another case where you could attempt to send a message to a closed or closing websocket; This is the same check we have in streamToWs but with the addition of checking that the socket is alive based on the heartbeats.

}

const { listener, stopHeartbeat } = subscription;

channelIds.forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, listener);
});

stopHeartbeat();

delete subscriptions[channelIds.join(';')];
}).catch(err => {
log.verbose(request.requestId, 'Unsubscription error:', err);
socket.send(JSON.stringify({ error: err.toString() }));
});
}

/**
* @param {WebSocketSession} session
Expand All @@ -1219,16 +1316,20 @@ const startServer = async () => {
subscribe(`${redisPrefix}${systemChannelId}`, listener);

subscriptions[accessTokenChannelId] = {
channelName: 'system',
listener,
stopHeartbeat: () => {
},
};

subscriptions[systemChannelId] = {
channelName: 'system',
listener,
stopHeartbeat: () => {
},
};

connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
};

/**
Expand All @@ -1255,6 +1356,8 @@ const startServer = async () => {
ws.isAlive = true;
});

connectedClients.labels({ type: 'websocket' }).inc();

/**
* @type {WebSocketSession}
*/
Expand All @@ -1265,17 +1368,18 @@ const startServer = async () => {
};

const onEnd = () => {
const keys = Object.keys(session.subscriptions);
const subscriptions = Object.keys(session.subscriptions);

keys.forEach(channelIds => {
const { listener, stopHeartbeat } = session.subscriptions[channelIds];
subscriptions.forEach(channelIds => {
removeSubscription(session.subscriptions, channelIds.split(';'), req)
});

channelIds.split(';').forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, listener);
});
// ensure garbage collection:
session.socket = null;
session.request = null;
session.subscriptions = {};
ClearlyClaire marked this conversation as resolved.
Show resolved Hide resolved

stopHeartbeat();
});
connectedClients.labels({ type: 'websocket' }).dec();
};

ws.on('close', onEnd);
Expand Down