Skip to content

Commit

Permalink
missed-emails-sending: Move email sending to separate queue.
Browse files Browse the repository at this point in the history
- Add new 'missedmessage_email_senders' queue for sending missed messages emails.
- Add the new worker to process 'missedmessage_email_senders' queue.
- Split aggregation missed messages and sending missed messages email
  to separate queue workers.
- Adapt tests for sending missed emails to the new logic.

Fixes #2607
  • Loading branch information
kkanahin committed Feb 13, 2017
1 parent 96f044c commit 9495622
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 18 deletions.
30 changes: 20 additions & 10 deletions zerver/lib/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.template import loader
from django.utils import timezone
from zerver.decorator import statsd_increment, uses_mandrill
from zerver.lib.queue import queue_json_publish
from zerver.models import (
Recipient,
ScheduledJob,
Expand Down Expand Up @@ -298,10 +299,10 @@ def handle_missedmessage_emails(user_profile_id, missed_email_events):
if not receives_offline_notifications(user_profile):
return

messages = [um.message for um in UserMessage.objects.filter(user_profile=user_profile,
message__id__in=message_ids,
flags=~UserMessage.flags.read)]
if not messages:
messages = Message.objects.filter(usermessage__user_profile_id=user_profile,
id__in=message_ids,
usermessage__flags=~UserMessage.flags.read)
if not messages.exists():
return

messages_by_recipient_subject = defaultdict(list) # type: Dict[Tuple[int, Text], List[Message]]
Expand All @@ -320,12 +321,21 @@ def handle_missedmessage_emails(user_profile_id, missed_email_events):

# Send an email per recipient subject pair
for recipient_subject, msg_list in messages_by_recipient_subject.items():
unique_messages = {m.id: m for m in msg_list}
do_send_missedmessage_events_reply_in_zulip(
user_profile,
list(unique_messages.values()),
message_count_by_recipient_subject[recipient_subject],
)
unique_messages_ids = {m.id for m in msg_list}
messages_to_send = {
'user_profile_id': user_profile_id,
'unique_messages_ids': unique_messages_ids,
'message_count': message_count_by_recipient_subject[recipient_subject]
}
queue_json_publish("missedmessage_email_senders", messages_to_send,
lambda messages_to_send: None)

def send_missedmessage_emails(data):
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(data['user_profile_id'])
missed_messages = Message.objects.filter(id__in=data['unique_messages_ids'])
message_count = data['message_count']
do_send_missedmessage_events_reply_in_zulip(user_profile, list(missed_messages), message_count)

@uses_mandrill
def clear_followup_emails_queue(email, mail_client=None):
Expand Down
2 changes: 1 addition & 1 deletion zerver/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ def pre_save_message(sender, **kwargs):
message.update_calculated_fields()

def get_context_for_message(message):
# type: (Message) -> Sequence[Message]
# type: (Message) -> QuerySet[Message]
# TODO: Change return type to QuerySet[Message]
return Message.objects.filter(
recipient_id=message.recipient_id,
Expand Down
36 changes: 30 additions & 6 deletions zerver/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from __future__ import absolute_import
from __future__ import print_function

from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, TypeVar, Text
from typing import (Any, Callable, Dict, Iterable, List, Mapping,
Optional, Tuple, TypeVar, Text, Union)
from mock import patch, MagicMock

from django.http import HttpResponse
Expand All @@ -23,7 +24,7 @@
from zerver.models import UserProfile, Recipient, \
Realm, RealmAlias, UserActivity, \
get_user_profile_by_email, get_realm, get_client, get_stream, \
Message, get_unique_open_realm, completely_open
Message, get_unique_open_realm, completely_open, get_context_for_message

from zerver.lib.avatar import get_avatar_url
from zerver.lib.initial_password import initial_password
Expand All @@ -33,7 +34,8 @@
do_change_is_admin, extract_recipients, \
do_set_realm_name, do_deactivate_realm, \
do_change_stream_invite_only
from zerver.lib.notifications import handle_missedmessage_emails
from zerver.lib.notifications import handle_missedmessage_emails, \
send_missedmessage_emails
from zerver.lib.session_user import get_session_dict_user
from zerver.middleware import is_slow_query
from zerver.lib.avatar import avatar_url
Expand Down Expand Up @@ -2220,12 +2222,34 @@ def _get_tokens(self):
# type: () -> List[str]
return [str(random.getrandbits(32)) for _ in range(30)]

def _test_cases(self, tokens, msg_id, body, send_as_user):
# type: (List[str], int, str, bool) -> None
def _get_msgs_ids_with_context(self, msg_id):
# type: (int) -> List[int]
msgs_ids = [msg_id]
message = Message.objects.get(id=msg_id)
context_msgs_ids = get_context_for_message(message).values_list('id', flat=True)
msgs_ids.extend(context_msgs_ids)
return msgs_ids

@patch('zerver.lib.notifications.queue_json_publish')
def _test_cases(self, tokens, msg_id, body, send_as_user, mock_queue_json_publish):
# type: (List[str], int, str, bool, MagicMock) -> None
othello = get_user_profile_by_email('othello@zulip.com')
hamlet = get_user_profile_by_email('hamlet@zulip.com')
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
msgs_ids = self._get_msgs_ids_with_context(msg_id)
messages_to_send_data = {
'message_count': 1,
'user_profile_id': hamlet.id,
'unique_messages_ids': set(msgs_ids)
}

def check_queue_json_publish(queue_name, data, processor):
# type: (str, Union[Mapping[str, Any], str], Callable[[Any], None]) -> None
self.assertEqual(queue_name, "missedmessage_email_senders")
self.assertEqual(data, messages_to_send_data)

mock_queue_json_publish.side_effect = check_queue_json_publish
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
send_missedmessage_emails(messages_to_send_data)
msg = mail.outbox[0]
reply_to_addresses = [settings.EMAIL_GATEWAY_PATTERN % (u'mm' + t) for t in tokens]
sender = 'Zulip <{}>'.format(settings.NOREPLY_EMAIL_ADDRESS)
Expand Down
9 changes: 8 additions & 1 deletion zerver/worker/queue_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from zerver.lib.queue import SimpleQueueClient, queue_json_publish
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails, \
clear_followup_emails_queue, send_local_email_template_with_delay
clear_followup_emails_queue, send_local_email_template_with_delay, \
send_missedmessage_emails
from zerver.lib.actions import do_send_confirmation_email, \
do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \
internal_send_message, check_send_message, extract_recipients, \
Expand Down Expand Up @@ -208,6 +209,12 @@ def start(self):
# of messages
time.sleep(2 * 60)

@assign_queue('missedmessage_email_senders')
class MissedMessageSendingWorker(QueueProcessingWorker):
def consume(self, data):
# type: (Mapping[str, Any]) -> None
send_missedmessage_emails(data)

@assign_queue('missedmessage_mobile_notifications')
class PushNotificationsWorker(QueueProcessingWorker):
def consume(self, data):
Expand Down

0 comments on commit 9495622

Please sign in to comment.