Skip to content

Commit

Permalink
reactions: Select Message FOR UPDATE when adding/removing.
Browse files Browse the repository at this point in the history
This locks the message row while a reaction is being added/removed,
which will handle race conditions caused by deleting the message
at the same time.

We make sure that events work happens outside the transaction,
so that in case there's some problem with the queue processor, the
locks aren't held for too long.

As a nice side-effect, we also handle race conditions from double
adding reactions, because once the message is locked, a duplicate
request will wait till the earlier transaction commits, and hence
will not throw `IntegrityErrors`s (rather, will be handled in our
safety check in the /views code itself), which earlier had to be
handled explicitly.
  • Loading branch information
abhijeetbodas2001 authored and timabbott committed Jun 4, 2021
1 parent efc2f49 commit 90b6fa7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
23 changes: 14 additions & 9 deletions zerver/lib/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,7 @@ def notify_reaction_update(
stream = Stream.objects.get(id=stream_id)
user_ids |= subscriber_ids_with_stream_history_access(stream)

send_event(user_profile.realm, event, list(user_ids))
transaction.on_commit(lambda: send_event(user_profile.realm, event, list(user_ids)))


def do_add_reaction(
Expand All @@ -2225,20 +2225,20 @@ def do_add_reaction(
emoji_code: str,
reaction_type: str,
) -> None:
"""Should be called while holding a SELECT FOR UPDATE lock
(e.g. via access_message(..., lock_message=True)) on the
Message row, to prevent race conditions.
"""

reaction = Reaction(
user_profile=user_profile,
message=message,
emoji_name=emoji_name,
emoji_code=emoji_code,
reaction_type=reaction_type,
)
try:
reaction.save()
except django.db.utils.IntegrityError: # nocoverage
# This can happen when a race results in the check in views
# code not catching an attempt to double-add a reaction, or
# perhaps if the emoji_name/emoji_code mapping is busted.
raise JsonableError(_("Reaction already exists."))

reaction.save()

notify_reaction_update(user_profile, message, reaction, "add")

Expand All @@ -2250,7 +2250,7 @@ def check_add_reaction(
emoji_code: Optional[str],
reaction_type: Optional[str],
) -> None:
message, user_message = access_message(user_profile, message_id)
message, user_message = access_message(user_profile, message_id, lock_message=True)

if emoji_code is None:
# The emoji_code argument is only required for rare corner
Expand Down Expand Up @@ -2316,13 +2316,18 @@ def check_add_reaction(
def do_remove_reaction(
user_profile: UserProfile, message: Message, emoji_code: str, reaction_type: str
) -> None:
"""Should be called while holding a SELECT FOR UPDATE lock
(e.g. via access_message(..., lock_message=True)) on the
Message row, to prevent race conditions.
"""
reaction = Reaction.objects.filter(
user_profile=user_profile,
message=message,
emoji_code=emoji_code,
reaction_type=reaction_type,
).get()
reaction.delete()

notify_reaction_update(user_profile, message, reaction, "remove")


Expand Down
34 changes: 33 additions & 1 deletion zerver/tests/test_reactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import orjson
from django.http import HttpResponse

from zerver.lib.actions import do_change_stream_invite_only, do_make_stream_web_public
from zerver.lib.actions import (
do_change_stream_invite_only,
do_make_stream_web_public,
notify_reaction_update,
)
from zerver.lib.cache import cache_get, to_dict_cache_key_id
from zerver.lib.emoji import emoji_name_to_emoji_code
from zerver.lib.message import extract_message_dict
Expand Down Expand Up @@ -1023,6 +1027,10 @@ def test_add_event(self) -> None:
}
events: List[Mapping[str, Any]] = []
with self.tornado_redirected_to_list(events, expected_num_events=1):
with mock.patch("zerver.lib.actions.send_event") as m:
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits!"
)
self.api_post(reaction_sender, f"/api/v1/messages/{pm_id}/reactions", reaction_info)

event = events[0]["event"]
Expand Down Expand Up @@ -1084,3 +1092,27 @@ def test_remove_event(self) -> None:
self.assertEqual(event["emoji_name"], reaction_info["emoji_name"])
self.assertEqual(event["emoji_code"], reaction_info["emoji_code"])
self.assertEqual(event["reaction_type"], reaction_info["reaction_type"])

def test_events_sent_after_transaction_commits(self) -> None:
"""
Tests that `send_event` is hooked to `transaction.on_commit`. This is important, because
we don't want to end up holding locks on message rows for too long if the event queue runs
into a problem.
"""
hamlet = self.example_user("hamlet")
self.send_stream_message(hamlet, "Scotland")
message = self.get_last_message()
reaction = Reaction(
user_profile=hamlet,
message=message,
emoji_name="whatever",
emoji_code="whatever",
reaction_type="whatever",
)

with self.tornado_redirected_to_list([], expected_num_events=1):
with mock.patch("zerver.lib.actions.send_event") as m:
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits."
)
notify_reaction_update(hamlet, message, reaction, "stuff")
7 changes: 6 additions & 1 deletion zerver/views/reactions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional

from django.db import transaction
from django.http import HttpRequest, HttpResponse
from django.utils.translation import gettext as _

Expand All @@ -12,6 +13,8 @@
from zerver.models import Reaction, UserProfile


# transaction.atomic is required since we use FOR UPDATE queries in access_message
@transaction.atomic
@has_request_variables
def add_reaction(
request: HttpRequest,
Expand All @@ -26,6 +29,8 @@ def add_reaction(
return json_success()


# transaction.atomic is required since we use FOR UPDATE queries in access_message
@transaction.atomic
@has_request_variables
def remove_reaction(
request: HttpRequest,
Expand All @@ -35,7 +40,7 @@ def remove_reaction(
emoji_code: Optional[str] = REQ(default=None),
reaction_type: str = REQ(default="unicode_emoji"),
) -> HttpResponse:
message, user_message = access_message(user_profile, message_id)
message, user_message = access_message(user_profile, message_id, lock_message=True)

if emoji_code is None:
if emoji_name is None:
Expand Down

0 comments on commit 90b6fa7

Please sign in to comment.