Skip to content

Commit

Permalink
Merge pull request #1472 from matrix-org/tadzik/rmau-limits
Browse files Browse the repository at this point in the history
Implement MAB's UserActivityTracker and blocking the bridge upon exceeding the user limit
  • Loading branch information
Half-Shot committed Oct 8, 2021
2 parents 9a366d0 + c0e3c2e commit b54eadc
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 10 deletions.
1 change: 1 addition & 0 deletions changelog.d/1472.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add optional bridge blocking upon exceeding a monthly active user limit
9 changes: 9 additions & 0 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,15 @@ ircService:
# how much time needs to pass between the reply and the original message to switch to the long format
shortReplyTresholdSeconds: 300

# Maximum number of montly active users, beyond which the bridge gets blocked (both ways)
# RMAUlimit: 100

# userActivity:
# The "grace period" before we start counting users as active
# minUserActiveDays: 1
# Time before users are considered inactive again
# inactiveAfterDays: 30

ircHandler:
# Should we attempt to match an IRC side mention (nickaname match)
# with the nickname's owner's matrixId, if we are bridging them?
Expand Down
9 changes: 9 additions & 0 deletions config.schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ properties:
type: "number"
passwordEncryptionKeyPath:
type: "string"
RMAUlimit:
type: "integer"
userActivity:
type: "object"
properties:
minUserActiveDays:
type: "number"
inactiveAfterDays:
type: "number"
matrixHandler:
type: "object"
properties:
Expand Down
65 changes: 60 additions & 5 deletions src/bridge/IrcBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import {
AppService,
Rules,
ActivityTracker,
BridgeBlocker,
UserActivityState,
UserActivityTracker,
UserActivityTrackerConfig,
} from "matrix-appservice-bridge";
import { IrcAction } from "../models/IrcAction";
import { DataStore } from "../datastore/DataStore";
Expand Down Expand Up @@ -95,6 +99,7 @@ export class IrcBridge {
networkId: string;
}>;
private privacyProtection: PrivacyProtection;
private bridgeBlocker?: BridgeBlocker;

