Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: Add basic downgradeable event queue support. #12926

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions zerver/lib/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ def aggregate_unread_data(raw_data: RawUnreadMessagesResult) -> UnreadMessagesRe
def apply_unread_message_event(user_profile: UserProfile,
state: RawUnreadMessagesResult,
message: Dict[str, Any],
flags: List[str]) -> None:
flags: List[str],
skip_database: bool=False) -> None:
message_id = message['id']
if message['type'] == 'stream':
message_type = 'stream'
Expand Down Expand Up @@ -925,7 +926,7 @@ def apply_unread_message_event(user_profile: UserProfile,
)
state['stream_dict'][message_id] = new_row

if stream_id not in state['muted_stream_ids']:
if not skip_database and stream_id not in state['muted_stream_ids']:
# This next check hits the database.
if not topic_is_muted(user_profile, stream_id, topic):
state['unmuted_stream_msgs'].add(message_id)
Expand Down
146 changes: 142 additions & 4 deletions zerver/tests/test_event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import ujson

from django.http import HttpRequest, HttpResponse
from typing import Any, Callable, Dict, Tuple
from typing import Any, Callable, Dict, Optional, Tuple

from zerver.lib.actions import do_mute_topic, do_change_subscription_property
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import POSTRequestMock
from zerver.models import Recipient, Stream, Subscription, UserProfile, get_stream
from zerver.models import Recipient, Stream, Subscription, UserProfile, get_stream, \
Realm
from zerver.tornado.event_queue import maybe_enqueue_notifications, \
allocate_client_descriptor, ClientDescriptor, \
get_client_descriptor, missedmessage_hook, persistent_queue_filename
Expand Down Expand Up @@ -343,7 +344,10 @@ def test_persistent_queue_filename(self) -> None:
"/home/zulip/tornado/event_queues.9993.last.json")

class EventQueueTest(ZulipTestCase):
def get_client_descriptor(self) -> ClientDescriptor:
def get_client_descriptor(self, last_connection_time: Optional[float]=None,
downgradeable: bool=False) -> ClientDescriptor:
if last_connection_time is None:
last_connection_time = time.time()
hamlet = self.example_user('hamlet')
realm = hamlet.realm
queue_data = dict(
Expand All @@ -352,7 +356,8 @@ def get_client_descriptor(self) -> ClientDescriptor:
client_gravatar=True,
client_type_name='website',
event_types=None,
last_connection_time=time.time(),
last_connection_time=last_connection_time,
downgradeable=downgradeable,
queue_timeout=0,
realm_id=realm.id,
user_profile_id=hamlet.id,
Expand Down Expand Up @@ -551,3 +556,136 @@ def test_collapse_event(self) -> None:

queue.prune(1)
self.verify_to_dict_end_to_end(client)

def test_garbage_collect(self) -> None:
from zerver.tornado.event_queue import clients, gc_event_queues, get_tornado_port
# Last connection an hour ago
expired_client = self.get_client_descriptor(last_connection_time = time.time() - 3600)
# Last connection 5 minutes ago
stale_client = self.get_client_descriptor(last_connection_time = time.time() - 300)
# Last connection 5 minutes ago
new_client = self.get_client_descriptor(last_connection_time = time.time())
downgradeable_client = self.get_client_descriptor(last_connection_time = time.time() - 3600,
downgradeable=True)

expired_ids = [expired_client.event_queue.id]
not_expired_ids = [
stale_client.event_queue.id,
new_client.event_queue.id,
downgradeable_client.event_queue.id]
port = get_tornado_port(Realm.objects.get(id=new_client.realm_id))

self.assertEqual(sorted(list(clients.keys())), sorted(not_expired_ids + expired_ids))
self.assertFalse(downgradeable_client.event_queue.downgraded)
self.assertEqual(len(stale_client.event_queue.queue), 0)
self.assertEqual(len(downgradeable_client.event_queue.queue), 0)
self.assertEqual(len(new_client.event_queue.queue), 0)
self.assertEqual(len(expired_client.event_queue.queue), 0)

# Send a message to a stream we're on
first_message_id = self.send_stream_message(self.example_email('othello'),
"Denmark", content="Test message")
self.assertEqual(len(stale_client.event_queue.queue), 1)
self.assertEqual(len(downgradeable_client.event_queue.queue), 1)
self.assertEqual(len(new_client.event_queue.queue), 1)
self.assertEqual(len(expired_client.event_queue.queue), 1)
stale_client_contents = stale_client.event_queue.contents()

# Should downgrade downgradeable_client, and expire expired_client
gc_event_queues(port)
self.assertEqual(sorted(list(clients.keys())), sorted(not_expired_ids))
self.assertTrue(downgradeable_client.event_queue.downgraded)

self.assertEqual(len(stale_client.event_queue.queue), 1)
self.assertEqual(len(downgradeable_client.event_queue.queue), 1)
self.assertEqual(len(new_client.event_queue.queue), 1)
self.assertEqual(stale_client_contents, stale_client.event_queue.contents())

# Send another message; it should not be added to the downgraded queue.
message_id = self.send_stream_message(self.example_email('othello'),
"Denmark", content="Test message")
to_read_message_id = self.send_stream_message(self.example_email('othello'),
"Denmark", content="Test message")
delete_message_id = self.send_stream_message(self.example_email('othello'),
"Denmark", content="Test message")
# Nothing got added to the queue; instead added to unread_data structure.
self.assertEqual(len(downgradeable_client.event_queue.queue), 1)
self.assertEqual(
sorted(list(downgradeable_client.event_queue.unread_data['stream_dict'].keys())),
sorted([message_id, to_read_message_id, delete_message_id]),
)

# Delete a message, make sure its updated properly
self.login(self.example_email("iago"))
result = self.client_delete('/json/messages/{msg_id}'.format(msg_id=delete_message_id))
self.assert_json_success(result)
self.assertEqual(
sorted(list(downgradeable_client.event_queue.unread_data['stream_dict'].keys())),
sorted([to_read_message_id, message_id]),
)

# Read a message, make sure its updated properly
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([to_read_message_id]),
"op": "add",
"flag": "read"})
self.assert_json_success(result)
self.assertEqual(
sorted(list(downgradeable_client.event_queue.unread_data['stream_dict'].keys())),
sorted([message_id]),
)

