Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ellar_sqlalchemy/model/typeDecorator/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import typing as t


class ContentTypeValidationError(Exception):
class ContentTypeValidationError(ValueError):
def __init__(
self,
content_type: t.Optional[str] = None,
Expand All @@ -18,14 +18,14 @@ def __init__(
super().__init__(message)


class InvalidFileError(Exception):
class InvalidFileError(ValueError):
pass


class InvalidImageOperationError(Exception):
class InvalidImageOperationError(ValueError):
pass


class MaximumAllowedFileLengthError(Exception):
class MaximumAllowedFileLengthError(ValueError):
def __init__(self, max_length: int) -> None:
super().__init__("Cannot store files larger than: %d bytes" % max_length)
27 changes: 16 additions & 11 deletions ellar_sqlalchemy/model/typeDecorator/file/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import time
import typing as t
import uuid
from abc import abstractmethod

import sqlalchemy as sa
from ellar.common import UploadFile
from ellar.core.files.storages import BaseStorage
from ellar.core.files.storages.utils import get_valid_filename
from starlette.datastructures import UploadFile

from ellar_sqlalchemy.model.typeDecorator.exceptions import (
ContentTypeValidationError,
Expand All @@ -25,7 +26,10 @@


class FileFieldBase(t.Generic[T]):
FileObject: t.Type[T] = t.cast(t.Type[T], FileObject)
@property
@abstractmethod
def file_object_type(self) -> t.Type[T]:
...

def load_dialect_impl(self, dialect: sa.Dialect) -> t.Any:
if dialect.name == "sqlite":
Expand Down Expand Up @@ -74,7 +78,7 @@ def load_from_str(self, data: str) -> T:
def load(self, data: t.Dict[str, t.Any]) -> T:
if "service_name" in data:
data.pop("service_name")
return self.FileObject(storage=self.storage, **data)
return self.file_object_type(storage=self.storage, **data)

def _guess_content_type(self, file: t.IO) -> str: # type:ignore[type-arg]
content = file.read(1024)
Expand All @@ -97,14 +101,15 @@ def convert_to_file_object(self, file: UploadFile) -> T:
original_filename = file.filename or unique_name

# use python magic to get the content type
content_type = self._guess_content_type(file.file)
content_type = self._guess_content_type(file.file) or ""
extension = guess_extension(content_type)
assert extension

file_size = get_length(file.file)
saved_filename = (
f"{original_filename[:-len(extension)]}_{unique_name[:-8]}{extension}"
)
if extension:
saved_filename = (
f"{original_filename[:-len(extension)]}_{unique_name[:-8]}{extension}"
)
else:
saved_filename = f"{unique_name[:-8]}_{original_filename}"
saved_filename = get_valid_filename(saved_filename)

init_kwargs = self.get_extra_file_initialization_context(file)
Expand All @@ -117,7 +122,7 @@ def convert_to_file_object(self, file: UploadFile) -> T:
file_size=file_size,
saved_filename=saved_filename,
)
return self.FileObject(**init_kwargs)
return self.file_object_type(**init_kwargs)

def process_bind_param_action(
self, value: t.Optional[t.Any], dialect: sa.Dialect
Expand All @@ -138,7 +143,7 @@ def process_bind_param_action(
return json.dumps(value.to_dict())
return value.to_dict()

raise InvalidFileError()
raise InvalidFileError(f"{value} is not supported")

def process_result_value_action(
self, value: t.Optional[t.Any], dialect: sa.Dialect
Expand Down
5 changes: 5 additions & 0 deletions ellar_sqlalchemy/model/typeDecorator/file/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


class FileField(FileFieldBase[FileObject], sa.TypeDecorator): # type: ignore[type-arg]

"""
Provide SqlAlchemy TypeDecorator for saving files
## Basic Usage
Expand All @@ -28,6 +29,10 @@ def route(file: File[UploadFile]):

"""

@property
def file_object_type(self) -> t.Type[FileObject]:
return FileObject

impl = sa.JSON

def process_bind_param(
Expand Down
5 changes: 1 addition & 4 deletions ellar_sqlalchemy/model/typeDecorator/file/file_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,5 @@ def to_dict(self) -> t.Dict[str, t.Any]:
"service_name": self._storage.service_name(),
}

def __str__(self) -> str:
return f"filename={self.filename}, content_type={self.content_type}, file_size={self.file_size}"

def __repr__(self) -> str:
return str(self)
return f"<{self.__class__.__name__} filename={self.filename}, content_type={self.content_type}, file_size={self.file_size}>"
23 changes: 15 additions & 8 deletions ellar_sqlalchemy/model/typeDecorator/image/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from io import SEEK_END, BytesIO

import sqlalchemy as sa
from ellar.common import UploadFile
from ellar.core.files.storages import BaseStorage
from PIL import Image
from starlette.datastructures import UploadFile

from ..exceptions import InvalidImageOperationError
from ..file import FileFieldBase
Expand All @@ -18,9 +18,10 @@ class ImageFileField(FileFieldBase[ImageFileObject], sa.TypeDecorator): # type:
## Basic Usage

class MyTable(Base):
image:
ImageFileField.FileObject = sa.Column(ImageFileField(storage=FileSystemStorage('path/to/save/files',
max_size=10*MB), nullable=True)
image: ImageFileField.FileObject = sa.Column(
ImageFileField(storage=FileSystemStorage('path/to/save/files', max_size=10*MB),
nullable=True
)

def route(file: File[UploadFile]):
session = SessionLocal()
Expand All @@ -46,7 +47,6 @@ def route(file: File[UploadFile]):
"""

impl = sa.JSON
FileObject = ImageFileObject

def __init__(
self,
Expand All @@ -60,6 +60,10 @@ def __init__(
super().__init__(*args, storage=storage, max_size=max_size, **kwargs)
self.crop = crop

@property
def file_object_type(self) -> t.Type[ImageFileObject]:
return ImageFileObject

def process_bind_param(
self, value: t.Optional[t.Any], dialect: sa.Dialect
) -> t.Any:
Expand All @@ -73,9 +77,12 @@ def process_result_value(
def get_extra_file_initialization_context(
self, file: UploadFile
) -> t.Dict[str, t.Any]:
with Image.open(file.file) as image:
width, height = image.size
return {"width": width, "height": height}
try:
with Image.open(file.file) as image:
width, height = image.size
return {"width": width, "height": height}
except Exception:
return {"width": None, "height": None}

def crop_image_with_box_sizing(
self, file: UploadFile, crop: t.Optional[CroppingDetails] = None
Expand Down
Empty file.
Binary file added tests/test_type_decorators/fixtures/image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
165 changes: 165 additions & 0 deletions tests/test_type_decorators/test_file_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import os
import uuid
from io import BytesIO
from unittest.mock import patch

import pytest
import sqlalchemy.exc as sa_exc
from ellar.common.datastructures import ContentFile, UploadFile
from ellar.core.files import storages
from starlette.datastructures import Headers

from ellar_sqlalchemy import model
from ellar_sqlalchemy.model.utils import MB


def serialize_file_data(file):
keys = {
"original_filename",
"content_type",
"extension",
"file_size",
"service_name",
}
return {k: v for k, v in file.to_dict().items() if k in keys}


def test_file_column_type(db_service, ignore_base, tmp_path):
path = str(tmp_path / "files")
fs = storages.FileSystemStorage(path)

class File(model.Model):
id: model.Mapped[uuid.uuid4] = model.mapped_column(
"id", model.Integer(), nullable=False, unique=True, primary_key=True
)
file: model.Mapped[model.FileObject] = model.mapped_column(
"file", model.FileField(storage=fs), nullable=False
)

db_service.create_all()
session = db_service.session_factory()
session.add(File(file=ContentFile(b"Testing file column type", name="text.txt")))
session.commit()

file: File = session.execute(model.select(File)).scalar()
assert "content_type=text/plain" in repr(file.file)

data = serialize_file_data(file.file)
assert data == {
"content_type": "text/plain",
"extension": ".txt",
"file_size": 24,
"original_filename": "text.txt",
"service_name": "local",
}

assert os.listdir(path)[0].split(".")[1] == "txt"


def test_file_column_invalid_file_extension(db_service, ignore_base, tmp_path):
fs = storages.FileSystemStorage(str(tmp_path / "files"))

class File(model.Model):
id: model.Mapped[uuid.uuid4] = model.mapped_column(
"id", model.Integer(), nullable=False, unique=True, primary_key=True
)
file: model.Mapped[model.FileObject] = model.mapped_column(
"file",
model.FileField(storage=fs, allowed_content_types=["application/pdf"]),
nullable=False,
)

with pytest.raises(sa_exc.StatementError) as stmt_exc:
db_service.create_all()
session = db_service.session_factory()
session.add(
File(file=ContentFile(b"Testing file column type", name="text.txt"))
)
session.commit()
assert (
str(stmt_exc.value.orig)
== "Content type is not supported text/plain. Valid options are: application/pdf"
)


@patch(
"ellar_sqlalchemy.model.typeDecorator.file.base.magic_mime_from_buffer",
return_value=None,
)
def test_file_column_invalid_file_extension_case_2(
mock_buffer, db_service, ignore_base, tmp_path
):
fs = storages.FileSystemStorage(str(tmp_path / "files"))

class File(model.Model):
id: model.Mapped[uuid.uuid4] = model.mapped_column(
"id", model.Integer(), nullable=False, unique=True, primary_key=True
)
file: model.Mapped[model.FileObject] = model.mapped_column(
"file",
model.FileField(storage=fs, allowed_content_types=["application/pdf"]),
nullable=False,
)

with pytest.raises(sa_exc.StatementError) as stmt_exc:
db_service.create_all()
session = db_service.session_factory()
session.add(
File(
file=UploadFile(
BytesIO(b"Testing file column type"),
size=24,
filename="test.txt",
headers=Headers({"content-type": ""}),
)
)
)
session.commit()
assert mock_buffer.called
assert (
str(stmt_exc.value.orig)
== "Content type is not supported . Valid options are: application/pdf"
)


@patch("ellar_sqlalchemy.model.typeDecorator.file.base.get_length", return_value=MB * 7)
def test_file_column_invalid_file_size_case_2(
mock_buffer, db_service, ignore_base, tmp_path
):
fs = storages.FileSystemStorage(str(tmp_path / "files"))

class File(model.Model):
id: model.Mapped[uuid.uuid4] = model.mapped_column(
"id", model.Integer(), nullable=False, unique=True, primary_key=True
)
file: model.Mapped[model.FileObject] = model.mapped_column(
"file", model.FileField(storage=fs, max_size=MB * 6), nullable=False
)

with pytest.raises(sa_exc.StatementError) as stmt_exc:
db_service.create_all()
session = db_service.session_factory()
session.add(File(file=ContentFile(b"Testing File Size Validation")))
session.commit()
assert mock_buffer.called
assert str(stmt_exc.value.orig) == "Cannot store files larger than: 6291456 bytes"


def test_file_column_invalid_set(db_service, ignore_base, tmp_path):
fs = storages.FileSystemStorage(str(tmp_path / "files"))

class File(model.Model):
id: model.Mapped[uuid.uuid4] = model.mapped_column(
"id", model.Integer(), nullable=False, unique=True, primary_key=True
)
file: model.Mapped[model.FileObject] = model.mapped_column(
"file", model.FileField(storage=fs, max_size=MB * 6), nullable=False
)

db_service.create_all()
session = db_service.session_factory()
with pytest.raises(sa_exc.StatementError) as stmt_exc:
session.add(File(file={}))
session.commit()

assert str(stmt_exc.value.orig) == "{} is not supported"
25 changes: 25 additions & 0 deletions tests/test_type_decorators/test_guid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import uuid

from ellar_sqlalchemy import model


def test_guid_column_type(db_service, ignore_base):
uid = uuid.uuid4()

class Guid(model.Model):
id: model.Mapped[uuid.uuid4] = model.mapped_column(
"id",
model.GUID(),
nullable=False,
unique=True,
primary_key=True,
default=uuid.uuid4,
)

db_service.create_all()
session = db_service.session_factory()
session.add(Guid(id=uid))
session.commit()

guid = session.execute(model.select(Guid)).scalar()
assert guid.id == uid
Loading