Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jokester committed Jun 15, 2024
1 parent 5dfcb12 commit bf37132
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 37 deletions.
2 changes: 2 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
# CDN URL 鉴权主/备 KEY
CDN_URL_KEY_A = env.get("CDN_URL_KEY_A", "")
CDN_URL_KEY_B = env.get("CDN_URL_KEY_B", "") # 备 KEY 暂未用到
OPENDAL_SERVICE = env.get('OPENDAL_SERVICE', '')
OPENDAL_GCS_BUCKET = env.get("OPENDAL_GCS_BUCKET", '')
# -----------
# 内容安全
# -----------
Expand Down
5 changes: 5 additions & 0 deletions app/constants/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@
class StorageType(StrType):
OSS = "OSS"
LOCAL_STORAGE = "LOCAL_STORAGE"
OPENDAL = "OPENDAL"


class OpendalStorageService(StrType):
GCS = "GCS" # Google Cloud Storage
40 changes: 40 additions & 0 deletions app/services/file_storage_abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
import io

import werkzeug
from typing import Union, List, Optional


class FSSException(Exception):
pass


class FSSKeyNotFound(FSSException):
pass


class AbstractStorageService(ABC):
@abstractmethod
def upload(self, path: str, filename: str,
file: io.BufferedReader | werkzeug.wrappers.request.FileStorage | str,
headers: Optional[dict[str, int | str]] = None, progress_callback=None):
"""上传文件"""
pass

Check warning on line 22 in app/services/file_storage_abstract.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_abstract.py#L22

Added line #L22 was not covered by tests

@abstractmethod
def download(self, path: str, filename: str, /, *, local_path=None) -> Optional[io.BytesIO]:
"""下载文件"""
pass

Check warning on line 27 in app/services/file_storage_abstract.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_abstract.py#L27

Added line #L27 was not covered by tests

@abstractmethod
def is_exist(self, path: str, filename: str, process_name: Optional[str] = None) -> bool:
"""检查文件是否存在"""
pass

Check warning on line 32 in app/services/file_storage_abstract.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_abstract.py#L32

Added line #L32 was not covered by tests

@abstractmethod
def delete(self, path: str, filename: Union[List[str], str]):
pass

Check warning on line 36 in app/services/file_storage_abstract.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_abstract.py#L36

Added line #L36 was not covered by tests

@abstractmethod
def sign_url(self, *args, **kwargs) -> str:
pass

Check warning on line 40 in app/services/file_storage_abstract.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_abstract.py#L40

