diff --git a/CHANGES/4359.bugfix b/CHANGES/4359.bugfix new file mode 100644 index 0000000000..6688ff1776 --- /dev/null +++ b/CHANGES/4359.bugfix @@ -0,0 +1,2 @@ +Fix encrypted fields to use json instead of repr/eval and make them fit for ``bulk_update``. +This solves an issue with ``pulpcore-manager rotate-db-key`` corrupting data. diff --git a/pulpcore/app/models/fields.py b/pulpcore/app/models/fields.py index 5dd75bea12..ccbcf51724 100644 --- a/pulpcore/app/models/fields.py +++ b/pulpcore/app/models/fields.py @@ -1,3 +1,4 @@ +import json import logging import os from gettext import gettext as _ @@ -94,14 +95,17 @@ def _fernet(self): with open(settings.DB_ENCRYPTION_KEY, "rb") as key_file: return Fernet(key_file.read()) - def get_db_prep_save(self, value, connection): - value = super().get_db_prep_save(value, connection) + def get_prep_value(self, value): + value = super().get_prep_value(value) if value is not None: - return force_str(self._fernet.encrypt(force_bytes(value))) + assert isinstance(value, str) + value = force_str(_fernet().encrypt(force_bytes(value))) + return value def from_db_value(self, value, expression, connection): if value is not None: - return force_str(self._fernet.decrypt(force_bytes(value))) + value = force_str(_fernet().decrypt(force_bytes(value))) + return value class EncryptedJSONField(JSONField): @@ -136,16 +140,24 @@ def decrypt(self, value): elif isinstance(value, (list, tuple, set)): return [self.decrypt(v) for v in value] - return eval(force_str(self._fernet.decrypt(force_bytes(value)))) + dec_value = force_str(self._fernet.decrypt(force_bytes(value))) + try: + return json.loads(dec_value, cls=self.decoder) + except json.JSONDecodeError: + return eval(dec_value) - def get_db_prep_save(self, value, connection): - value = self.encrypt(value) - return super().get_db_prep_save(value, connection) + def get_prep_value(self, value): + value = super().get_prep_value(value) + if value is not None: + if hasattr(value, "as_sql"): + return value + value = self.encrypt(value) + return value def from_db_value(self, value, expression, connection): if value is not None: - value = super().from_db_value(value, expression, connection) - return self.decrypt(value) + value = self.decrypt(super().from_db_value(value, expression, connection)) + return value @Field.register_lookup diff --git a/pulpcore/tests/unit/models/test_remote.py b/pulpcore/tests/unit/models/test_remote.py index 0a6223defa..159bf00147 100644 --- a/pulpcore/tests/unit/models/test_remote.py +++ b/pulpcore/tests/unit/models/test_remote.py @@ -1,11 +1,18 @@ +import pytest from uuid import uuid4 from unittest.mock import patch, mock_open +from cryptography.fernet import InvalidToken +from django.core.management import call_command from django.db import connection from django.test import TestCase from pulpcore.app.models import Remote -from pulpcore.app.models.fields import EncryptedTextField +from pulpcore.app.models.fields import _fernet, EncryptedTextField + + +TEST_KEY1 = b"hPCIFQV/upbvPRsEpgS7W32XdFA2EQgXnMtyNAekebQ=" +TEST_KEY2 = b"6Xyv+QezAQ+4R870F5qsgKcngzmm46caDB2gyo9qnpc=" class RemoteTestCase(TestCase): @@ -19,7 +26,7 @@ def tearDown(self): @patch( "pulpcore.app.models.fields.open", new_callable=mock_open, - read_data=b"hPCIFQV/upbvPRsEpgS7W32XdFA2EQgXnMtyNAekebQ=", + read_data=TEST_KEY1, ) def test_encrypted_proxy_password(self, mock_file): self.remote = Remote(name=uuid4(), proxy_password="test") @@ -36,3 +43,45 @@ def test_encrypted_proxy_password(self, mock_file): self.assertNotEqual(db_proxy_password, "test") self.assertEqual(proxy_password, "test") self.assertEqual(mock_file.call_count, 2) + + +@pytest.fixture +def fake_fernet(tmp_path, settings): + def _steps(): + yield + key_file.write_bytes(TEST_KEY2 + b"\n" + TEST_KEY1) + _fernet.cache_clear() + yield + key_file.write_bytes(TEST_KEY2) + _fernet.cache_clear() + yield + key_file.write_bytes(TEST_KEY1) + _fernet.cache_clear() + yield + + key_file = tmp_path / "db_symmetric_key" + key_file.write_bytes(TEST_KEY1) + settings.DB_ENCRYPTION_KEY = str(key_file) + _fernet.cache_clear() + yield _steps() + _fernet.cache_clear() + + +@pytest.mark.django_db +def test_rotate_db_key(fake_fernet): + remote = Remote.objects.create(name=uuid4(), proxy_password="test") + + next(fake_fernet) # new + old key + + call_command("rotate-db-key") + + next(fake_fernet) # new key + + del remote.proxy_password + assert remote.proxy_password == "test" + + next(fake_fernet) # old key + + del remote.proxy_password + with pytest.raises(InvalidToken): + remote.proxy_password