# Do the same test for PMs
to_read_pm_id = self.send_personal_message(self.example_email('othello'),
self.example_email("hamlet"),
content="Test message")
to_keep_pm_id = self.send_personal_message(self.example_email('othello'),
self.example_email("hamlet"),
content="Test message")
to_delete_pm_id = self.send_personal_message(self.example_email('iago'),
self.example_email("hamlet"),
content="Test message")
self.assertEqual(
sorted(list(downgradeable_client.event_queue.unread_data['pm_dict'].keys())),
sorted([to_read_pm_id, to_keep_pm_id, to_delete_pm_id]),
)

self.login(self.example_email("iago"))
result = self.client_delete('/json/messages/{msg_id}'.format(msg_id=to_delete_pm_id))
self.assert_json_success(result)
self.assertEqual(
sorted(list(downgradeable_client.event_queue.unread_data['pm_dict'].keys())),
sorted([to_read_pm_id, to_keep_pm_id]),
)

self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([to_read_pm_id]),
"op": "add",
"flag": "read"})
self.assert_json_success(result)
self.assertEqual(
sorted(list(downgradeable_client.event_queue.unread_data['pm_dict'].keys())),
sorted([to_keep_pm_id]),
)

# TODO: Extensively extend the downgradeable test with message
# editing

# TODO test that very first message being marked as read (currently broken)

contents = downgradeable_client.event_queue.contents()
self.assertFalse(downgradeable_client.event_queue.downgraded)
self.assertEqual(len(contents), 4)
self.assertEqual(contents[0]['type'], 'message')
self.assertEqual(contents[0]['message']['id'], first_message_id)
self.assertEqual(contents[1]['type'], 'delete_message')
self.assertEqual(contents[2]['type'], 'delete_message')
self.assertEqual(contents[3]['type'], 'unread_data')
# Check that our PMs ended up in the final structure properly.
self.assertEqual(len(contents[3]['pms']), 1)
self.assertEqual(contents[3]['pms'][0]['unread_message_ids'], [to_keep_pm_id])
# We don't test every other element, because we're reusing
# code from the original unread_msgs data structure that has
# its own unit tests.