constructor(
public readonly config: BridgeConfig,
Expand All @@ -121,8 +126,9 @@ export class IrcBridge {
if (this.config.database.engine === "nedb") {
const dirPath = this.config.database.connectionString.substring("nedb://".length);
bridgeStoreConfig = {
roomStore: `${dirPath}/rooms.db`,
userStore: `${dirPath}/users.db`,
roomStore: `${dirPath}/rooms.db`,
userStore: `${dirPath}/users.db`,
userActivityStore: `${dirPath}/user-activity.db`,
};
}
else {
Expand Down Expand Up @@ -215,6 +221,10 @@ export class IrcBridge {
httpMaxSizeBytes: this.config.advanced?.maxTxnSize ?? TXN_SIZE_DEFAULT,
});
this.roomConfigs = new RoomConfig(this.bridge, this.config.ircService.perRoomConfig);

if (this.config.ircService.RMAUlimit) {
this.bridgeBlocker = new BridgeBlocker(this.config.ircService.RMAUlimit);
}
}

public async onConfigChanged(newConfig: BridgeConfig) {
Expand Down Expand Up @@ -415,6 +425,11 @@ export class IrcBridge {
labels: ["server"]
});

const bridgeBlocked = metrics.addGauge({
name: "bridge_blocked",
help: "Is the bridge currently blocking messages",
});

metrics.addCollector(() => {
this.ircServers.forEach((server) => {
reconnQueue.set({server: server.domain},
Expand Down Expand Up @@ -473,6 +488,8 @@ export class IrcBridge {
Object.entries(ircMetrics).forEach((kv) => {
ircHandlerCalls.inc({method: kv[0]}, kv[1]);
});

bridgeBlocked.set(this.bridgeBlocker?.isBlocked ? 1 : 0);
});

metrics.addCollector(async () => {
Expand Down Expand Up @@ -569,12 +586,14 @@ export class IrcBridge {
await this.bridge.loadDatabases();
const userStore = this.bridge.getUserStore();
const roomStore = this.bridge.getRoomStore();
const userActivityStore = this.bridge.getUserActivityStore();
log.info("Using NeDBDataStore for Datastore");
if (!userStore || !roomStore) {
throw Error('Could not load userStore or roomStore');
if (!userStore || !roomStore || !userActivityStore) {
throw Error('Could not load user(Activity)Store or roomStore');
}
this.dataStore = new NeDBDataStore(
userStore,
userActivityStore,
roomStore,
this.config.homeserver.domain,
pkeyPath,
Expand Down Expand Up @@ -636,6 +655,22 @@ export class IrcBridge {
throw Error("No IRC servers specified.");
}

const uatConfig = {
...UserActivityTrackerConfig.DEFAULT,
};
if (this.config.ircService.userActivity?.minUserActiveDays !== undefined) {
uatConfig.minUserActiveDays = this.config.ircService.userActivity.minUserActiveDays;
}
if (this.config.ircService.userActivity?.inactiveAfterDays !== undefined) {
uatConfig.inactiveAfterDays = this.config.ircService.userActivity.inactiveAfterDays;
}
this.bridge.opts.controller.userActivityTracker = new UserActivityTracker(
uatConfig,
await this.getStore().getUserActivity(),
(changes) => this.onUserActivityChanged(changes),
);
this.bridgeBlocker?.checkLimits(this.bridge.opts.controller.userActivityTracker.countActiveUsers().allUsers);

// run the bridge (needs to be done prior to configure IRC side)
await this.bridge.listen(port, this.config.homeserver.bindHostname, undefined, this.appservice);
log.info(`Listening on ${this.config.homeserver.bindHostname || "0.0.0.0"}:${port}`)
Expand Down Expand Up @@ -744,6 +779,7 @@ export class IrcBridge {
});

log.info("Startup complete.");

this.startedUp = true;
}

Expand Down Expand Up @@ -849,6 +885,10 @@ export class IrcBridge {
}

public async sendMatrixAction(room: MatrixRoom, from: MatrixUser, action: MatrixAction): Promise<void> {
if (this.bridgeBlocker?.isBlocked) {
log.info("Bridge is blocked, dropping Matrix action");
return;
}
const intent = this.bridge.getIntent(from.userId);
const extraContent: Record<string, unknown> = {};
if (action.replyEvent) {
Expand Down Expand Up @@ -955,6 +995,10 @@ export class IrcBridge {
}

public onEvent(request: BridgeRequestEvent) {
if (this.bridgeBlocker?.isBlocked) {
log.info("Bridge is blocked, dropping Matrix event");
return;
}
request.outcomeFrom(this._onEvent(request));
}

Expand Down Expand Up @@ -1297,11 +1341,15 @@ export class IrcBridge {
}

public async sendIrcAction(ircRoom: IrcRoom, bridgedClient: BridgedClient, action: IrcAction) {
if (this.bridgeBlocker?.isBlocked) {
log.info("Bridge is blocked, dropping IRC action");
return;
}
log.info(
"Sending IRC message in %s as %s (connected=%s)",
ircRoom.channel, bridgedClient.nick, Boolean(bridgedClient.status === BridgedClientStatus.CONNECTED)
);
return bridgedClient.sendAction(ircRoom, action);
await bridgedClient.sendAction(ircRoom, action);
}

public async getBotClient(server: IrcServer) {
Expand Down Expand Up @@ -1477,4 +1525,11 @@ export class IrcBridge {
const current = await this.dataStore.getRoomCount();
return current >= limit;
}

private onUserActivityChanged(userActivity: UserActivityState) {
for (const userId of userActivity.changed) {
this.getStore().storeUserActivity(userId, userActivity.dataSet.users[userId]);
}
this.bridgeBlocker?.checkLimits(userActivity.activeUsers);
}
}
5 changes: 5 additions & 0 deletions src/config/BridgeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ export interface BridgeConfig {
[userIdOrDomain: string]: "admin";
};
perRoomConfig?: RoomConfigConfig;
RMAUlimit?: number;
userActivity?: {
minUserActiveDays?: number;
inactiveAfterDays?: number;
};
};
sentry?: {
enabled: boolean;
Expand Down
10 changes: 9 additions & 1 deletion src/datastore/DataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

import { MatrixRoom, MatrixUser, RoomBridgeStoreEntry as Entry} from "matrix-appservice-bridge";
import {
MatrixRoom, MatrixUser,
RoomBridgeStoreEntry as Entry,
UserActivity, UserActivitySet
} from "matrix-appservice-bridge";
import { IrcRoom } from "../models/IrcRoom";
import { IrcClientConfig } from "../models/IrcClientConfig";
import { IrcServer, IrcServerConfig } from "../irc/IrcServer";
Expand Down Expand Up @@ -157,6 +161,10 @@ export interface DataStore {

storeUserFeatures(userId: string, features: UserFeatures): Promise<void>;

getUserActivity(): Promise<UserActivitySet>;

storeUserActivity(userId: string, activity: UserActivity): Promise<void>;

storePass(userId: string, domain: string, pass: string): Promise<void>;

removePass(userId: string, domain: string): Promise<void>;
Expand Down
17 changes: 15 additions & 2 deletions src/datastore/NedbDataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import { IrcRoom } from "../models/IrcRoom";
import { IrcClientConfig, IrcClientConfigSeralized } from "../models/IrcClientConfig"
import { getLogger } from "../logging";

import { MatrixRoom, MatrixUser, RemoteUser, RemoteRoom,
UserBridgeStore, RoomBridgeStore, RoomBridgeStoreEntry as Entry } from "matrix-appservice-bridge";
import {
MatrixRoom, MatrixUser, RemoteUser, RemoteRoom,
UserBridgeStore, UserActivityStore,
RoomBridgeStore, RoomBridgeStoreEntry as Entry,
UserActivity, UserActivitySet
} from "matrix-appservice-bridge";
import { DataStore, RoomOrigin, ChannelMappings, UserFeatures } from "./DataStore";
import { IrcServer, IrcServerConfig } from "../irc/IrcServer";
import { StringCrypto } from "./StringCrypto";
Expand All @@ -35,6 +39,7 @@ export class NeDBDataStore implements DataStore {
private cryptoStore?: StringCrypto;
constructor(
private userStore: UserBridgeStore,
private userActivityStore: UserActivityStore,
private roomStore: RoomBridgeStore,
private bridgeDomain: string,
pkeyPath?: string) {
Expand Down Expand Up @@ -586,6 +591,14 @@ export class NeDBDataStore implements DataStore {
await this.userStore.setMatrixUser(matrixUser);
}

public async getUserActivity(): Promise<UserActivitySet> {
return this.userActivityStore.getActivitySet();
}

public async storeUserActivity(userId: string, activity: UserActivity) {
this.userActivityStore.storeUserActivity(userId, activity);
}

public async storePass(userId: string, domain: string, pass: string) {
const config = await this.getIrcClientConfig(userId, domain);
if (!config) {
Expand Down
24 changes: 22 additions & 2 deletions src/datastore/postgres/PgDataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import {
MatrixRoom,
RemoteRoom,
RoomBridgeStoreEntry as Entry,
MatrixRoomData
MatrixRoomData,
UserActivitySet,
UserActivity,
} from "matrix-appservice-bridge";
import { DataStore, RoomOrigin, ChannelMappings, UserFeatures } from "../DataStore";
import { IrcRoom } from "../../models/IrcRoom";
Expand Down Expand Up @@ -52,7 +54,7 @@ interface RoomRecord {
export class PgDataStore implements DataStore {
private serverMappings: {[domain: string]: IrcServer} = {};

public static readonly LATEST_SCHEMA = 6;
public static readonly LATEST_SCHEMA = 7;
private pgPool: Pool;
private hasEnded = false;
private cryptoStore?: StringCrypto;
Expand Down Expand Up @@ -556,6 +558,24 @@ export class PgDataStore implements DataStore {
await this.pgPool.query(statement, [userId, JSON.stringify(features)]);
}

public async getUserActivity(): Promise<UserActivitySet> {
const res = await this.pgPool.query('SELECT * FROM user_activity');
const users: {[mxid: string]: UserActivity} = {};
for (const row of res.rows) {
users[row['user_id']] = row['data'];
}
return { users };
}

public async storeUserActivity(userId: string, activity: UserActivity) {
const stmt = PgDataStore.BuildUpsertStatement(
'user_activity',
'(user_id)',
['user_id', 'data'],
);
await this.pgPool.query(stmt, [userId, JSON.stringify(activity)]);
}

public async storePass(userId: string, domain: string, pass: string, encrypt = true): Promise<void> {
let password = pass;
if (encrypt) {
Expand Down
10 changes: 10 additions & 0 deletions src/datastore/postgres/schema/v7.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { PoolClient } from "pg";

export async function runSchema(connection: PoolClient) {
await connection.query(`
CREATE TABLE user_activity (
user_id TEXT UNIQUE,
data JSON
);
`);
}

0 comments on commit b54eadc

Please sign in to comment.