Skip to content
This repository was archived by the owner on Oct 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion athena/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// @flow
const debug = require('debug')('athena');
import createWorker from '../shared/bull/create-worker';
// Our job-processing worker server
import processMessageNotification from './queues/message-notification';
import processMessageNotification from './queues/new-message-in-thread';
import processMentionNotification from './queues/mention-notification';
import processDirectMessageNotification from './queues/direct-message-notification';
import processReactionNotification from './queues/reaction-notification';
Expand Down Expand Up @@ -57,6 +58,7 @@ const server = createWorker({

console.log(
`🗄 Queues open for business ${(process.env.NODE_ENV === 'production' &&
// $FlowIssue
`at ${process.env.COMPOSE_REDIS_URL}:${process.env.COMPOSE_REDIS_PORT}`) ||
'locally'}`
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// @flow
const debug = require('debug')('athena:send-message-notification-email');
import addQueue from '../utils/addQueue';
import { SEND_NEW_MESSAGE_EMAIL } from './constants';
import { getNotifications } from '../models/notification';
import groupReplies from '../utils/group-replies';
import getEmailStatus from '../utils/get-email-status';
const debug = require('debug')('athena::send-message-notification-email');
import addQueue from '../../utils/addQueue';
import { SEND_NEW_MESSAGE_EMAIL } from '../constants';
import { getNotifications } from '../../models/notification';
import groupReplies from './group-replies';
import getEmailStatus from '../../utils/get-email-status';

const IS_PROD = process.env.NODE_ENV === 'production';
// Change buffer in dev to 10 seconds vs 3 minutes in prod
Expand All @@ -13,69 +13,95 @@ const BUFFER = IS_PROD ? 180000 : 10000;
const MAX_WAIT = 600000;

// Called when the buffer time is over to actually send an email
const timedOut = recipient => {
const timedOut = async recipient => {
const threadsInScope = timeouts[recipient.email].threads;

// array of DBNotifications
const notificationsInScope = timeouts[recipient.email].notifications;

// Clear timeout buffer for this recipient
delete timeouts[recipient.email];
debug(
`send notification email for ${threadsInScope.length} threads to @${recipient.username} (${recipient.email})`
);

// Make sure we should be sending an email to this user
return getEmailStatus(recipient.userId, 'newMessageInThreads')
.then(shouldGetEmail => {
if (!shouldGetEmail) {
debug(`@${recipient.username} should not get email, aborting`);
return;
}
debug(`@${recipient.username} should get email, getting notifications`);
return getNotifications(
notificationsInScope.map(notification => notification.id)
);
})
.then(notifications => {
if (!notifications) return;
debug('notifications loaded, finding unseen threads');
const unseenThreadIds = notifications
.filter(notification => !notification.isSeen && !notification.isRead)
.map(notification => notification.context.id);
if (unseenThreadIds.length === 0) {
debug('aborting, no unseen threads');
return;
const shouldGetEmail = await getEmailStatus(
recipient.userId,
'newMessageInThreads'
);
if (!shouldGetEmail) {
debug(`@${recipient.username} should not get email, aborting`);
return;
}

debug(`@${recipient.username} should get email, getting notifications`);
const notifications = await getNotifications(
notificationsInScope.map(notification => notification.id)
);
if (!notifications || notifications.length === 0) {
debug('No notifications in scope');
return;
}

debug('notifications loaded, finding unseen threads');
const unseenThreadIds = notifications
.filter(notification => !notification.isSeen && !notification.isRead)
.map(notification => notification.context.id);

if (unseenThreadIds.length === 0) {
debug('aborting, no unseen threads');
return;
}
debug('filter unseen threads, merge replies');

// Convert threads to object, merge replies to same thread
const threads = threadsInScope
.filter(thread => unseenThreadIds.includes(thread.id))
.reduce((map, thread) => {
if (!map[thread.id]) {
map[thread.id] = thread;
return map;
}
debug('filter unseen threads, merge replies');
// Convert threads to object, merge replies to same thread
const threads = threadsInScope
.filter(thread => unseenThreadIds.includes(thread.id))
.reduce((map, thread) => {
if (!map[thread.id]) {
map[thread.id] = thread;
return map;
}
map[thread.id] = {
...map[thread.id],
replies: map[thread.id].replies.concat(thread.replies),
};
return map;
}, {});
debug('group replies');
// Group replies by sender, turn it back into an array
const threadsWithGroupedReplies = Object.keys(threads).map(threadId => ({
...threads[threadId],
replies: groupReplies(threads[threadId].replies),
}));
debug(`adding email for @${recipient.username} to queue`);
return addQueue(SEND_NEW_MESSAGE_EMAIL, {
to: recipient.email,
user: {
displayName: recipient.name,
username: recipient.username,
userId: recipient.userId,
},
threads: threadsWithGroupedReplies,
});
});
map[thread.id] = {
...map[thread.id],
replies: map[thread.id].replies.concat(thread.replies),
};
return map;
}, {});

debug('group replies');
// Group replies by sender, turn it back into an array
const threadKeys = Object.keys(threads);

const threadsWithGroupedRepliesPromises = threadKeys.map(async threadId => ({
...threads[threadId],
replies: await groupReplies(threads[threadId].replies),
repliesCount: threads[threadId].replies.length,
}));

const threadsWithGroupedReplies = await Promise.all([
...threadsWithGroupedRepliesPromises,
]).catch(err => console.log('error grouping threads and replies', err));

const filteredThreadsWithGroupedReplies =
threadsWithGroupedReplies &&
threadsWithGroupedReplies.length > 0 &&
threadsWithGroupedReplies.filter(thread => thread.replies.length > 0);

// this would happen if someone sends a message in a thread then deletes that message
if (
filteredThreadsWithGroupedReplies &&
filteredThreadsWithGroupedReplies.length === 0
) {
debug('no threads with at least one reply');
return;
}

debug(`adding email for @${recipient.username} to queue`);
return addQueue(SEND_NEW_MESSAGE_EMAIL, {
recipient,
threads: filteredThreadsWithGroupedReplies,
});
};

type Timeouts = {
Expand Down Expand Up @@ -122,11 +148,11 @@ const bufferMessageNotificationEmail = (
threads: [thread],
notifications: [notification],
};

// If we already have a timeout going
} else {
// If we already have a timeout going
debug(`timeout exists for ${recipient.email}, clearing`);
clearTimeout(timeouts[recipient.email].timeout);

debug(`adding new thread to ${recipient.email}'s threads`);
timeouts[recipient.email].threads.push(thread);
timeouts[recipient.email].notifications.push(notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
const debug = require('debug')(
'athena:queue:format-and-buffer-notification-email'
);
import { getCommunityById } from '../../models/community';
import { getChannelById } from '../../models/channel';
import { toPlainText, toState } from 'shared/draft-utils';
import bufferNotificationEmail from '../queues/buffer-message-notification-email';
import bufferNotificationEmail from './buffer-email';

type UserType = {
id: string,
profilePhoto: string,
name: string,
username: string,
};
type MessageType = {
id: string,
content: {
body: string,
},
Expand All @@ -22,9 +26,13 @@ type RecipientType = {
name: string,
};
type NotificationType = {};
type ThreadType = {};
type ThreadType = {
id: string,
channelId: string,
communityId: string,
};

export const formatAndBufferNotificationEmail = (
export default async (
recipient: RecipientType,
thread: ThreadType,
user: UserType,
Expand All @@ -43,16 +51,22 @@ export const formatAndBufferNotificationEmail = (
return Promise.resolve();
}

const { communityId, channelId, ...restOfThread } = thread;

return bufferNotificationEmail(
recipient,
{
...thread,
...restOfThread,
community: await getCommunityById(communityId),
channel: await getChannelById(channelId),
replies: [
{
id: message.id,
sender: {
id: user.id,
profilePhoto: user.profilePhoto,
name: user.name,
username: user.username,
},
content: {
body:
Expand All @@ -66,5 +80,3 @@ export const formatAndBufferNotificationEmail = (
notification
);
};

export default formatAndBufferNotificationEmail;
51 changes: 51 additions & 0 deletions athena/queues/new-message-in-thread/group-replies.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { getMessageById } from '../../models/message';

export default async replies => {
let newReplies = [];

const replyPromises = replies.map(
async reply => await getMessageById(reply.id)
);

// get all the messages for this thread
const messageRecords = await Promise.all(replyPromises).catch(err =>
console.log('error getting reply promises', err)
);

// filter deleted ones and sort them by recency
const filteredMessageRecords = messageRecords
.filter(message => !message.deletedAt)
.sort((a, b) => a.timestamp > b.timestamp);

const filteredPromises = filteredMessageRecords.map((message, index) => {
const reply = replies.filter(r => r.id === message.id)[0];
const body =
message.messageType === 'media'
? `<p class='reply-img-container'><img class='reply-img' src='${reply
.content.body}?w=600&dpr=2' /></p>`
: `<p class='reply'>${reply.content.body}</p>`;

const newGroup = {
...reply,
content: {
body,
},
};

if (index === 0) return newReplies.push(newGroup);
if (
newReplies[newReplies.length - 1] &&
newReplies[newReplies.length - 1].sender.id === message.senderId
) {
newReplies[newReplies.length - 1].content.body += body;
return;
} else {
newReplies.push(newGroup);
return;
}
});

return await Promise.all([filteredPromises])
.then(() => newReplies)
.catch(err => console.log('error getting filteredPromises', err));
};
Loading