Skip to content

Commit

Permalink
Refactor send story job
Browse files Browse the repository at this point in the history
Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
  • Loading branch information
automated-signal and indutny-signal committed Nov 29, 2022
1 parent 20156a2 commit 6ad8b21
Showing 1 changed file with 60 additions and 66 deletions.
126 changes: 60 additions & 66 deletions ts/jobs/helpers/sendStory.ts
Expand Up @@ -27,7 +27,7 @@ import type { UUIDStringType } from '../../types/UUID';
import * as Errors from '../../types/errors';
import dataInterface from '../../sql/Client';
import { SignalService as Proto } from '../../protobuf';
import { getMessageById } from '../../messages/getMessageById';
import { getMessagesById } from '../../messages/getMessagesById';
import {
getSendOptions,
getSendOptionsForRecipients,
Expand Down Expand Up @@ -70,36 +70,68 @@ export async function sendStory(
return;
}

// We want to generate the StoryMessage proto once at the top level so we
// can reuse it but first we'll need textAttachment | fileAttachment.
// This function pulls off the attachment and generates the proto from the
// first message on the list prior to continuing.
const originalStoryMessage = await (async (): Promise<
Proto.StoryMessage | undefined
> => {
const [messageId] = messageIds;
const message = await getMessageById(messageId);
if (!message) {
log.info(
`stories.sendStory(${messageId}): message was not found, maybe because it was deleted. Giving up on sending it`
);
return;
}
const notFound = new Set(messageIds);
const messages = (await getMessagesById(messageIds)).filter(message => {
notFound.delete(message.id);

const distributionId = message.get('storyDistributionListId');
const logId = `stories.sendStory(${timestamp}/${distributionId})`;

const messageConversation = message.getConversation();
if (messageConversation !== conversation) {
log.error(
`stories.sendStory(${timestamp}): Message conversation '${messageConversation?.idForLogging()}' does not match job conversation ${conversation.idForLogging()}`
`${logId}: Message conversation ` +
`'${messageConversation?.idForLogging()}' does not match job ` +
`conversation ${conversation.idForLogging()}`
);
return false;
}

if (message.get('timestamp') !== timestamp) {
log.error(
`${logId}: Message timestamp ${message.get(
'timestamp'
)} does not match job timestamp`
);
return false;
}

if (message.isErased() || message.get('deletedForEveryone')) {
log.info(`${logId}: message was erased. Giving up on sending it`);
return false;
}

return true;
});

for (const messageId of notFound) {
log.info(
`stories.sendStory(${messageId}): message was not found, ` +
'maybe because it was deleted. Giving up on sending it'
);
}

// We want to generate the StoryMessage proto once at the top level so we
// can reuse it but first we'll need textAttachment | fileAttachment.
// This function pulls off the attachment and generates the proto from the
// first message on the list prior to continuing.
let originalStoryMessage: Proto.StoryMessage;
{
const [originalMessageId] = messageIds;
const originalMessage = messages.find(
message => message.id === originalMessageId
);
if (!originalMessage) {
return;
}

const attachments = message.get('attachments') || [];
const attachments = originalMessage.get('attachments') || [];
const [attachment] = attachments;

if (!attachment) {
log.info(
`stories.sendStory(${timestamp}): message does not have any attachments to send. Giving up on sending it`
`stories.sendStory(${timestamp}): original story message does not ` +
'have any attachments to send. Giving up on sending it'
);
return;
}
Expand All @@ -122,17 +154,13 @@ export async function sendStory(
// Some distribution lists need allowsReplies false, some need it set to true
// we create this proto (for the sync message) and also to re-use some of the
// attributes inside it.
return messaging.getStoryMessage({
originalStoryMessage = await messaging.getStoryMessage({
allowsReplies: true,
fileAttachment,
groupV2,
textAttachment,
profileKey,
});
})();

if (!originalStoryMessage) {
return;
}

const canReplyUuids = new Set<string>();
Expand Down Expand Up @@ -171,43 +199,13 @@ export async function sendStory(
// complete, and so we can send a sync message afterwards if we sent the story
// successfully to at least one recipient.
const sendResults = await Promise.allSettled(
messageIds.map(async (messageId: string): Promise<void> => {
const message = await getMessageById(messageId);
if (!message) {
log.info(
`stories.sendStory(${messageId}): message was not found, maybe because it was deleted. Giving up on sending it`
);
return;
}

messages.map(async (message: MessageModel): Promise<void> => {
const distributionId = message.get('storyDistributionListId');
const logId = `stories.sendStory(${timestamp}/${distributionId})`;

if (message.get('timestamp') !== timestamp) {
log.error(
`${logId}: Message timestamp ${message.get(
'timestamp'
)} does not match job timestamp`
);
return;
}

const messageConversation = message.getConversation();
if (messageConversation !== conversation) {
log.error(
`${logId}: Message conversation '${messageConversation?.idForLogging()}' does not match job conversation ${conversation.idForLogging()}`
);
return;
}

if (message.isErased() || message.get('deletedForEveryone')) {
log.info(`${logId}: message was erased. Giving up on sending it`);
return;
}

const listId = message.get('storyDistributionListId');
const receiverId = isGroupV2(messageConversation.attributes)
? messageConversation.id
const receiverId = isGroupV2(conversation.attributes)
? conversation.id
: listId;

if (!receiverId) {
Expand All @@ -217,7 +215,7 @@ export async function sendStory(
return;
}

const distributionList = isGroupV2(messageConversation.attributes)
const distributionList = isGroupV2(conversation.attributes)
? undefined
: await dataInterface.getStoryDistributionWithMembers(receiverId);

Expand Down Expand Up @@ -302,7 +300,7 @@ export async function sendStory(
storyMessage.textAttachment = originalStoryMessage.textAttachment;
storyMessage.group = originalStoryMessage.group;
storyMessage.allowsReplies =
isGroupV2(messageConversation.attributes) ||
isGroupV2(conversation.attributes) ||
Boolean(distributionList?.allowsReplies);

let inMemorySenderKeyInfo = distributionList?.senderKeyInfo;
Expand Down Expand Up @@ -347,11 +345,12 @@ export async function sendStory(
});

// Don't send normal sync messages; a story sync is sent at the end of the process
// eslint-disable-next-line no-param-reassign
message.doNotSendSyncMessage = true;

const messageSendPromise = message.send(
handleMessageSend(innerPromise, {
messageIds: [messageId],
messageIds: [message.id],
sendType: 'story',
}),
saveErrors
Expand Down Expand Up @@ -456,12 +455,7 @@ export async function sendStory(
// messages but we still want to make sure that the sendStateByConversationId
// is kept in sync across all messages.
await Promise.all(
messageIds.map(async messageId => {
const message = await getMessageById(messageId);
if (!message) {
return;
}

messages.map(async message => {
const oldSendStateByConversationId =
message.get('sendStateByConversationId') || {};

Expand Down

0 comments on commit 6ad8b21

Please sign in to comment.