Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve chart performance #7360

Merged
merged 13 commits into from Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
218 changes: 218 additions & 0 deletions migration/1615965918224-chart-v2.ts

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions migration/1615966519402-chart-v2-2.ts
@@ -0,0 +1,22 @@
import {MigrationInterface, QueryRunner} from "typeorm";

export class chartV221615966519402 implements MigrationInterface {
name = 'chartV221615966519402'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___local_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__active_users" ADD "___remote_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___local_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" ADD "___remote_users" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
await queryRunner.query(`ALTER TABLE "__chart__test_unique" ADD "___foo" character varying array NOT NULL DEFAULT '{}'::varchar[]`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "__chart__test_unique" DROP COLUMN "___foo"`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___remote_users"`);
await queryRunner.query(`ALTER TABLE "__chart__hashtag" DROP COLUMN "___local_users"`);
await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___remote_users"`);
await queryRunner.query(`ALTER TABLE "__chart__active_users" DROP COLUMN "___local_users"`);
}

}
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -47,7 +47,7 @@
"@koa/router": "9.0.1",
"@sentry/browser": "5.29.2",
"@sentry/tracing": "5.29.2",
"@sinonjs/fake-timers": "6.0.1",
"@sinonjs/fake-timers": "7.0.2",
"@syuilo/aiscript": "0.11.1",
"@types/bcryptjs": "2.4.2",
"@types/bull": "3.15.0",
Expand Down
2 changes: 1 addition & 1 deletion src/daemons/queue-stats.ts
@@ -1,5 +1,5 @@
import Xev from 'xev';
import { deliverQueue, inboxQueue } from '../queue';
import { deliverQueue, inboxQueue } from '../queue/queues';

const ev = new Xev();

Expand Down
4 changes: 4 additions & 0 deletions src/db/postgre.ts
@@ -1,3 +1,7 @@
// https://github.com/typeorm/typeorm/issues/2400
const types = require('pg').types;
types.setTypeParser(20, Number);

import { createConnection, Logger, getConnection } from 'typeorm';
import config from '../config';
import { entities as charts } from '../services/chart/entities';
Expand Down
1 change: 1 addition & 0 deletions src/global.d.ts
@@ -0,0 +1 @@
type FIXME = any;
88 changes: 88 additions & 0 deletions src/misc/before-shutdown.ts
@@ -0,0 +1,88 @@
// https://gist.github.com/nfantone/1eaa803772025df69d07f4dbf5df7e58

'use strict';

/**
* @callback BeforeShutdownListener
* @param {string} [signalOrEvent] The exit signal or event name received on the process.
*/

/**
* System signals the app will listen to initiate shutdown.
* @const {string[]}
*/
const SHUTDOWN_SIGNALS = ['SIGINT', 'SIGTERM'];

/**
* Time in milliseconds to wait before forcing shutdown.
* @const {number}
*/
const SHUTDOWN_TIMEOUT = 15000;

/**
* A queue of listener callbacks to execute before shutting
* down the process.
* @type {BeforeShutdownListener[]}
*/
const shutdownListeners = [];

/**
* Listen for signals and execute given `fn` function once.
* @param {string[]} signals System signals to listen to.
* @param {function(string)} fn Function to execute on shutdown.
*/
const processOnce = (signals, fn) => {
return signals.forEach(sig => process.once(sig, fn));
};

/**
* Sets a forced shutdown mechanism that will exit the process after `timeout` milliseconds.
* @param {number} timeout Time to wait before forcing shutdown (milliseconds)
*/
const forceExitAfter = timeout => () => {
setTimeout(() => {
// Force shutdown after timeout
console.warn(`Could not close resources gracefully after ${timeout}ms: forcing shutdown`);
return process.exit(1);
}, timeout).unref();
};

/**
* Main process shutdown handler. Will invoke every previously registered async shutdown listener
* in the queue and exit with a code of `0`. Any `Promise` rejections from any listener will
* be logged out as a warning, but won't prevent other callbacks from executing.
* @param {string} signalOrEvent The exit signal or event name received on the process.
*/
async function shutdownHandler(signalOrEvent) {
console.warn(`Shutting down: received [${signalOrEvent}] signal`);

for (const listener of shutdownListeners) {
try {
await listener(signalOrEvent);
} catch (err) {
console.warn(`A shutdown handler failed before completing with: ${err.message || err}`);
}
}

return process.exit(0);
}

