Skip to content

Commit

Permalink
messageStorage: convert to async
Browse files Browse the repository at this point in the history
Message stores are more complicated that a sync "fire and forget"
API allows for.
For starters, non trivial stores (say sqlite) can fail during init
and we want to be able to catch that.
Second, we really need to be able to run migrations and such, which
may block (and fail) the activation of the store.

On the plus side, this pushes error handling to the caller rather
than the stores, which is a good thing as that allows us to eventually
push this to the client in the UI, rather than just logging it in the
server on stdout
  • Loading branch information
brunnre8 committed Nov 1, 2022
1 parent f068fd4 commit d62dd3e
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 187 deletions.
6 changes: 3 additions & 3 deletions server/client.ts
Expand Up @@ -147,7 +147,7 @@ class Client {
}

for (const messageStorage of client.messageStorage) {
messageStorage.enable();
messageStorage.enable().catch((e) => log.error(e));
}
}

Expand Down Expand Up @@ -614,7 +614,7 @@ class Client {
}

for (const messageStorage of this.messageStorage) {
messageStorage.deleteChannel(target.network, target.chan);
messageStorage.deleteChannel(target.network, target.chan).catch((e) => log.error(e));
}
}

Expand Down Expand Up @@ -767,7 +767,7 @@ class Client {
});

for (const messageStorage of this.messageStorage) {
messageStorage.close();
messageStorage.close().catch((e) => log.error(e));
}
}