Added line #L40 was not covered by tests
135 changes: 135 additions & 0 deletions app/services/file_storage_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""
文件存储服务 (基本用于图片)
- 读写文件 (本地或云)
- 为文件生成供客户端使用的URL
代替1.0.7之前的oss+本地存储
"""

from __future__ import annotations

import asyncio
import io
import werkzeug
import opendal
import mimetypes
from typing import Union, Optional, List

from app.constants.storage import StorageType, OpendalStorageService
from asgiref.sync import async_to_sync
from app.utils.logging import logger
from .file_storage_abstract import AbstractStorageService


def create_opendal_storage_service(config: dict[str, str]) -> OpenDalStorageService:
if config['STORAGE_TYPE'] != StorageType.OPENDAL:
raise Exception("unexpected STORAGE_TYPE")

Check warning on line 27 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L26-L27

Added lines #L26 - L27 were not covered by tests

if config.get('OPENDAL_SERVICE') == OpendalStorageService.GCS:
url_builder = GcpUrlBuilder(config)

Check warning on line 30 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L29-L30

Added lines #L29 - L30 were not covered by tests

operator = opendal.AsyncOperator("gcs",

Check warning on line 32 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L32

Added line #L32 was not covered by tests
bucket=config['OPENDAL_GCS_BUCKET'],
root="/")
return OpenDalStorageService(url_builder, operator)

Check warning on line 35 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L35

Added line #L35 was not covered by tests

raise Exception("unsupported STORAGE_PROVIDER: {0}".format(config.get('OPENDAL_SERVICE')))

Check warning on line 37 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L37

Added line #L37 was not covered by tests


def read_key_file(path_or_value: str) -> str:
with open(path_or_value, "r") as f:
return f.read()

Check warning on line 42 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L41-L42

Added lines #L41 - L42 were not covered by tests


class OpenDalStorageService(AbstractStorageService):
"""
各种云存储服务的io
"""

def __init__(self, url_builder: GcpUrlBuilder, operator: opendal.AsyncOperator):
self.url_builder = url_builder
self.operator = operator

Check warning on line 52 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L51-L52

Added lines #L51 - L52 were not covered by tests

def upload(self, path: str, filename: str,
file: io.BufferedReader | werkzeug.wrappers.request.FileStorage | str,
headers: Optional[dict[str, int | str]] = None, progress_callback=None) -> None:
self._sync_upload(path, filename, file, headers or {})

Check warning on line 57 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L57

Added line #L57 was not covered by tests

@async_to_sync
async def _sync_upload(self, path: str, filename: str,
file: io.BufferedReader | werkzeug.wrappers.request.FileStorage | str,
headers: dict[str, int | str]):
blob: bytes = file.encode() if isinstance(file, str) else file.read()

Check warning on line 63 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L63

Added line #L63 was not covered by tests

aliased_headers = {

Check warning on line 65 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L65

Added line #L65 was not covered by tests
'content_type': 'Content-Type',
'cache_control': 'Cache-Control',
'content_disposition': 'Content-Disposition',
}
write_kwargs = {

Check warning on line 70 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L70

Added line #L70 was not covered by tests
k1: headers.get(k1) or headers.get(k2)
for (k1, k2) in aliased_headers.items()
if (k1 in headers) or (k2 in headers)
}
if "content_type" not in write_kwargs:
guessed_type, guessed_encoding = mimetypes.guess_type(filename)
if guessed_type:
write_kwargs['content_type'] = guessed_type
if "cache_control" not in write_kwargs:
write_kwargs["cache_control"] = "private, max-age=31536000, must-revalidate"
await self.operator.write(path + filename, blob, **write_kwargs)

Check warning on line 81 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L75-L81

Added lines #L75 - L81 were not covered by tests

def download(self, path: str, filename: str, /, *, local_path=None) -> Optional[io.BytesIO]:
"""下载文件"""
if local_path:
self._download_to_file(path, filename, local_path)

Check warning on line 86 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L85-L86

Added lines #L85 - L86 were not covered by tests
else:
downloaded: memoryview = self._download_to_memory(path, filename)
return io.BytesIO(downloaded)

Check warning on line 89 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L88-L89

Added lines #L88 - L89 were not covered by tests

@async_to_sync
async def _download_to_memory(self, path: str, filename: str):
return await self.operator.read("{0}/{1}".format(path, filename))

Check warning on line 93 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L93

Added line #L93 was not covered by tests

@async_to_sync
async def _download_to_file(self, path: str, filename: str, local_path: str):
blob = await self.operator.read("{0}/{1}".format(path, filename))
with open(local_path, "wb") as f:
f.write(bytes(blob))

Check warning on line 99 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L97-L99

Added lines #L97 - L99 were not covered by tests

def is_exist(self, path, filename, process_name=None) -> bool:
"""检查文件是否存在"""
return self._is_exist(path, filename)

Check warning on line 103 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L103

Added line #L103 was not covered by tests

@async_to_sync
async def _is_exist(self, path, filename):
metadata = await self.operator.stat(path + filename)

Check warning on line 107 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L107

Added line #L107 was not covered by tests
# FIXME
return True

Check warning on line 109 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L109

Added line #L109 was not covered by tests

def delete(self, path_prefix: str, filename: List[str] | str):
self._sync_delete(path_prefix, filename)

Check warning on line 112 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L112

Added line #L112 was not covered by tests

@async_to_sync
async def _sync_delete(self, path_prefix: str, filename: List[str] | str):
targets = filename if isinstance(filename, list) else [filename]
await asyncio.gather(

Check warning on line 117 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L116-L117

Added lines #L116 - L117 were not covered by tests
*[self.operator.delete(path_prefix + t) for t in targets]
)

def sign_url(self, path_prefix: str, filename: str, expires: int = 3600, process_name: str = None) -> str:
"""生成URL"""
return self.url_builder.create_public_url(path_prefix, filename, expires=expires, process_name=process_name)

Check warning on line 123 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L123

Added line #L123 was not covered by tests


class GcpUrlBuilder:
"""
GCP Cloud Storage的URL生成
"""

def __init__(self, options: dict[str, str]):
self.bucket_name = options['OPENDAL_GCS_BUCKET']

Check warning on line 132 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L132

Added line #L132 was not covered by tests

def create_public_url(self, path_prefix: str, filename: str, /, **kwargs) -> str:
return f"https://storage.cloud.google.com/{self.bucket_name}/{path_prefix}{filename}"

Check warning on line 135 in app/services/file_storage_service.py

View check run for this annotation

Codecov / codecov/patch

app/services/file_storage_service.py#L135

Added line #L135 was not covered by tests

0 comments on commit bf37132

Please sign in to comment.