From dc4ee9915228238bd24ce67645504f65eaf2f1fd Mon Sep 17 00:00:00 2001 From: Simon Charette Date: Thu, 28 Aug 2025 17:34:06 +0200 Subject: [PATCH 1/3] Refs #27222 -- Implemented BaseDatabaseOperations.return_insert_columns()/fetch_returned_insert_rows(). --- django/db/backends/base/operations.py | 19 ++++++++++++++++++- django/db/backends/mysql/operations.py | 21 --------------------- django/db/backends/postgresql/operations.py | 20 -------------------- django/db/backends/sqlite3/operations.py | 21 --------------------- 4 files changed, 18 insertions(+), 63 deletions(-) diff --git a/django/db/backends/base/operations.py b/django/db/backends/base/operations.py index a95f535bdb5e..f1b0c09abd2d 100644 --- a/django/db/backends/base/operations.py +++ b/django/db/backends/base/operations.py @@ -364,7 +364,24 @@ def return_insert_columns(self, fields): return the SQL and params to append to the INSERT query. The returned fragment should contain a format string to hold the appropriate column. """ - pass + if not fields: + return "", () + columns = [ + "%s.%s" + % ( + self.quote_name(field.model._meta.db_table), + self.quote_name(field.column), + ) + for field in fields + ] + return "RETURNING %s" % ", ".join(columns), () + + def fetch_returned_insert_rows(self, cursor): + """ + Given a cursor object that has just performed an INSERT...RETURNING + statement into a table, return the tuple of returned data. + """ + return cursor.fetchall() def compiler(self, compiler_name): """ diff --git a/django/db/backends/mysql/operations.py b/django/db/backends/mysql/operations.py index 2d6185a2ca26..7dfcd57958eb 100644 --- a/django/db/backends/mysql/operations.py +++ b/django/db/backends/mysql/operations.py @@ -148,13 +148,6 @@ def time_trunc_sql(self, lookup_type, sql, params, tzname=None): else: return f"TIME({sql})", params - def fetch_returned_insert_rows(self, cursor): - """ - Given a cursor object that has just performed an INSERT...RETURNING - statement into a table, return the tuple of returned data. - """ - return cursor.fetchall() - def format_for_duration_arithmetic(self, sql): return "INTERVAL %s MICROSECOND" % sql @@ -182,20 +175,6 @@ def quote_name(self, name): return name # Quoting once is enough. return "`%s`" % name - def return_insert_columns(self, fields): - # MySQL doesn't support an INSERT...RETURNING statement. - if not fields: - return "", () - columns = [ - "%s.%s" - % ( - self.quote_name(field.model._meta.db_table), - self.quote_name(field.column), - ) - for field in fields - ] - return "RETURNING %s" % ", ".join(columns), () - def sql_flush(self, style, tables, *, reset_sequences=False, allow_cascade=False): if not tables: return [] diff --git a/django/db/backends/postgresql/operations.py b/django/db/backends/postgresql/operations.py index a8c073e4186f..7cd868d78951 100644 --- a/django/db/backends/postgresql/operations.py +++ b/django/db/backends/postgresql/operations.py @@ -155,13 +155,6 @@ def bulk_insert_sql(self, fields, placeholder_rows): return f"SELECT * FROM {placeholder_rows}" return super().bulk_insert_sql(fields, placeholder_rows) - def fetch_returned_insert_rows(self, cursor): - """ - Given a cursor object that has just performed an INSERT...RETURNING - statement into a table, return the tuple of returned data. - """ - return cursor.fetchall() - def lookup_cast(self, lookup_type, internal_type=None): lookup = "%s" # Cast text lookups to text to allow things like filter(x__contains=4) @@ -324,19 +317,6 @@ def last_executed_query(self, cursor, sql, params): return cursor.query.decode() return None - def return_insert_columns(self, fields): - if not fields: - return "", () - columns = [ - "%s.%s" - % ( - self.quote_name(field.model._meta.db_table), - self.quote_name(field.column), - ) - for field in fields - ] - return "RETURNING %s" % ", ".join(columns), () - if is_psycopg3: def adapt_integerfield_value(self, value, internal_type): diff --git a/django/db/backends/sqlite3/operations.py b/django/db/backends/sqlite3/operations.py index 4a1693768c7f..1500ae28aa0d 100644 --- a/django/db/backends/sqlite3/operations.py +++ b/django/db/backends/sqlite3/operations.py @@ -84,13 +84,6 @@ def date_extract_sql(self, lookup_type, sql, params): """ return f"django_date_extract(%s, {sql})", (lookup_type.lower(), *params) - def fetch_returned_insert_rows(self, cursor): - """ - Given a cursor object that has just performed an INSERT...RETURNING - statement into a table, return the list of returned data. - """ - return cursor.fetchall() - def format_for_duration_arithmetic(self, sql): """Do nothing since formatting is handled in the custom function.""" return sql @@ -399,20 +392,6 @@ def insert_statement(self, on_conflict=None): return "INSERT OR IGNORE INTO" return super().insert_statement(on_conflict=on_conflict) - def return_insert_columns(self, fields): - # SQLite < 3.35 doesn't support an INSERT...RETURNING statement. - if not fields: - return "", () - columns = [ - "%s.%s" - % ( - self.quote_name(field.model._meta.db_table), - self.quote_name(field.column), - ) - for field in fields - ] - return "RETURNING %s" % ", ".join(columns), () - def on_conflict_suffix_sql(self, fields, on_conflict, update_fields, unique_fields): if ( on_conflict == OnConflict.UPDATE From 292b9e6fe8f23491680d9cc60f328562e2b1c823 Mon Sep 17 00:00:00 2001 From: Simon Charette Date: Fri, 21 Mar 2025 21:50:54 -0400 Subject: [PATCH 2/3] Refs #27222 -- Adapted RETURNING handling to be usable for UPDATE queries. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Renamed existing methods and abstractions used for INSERT … RETURNING to be generic enough to be used in the context of UPDATEs as well. This also consolidates SQL compliant implementations on BaseDatabaseOperations. --- django/db/backends/base/operations.py | 22 +++++------- django/db/backends/oracle/operations.py | 48 ++++++++++++------------- django/db/backends/oracle/utils.py | 2 +- django/db/models/sql/compiler.py | 20 +++++------ docs/releases/6.0.txt | 12 +++++++ 5 files changed, 51 insertions(+), 53 deletions(-) diff --git a/django/db/backends/base/operations.py b/django/db/backends/base/operations.py index f1b0c09abd2d..16a6296f9b5f 100644 --- a/django/db/backends/base/operations.py +++ b/django/db/backends/base/operations.py @@ -208,13 +208,6 @@ def distinct_sql(self, fields, params): else: return ["DISTINCT"], [] - def fetch_returned_insert_columns(self, cursor, returning_params): - """ - Given a cursor object that has just performed an INSERT...RETURNING - statement into a table, return the newly created data. - """ - return cursor.fetchone() - def force_group_by(self): """ Return a GROUP BY clause to use with a HAVING clause when no grouping @@ -358,11 +351,12 @@ def process_clob(self, value): """ return value - def return_insert_columns(self, fields): + def returning_columns(self, fields): """ - For backends that support returning columns as part of an insert query, - return the SQL and params to append to the INSERT query. The returned - fragment should contain a format string to hold the appropriate column. + For backends that support returning columns as part of an insert or + update query, return the SQL and params to append to the query. + The returned fragment should contain a format string to hold the + appropriate column. """ if not fields: return "", () @@ -376,10 +370,10 @@ def return_insert_columns(self, fields): ] return "RETURNING %s" % ", ".join(columns), () - def fetch_returned_insert_rows(self, cursor): + def fetch_returned_rows(self, cursor, returning_params): """ - Given a cursor object that has just performed an INSERT...RETURNING - statement into a table, return the tuple of returned data. + Given a cursor object for a DML query with a RETURNING statement, + return the selected returning rows of tuples. """ return cursor.fetchall() diff --git a/django/db/backends/oracle/operations.py b/django/db/backends/oracle/operations.py index ce9ed7288d57..bc152c4e6ed5 100644 --- a/django/db/backends/oracle/operations.py +++ b/django/db/backends/oracle/operations.py @@ -22,7 +22,7 @@ from django.utils.regex_helper import _lazy_re_compile from .base import Database -from .utils import BulkInsertMapper, InsertVar, Oracle_datetime +from .utils import BoundVar, BulkInsertMapper, Oracle_datetime class DatabaseOperations(BaseDatabaseOperations): @@ -298,12 +298,27 @@ def convert_empty_bytes(value, expression, connection): def deferrable_sql(self): return " DEFERRABLE INITIALLY DEFERRED" - def fetch_returned_insert_columns(self, cursor, returning_params): - columns = [] - for param in returning_params: - value = param.get_value() - columns.append(value[0]) - return tuple(columns) + def returning_columns(self, fields): + if not fields: + return "", () + field_names = [] + params = [] + for field in fields: + field_names.append( + "%s.%s" + % ( + self.quote_name(field.model._meta.db_table), + self.quote_name(field.column), + ) + ) + params.append(BoundVar(field)) + return "RETURNING %s INTO %s" % ( + ", ".join(field_names), + ", ".join(["%s"] * len(params)), + ), tuple(params) + + def fetch_returned_rows(self, cursor, returning_params): + return list(zip(*(param.get_value() for param in returning_params))) def no_limit_value(self): return None @@ -391,25 +406,6 @@ def regex_lookup(self, lookup_type): match_option = "'i'" return "REGEXP_LIKE(%%s, %%s, %s)" % match_option - def return_insert_columns(self, fields): - if not fields: - return "", () - field_names = [] - params = [] - for field in fields: - field_names.append( - "%s.%s" - % ( - self.quote_name(field.model._meta.db_table), - self.quote_name(field.column), - ) - ) - params.append(InsertVar(field)) - return "RETURNING %s INTO %s" % ( - ", ".join(field_names), - ", ".join(["%s"] * len(params)), - ), tuple(params) - def __foreign_key_constraints(self, table_name, recursive): with self.connection.cursor() as cursor: if recursive: diff --git a/django/db/backends/oracle/utils.py b/django/db/backends/oracle/utils.py index 57d97b3f771e..fd7deab9a217 100644 --- a/django/db/backends/oracle/utils.py +++ b/django/db/backends/oracle/utils.py @@ -4,7 +4,7 @@ from .base import Database -class InsertVar: +class BoundVar: """ A late-binding cursor variable that can be passed to Cursor.execute as a parameter, in order to receive the id of the row created by an diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py index f72ba907ad75..73dfa5b87c84 100644 --- a/django/db/models/sql/compiler.py +++ b/django/db/models/sql/compiler.py @@ -1890,7 +1890,7 @@ def as_sql(self): result.append(on_conflict_suffix_sql) # Skip empty r_sql to allow subclasses to customize behavior for # 3rd party backends. Refs #19096. - r_sql, self.returning_params = self.connection.ops.return_insert_columns( + r_sql, self.returning_params = self.connection.ops.returning_columns( self.returning_fields ) if r_sql: @@ -1925,20 +1925,16 @@ def execute_sql(self, returning_fields=None): cursor.execute(sql, params) if not self.returning_fields: return [] + obj_len = len(self.query.objs) if ( self.connection.features.can_return_rows_from_bulk_insert - and len(self.query.objs) > 1 + and obj_len > 1 + ) or ( + self.connection.features.can_return_columns_from_insert and obj_len == 1 ): - rows = self.connection.ops.fetch_returned_insert_rows(cursor) - cols = [field.get_col(opts.db_table) for field in self.returning_fields] - elif self.connection.features.can_return_columns_from_insert: - assert len(self.query.objs) == 1 - rows = [ - self.connection.ops.fetch_returned_insert_columns( - cursor, - self.returning_params, - ) - ] + rows = self.connection.ops.fetch_returned_rows( + cursor, self.returning_params + ) cols = [field.get_col(opts.db_table) for field in self.returning_fields] elif returning_fields and isinstance( returning_field := returning_fields[0], AutoField diff --git a/docs/releases/6.0.txt b/docs/releases/6.0.txt index 94376b999415..a9fe1873aa98 100644 --- a/docs/releases/6.0.txt +++ b/docs/releases/6.0.txt @@ -402,6 +402,18 @@ backends. * :class:`~django.db.backends.base.schema.BaseDatabaseSchemaEditor` and PostgreSQL backends no longer use ``CASCADE`` when dropping a column. +* ``DatabaseOperations.return_insert_columns()`` and + ``DatabaseOperations.fetch_returned_insert_rows()`` methods are renamed to + ``returning_columns()`` and ``fetch_returned_rows()``, respectively, to + denote they can be used in the context ``UPDATE … RETURNING`` statements as + well as ``INSERT … RETURNING``. + +* The ``DatabaseOperations.fetch_returned_insert_columns()`` method is removed + and the ``fetch_returned_rows()`` method replacing + ``fetch_returned_insert_rows()`` expects both a ``cursor`` and + ``returning_params`` to be provided just like + ``fetch_returned_insert_columns()`` did. + Dropped support for MariaDB 10.5 -------------------------------- From 550822bceea227b07445d1852c4376b663c09ea4 Mon Sep 17 00:00:00 2001 From: Rob Hudson Date: Sat, 23 Aug 2025 12:23:53 -0700 Subject: [PATCH 3/3] Fixed #36532 -- Added Content Security Policy view decorators to override or disable policies. Co-authored-by: Natalia <124304+nessita@users.noreply.github.com> --- django/middleware/csp.py | 23 ++++---- django/views/debug.py | 5 ++ django/views/decorators/csp.py | 39 ++++++++++++++ docs/ref/csp.txt | 86 ++++++++++++++++++++++++++++++ docs/releases/6.0.txt | 3 +- docs/topics/async.txt | 2 + tests/decorators/test_csp.py | 95 ++++++++++++++++++++++++++++++++++ tests/middleware/test_csp.py | 69 ++++++++++++++++++++++++ tests/middleware/urls.py | 6 +++ tests/middleware/views.py | 40 ++++++++++++++ 10 files changed, 354 insertions(+), 14 deletions(-) create mode 100644 django/views/decorators/csp.py create mode 100644 tests/decorators/test_csp.py diff --git a/django/middleware/csp.py b/django/middleware/csp.py index e1c66ada5a64..ba08cfff0c1a 100644 --- a/django/middleware/csp.py +++ b/django/middleware/csp.py @@ -1,5 +1,3 @@ -from http import HTTPStatus - from django.conf import settings from django.utils.csp import CSP, LazyNonce, build_policy from django.utils.deprecation import MiddlewareMixin @@ -14,22 +12,21 @@ def process_request(self, request): request._csp_nonce = LazyNonce() def process_response(self, request, response): - # In DEBUG mode, exclude CSP headers for specific status codes that - # trigger the debug view. - exempted_status_codes = { - HTTPStatus.NOT_FOUND, - HTTPStatus.INTERNAL_SERVER_ERROR, - } - if settings.DEBUG and response.status_code in exempted_status_codes: - return response - nonce = get_nonce(request) + + sentinel = object() + if (csp_config := getattr(response, "_csp_config", sentinel)) is sentinel: + csp_config = settings.SECURE_CSP + if (csp_ro_config := getattr(response, "_csp_ro_config", sentinel)) is sentinel: + csp_ro_config = settings.SECURE_CSP_REPORT_ONLY + for header, config in [ - (CSP.HEADER_ENFORCE, settings.SECURE_CSP), - (CSP.HEADER_REPORT_ONLY, settings.SECURE_CSP_REPORT_ONLY), + (CSP.HEADER_ENFORCE, csp_config), + (CSP.HEADER_REPORT_ONLY, csp_ro_config), ]: # If headers are already set on the response, don't overwrite them. # This allows for views to set their own CSP headers as needed. + # An empty config means CSP headers are not added to the response. if config and header not in response: response.headers[str(header)] = build_policy(config, nonce) diff --git a/django/views/debug.py b/django/views/debug.py index 75f30ca60138..5a1b4aee91a0 100644 --- a/django/views/debug.py +++ b/django/views/debug.py @@ -18,6 +18,7 @@ from django.utils.module_loading import import_string from django.utils.regex_helper import _lazy_re_compile from django.utils.version import get_docs_version +from django.views.decorators.csp import csp_override, csp_report_only_override from django.views.decorators.debug import coroutine_functions_to_sensitive_variables # Minimal Django templates engine to render the error templates @@ -59,6 +60,8 @@ def __repr__(self): return repr(self._wrapped) +@csp_override({}) +@csp_report_only_override({}) def technical_500_response(request, exc_type, exc_value, tb, status_code=500): """ Create a technical server error response. The last three arguments are @@ -606,6 +609,8 @@ def get_exception_traceback_frames(self, exc_value, tb): tb = tb.tb_next +@csp_override({}) +@csp_report_only_override({}) def technical_404_response(request, exception): """Create a technical 404 error response. `exception` is the Http404.""" try: diff --git a/django/views/decorators/csp.py b/django/views/decorators/csp.py new file mode 100644 index 000000000000..9033d9cdf0ee --- /dev/null +++ b/django/views/decorators/csp.py @@ -0,0 +1,39 @@ +from functools import wraps + +from asgiref.sync import iscoroutinefunction + + +def _make_csp_decorator(config_attr_name, config_attr_value): + """General CSP override decorator factory.""" + + if not isinstance(config_attr_value, dict): + raise TypeError("CSP config should be a mapping.") + + def decorator(view_func): + @wraps(view_func) + async def _wrapped_async_view(request, *args, **kwargs): + response = await view_func(request, *args, **kwargs) + setattr(response, config_attr_name, config_attr_value) + return response + + @wraps(view_func) + def _wrapped_sync_view(request, *args, **kwargs): + response = view_func(request, *args, **kwargs) + setattr(response, config_attr_name, config_attr_value) + return response + + if iscoroutinefunction(view_func): + return _wrapped_async_view + return _wrapped_sync_view + + return decorator + + +def csp_override(config): + """Override the Content-Security-Policy header for a view.""" + return _make_csp_decorator("_csp_config", config) + + +def csp_report_only_override(config): + """Override the Content-Security-Policy-Report-Only header for a view.""" + return _make_csp_decorator("_csp_ro_config", config) diff --git a/docs/ref/csp.txt b/docs/ref/csp.txt index 3ecee17acda9..5d0a9c0ecbd1 100644 --- a/docs/ref/csp.txt +++ b/docs/ref/csp.txt @@ -154,6 +154,92 @@ with the CSP specification. secure, random nonce that is generated for each request. See detailed explanation in :ref:`csp-nonce`. +Decorators +========== + +.. module:: django.views.decorators.csp + +Django provides decorators to control the Content Security Policy headers on a +per-view basis. These allow overriding or disabling the enforced or report-only +policy for specific views, providing fine-grained control when the global +settings are not sufficient. Applying these overrides fully replaces the base +CSP: they do not merge with existing rules. They can be used alongside the +constants defined in :class:`~django.utils.csp.CSP`. + +.. warning:: + + Weakening or disabling a CSP policy on any page can compromise the security + of the entire site. Because of the "same origin" policy, an attacker could + exploit a vulnerability on one page to access other parts of the site. + +.. function:: csp_override(config)(view) + + Overrides the ``Content-Security-Policy`` header for the decorated view + using directives in the same format as the :setting:`SECURE_CSP` setting. + + The ``config`` argument must be a mapping with the desired CSP directives. + If ``config`` is an empty mapping (``{}``), no CSP enforcement header will + be added to the response returned by that view, effectively disabling CSP + for that view. + + Examples:: + + from django.http import HttpResponse + from django.utils.csp import CSP + from django.views.decorators.csp import csp_override + + + @csp_override( + { + "default-src": [CSP.SELF], + "img-src": [CSP.SELF, "data:"], + } + ) + def my_view(request): + return HttpResponse("Custom Content-Security-Policy header applied") + + + @csp_override({}) + def my_other_view(request): + return HttpResponse("No Content-Security-Policy header added") + + +.. function:: csp_report_only_override(config)(view) + + Overrides the ``Content-Security-Policy-Report-Only`` header for the + decorated view using directives in the same format as the + :setting:`SECURE_CSP_REPORT_ONLY` setting. + + Like :func:`csp_override`, the ``config`` argument must be a mapping with + the desired CSP directives. If ``config`` is an empty mapping (``{}``), no + CSP report-only header will be added to the response returned by that view, + effectively disabling report-only CSP for that view. + + Examples:: + + from django.http import HttpResponse + from django.utils.csp import CSP + from django.views.decorators.csp import csp_report_only_override + + + @csp_report_only_override( + { + "default-src": [CSP.SELF], + "img-src": [CSP.SELF, "data:"], + "report-uri": "https://mysite.com/csp-report/", + } + ) + def my_view(request): + return HttpResponse("Custom Content-Security-Policy-Report-Only header applied") + + + @csp_report_only_override({}) + def my_other_view(request): + return HttpResponse("No Content-Security-Policy-Report-Only header added") + +The examples above assume function-based views. For class-based views, see the +:ref:`guide for decorating class-based views `. + .. _csp-nonce: Nonce usage diff --git a/docs/releases/6.0.txt b/docs/releases/6.0.txt index a9fe1873aa98..c47faec9d586 100644 --- a/docs/releases/6.0.txt +++ b/docs/releases/6.0.txt @@ -72,7 +72,8 @@ The resulting ``Content-Security-Policy`` header would be set to: To get started, follow the :doc:`CSP how-to guide `. For in-depth guidance, see the :ref:`CSP security overview ` and the -:doc:`reference docs `. +:doc:`reference docs `, which include details about decorators to +override or disable policies on a per-view basis. Adoption of Python's modern email API ------------------------------------- diff --git a/docs/topics/async.txt b/docs/topics/async.txt index 8138103f927c..5cf9fd46242a 100644 --- a/docs/topics/async.txt +++ b/docs/topics/async.txt @@ -82,6 +82,8 @@ view functions: * :func:`~django.views.decorators.cache.cache_control` * :func:`~django.views.decorators.cache.never_cache` * :func:`~django.views.decorators.common.no_append_slash` +* :func:`~django.views.decorators.csp.csp_override` +* :func:`~django.views.decorators.csp.csp_report_only_override` * :func:`~django.views.decorators.csrf.csrf_exempt` * :func:`~django.views.decorators.csrf.csrf_protect` * :func:`~django.views.decorators.csrf.ensure_csrf_cookie` diff --git a/tests/decorators/test_csp.py b/tests/decorators/test_csp.py new file mode 100644 index 000000000000..2def6b0f79cb --- /dev/null +++ b/tests/decorators/test_csp.py @@ -0,0 +1,95 @@ +from itertools import product + +from asgiref.sync import iscoroutinefunction + +from django.http import HttpRequest, HttpResponse +from django.test import SimpleTestCase +from django.utils.csp import CSP +from django.views.decorators.csp import csp_override, csp_report_only_override + +basic_config = { + "default-src": [CSP.SELF], +} + + +class CSPOverrideDecoratorTest(SimpleTestCase): + def test_wrapped_sync_function_is_not_coroutine_function(self): + def sync_view(request): + return HttpResponse() + + wrapped_view = csp_override({})(sync_view) + self.assertIs(iscoroutinefunction(wrapped_view), False) + + def test_wrapped_async_function_is_coroutine_function(self): + async def async_view(request): + return HttpResponse() + + wrapped_view = csp_override({})(async_view) + self.assertIs(iscoroutinefunction(wrapped_view), True) + + def test_decorator_requires_mapping(self): + for config, decorator in product( + [None, 0, False, [], [1, 2, 3], 42, {4, 5}], + (csp_override, csp_report_only_override), + ): + with ( + self.subTest(config=config, decorator=decorator), + self.assertRaisesMessage(TypeError, "CSP config should be a mapping"), + ): + decorator(config) + + def test_csp_override(self): + @csp_override(basic_config) + def sync_view(request): + return HttpResponse("OK") + + response = sync_view(HttpRequest()) + self.assertEqual(response._csp_config, basic_config) + self.assertIs(hasattr(response, "_csp_ro_config"), False) + + async def test_csp_override_async_view(self): + @csp_override(basic_config) + async def async_view(request): + return HttpResponse("OK") + + response = await async_view(HttpRequest()) + self.assertEqual(response._csp_config, basic_config) + self.assertIs(hasattr(response, "_csp_ro_config"), False) + + def test_csp_report_only_override(self): + @csp_report_only_override(basic_config) + def sync_view(request): + return HttpResponse("OK") + + response = sync_view(HttpRequest()) + self.assertEqual(response._csp_ro_config, basic_config) + self.assertIs(hasattr(response, "_csp_config"), False) + + async def test_csp_report_only_override_async_view(self): + @csp_report_only_override(basic_config) + async def async_view(request): + return HttpResponse("OK") + + response = await async_view(HttpRequest()) + self.assertEqual(response._csp_ro_config, basic_config) + self.assertIs(hasattr(response, "_csp_config"), False) + + def test_csp_override_both(self): + @csp_override(basic_config) + @csp_report_only_override(basic_config) + def sync_view(request): + return HttpResponse("OK") + + response = sync_view(HttpRequest()) + self.assertEqual(response._csp_config, basic_config) + self.assertEqual(response._csp_ro_config, basic_config) + + async def test_csp_override_both_async_view(self): + @csp_override(basic_config) + @csp_report_only_override(basic_config) + async def async_view(request): + return HttpResponse("OK") + + response = await async_view(HttpRequest()) + self.assertEqual(response._csp_config, basic_config) + self.assertEqual(response._csp_ro_config, basic_config) diff --git a/tests/middleware/test_csp.py b/tests/middleware/test_csp.py index e7a2452240b1..3ab732fe2f52 100644 --- a/tests/middleware/test_csp.py +++ b/tests/middleware/test_csp.py @@ -106,6 +106,75 @@ def test_csp_500_debug_view(self): self.assertNotIn(CSP.HEADER_REPORT_ONLY, response) +@override_settings( + MIDDLEWARE=["django.middleware.csp.ContentSecurityPolicyMiddleware"], + ROOT_URLCONF="middleware.urls", + SECURE_CSP=basic_config, + SECURE_CSP_REPORT_ONLY=basic_config, +) +class CSPMiddlewareWithDecoratedViewsTest(SimpleTestCase): + def test_no_decorators(self): + response = self.client.get("/csp-base/") + self.assertEqual(response[CSP.HEADER_ENFORCE], basic_policy) + self.assertEqual(response[CSP.HEADER_REPORT_ONLY], basic_policy) + + def test_csp_disabled_enforced(self): + """ + `csp_override({})` only disables the enforced CSP header. + """ + response = self.client.get("/csp-disabled-enforced/") + self.assertNotIn(CSP.HEADER_ENFORCE, response) + self.assertEqual(response[CSP.HEADER_REPORT_ONLY], basic_policy) + + def test_csp_report_only_disabled(self): + """ + `csp_report_only_override({})` only disables the report-only header. + """ + response = self.client.get("/csp-disabled-report-only/") + self.assertNotIn(CSP.HEADER_REPORT_ONLY, response) + self.assertEqual(response[CSP.HEADER_ENFORCE], basic_policy) + + def test_csp_disabled_both(self): + """ + Using both CSP decorators with empty mappings will clear both headers. + """ + response = self.client.get("/csp-disabled-both/") + self.assertNotIn(CSP.HEADER_ENFORCE, response) + self.assertNotIn(CSP.HEADER_REPORT_ONLY, response) + + def test_csp_override_enforced(self): + """ + `csp_override` only overrides the enforced header. + """ + response = self.client.get("/csp-override-enforced/") + self.assertEqual( + response[CSP.HEADER_ENFORCE], "default-src 'self'; img-src 'self' data:" + ) + self.assertEqual(response[CSP.HEADER_REPORT_ONLY], basic_policy) + + def test_csp_report_only_override(self): + """ + `csp_report_only_override` only overrides the report-only header. + """ + response = self.client.get("/csp-override-report-only/") + self.assertEqual( + response[CSP.HEADER_REPORT_ONLY], "default-src 'self'; img-src 'self' data:" + ) + self.assertEqual(response[CSP.HEADER_ENFORCE], basic_policy) + + def test_csp_override_both_decorator(self): + """ + Using both CSP decorators overrides both CSP Django settings. + """ + response = self.client.get("/csp-override-both/") + self.assertEqual( + response[CSP.HEADER_ENFORCE], "default-src 'self'; img-src 'self' data:" + ) + self.assertEqual( + response[CSP.HEADER_REPORT_ONLY], "default-src 'self'; img-src 'self' data:" + ) + + @override_settings( ROOT_URLCONF="middleware.urls", SECURE_CSP_REPORT_ONLY={ diff --git a/tests/middleware/urls.py b/tests/middleware/urls.py index 37120c7a544a..bbd68d205075 100644 --- a/tests/middleware/urls.py +++ b/tests/middleware/urls.py @@ -17,5 +17,11 @@ path("csp-report/", views.csp_report_view), path("csp-base/", views.empty_view), path("csp-nonce/", views.csp_nonce), + path("csp-disabled-both/", views.csp_disabled_both), + path("csp-disabled-enforced/", views.csp_disabled_enforced), + path("csp-disabled-report-only/", views.csp_disabled_ro), + path("csp-override-both/", views.csp_override_both), + path("csp-override-enforced/", views.csp_override_enforced), + path("csp-override-report-only/", views.csp_override_report_only), path("csp-500/", views.csp_500), ] diff --git a/tests/middleware/views.py b/tests/middleware/views.py index 6dc3ca24c7e9..716ddec5fdae 100644 --- a/tests/middleware/views.py +++ b/tests/middleware/views.py @@ -3,9 +3,11 @@ from django.http import HttpResponse from django.middleware.csp import get_nonce +from django.utils.csp import CSP from django.utils.decorators import method_decorator from django.views.debug import technical_500_response from django.views.decorators.common import no_append_slash +from django.views.decorators.csp import csp_override, csp_report_only_override from django.views.decorators.csrf import csrf_exempt from django.views.generic import View @@ -29,6 +31,44 @@ def csp_nonce(request): return HttpResponse(get_nonce(request)) +@csp_override({}) +def csp_disabled_enforced(request): + return HttpResponse() + + +@csp_report_only_override({}) +def csp_disabled_ro(request): + return HttpResponse() + + +@csp_override({}) +@csp_report_only_override({}) +def csp_disabled_both(request): + return HttpResponse() + + +csp_policy_override = { + "default-src": [CSP.SELF], + "img-src": [CSP.SELF, "data:"], +} + + +@csp_override(csp_policy_override) +def csp_override_enforced(request): + return HttpResponse() + + +@csp_report_only_override(csp_policy_override) +def csp_override_report_only(request): + return HttpResponse() + + +@csp_override(csp_policy_override) +@csp_report_only_override(csp_policy_override) +def csp_override_both(request): + return HttpResponse() + + def csp_500(request): try: raise Exception