Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions dev/u-wave-dev-server
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ const announce = require('u-wave-announce');
const ytSource = require('u-wave-source-youtube');
const scSource = require('u-wave-source-soundcloud');
const recaptchaTestKeys = require('recaptcha-test-keys');
const debug = require('debug')('uwave:dev-server');
const pino = require('pino');
const dotenv = require('dotenv');

dotenv.config();

const logger = pino({ level: 'trace' });

const testTransport = {
name: 'test',
version: '0.0.0',
send(mail, callback) {
mail.message.createReadStream().pipe(concat((message) => {
debug(mail.message.getEnvelope().to, message.toString('utf8'));
logger.info('send test email', { to: mail.message.getEnvelope().to, contents: message.toString('utf8') });
callback(null, {
envelope: mail.message.getEnvelope(),
messageId: mail.message.messageId(),
Expand All @@ -42,6 +44,7 @@ async function start() {
port,
redis: process.env.REDIS_URL,
mongo: process.env.MONGODB_URL ?? 'mongodb://localhost/uwave',
logger: { level: 'trace' },
secret,
mailTransport: testTransport,
timeout: 10,
Expand Down Expand Up @@ -76,10 +79,10 @@ async function start() {
});

await uw.listen();
console.log(`Now listening on ${port}`);
logger.info('Now listening', { port });
}

start().catch((error) => {
console.error(error.stack);
logger.error(error);
process.exit(1);
});
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"cookie": "^0.5.0",
"cookie-parser": "^1.4.4",
"cors": "^2.8.5",
"debug": "^4.1.1",
"escape-string-regexp": "^4.0.0",
"explain-error": "^1.0.4",
"express": "^4.17.1",
Expand All @@ -55,6 +54,8 @@
"passport": "^0.5.0",
"passport-google-oauth20": "^2.0.0",
"passport-local": "^1.0.0",
"pino": "^7.11.0",
"pino-http": "^7.0.0",
"qs": "^6.9.1",
"random-string": "^0.2.0",
"ratelimiter": "^3.4.0",
Expand All @@ -75,7 +76,6 @@
"@types/cookie": "^0.5.0",
"@types/cookie-parser": "^1.4.2",
"@types/cors": "^2.8.10",
"@types/debug": "^4.1.5",
"@types/express": "^4.17.11",
"@types/has": "^1.0.0",
"@types/htmlescape": "^1.1.1",
Expand All @@ -102,7 +102,6 @@
"@typescript-eslint/parser": "^5.3.0",
"c8": "^7.10.0",
"concat-stream": "^2.0.0",
"cross-env": "^7.0.0",
"delay": "^5.0.0",
"dotenv": "^16.0.0",
"eslint": "^8.2.0",
Expand All @@ -113,6 +112,7 @@
"mocha": "^10.0.0",
"nock": "^13.2.0",
"nodemon": "^2.0.2",
"pino-colada": "^2.2.2",
"recaptcha-test-keys": "^1.0.0",
"sinon": "^14.0.0",
"supertest": "^6.1.3",
Expand All @@ -124,6 +124,6 @@
"test": "npm run tests-only && npm run lint",
"tests-only": "c8 --reporter lcov --src src mocha --exit",
"types": "tsc -p tsconfig.json",
"start": "cross-env DEBUG=uwave:* nodemon dev/u-wave-dev-server"
"start": "nodemon dev/u-wave-dev-server | pino-colada"
}
}
27 changes: 18 additions & 9 deletions src/HttpApi.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
'use strict';

const { URL } = require('url');
const { randomUUID } = require('crypto');
const express = require('express');
const bodyParser = require('body-parser');
const cookieParser = require('cookie-parser');
const cors = require('cors');
const helmet = require('helmet').default;
const http = require('http');
const debug = require('debug')('uwave:http-api');
const pinoHttp = require('pino-http').default;

// routes
const authenticate = require('./routes/authenticate');
Expand Down Expand Up @@ -82,6 +82,10 @@ async function httpApi(uw, options) {
throw new TypeError('"options.onError" must be a function.');
}

const logger = uw.logger.child({
ns: 'uwave:http-api',
});

uw.config.register(optionsSchema['uw:key'], optionsSchema);

/** @type {HttpApiSettings} */
Expand All @@ -93,12 +97,17 @@ async function httpApi(uw, options) {
}
});

debug('setup', runtimeOptions);
logger.debug(runtimeOptions, 'start HttpApi');
uw.httpApi = Object.assign(express.Router(), {
authRegistry: new AuthRegistry(uw.redis),
});

uw.httpApi
.use(pinoHttp({
genReqId: () => randomUUID(),
quietReqLogger: true,
logger,
}))
.use(bodyParser.json())
.use(cookieParser())
.use(uw.passport.initialize())
Expand Down Expand Up @@ -149,12 +158,12 @@ async function httpApi(uw, options) {
* @param {import('./Uwave')} uw
*/
async function errorHandling(uw) {
debug('after');
uw.httpApi.use(errorHandler());
uw.express.use(/** @type {import('express').ErrorRequestHandler} */ (error, req, res, next) => {
debug(error);
next(error);
});
uw.logger.debug({ ns: 'uwave:http-api' }, 'setup HTTP error handling');
uw.httpApi.use(errorHandler({
onError(_req, error) {
uw.logger.error({ err: error, ns: 'uwave:http-api' });
},
}));
}

httpApi.errorHandling = errorHandling;
Expand Down
60 changes: 37 additions & 23 deletions src/SocketServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const sjson = require('secure-json-parse');
const WebSocket = require('ws');
const Ajv = require('ajv').default;
const ms = require('ms');
const debug = require('debug')('uwave:api:sockets');
const { stdSerializers } = require('pino');
const { socketVote } = require('./controllers/booth');
const { disconnectUser } = require('./controllers/users');
const AuthRegistry = require('./AuthRegistry');
Expand Down Expand Up @@ -108,6 +108,8 @@ class SocketServer {

#uw;

#logger;

#redisSubscription;

#wss;
Expand Down Expand Up @@ -165,11 +167,16 @@ class SocketServer {
}

this.#uw = uw;
this.#logger = uw.logger.child({ ns: 'uwave:sockets' }, {
serializers: {
req: stdSerializers.req,
},
});
this.#redisSubscription = uw.redis.duplicate();

this.options = {
/** @type {(socket: import('ws') | undefined, err: Error) => void} */
onError: (socket, err) => {
/** @type {(_socket: import('ws') | undefined, err: Error) => void} */
onError: (_socket, err) => {
throw err;
},
timeout: 30,
Expand All @@ -185,7 +192,7 @@ class SocketServer {
});

this.#redisSubscription.subscribe('uwave', 'v1').catch((error) => {
debug(error);
this.#logger.error(error);
});
this.#redisSubscription.on('message', (channel, command) => {
// this returns a promise, but we don't handle the error case:
Expand All @@ -196,8 +203,8 @@ class SocketServer {
this.#wss.on('error', (error) => {
this.onError(error);
});
this.#wss.on('connection', (socket) => {
this.onSocketConnected(socket);
this.#wss.on('connection', (socket, request) => {
this.onSocketConnected(socket, request);
});

