Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nextcloudappstore/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@
ACCOUNT_RATE_LIMITS = {
"reset_password": "1/m/ip",
"login_failed": "10/h/ip",
# Shared by /account/ (custom AccountView) and allauth's /accounts/email/.
# Tighter than allauth's default (10/m/user) to mitigate confirmation-mail spam.
"manage_email": "3/h/user",
}

SITE_ID = 1
Expand Down
5 changes: 4 additions & 1 deletion nextcloudappstore/user/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
SPDX-License-Identifier: AGPL-3.0-or-later
"""

from allauth.account.adapter import get_adapter
from allauth.account.utils import (
filter_users_by_email,
)
Expand Down Expand Up @@ -96,7 +97,9 @@ def __init__(self, *args, **kwargs):
self.fields["subscribe_to_news"].initial = self.user.profile.subscribe_to_news

def clean_email(self):
value = self.cleaned_data["email"]
# Defer to the adapter so empty/invalid emails are rejected the same way
# allauth's AddEmailForm rejects them at /accounts/email/.
value = get_adapter().clean_email(self.cleaned_data["email"])
users = filter_users_by_email(value)
if [u for u in users if u.pk != self.instance.pk]:
msg = _("This email address is already associated with another account.")
Expand Down
184 changes: 181 additions & 3 deletions nextcloudappstore/user/tests.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,186 @@
"""
SPDX-FileCopyrightText: 2017 Nextcloud GmbH and Nextcloud contributors
SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors
SPDX-License-Identifier: AGPL-3.0-or-later
"""

from django.test import TestCase # noqa
from unittest.mock import patch

# Create your tests here.
from allauth.account.models import EmailAddress
from allauth.core import ratelimit
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.test import RequestFactory, TestCase, override_settings
from django.urls import reverse
from email_validator import validate_email as _real_validate_email

from nextcloudappstore.user.facades import create_user

PASSWORD = "Sup3rS3cret!"
PRIMARY_EMAIL = "user@example.com"


def _validate_email_no_dns(email, **kwargs):
# Run the real format/empty checks but skip the DNS-deliverability lookup
# so tests don't depend on external resolvers.
kwargs["check_deliverability"] = False
return _real_validate_email(email, **kwargs)


@override_settings(
ACCOUNT_RATE_LIMITS={
"reset_password": "1/m/ip",
"login_failed": "10/h/ip",
"manage_email": "3/h/user",
},
)
class AccountEmailChangeRateLimitTest(TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._validate_patcher = patch(
"nextcloudappstore.user.adapters.validate_email",
side_effect=_validate_email_no_dns,
)
cls._validate_patcher.start()

@classmethod
def tearDownClass(cls):
cls._validate_patcher.stop()
super().tearDownClass()

def setUp(self):
cache.clear()
self.user = create_user(username="testuser", password=PASSWORD, email=PRIMARY_EMAIL)
self.client.login(username="testuser", password=PASSWORD)
self.url = reverse("user:account")

def _post(self, **overrides):
data = {
"first_name": "Test",
"last_name": "User",
"email": PRIMARY_EMAIL,
"subscribe_to_news": False,
"passwd": PASSWORD,
}
data.update(overrides)
return self.client.post(self.url, data)

def test_no_email_change_does_not_consume_budget(self):
for i in range(10):
response = self._post(first_name=f"Name{i}")
self.assertEqual(response.status_code, 302, f"iteration {i}")

def test_first_email_change_within_limit_succeeds(self):
response = self._post(email="new1@nextcloud.com")
self.assertEqual(response.status_code, 302)
self.assertTrue(
EmailAddress.objects.filter(user=self.user, email="new1@nextcloud.com", verified=False).exists()
)

def test_email_change_blocked_after_limit_exceeded(self):
for i in range(1, 4):
response = self._post(email=f"new{i}@nextcloud.com")
self.assertEqual(response.status_code, 302, f"iteration {i}")

response = self._post(email="new4@nextcloud.com")
self.assertEqual(response.status_code, 429)
self.assertIn("email", response.context["form"].errors)
self.assertFalse(EmailAddress.objects.filter(email="new4@nextcloud.com").exists())

def test_user_email_unchanged_when_rate_limited(self):
for i in range(1, 4):
self._post(email=f"new{i}@nextcloud.com")
before = get_user_model().objects.get(pk=self.user.pk).email

self._post(email="new4@nextcloud.com")
after = get_user_model().objects.get(pk=self.user.pk).email

self.assertEqual(after, before)
self.assertNotEqual(after, "new4@nextcloud.com")

def test_name_changes_persist_when_email_rate_limited(self):
for i in range(1, 4):
self._post(email=f"new{i}@nextcloud.com")

self._post(
email="new4@nextcloud.com",
first_name="NewFirst",
last_name="NewLast",
)

updated = get_user_model().objects.get(pk=self.user.pk)
self.assertEqual(updated.first_name, "NewFirst")
self.assertEqual(updated.last_name, "NewLast")

def test_rate_limit_independent_from_reset_password(self):
for i in range(1, 4):
self._post(email=f"new{i}@nextcloud.com")

request = RequestFactory().post(self.url)
self.assertTrue(ratelimit.consume(request, action="reset_password"))

def test_invalid_form_does_not_consume_budget(self):
for i in range(5):
response = self._post(
email=f"newX{i}@nextcloud.com",
passwd="wrong-password", # nosec
)
self.assertEqual(response.status_code, 200)
self.assertTrue(response.context["form"].errors)

for i in range(1, 4):
response = self._post(email=f"valid{i}@nextcloud.com")
self.assertEqual(response.status_code, 302, f"iteration {i}")

def test_same_email_submission_does_not_consume_budget(self):
for _ in range(10):
response = self._post()
self.assertEqual(response.status_code, 302)

for i in range(1, 4):
response = self._post(email=f"changed{i}@nextcloud.com")
self.assertEqual(response.status_code, 302, f"iteration {i}")

def test_case_only_email_change_does_not_consume_budget(self):
# allauth stores EmailAddress.email lowercased and add_email() lowercases
# on the way in, so submitting the same address in different case is a
# silent no-op. The rate-limit budget must reflect that.
for _ in range(10):
response = self._post(email=PRIMARY_EMAIL.upper())
self.assertEqual(response.status_code, 302)
for i in range(1, 4):
response = self._post(email=f"changed{i}@nextcloud.com")
self.assertEqual(response.status_code, 302, f"iteration {i}")

def test_odoo_called_with_actual_email_when_rate_limited(self):
for i in range(1, 4):
self._post(email=f"new{i}@nextcloud.com")

with patch("nextcloudappstore.user.models.subscribe_user_to_news") as mock_sub:
# Flipping subscribe_to_news triggers the pre_save handler in
# signals.py; it must not be passed the rejected new email.
self._post(email="new4@nextcloud.com", subscribe_to_news=True)

if mock_sub.called:
called_with_email = mock_sub.call_args[0][0]
self.assertNotEqual(called_with_email, "new4@nextcloud.com")

def test_empty_email_rejected_by_form(self):
response = self._post(email="")
self.assertEqual(response.status_code, 200)
self.assertIn("email", response.context["form"].errors)
self.assertFalse(EmailAddress.objects.filter(email="").exists())

def test_malformed_email_rejected_by_form(self):
response = self._post(email="not-an-email")
self.assertEqual(response.status_code, 200)
self.assertIn("email", response.context["form"].errors)

def test_budget_shared_with_allauth_email_view(self):
for i in range(1, 4):
r = self._post(email=f"viaaccount{i}@nextcloud.com")
self.assertEqual(r.status_code, 302, f"iteration {i}")

email_url = reverse("account_email")
r = self.client.post(email_url, {"email": "viaemail@nextcloud.com", "action_add": ""})
self.assertEqual(r.status_code, 429)
29 changes: 26 additions & 3 deletions nextcloudappstore/user/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from allauth.account.models import EmailAddress
from allauth.account.views import PasswordChangeView
from allauth.core import ratelimit
from django.contrib import messages
from django.contrib.auth import logout
from django.contrib.auth.mixins import LoginRequiredMixin
Expand All @@ -13,6 +14,7 @@
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse, reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _
from django.views.decorators.cache import never_cache
from django.views.generic import TemplateView, UpdateView

Expand Down Expand Up @@ -139,12 +141,33 @@ def form_invalid(self, form):
return super().form_invalid(form)

def form_valid(self, form):
message = "Account details saved."

user = self.request.user
# Compare case-insensitively: allauth stores EmailAddress.email lowercased
# and add_email() also lowercases. A case-only "change" is a no-op there,
# so it must not consume the rate-limit budget either.
current_email = EmailAddress.objects.get_primary(user=user).email
new_email = form.cleaned_data["email"]
if new_email != current_email:
email_changed = new_email.lower() != current_email.lower()

if email_changed and not ratelimit.consume(self.request, action="manage_email"):
# Persist name/subscription changes but skip the email change and
# surface an inline error. User.email is intentionally not updated.
# Construct_instance set user.email to the rejected new email in
# memory; refresh it so the profile pre_save signal handler
# (signals.handle_subscription_change) does not leak that value to
# Odoo via instance.user.email when subscribe_to_news flips.
user.refresh_from_db(fields=["email"])
user.first_name = form.cleaned_data["first_name"]
user.last_name = form.cleaned_data["last_name"]
user.save(update_fields=["first_name", "last_name"])
user.profile.subscribe_to_news = form.cleaned_data["subscribe_to_news"]
user.profile.save(update_fields=["subscribe_to_news"])
form.add_error("email", _("Too many email change attempts. Please try again later."))
self.request.session["account_update_failed_count"] = 0
return self.render_to_response(self.get_context_data(form=form), status=429)

message = "Account details saved."
if email_changed:
# Delete other unconfirmed email addresses
EmailAddress.objects.filter(user=user).exclude(primary=True).delete()
# Add new email address, send confirmation email
Expand Down
49 changes: 23 additions & 26 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading