Skip to content

Commit

Permalink
updates to message auditing
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Sep 29, 2019
1 parent 389d08e commit 83bab19
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 22 deletions.
12 changes: 11 additions & 1 deletion api.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const UserHandler = require('./lib/user-handler');
const MailboxHandler = require('./lib/mailbox-handler');
const MessageHandler = require('./lib/message-handler');
const StorageHandler = require('./lib/storage-handler');
const AuditHandler = require('./lib/audit-handler');
const ImapNotifier = require('./lib/imap-notifier');
const db = require('./lib/db');
const certs = require('./lib/certs');
Expand Down Expand Up @@ -41,6 +42,7 @@ let userHandler;
let mailboxHandler;
let messageHandler;
let storageHandler;
let auditHandler;
let notifier;
let loggelf;

Expand Down Expand Up @@ -464,6 +466,14 @@ module.exports = done => {
loggelf: message => loggelf(message)
});

auditHandler = new AuditHandler({
database: db.database,
users: db.users,
gridfs: db.gridfs,
bucket: 'audit',
loggelf: message => loggelf(message)
});

server.loggelf = message => loggelf(message);

usersRoutes(db, server, userHandler);
Expand All @@ -480,7 +490,7 @@ module.exports = done => {
authRoutes(db, server, userHandler);
autoreplyRoutes(db, server);
submitRoutes(db, server, messageHandler, userHandler);
auditRoutes(db, server);
auditRoutes(db, server, auditHandler);
domainaliasRoutes(db, server);
dkimRoutes(db, server);

Expand Down
12 changes: 2 additions & 10 deletions lib/api/audit.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const tools = require('../tools');
const roles = require('../roles');
const ObjectID = require('mongodb').ObjectID;

module.exports = (db, server) => {
module.exports = (db, server, auditHandler) => {
/**
* @api {post} /audit Create new audit
* @apiName PostAudit
Expand Down Expand Up @@ -88,19 +88,11 @@ module.exports = (db, server) => {
// permissions check
req.validate(roles.can(req.role).updateAny('audit'));

let audit = new ObjectID();
let user = new ObjectID(result.value.user);
let start = result.value.start;
let end = result.value.end;

let now = new Date();
await db.database.collection('tasks').insertOne({
task: 'audit',
locked: false,
lockedUntil: now,
created: now,
status: 'queued',
audit,
let audit = await auditHandler.create({
user,
start,
end
Expand Down
109 changes: 103 additions & 6 deletions lib/audit-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class AuditHandler {
this.options = options || {};

this.database = options.database;
this.users = options.user || options.database;
this.gridfs = options.gridfs || options.database;

this.loggelf = options.loggelf || (() => false);
Expand All @@ -20,6 +21,92 @@ class AuditHandler {
});
}

async create(options) {
options = options || {};

if (!options.user || !ObjectID.isValid(options.user)) {
let err = new Error('Missing user ID');
err.code = 'InputValidationError';
throw err;
}

let auditData = {
user: typeof options.user === 'string' ? new ObjectID(options.user) : options.user,
start: options.start, // Date or null
end: options.end, // Date or null
'import.status': 'queued'
};

let r = await this.database.collection('audits').insertOne(auditData);
if (!r.insertedId) {
let err = new Error();
err.code = 'InternalDatabaseError';
throw err;
}

auditData._id = r.insertedId;

try {
// NB! this user might not exist anymore, so do not check if any users were updated or not
await this.users.collection('users').updateOne(
{
_id: auditData.user
},
{
$addToSet: {
audit: auditData._id
}
}
);
} catch (err) {
// try to rollback
err.code = err.code = 'InternalDatabaseError';

try {
await this.database.collection('audits').deleteOne({ _id: auditData._id });
} catch (e) {
// ignore
}

throw err;
}

try {
let now = new Date();
await this.database.collection('tasks').insertOne({
task: 'audit',
locked: false,
lockedUntil: now,
created: now,
status: 'queued',
audit: auditData._id,
user: auditData.user,
start: auditData.start,
end: auditData.end
});
} catch (err) {
// try to rollback
err.code = err.code = 'InternalDatabaseError';

try {
await this.database.collection('audits').deleteOne({ _id: auditData._id });
} catch (e) {
// ignore
}

throw err;
}

return auditData._id;
}

/**
* Store message to audit GridFS
*
* @param {ObjectID} audit ID of the audit session
* @param {Mixed} message Either a Buffer, an Array of Buffers or a Stream
* @param {Object} metadata Metadata for the stored message
*/
async store(audit, message, metadata) {
if (!message) {
throw new Error('Missing message content');
Expand All @@ -36,10 +123,6 @@ class AuditHandler {
metadata.date = metadata.date || new Date();

return new Promise((resolve, reject) => {
if (!Buffer.isBuffer(message) && typeof message.pipe !== 'function') {
return reject(new Error('Invalid message content'));
}

let stream = this.gridstore.openUploadStreamWithId(id, null, {
contentType: 'message/rfc822',
metadata
Expand All @@ -48,8 +131,22 @@ class AuditHandler {
stream.once('finish', () => resolve(id));

if (Buffer.isBuffer(message)) {
// store as a buffer
return stream.end(message);
message = [message];
}

let writeChunks = async () => {
// write chunk by chunk
for (let chunk of message) {
if (stream.write(chunk) === false) {
await new Promise(resolve => {
stream.once('drain', resolve());
});
}
}
};

if (Array.isArray(message)) {
return writeChunks().catch(err => reject(err));
}

message.on('error', err => {
Expand Down
42 changes: 37 additions & 5 deletions lib/tasks/audit.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,26 @@ let run = async (taskData, options) => {
}

let processMessage = async messageData => {
console.log(messageData);

let builder = messageHandler.indexer.rebuild(messageData.mimeTree);
if (!builder || builder.type !== 'stream' || !builder.value) {
return false;
}

let auditMessage = await auditHandler.store(taskData.audit, builder.value, {});
let auditMessage = await auditHandler.store(taskData.audit, builder.value, {
date: messageData.idate,
msgid: messageData.msgid,
header: messageData.mimeTree && messageData.mimeTree.parsedHeader,
ha: messageData.ha,
info: messageData.meta
});

return auditMessage;
};

let copied = 0;
let failed = 0;
let status = 'imported'; //expect to complete successfully

let processMessages = async collection => {
let cursor = await db.users.collection(collection).find(query, {
projection: {
Expand All @@ -65,6 +74,7 @@ let run = async (taskData, options) => {
messageData._id,
auditMessage
);
copied++;
} catch (err) {
log.error(
'Tasks',
Expand All @@ -75,6 +85,7 @@ let run = async (taskData, options) => {
'Failed to process message',
err.message
);
failed++;
}
}
await cursor.close();
Expand All @@ -88,13 +99,34 @@ let run = async (taskData, options) => {
'Failed to fetch stored messages',
err.message
);

err.code = 'InternalDatabaseError';
throw err;
}
};

await processMessages('messages');
await processMessages('archive');
try {
await processMessages('messages');
} catch (err) {
status = 'import failed';
}

try {
await processMessages('archive');
} catch (err) {
status = 'import failed';
}

await db.database.collection('audits').updateOne(
{ _id: taskData.audit },
{
$set: {
'import.status': status,
'import.copied': copied,
'import.failed': failed
}
}
);

log.verbose('Tasks', 'task=audit id=%s user=%s message=%s', taskData._id, taskData.user, `Copied user messages for auditing`);
return true;
Expand Down
1 change: 1 addition & 0 deletions tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ module.exports.start = callback => {

auditHandler = new AuditHandler({
database: db.database,
users: db.users,
gridfs: db.gridfs,
bucket: 'audit',
loggelf: message => loggelf(message)
Expand Down

0 comments on commit 83bab19

Please sign in to comment.