diff --git a/ellar_sqlalchemy/model/typeDecorator/exceptions.py b/ellar_sqlalchemy/model/typeDecorator/exceptions.py index 6bda771..330e395 100644 --- a/ellar_sqlalchemy/model/typeDecorator/exceptions.py +++ b/ellar_sqlalchemy/model/typeDecorator/exceptions.py @@ -1,7 +1,7 @@ import typing as t -class ContentTypeValidationError(Exception): +class ContentTypeValidationError(ValueError): def __init__( self, content_type: t.Optional[str] = None, @@ -18,14 +18,14 @@ def __init__( super().__init__(message) -class InvalidFileError(Exception): +class InvalidFileError(ValueError): pass -class InvalidImageOperationError(Exception): +class InvalidImageOperationError(ValueError): pass -class MaximumAllowedFileLengthError(Exception): +class MaximumAllowedFileLengthError(ValueError): def __init__(self, max_length: int) -> None: super().__init__("Cannot store files larger than: %d bytes" % max_length) diff --git a/ellar_sqlalchemy/model/typeDecorator/file/base.py b/ellar_sqlalchemy/model/typeDecorator/file/base.py index 6391f23..6fa89cf 100644 --- a/ellar_sqlalchemy/model/typeDecorator/file/base.py +++ b/ellar_sqlalchemy/model/typeDecorator/file/base.py @@ -2,11 +2,12 @@ import time import typing as t import uuid +from abc import abstractmethod import sqlalchemy as sa +from ellar.common import UploadFile from ellar.core.files.storages import BaseStorage from ellar.core.files.storages.utils import get_valid_filename -from starlette.datastructures import UploadFile from ellar_sqlalchemy.model.typeDecorator.exceptions import ( ContentTypeValidationError, @@ -25,7 +26,10 @@ class FileFieldBase(t.Generic[T]): - FileObject: t.Type[T] = t.cast(t.Type[T], FileObject) + @property + @abstractmethod + def file_object_type(self) -> t.Type[T]: + ... def load_dialect_impl(self, dialect: sa.Dialect) -> t.Any: if dialect.name == "sqlite": @@ -74,7 +78,7 @@ def load_from_str(self, data: str) -> T: def load(self, data: t.Dict[str, t.Any]) -> T: if "service_name" in data: data.pop("service_name") - return self.FileObject(storage=self.storage, **data) + return self.file_object_type(storage=self.storage, **data) def _guess_content_type(self, file: t.IO) -> str: # type:ignore[type-arg] content = file.read(1024) @@ -97,14 +101,15 @@ def convert_to_file_object(self, file: UploadFile) -> T: original_filename = file.filename or unique_name # use python magic to get the content type - content_type = self._guess_content_type(file.file) + content_type = self._guess_content_type(file.file) or "" extension = guess_extension(content_type) - assert extension - file_size = get_length(file.file) - saved_filename = ( - f"{original_filename[:-len(extension)]}_{unique_name[:-8]}{extension}" - ) + if extension: + saved_filename = ( + f"{original_filename[:-len(extension)]}_{unique_name[:-8]}{extension}" + ) + else: + saved_filename = f"{unique_name[:-8]}_{original_filename}" saved_filename = get_valid_filename(saved_filename) init_kwargs = self.get_extra_file_initialization_context(file) @@ -117,7 +122,7 @@ def convert_to_file_object(self, file: UploadFile) -> T: file_size=file_size, saved_filename=saved_filename, ) - return self.FileObject(**init_kwargs) + return self.file_object_type(**init_kwargs) def process_bind_param_action( self, value: t.Optional[t.Any], dialect: sa.Dialect @@ -138,7 +143,7 @@ def process_bind_param_action( return json.dumps(value.to_dict()) return value.to_dict() - raise InvalidFileError() + raise InvalidFileError(f"{value} is not supported") def process_result_value_action( self, value: t.Optional[t.Any], dialect: sa.Dialect diff --git a/ellar_sqlalchemy/model/typeDecorator/file/field.py b/ellar_sqlalchemy/model/typeDecorator/file/field.py index 993fac3..6f7ea7f 100644 --- a/ellar_sqlalchemy/model/typeDecorator/file/field.py +++ b/ellar_sqlalchemy/model/typeDecorator/file/field.py @@ -7,6 +7,7 @@ class FileField(FileFieldBase[FileObject], sa.TypeDecorator): # type: ignore[type-arg] + """ Provide SqlAlchemy TypeDecorator for saving files ## Basic Usage @@ -28,6 +29,10 @@ def route(file: File[UploadFile]): """ + @property + def file_object_type(self) -> t.Type[FileObject]: + return FileObject + impl = sa.JSON def process_bind_param( diff --git a/ellar_sqlalchemy/model/typeDecorator/file/file_info.py b/ellar_sqlalchemy/model/typeDecorator/file/file_info.py index 973478a..84f4422 100644 --- a/ellar_sqlalchemy/model/typeDecorator/file/file_info.py +++ b/ellar_sqlalchemy/model/typeDecorator/file/file_info.py @@ -42,8 +42,5 @@ def to_dict(self) -> t.Dict[str, t.Any]: "service_name": self._storage.service_name(), } - def __str__(self) -> str: - return f"filename={self.filename}, content_type={self.content_type}, file_size={self.file_size}" - def __repr__(self) -> str: - return str(self) + return f"<{self.__class__.__name__} filename={self.filename}, content_type={self.content_type}, file_size={self.file_size}>" diff --git a/ellar_sqlalchemy/model/typeDecorator/image/field.py b/ellar_sqlalchemy/model/typeDecorator/image/field.py index 2126cf4..6f70027 100644 --- a/ellar_sqlalchemy/model/typeDecorator/image/field.py +++ b/ellar_sqlalchemy/model/typeDecorator/image/field.py @@ -2,9 +2,9 @@ from io import SEEK_END, BytesIO import sqlalchemy as sa +from ellar.common import UploadFile from ellar.core.files.storages import BaseStorage from PIL import Image -from starlette.datastructures import UploadFile from ..exceptions import InvalidImageOperationError from ..file import FileFieldBase @@ -18,9 +18,10 @@ class ImageFileField(FileFieldBase[ImageFileObject], sa.TypeDecorator): # type: ## Basic Usage class MyTable(Base): - image: - ImageFileField.FileObject = sa.Column(ImageFileField(storage=FileSystemStorage('path/to/save/files', - max_size=10*MB), nullable=True) + image: ImageFileField.FileObject = sa.Column( + ImageFileField(storage=FileSystemStorage('path/to/save/files', max_size=10*MB), + nullable=True + ) def route(file: File[UploadFile]): session = SessionLocal() @@ -46,7 +47,6 @@ def route(file: File[UploadFile]): """ impl = sa.JSON - FileObject = ImageFileObject def __init__( self, @@ -60,6 +60,10 @@ def __init__( super().__init__(*args, storage=storage, max_size=max_size, **kwargs) self.crop = crop + @property + def file_object_type(self) -> t.Type[ImageFileObject]: + return ImageFileObject + def process_bind_param( self, value: t.Optional[t.Any], dialect: sa.Dialect ) -> t.Any: @@ -73,9 +77,12 @@ def process_result_value( def get_extra_file_initialization_context( self, file: UploadFile ) -> t.Dict[str, t.Any]: - with Image.open(file.file) as image: - width, height = image.size - return {"width": width, "height": height} + try: + with Image.open(file.file) as image: + width, height = image.size + return {"width": width, "height": height} + except Exception: + return {"width": None, "height": None} def crop_image_with_box_sizing( self, file: UploadFile, crop: t.Optional[CroppingDetails] = None diff --git a/tests/test_type_decorators/__init__.py b/tests/test_type_decorators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_type_decorators/fixtures/image.png b/tests/test_type_decorators/fixtures/image.png new file mode 100644 index 0000000..94ef3eb Binary files /dev/null and b/tests/test_type_decorators/fixtures/image.png differ diff --git a/tests/test_type_decorators/test_file_upload.py b/tests/test_type_decorators/test_file_upload.py new file mode 100644 index 0000000..01785e1 --- /dev/null +++ b/tests/test_type_decorators/test_file_upload.py @@ -0,0 +1,165 @@ +import os +import uuid +from io import BytesIO +from unittest.mock import patch + +import pytest +import sqlalchemy.exc as sa_exc +from ellar.common.datastructures import ContentFile, UploadFile +from ellar.core.files import storages +from starlette.datastructures import Headers + +from ellar_sqlalchemy import model +from ellar_sqlalchemy.model.utils import MB + + +def serialize_file_data(file): + keys = { + "original_filename", + "content_type", + "extension", + "file_size", + "service_name", + } + return {k: v for k, v in file.to_dict().items() if k in keys} + + +def test_file_column_type(db_service, ignore_base, tmp_path): + path = str(tmp_path / "files") + fs = storages.FileSystemStorage(path) + + class File(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + file: model.Mapped[model.FileObject] = model.mapped_column( + "file", model.FileField(storage=fs), nullable=False + ) + + db_service.create_all() + session = db_service.session_factory() + session.add(File(file=ContentFile(b"Testing file column type", name="text.txt"))) + session.commit() + + file: File = session.execute(model.select(File)).scalar() + assert "content_type=text/plain" in repr(file.file) + + data = serialize_file_data(file.file) + assert data == { + "content_type": "text/plain", + "extension": ".txt", + "file_size": 24, + "original_filename": "text.txt", + "service_name": "local", + } + + assert os.listdir(path)[0].split(".")[1] == "txt" + + +def test_file_column_invalid_file_extension(db_service, ignore_base, tmp_path): + fs = storages.FileSystemStorage(str(tmp_path / "files")) + + class File(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + file: model.Mapped[model.FileObject] = model.mapped_column( + "file", + model.FileField(storage=fs, allowed_content_types=["application/pdf"]), + nullable=False, + ) + + with pytest.raises(sa_exc.StatementError) as stmt_exc: + db_service.create_all() + session = db_service.session_factory() + session.add( + File(file=ContentFile(b"Testing file column type", name="text.txt")) + ) + session.commit() + assert ( + str(stmt_exc.value.orig) + == "Content type is not supported text/plain. Valid options are: application/pdf" + ) + + +@patch( + "ellar_sqlalchemy.model.typeDecorator.file.base.magic_mime_from_buffer", + return_value=None, +) +def test_file_column_invalid_file_extension_case_2( + mock_buffer, db_service, ignore_base, tmp_path +): + fs = storages.FileSystemStorage(str(tmp_path / "files")) + + class File(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + file: model.Mapped[model.FileObject] = model.mapped_column( + "file", + model.FileField(storage=fs, allowed_content_types=["application/pdf"]), + nullable=False, + ) + + with pytest.raises(sa_exc.StatementError) as stmt_exc: + db_service.create_all() + session = db_service.session_factory() + session.add( + File( + file=UploadFile( + BytesIO(b"Testing file column type"), + size=24, + filename="test.txt", + headers=Headers({"content-type": ""}), + ) + ) + ) + session.commit() + assert mock_buffer.called + assert ( + str(stmt_exc.value.orig) + == "Content type is not supported . Valid options are: application/pdf" + ) + + +@patch("ellar_sqlalchemy.model.typeDecorator.file.base.get_length", return_value=MB * 7) +def test_file_column_invalid_file_size_case_2( + mock_buffer, db_service, ignore_base, tmp_path +): + fs = storages.FileSystemStorage(str(tmp_path / "files")) + + class File(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + file: model.Mapped[model.FileObject] = model.mapped_column( + "file", model.FileField(storage=fs, max_size=MB * 6), nullable=False + ) + + with pytest.raises(sa_exc.StatementError) as stmt_exc: + db_service.create_all() + session = db_service.session_factory() + session.add(File(file=ContentFile(b"Testing File Size Validation"))) + session.commit() + assert mock_buffer.called + assert str(stmt_exc.value.orig) == "Cannot store files larger than: 6291456 bytes" + + +def test_file_column_invalid_set(db_service, ignore_base, tmp_path): + fs = storages.FileSystemStorage(str(tmp_path / "files")) + + class File(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + file: model.Mapped[model.FileObject] = model.mapped_column( + "file", model.FileField(storage=fs, max_size=MB * 6), nullable=False + ) + + db_service.create_all() + session = db_service.session_factory() + with pytest.raises(sa_exc.StatementError) as stmt_exc: + session.add(File(file={})) + session.commit() + + assert str(stmt_exc.value.orig) == "{} is not supported" diff --git a/tests/test_type_decorators/test_guid.py b/tests/test_type_decorators/test_guid.py new file mode 100644 index 0000000..06dab19 --- /dev/null +++ b/tests/test_type_decorators/test_guid.py @@ -0,0 +1,25 @@ +import uuid + +from ellar_sqlalchemy import model + + +def test_guid_column_type(db_service, ignore_base): + uid = uuid.uuid4() + + class Guid(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", + model.GUID(), + nullable=False, + unique=True, + primary_key=True, + default=uuid.uuid4, + ) + + db_service.create_all() + session = db_service.session_factory() + session.add(Guid(id=uid)) + session.commit() + + guid = session.execute(model.select(Guid)).scalar() + assert guid.id == uid diff --git a/tests/test_type_decorators/test_image_upload.py b/tests/test_type_decorators/test_image_upload.py new file mode 100644 index 0000000..c13f773 --- /dev/null +++ b/tests/test_type_decorators/test_image_upload.py @@ -0,0 +1,229 @@ +import os +import uuid +from pathlib import Path + +import pytest +import sqlalchemy.exc as sa_exc +from ellar.common.datastructures import ContentFile, UploadFile +from ellar.core.files import storages +from starlette.datastructures import Headers + +from ellar_sqlalchemy import model +from ellar_sqlalchemy.model.utils import get_length + +fixtures_dir = Path(__file__).parent / "fixtures" + + +def get_image_upload(file, *, filename): + return UploadFile( + file, + filename=filename, + size=get_length(file), + headers=Headers({"content-type": ""}), + ) + + +def serialize_file_data(file): + keys = { + "original_filename", + "content_type", + "extension", + "file_size", + "service_name", + "height", + "width", + } + return {k: v for k, v in file.to_dict().items() if k in keys} + + +def test_image_column_type(db_service, ignore_base, tmp_path): + path = str(tmp_path / "images") + fs = storages.FileSystemStorage(path) + + class Image(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + image: model.Mapped[model.ImageFileObject] = model.mapped_column( + "image", model.ImageFileField(storage=fs), nullable=False + ) + + with open(fixtures_dir / "image.png", mode="rb+") as fp: + db_service.create_all() + session = db_service.session_factory() + session.add(Image(image=get_image_upload(filename="image.png", file=fp))) + session.commit() + + file: Image = session.execute(model.select(Image)).scalar() + + data = serialize_file_data(file.image) + assert data == { + "content_type": "image/png", + "extension": ".png", + "file_size": 1590279, + "height": 1080, + "original_filename": "image.png", + "service_name": "local", + "width": 1080, + } + + assert os.listdir(path)[0].split(".")[1] == "png" + + +def test_image_file_with_cropping_details_set_on_column( + db_service, ignore_base, tmp_path +): + fs = storages.FileSystemStorage(str(tmp_path / "images")) + + class Image2(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + image: model.Mapped[model.ImageFileObject] = model.mapped_column( + "image", + model.ImageFileField( + storage=fs, + crop=model.CroppingDetails(x=100, y=200, height=400, width=400), + ), + nullable=False, + ) + + with open(fixtures_dir / "image.png", mode="rb+") as fp: + db_service.create_all() + session = db_service.session_factory() + session.add(Image2(image=get_image_upload(filename="image.png", file=fp))) + session.commit() + + image_2: Image2 = session.execute(model.select(Image2)).scalar() + + data = serialize_file_data(image_2.image) + assert data == { + "content_type": "image/png", + "extension": ".png", + "file_size": 108477, + "height": 400, + "original_filename": "image.png", + "service_name": "local", + "width": 400, + } + + +def test_image_file_with_cropping_details_on_set(db_service, ignore_base, tmp_path): + fs = storages.FileSystemStorage(str(tmp_path / "images")) + + class Image3(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + image: model.Mapped[model.ImageFileObject] = model.mapped_column( + "image", model.ImageFileField(storage=fs), nullable=False + ) + + db_service.create_all() + session = db_service.session_factory() + + with open(fixtures_dir / "image.png", mode="rb+") as fp: + file = get_image_upload(filename="image.png", file=fp) + image_input = (file, model.CroppingDetails(x=100, y=200, height=400, width=400)) + + session.add(Image3(image=image_input)) + session.commit() + + image_3: Image3 = session.execute(model.select(Image3)).scalar() + + data = serialize_file_data(image_3.image) + assert data == { + "content_type": "image/png", + "extension": ".png", + "file_size": 108477, + "height": 400, + "original_filename": "image.png", + "service_name": "local", + "width": 400, + } + + +def test_image_file_with_cropping_details_override(db_service, ignore_base, tmp_path): + fs = storages.FileSystemStorage(str(tmp_path / "images")) + + class Image4(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + image: model.Mapped[model.ImageFileObject] = model.mapped_column( + "image", + model.ImageFileField( + storage=fs, + crop=model.CroppingDetails(x=100, y=200, height=400, width=400), + ), + nullable=False, + ) + + db_service.create_all() + session = db_service.session_factory() + + with open(fixtures_dir / "image.png", mode="rb+") as fp: + file = get_image_upload(filename="image.png", file=fp) + image_input = (file, model.CroppingDetails(x=100, y=200, height=300, width=300)) + + session.add(Image4(image=image_input)) + session.commit() + + image_4: Image4 = session.execute(model.select(Image4)).scalar() + + data = serialize_file_data(image_4.image) + assert data == { + "content_type": "image/png", + "extension": ".png", + "file_size": 54508, + "height": 300, + "original_filename": "image.png", + "service_name": "local", + "width": 300, + } + + +def test_image_column_invalid_set(db_service, ignore_base, tmp_path): + fs = storages.FileSystemStorage(str(tmp_path / "files")) + + class Image3(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + image: model.Mapped[model.ImageFileObject] = model.mapped_column( + "image", model.ImageFileField(storage=fs), nullable=False + ) + + db_service.create_all() + session = db_service.session_factory() + + with pytest.raises(sa_exc.StatementError) as stmt_exc: + invalid_input = (ContentFile(b"Invalid Set"), {}) + session.add(Image3(image=invalid_input)) + session.commit() + assert str(stmt_exc.value.orig) == ( + "Invalid data was provided for ImageFileField. " + "Accept values: UploadFile or (UploadFile, CroppingDetails)" + ) + + +def test_image_column_invalid_set_case_2(db_service, ignore_base, tmp_path): + fs = storages.FileSystemStorage(str(tmp_path / "files")) + + class Image3(model.Model): + id: model.Mapped[uuid.uuid4] = model.mapped_column( + "id", model.Integer(), nullable=False, unique=True, primary_key=True + ) + image: model.Mapped[model.ImageFileObject] = model.mapped_column( + "image", model.ImageFileField(storage=fs), nullable=False + ) + + db_service.create_all() + session = db_service.session_factory() + + with pytest.raises(sa_exc.StatementError) as stmt_exc: + session.add(Image3(image=ContentFile(b"Not an image"))) + session.commit() + assert str(stmt_exc.value.orig) == ( + "Content type is not supported text/plain. Valid options are: image/jpeg, image/png" + ) diff --git a/tests/test_type_decorators/test_ipaddress.py b/tests/test_type_decorators/test_ipaddress.py new file mode 100644 index 0000000..203eaa5 --- /dev/null +++ b/tests/test_type_decorators/test_ipaddress.py @@ -0,0 +1,17 @@ +from ellar_sqlalchemy import model + + +def test_ipaddress_column_type(db_service, ignore_base): + ip = "192.0.2.1" + + class IPAddress(model.Model): + id = model.Column(model.Integer, primary_key=True) + ip = model.Column(model.GenericIP) + + db_service.create_all() + session = db_service.session_factory() + session.add(IPAddress(ip=ip)) + session.commit() + + ip_address = session.execute(model.select(IPAddress)).scalar() + assert str(ip_address.ip) == ip