diff --git a/tests/unit/forklift/test_legacy.py b/tests/unit/forklift/test_legacy.py index 2a81315a25eb..8fd337e68a4a 100644 --- a/tests/unit/forklift/test_legacy.py +++ b/tests/unit/forklift/test_legacy.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 import base64 +import builtins import hashlib import io import json @@ -2031,6 +2032,95 @@ def test_upload_succeeds_custom_project_size_limit( ("example", "1.0", "add source file example-1.0.tar.gz", user), ] + def test_upload_fails_with_oserror_on_metadata_write( + self, tmpdir, monkeypatch, pyramid_config, db_request + ): + monkeypatch.setattr(tempfile, "tempdir", str(tmpdir)) + monkeypatch.setattr( + legacy, "_is_valid_dist_file", lambda *a, **kw: (True, None) + ) + + user = UserFactory.create() + EmailFactory.create(user=user) + project = ProjectFactory.create() + release = ReleaseFactory.create(project=project, version="1.0") + RoleFactory.create(user=user, project=project) + + filename = "{}-{}-py2.py3-none-any.whl".format( + project.normalized_name.replace("-", "_"), + release.version, + ) + + db_request.user = user + pyramid_config.testing_securitypolicy(identity=user) + + db_request.user_agent = "warehouse-tests/6.6.6" + + wheel_metadata = dedent( + """ + Metadata-Version: 2.1 + Name: {project.name} + Version: {release.version} + """ + ).encode("utf-8") + + wheel_testdata = _get_whl_testdata( + name=project.normalized_name.replace("-", "_"), version=release.version + ) + wheel_md5 = hashlib.md5(wheel_testdata).hexdigest() + + content = FieldStorage() + content.filename = filename + content.file = io.BytesIO(wheel_testdata) + content.type = "application/octet-stream" + + db_request.POST = MultiDict( + { + "metadata_version": "2.1", + "name": project.name, + "version": release.version, + "filetype": "bdist_wheel", + "pyversion": "py2.py3", + "content": content, + "md5_digest": wheel_md5, + "wheel_metadata_version": "1.0", + "wheel_metadata": base64.b64encode(wheel_metadata).decode("ascii"), + } + ) + + storage_service = pretend.stub(store=lambda path, file_path, *, meta: None) + db_request.find_service = pretend.call_recorder( + lambda svc, name=None, context=None: { + IFileStorage: storage_service, + }.get(svc) + ) + + # Patch open to raise OSError + original_open = builtins.open + + def mock_open(file, mode="r", *args, **kwargs): + if str(file).endswith(".metadata"): + raise OSError("Filename too long") + return original_open(file, mode, *args, **kwargs) + + monkeypatch.setattr("builtins.open", mock_open) + + with pytest.raises(HTTPBadRequest) as excinfo: + legacy.file_upload(db_request) + + resp = excinfo.value + + assert resp.status_code == 400 + assert resp.status == f"400 Filename is too long: '{filename}'" + + assert db_request.metrics.increment.calls == [ + pretend.call("warehouse.upload.attempt"), + pretend.call( + "warehouse.upload.failed", + tags=["reason:filename-too-long", "filetype:bdist_wheel"], + ), + ] + def test_upload_fails_with_previously_used_filename( self, pyramid_config, db_request ): diff --git a/warehouse/forklift/legacy.py b/warehouse/forklift/legacy.py index 5422785f6322..4e2745c7a026 100644 --- a/warehouse/forklift/legacy.py +++ b/warehouse/forklift/legacy.py @@ -1536,8 +1536,22 @@ def file_upload(request): filename=filename, metadata_filename=metadata_filename ), ) - with open(temporary_filename + ".metadata", "wb") as fp: - fp.write(wheel_metadata_contents) + try: + with open(temporary_filename + ".metadata", "wb") as fp: + fp.write(wheel_metadata_contents) + except OSError: + request.metrics.increment( + "warehouse.upload.failed", + tags=[ + "reason:filename-too-long", + f"filetype:{form.filetype.data}", + ], + ) + raise _exc_with_message( + HTTPBadRequest, + f"Filename is too long: '{filename}'", + ) + metadata_file_hashes = { "sha256": hashlib.sha256(), "blake2_256": hashlib.blake2b(digest_size=256 // 8),