/**
* Registers a new shutdown listener to be invoked before exiting
* the main process. Listener handlers are guaranteed to be called in the order
* they were registered.
* @param {BeforeShutdownListener} listener The shutdown listener to register.
* @returns {BeforeShutdownListener} Echoes back the supplied `listener`.
*/
export function beforeShutdown(listener) {
shutdownListeners.push(listener);
return listener;
}

// Register shutdown callback that kills the process after `SHUTDOWN_TIMEOUT` milliseconds
// This prevents custom shutdown handlers from hanging the process indefinitely
processOnce(SHUTDOWN_SIGNALS, forceExitAfter(SHUTDOWN_TIMEOUT));

// Register process shutdown callback
// Will listen to incoming signal events and execute all registered handlers in the stack
processOnce(SHUTDOWN_SIGNALS, shutdownHandler);
23 changes: 1 addition & 22 deletions src/queue/index.ts
@@ -1,4 +1,3 @@
import * as Queue from 'bull';
import * as httpSignature from 'http-signature';

import config from '../config';
Expand All @@ -13,22 +12,7 @@ import { queueLogger } from './logger';
import { DriveFile } from '../models/entities/drive-file';
import { getJobInfo } from './get-job-info';
import { IActivity } from '../remote/activitypub/type';

function initializeQueue(name: string, limitPerSec = -1) {
return new Queue(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec * 5,
duration: 5000
} : undefined
});
}
import { dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';

export type InboxJobData = {
activity: IActivity,
Expand All @@ -44,11 +28,6 @@ function renderError(e: Error): any {
};
}

export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue('db');
export const objectStorageQueue = initializeQueue('objectStorage');

const deliverLogger = queueLogger.createSubLogger('deliver');
const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');
Expand Down
18 changes: 18 additions & 0 deletions src/queue/initialize.ts
@@ -0,0 +1,18 @@
import * as Queue from 'bull';
import config from '../config';

export function initialize(name: string, limitPerSec = -1) {
return new Queue(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec * 5,
duration: 5000
} : undefined
});
}
7 changes: 7 additions & 0 deletions src/queue/queues.ts
@@ -0,0 +1,7 @@
import config from '../config';
import { initialize as initializeQueue } from './initialize';

export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue('db');
export const objectStorageQueue = initializeQueue('objectStorage');
18 changes: 15 additions & 3 deletions src/services/chart/charts/classes/active-users.ts
Expand Up @@ -17,6 +17,18 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> {
return {};
}