Expand Down
15 changes: 15 additions & 0 deletions server/helper.ts
Expand Up @@ -23,6 +23,7 @@ const Helper = {
parseHostmask,
compareHostmask,
compareWithWildcard,
catch_to_error,

password: {
hash: passwordHash,
Expand Down Expand Up @@ -183,3 +184,17 @@ function compareWithWildcard(a: string, b: string) {
const re = new RegExp(`^${user_regex}$`, "i"); // case insensitive
return re.test(b);
}

function catch_to_error(prefix: string, err: any): Error {
let msg: string;

if (err instanceof Error) {
msg = err.message;
} else if (typeof err === "string") {
msg = err;
} else {
msg = err.toString();
}

return new Error(`${prefix}: ${msg}`);
}
2 changes: 1 addition & 1 deletion server/models/chan.ts
Expand Up @@ -260,7 +260,7 @@ class Chan {
}

for (const messageStorage of client.messageStorage) {
messageStorage.index(target.network, targetChannel, msg);
messageStorage.index(target.network, targetChannel, msg).catch((e) => log.error(e));
}
}
loadMessages(client: Client, network: Network) {
Expand Down
81 changes: 37 additions & 44 deletions server/plugins/messageStorage/sqlite.ts
Expand Up @@ -2,11 +2,12 @@ import type {Database} from "sqlite3";

import log from "../../log";
import path from "path";
import fs from "fs";
import fs from "fs/promises";
import Config from "../../config";
import Msg, {Message} from "../../models/msg";
import Client from "../../client";
import Chan, {Channel} from "../../models/chan";
import Helper from "../../helper";
import type {
SearchResponse,
SearchQuery,
Expand Down Expand Up @@ -47,26 +48,26 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
this.isEnabled = false;
}

enable() {
async enable() {
const logsPath = Config.getUserLogsPath();
const sqlitePath = path.join(logsPath, `${this.client.name}.sqlite3`);

try {
fs.mkdirSync(logsPath, {recursive: true});
} catch (e: any) {
log.error("Unable to create logs directory", String(e));

return;
await fs.mkdir(logsPath, {recursive: true});
} catch (e) {
throw Helper.catch_to_error("Unable to create logs directory", e);
}

this.isEnabled = true;

this.database = new sqlite3.Database(sqlitePath);

this.run_migrations().catch((err) => {
log.error("Migration failed", String(err));
try {
await this.run_migrations();
} catch (e) {
this.isEnabled = false;
});
throw Helper.catch_to_error("Migration failed", e);
}
}

async run_migrations() {
Expand Down Expand Up @@ -106,25 +107,26 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
]);
}

close(callback?: (error?: Error | null) => void) {
async close() {
if (!this.isEnabled) {
return;
}

this.isEnabled = false;

this.database.close((err) => {
if (err) {
log.error(`Failed to close sqlite database: ${err.message}`);
}
return new Promise<void>((resolve, reject) => {
this.database.close((err) => {
if (err) {
reject(`Failed to close sqlite database: ${err.message}`);
return;
}

if (callback) {
callback(err);
}
resolve();
});
});
}

index(network: Network, channel: Chan, msg: Msg) {
async index(network: Network, channel: Chan, msg: Msg) {
if (!this.isEnabled) {
return;
}
Expand All @@ -140,26 +142,27 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
return newMsg;
}, {});

this.run(
await this.serialize_run(
"INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)",
network.uuid,
channel.name.toLowerCase(),
msg.time.getTime(),
msg.type,
JSON.stringify(clonedMsg)
[
network.uuid,
channel.name.toLowerCase(),
msg.time.getTime(),
msg.type,
JSON.stringify(clonedMsg),
]
);
}

deleteChannel(network: Network, channel: Channel) {
async deleteChannel(network: Network, channel: Channel) {
if (!this.isEnabled) {
return;
}

this.run(
"DELETE FROM messages WHERE network = ? AND channel = ?",
await this.serialize_run("DELETE FROM messages WHERE network = ? AND channel = ?", [
network.uuid,
channel.name.toLowerCase()
);
channel.name.toLowerCase(),
]);
}

/**
Expand All @@ -170,7 +173,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
*/
async getMessages(network: Network, channel: Channel): Promise<Message[]> {
if (!this.isEnabled || Config.values.maxHistory === 0) {
return Promise.resolve([]);
return [];
}

// If unlimited history is specified, load 100k messages
Expand All @@ -183,7 +186,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
limit
);

return rows.reverse().map((row: any) => {
return rows.reverse().map((row: any): Message => {
const msg = JSON.parse(row.msg);
msg.time = row.time;
msg.type = row.type;
Expand All @@ -192,7 +195,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
newMsg.id = this.client.idMsg++;

return newMsg;
}) as Message[];
});
}

async search(query: SearchQuery): Promise<SearchResponse> {
Expand Down Expand Up @@ -243,17 +246,10 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
return this.isEnabled;
}

private run(stmt: string, ...params: any[]) {
this.serialize_run(stmt, params).catch((err) =>
log.error(`failed to run ${stmt}`, String(err))
);
}

private serialize_run(stmt: string, params: any[]): Promise<void> {
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.run(stmt, params, (err) => {

if (err) {
reject(err);
return;
Expand All @@ -265,7 +261,7 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
});
}

private serialize_fetchall(stmt: string, ...params: any[]): Promise<any> {
private serialize_fetchall(stmt: string, ...params: any[]): Promise<any[]> {
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.all(stmt, params, (err, rows) => {
Expand All @@ -281,12 +277,9 @@ class SqliteMessageStorage implements ISqliteMessageStorage {
}

private serialize_get(stmt: string, ...params: any[]): Promise<any> {
const log_id = this.stmt_id();
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.get(stmt, params, (err, row) => {
log.debug(log_id, "callback", stmt);

if (err) {
reject(err);
return;
Expand Down
59 changes: 19 additions & 40 deletions server/plugins/messageStorage/text.ts
@@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import fs from "fs";
import fs from "fs/promises";
import path from "path";
import filenamify from "filenamify";

import log from "../../log";
import Config from "../../config";
import {MessageStorage} from "./types";
import Client from "../../client";
Expand All @@ -20,19 +19,17 @@ class TextFileMessageStorage implements MessageStorage {
this.isEnabled = false;
}

enable() {
// eslint-disable-next-line @typescript-eslint/require-await
async enable() {
this.isEnabled = true;
}

close(callback: () => void) {
// eslint-disable-next-line @typescript-eslint/require-await
async close() {
this.isEnabled = false;

if (callback) {
callback();
}
}

index(network: Network, channel: Channel, msg: Message) {
async index(network: Network, channel: Channel, msg: Message) {
if (!this.isEnabled) {
return;
}
Expand All @@ -44,10 +41,9 @@ class TextFileMessageStorage implements MessageStorage {
);

try {
fs.mkdirSync(logPath, {recursive: true});
} catch (e: any) {
log.error("Unable to create logs directory", String(e));
return;
await fs.mkdir(logPath, {recursive: true});
} catch (e) {
throw new Error(`Unable to create logs directory: ${e}`);
}

let line = `[${msg.time.toISOString()}] `;
Expand Down Expand Up @@ -106,35 +102,18 @@ class TextFileMessageStorage implements MessageStorage {

line += "\n";

fs.appendFile(
path.join(logPath, TextFileMessageStorage.getChannelFileName(channel)),
line,
(e) => {
if (e) {
log.error("Failed to write user log", e.message);
}
}
);
}

deleteChannel() {
/* TODO: Truncating text logs is disabled, until we figure out some UI for it
if (!this.isEnabled) {
return;
try {
await fs.appendFile(
path.join(logPath, TextFileMessageStorage.getChannelFileName(channel)),
line
);
} catch (e) {
throw new Error(`Failed to write user log: ${e}`);
}
}

const logPath = path.join(
Config.getUserLogsPath(),
this.client.name,
TextFileMessageStorage.getNetworkFolderName(network),
TextFileMessageStorage.getChannelFileName(channel)
);
fs.truncate(logPath, 0, (e) => {
if (e) {
log.error("Failed to truncate user log", e);
}
});*/
async deleteChannel() {
// Not implemented for text log files
}

getMessages() {
Expand Down
13 changes: 6 additions & 7 deletions server/plugins/messageStorage/types.d.ts
Expand Up @@ -9,13 +9,13 @@ interface MessageStorage {
client: Client;
isEnabled: boolean;

enable(): void;
enable(): Promise<void>;

close(callback?: () => void): void;
close(): Promise<void>;

index(network: Network, channel: Channel, msg: Message): void;
index(network: Network, channel: Channel, msg: Message): Promise<void>;

deleteChannel(network: Network, channel: Channel);
deleteChannel(network: Network, channel: Channel): Promise<void>;

getMessages(network: Network, channel: Channel): Promise<Message[]>;

Expand All @@ -30,12 +30,11 @@ export type SearchQuery = {
};

export type SearchResponse =
| (Omit<SearchQuery, "channelName" | "offset"> & {
| Omit<SearchQuery, "channelName" | "offset"> & {
results: Message[];
target: string;
offset: number;
})
| [];
};

type SearchFunction = (query: SearchQuery) => Promise<SearchResponse>;

Expand Down

0 comments on commit d62dd3e

Please sign in to comment.