this.#pinger = setInterval(() => {
Expand All @@ -206,13 +213,13 @@ class SocketServer {

this.recountGuests = debounce(() => {
this.recountGuestsInternal().catch((error) => {
debug('counting guests failed:', error);
this.#logger.error({ err: error }, 'counting guests failed');
});
}, ms('2 seconds'));

this.#clientActions = {
sendChat: (user, message) => {
debug('sendChat', user, message);
this.#logger.trace({ user, message }, 'sendChat');
this.#uw.chat.send(user, message);
},
vote: (user, direction) => {
Expand Down Expand Up @@ -475,10 +482,11 @@ class SocketServer {

/**
* @param {import('ws')} socket
* @param {import('http').IncomingMessage} request
* @private
*/
onSocketConnected(socket) {
debug('new connection');
onSocketConnected(socket, request) {
this.#logger.info({ req: request }, 'new connection');

socket.on('error', (error) => {
this.onSocketError(socket, error);
Expand All @@ -492,7 +500,7 @@ class SocketServer {
* @private
*/
onSocketError(socket, error) {
debug('socket error:', error);
this.#logger.warn({ err: error }, 'socket error');

this.options.onError(socket, error);
}
Expand All @@ -502,7 +510,7 @@ class SocketServer {
* @private
*/
onError(error) {
debug('server error:', error);
this.#logger.error({ err: error }, 'server error');

this.options.onError(undefined, error);
}
Expand Down Expand Up @@ -533,10 +541,9 @@ class SocketServer {
this.remove(connection);
});
connection.on('authenticate', async (user) => {
debug('connecting', user.id, user.username);
const isReconnect = await connection.isReconnect(user);
this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
if (isReconnect) {
debug('is reconnection');
const previousConnection = this.getLostConnection(user);
if (previousConnection) this.remove(previousConnection);
}
Expand All @@ -562,11 +569,11 @@ class SocketServer {
const connection = new AuthedConnection(this.#uw, socket, user);
connection.on('close', ({ banned }) => {
if (banned) {
debug('removing connection after ban', user.id, user.username);
this.#logger.info({ userId: user.id }, 'removing connection after ban');
this.remove(connection);
disconnectUser(this.#uw, user._id);
} else {
debug('lost connection', user.id, user.username);
this.#logger.info({ userId: user.id }, 'lost connection');
this.replace(connection, this.createLostConnection(user));
}
});
Expand All @@ -577,7 +584,7 @@ class SocketServer {
* @param {import('type-fest').JsonValue} data
*/
(command, data) => {
debug('command', user.id, user.username, command, data);
this.#logger.trace({ userId: user.id, command, data }, 'command');
if (has(this.#clientActions, command)) {
// Ignore incorrect input
const validate = this.#clientActionSchemas[command];
Expand All @@ -604,7 +611,7 @@ class SocketServer {
createLostConnection(user) {
const connection = new LostConnection(this.#uw, user, this.options.timeout);
connection.on('close', () => {
debug('left', user.id, user.username);
this.#logger.info({ userId: user.id }, 'user left');
this.remove(connection);
// Only register that the user left if they didn't have another connection
// still open.
Expand All @@ -622,7 +629,8 @@ class SocketServer {
* @private
*/
add(connection) {
debug('adding', String(connection));
const userId = 'user' in connection ? connection.user.id : null;
this.#logger.trace({ type: connection.constructor.name, userId }, 'add connection');

this.#connections.push(connection);
this.recountGuests();
Expand All @@ -635,7 +643,8 @@ class SocketServer {
* @private
*/
remove(connection) {
debug('removing', String(connection));
const userId = 'user' in connection ? connection.user.id : null;
this.#logger.trace({ type: connection.constructor.name, userId }, 'remove connection');

const i = this.#connections.indexOf(connection);
this.#connections.splice(i, 1);
Expand Down Expand Up @@ -677,7 +686,7 @@ class SocketServer {
}
const { command, data } = json;

debug(channel, command, data);
this.#logger.trace({ channel, command, data }, 'server message');

if (channel === 'v1') {
this.broadcast(command, data);
Expand Down Expand Up @@ -735,10 +744,15 @@ class SocketServer {
* @param {import('type-fest').JsonValue} data Command data.
*/
broadcast(command, data) {
debug('broadcast', command, data);
this.#logger.trace({
command,
data,
to: this.#connections.map((connection) => (
'user' in connection ? connection.user.id : null
)),
}, 'broadcast');

this.#connections.forEach((connection) => {
debug(' to', connection.toString());
connection.send(command, data);
});
}
Expand Down
Loading