@autobind
protected aggregate(logs: ActiveUsersLog[]): ActiveUsersLog {
return {
local: {
users: logs.reduce((a, b) => a.concat(b.local.users), [] as ActiveUsersLog['local']['users']),
},
remote: {
users: logs.reduce((a, b) => a.concat(b.remote.users), [] as ActiveUsersLog['remote']['users']),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<ActiveUsersLog>> {
return {};
Expand All @@ -25,11 +37,11 @@ export default class ActiveUsersChart extends Chart<ActiveUsersLog> {
@autobind
public async update(user: User) {
const update: Obj = {
count: 1
users: [user.id]
};

await this.incIfUnique({
await this.inc({
[Users.isLocalUser(user) ? 'local' : 'remote']: update
}, 'users', user.id);
});
}
}
22 changes: 22 additions & 0 deletions src/services/chart/charts/classes/drive.ts
Expand Up @@ -27,6 +27,28 @@ export default class DriveChart extends Chart<DriveLog> {
};
}

@autobind
protected aggregate(logs: DriveLog[]): DriveLog {
return {
local: {
totalCount: logs[0].local.totalCount,
totalSize: logs[0].local.totalSize,
incCount: logs.reduce((a, b) => a + b.local.incCount, 0),
incSize: logs.reduce((a, b) => a + b.local.incSize, 0),
decCount: logs.reduce((a, b) => a + b.local.decCount, 0),
decSize: logs.reduce((a, b) => a + b.local.decSize, 0),
},
remote: {
totalCount: logs[0].remote.totalCount,
totalSize: logs[0].remote.totalSize,
incCount: logs.reduce((a, b) => a + b.remote.incCount, 0),
incSize: logs.reduce((a, b) => a + b.remote.incSize, 0),
decCount: logs.reduce((a, b) => a + b.remote.decCount, 0),
decSize: logs.reduce((a, b) => a + b.remote.decSize, 0),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<DriveLog>> {
const [localCount, remoteCount, localSize, remoteSize] = await Promise.all([
Expand Down
11 changes: 11 additions & 0 deletions src/services/chart/charts/classes/federation.ts
Expand Up @@ -20,6 +20,17 @@ export default class FederationChart extends Chart<FederationLog> {
};
}

@autobind
protected aggregate(logs: FederationLog[]): FederationLog {
return {
instance: {
total: logs[0].instance.total,
inc: logs.reduce((a, b) => a + b.instance.inc, 0),
dec: logs.reduce((a, b) => a + b.instance.dec, 0),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<FederationLog>> {
const [total] = await Promise.all([
Expand Down
18 changes: 15 additions & 3 deletions src/services/chart/charts/classes/hashtag.ts
Expand Up @@ -17,6 +17,18 @@ export default class HashtagChart extends Chart<HashtagLog> {
return {};
}

@autobind
protected aggregate(logs: HashtagLog[]): HashtagLog {
return {
local: {
users: logs.reduce((a, b) => a.concat(b.local.users), [] as HashtagLog['local']['users']),
},
remote: {
users: logs.reduce((a, b) => a.concat(b.remote.users), [] as HashtagLog['remote']['users']),
},
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<HashtagLog>> {
return {};
Expand All @@ -25,11 +37,11 @@ export default class HashtagChart extends Chart<HashtagLog> {
@autobind
public async update(hashtag: string, user: User) {
const update: Obj = {
count: 1
users: [user.id]
};

await this.incIfUnique({
await this.inc({
[Users.isLocalUser(user) ? 'local' : 'remote']: update
}, 'users', user.id, hashtag);
}, hashtag);
}
}
44 changes: 44 additions & 0 deletions src/services/chart/charts/classes/instance.ts
Expand Up @@ -36,6 +36,50 @@ export default class InstanceChart extends Chart<InstanceLog> {
};
}

@autobind
protected aggregate(logs: InstanceLog[]): InstanceLog {
return {
requests: {
failed: logs.reduce((a, b) => a + b.requests.failed, 0),
succeeded: logs.reduce((a, b) => a + b.requests.succeeded, 0),
received: logs.reduce((a, b) => a + b.requests.received, 0),
},
notes: {
total: logs[0].notes.total,
inc: logs.reduce((a, b) => a + b.notes.inc, 0),
dec: logs.reduce((a, b) => a + b.notes.dec, 0),
diffs: {
reply: logs.reduce((a, b) => a + b.notes.diffs.reply, 0),
renote: logs.reduce((a, b) => a + b.notes.diffs.renote, 0),
normal: logs.reduce((a, b) => a + b.notes.diffs.normal, 0),
},
},
users: {
total: logs[0].users.total,
inc: logs.reduce((a, b) => a + b.users.inc, 0),
dec: logs.reduce((a, b) => a + b.users.dec, 0),
},
following: {
total: logs[0].following.total,
inc: logs.reduce((a, b) => a + b.following.inc, 0),
dec: logs.reduce((a, b) => a + b.following.dec, 0),
},
followers: {
total: logs[0].followers.total,
inc: logs.reduce((a, b) => a + b.followers.inc, 0),
dec: logs.reduce((a, b) => a + b.followers.dec, 0),
},
drive: {
totalFiles: logs[0].drive.totalFiles,
totalUsage: logs[0].drive.totalUsage,
incFiles: logs.reduce((a, b) => a + b.drive.incFiles, 0),
incUsage: logs.reduce((a, b) => a + b.drive.incUsage, 0),
decFiles: logs.reduce((a, b) => a + b.drive.decFiles, 0),
decUsage: logs.reduce((a, b) => a + b.drive.decUsage, 0),
},
};
}

@autobind
protected async fetchActual(group: string): Promise<DeepPartial<InstanceLog>> {
const [
Expand Down
11 changes: 11 additions & 0 deletions src/services/chart/charts/classes/network.ts
Expand Up @@ -15,6 +15,17 @@ export default class NetworkChart extends Chart<NetworkLog> {
return {};
}

@autobind
protected aggregate(logs: NetworkLog[]): NetworkLog {
return {
incomingRequests: logs.reduce((a, b) => a + b.incomingRequests, 0),
outgoingRequests: logs.reduce((a, b) => a + b.outgoingRequests, 0),
totalTime: logs.reduce((a, b) => a + b.totalTime, 0),
incomingBytes: logs.reduce((a, b) => a + b.incomingBytes, 0),
outgoingBytes: logs.reduce((a, b) => a + b.outgoingBytes, 0),
};
}

@autobind
protected async fetchActual(): Promise<DeepPartial<NetworkLog>> {
return {};
Expand Down