Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 87f3a30

Browse files
committed
fix: improve new minio implementation
The team mentioned wanting to ship the new read implementation first then the new write implementation so this commit enables that. i'm folding the NewMinioStorageService functionality back into the MinioStorageService and creating 2 new feature flags one for read and one for write. Based on the values of those feature flags the MinioStorageService will choose an implementation of read/write. I also removed the old USE_NEW_MINIO feature flag, and modified the get_appropriate_storage_service function to make it so it no longer caches the MinioStorageService and instead the MinioStorageService is caching its internal minio_client since that is the expensive part of creating a new MinioStorageService instance. I also moved some code around
1 parent ec4b5fb commit 87f3a30

File tree

10 files changed

+467
-676
lines changed

10 files changed

+467
-676
lines changed

shared/rollouts/features.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@
22

33
BUNDLE_THRESHOLD_FLAG = Feature("bundle_threshold_flag")
44
INCLUDE_GITHUB_COMMENT_ACTIONS_BY_OWNER = Feature("include_github_comment_actions")
5-
USE_NEW_MINIO = Feature("use_new_minio")
5+
6+
READ_NEW_MINIO = Feature("read_new_minio")
7+
WRITE_NEW_MINIO = Feature("write_new_minio")

shared/storage/__init__.py

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
1-
from typing import Literal
2-
31
from shared.config import get_config
4-
from shared.rollouts.features import USE_NEW_MINIO
5-
from shared.storage.aws import AWSStorageService
2+
from shared.rollouts.features import READ_NEW_MINIO, WRITE_NEW_MINIO
63
from shared.storage.base import BaseStorageService
7-
from shared.storage.fallback import StorageWithFallbackService
8-
from shared.storage.gcp import GCPStorageService
94
from shared.storage.minio import MinioStorageService
10-
from shared.storage.new_minio import NewMinioStorageService
115

126
_storage_service_cache: dict[str, BaseStorageService] = {}
137

@@ -16,24 +10,20 @@ def get_appropriate_storage_service(
1610
repoid: int | None = None,
1711
force_minio=False,
1812
) -> BaseStorageService:
19-
chosen_storage = "minio"
20-
if repoid and USE_NEW_MINIO.check_value(repoid, default=False):
21-
chosen_storage = "new_minio"
22-
23-
if chosen_storage not in _storage_service_cache:
24-
_storage_service_cache[chosen_storage] = (
25-
_get_appropriate_storage_service_given_storage(chosen_storage)
13+
return get_minio_storage_service(repoid)
14+
15+
16+
def get_minio_storage_service(
17+
repo_id: int | None,
18+
) -> MinioStorageService:
19+
minio_config = get_config("services", "minio", default={})
20+
if repo_id:
21+
new_read = READ_NEW_MINIO.check_value(repo_id, default=False)
22+
new_write = WRITE_NEW_MINIO.check_value(repo_id, default=False)
23+
return MinioStorageService(
24+
minio_config,
25+
new_read=new_read,
26+
new_write=new_write,
2627
)
27-
28-
return _storage_service_cache[chosen_storage]
29-
30-
31-
def _get_appropriate_storage_service_given_storage(
32-
chosen_storage: Literal["minio", "new_minio"],
33-
) -> BaseStorageService:
34-
if chosen_storage == "new_minio":
35-
minio_config = get_config("services", "minio", default={})
36-
return NewMinioStorageService(minio_config)
37-
elif chosen_storage == "minio":
38-
minio_config = get_config("services", "minio", default={})
28+
else:
3929
return MinioStorageService(minio_config)

shared/storage/compression.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import gzip
2+
import importlib.metadata
3+
from typing import IO
4+
5+
6+
class GZipStreamReader:
7+
def __init__(self, fileobj: IO[bytes]):
8+
self.data = fileobj
9+
10+
def read(self, size: int = -1, /) -> bytes:
11+
curr_data = self.data.read(size)
12+
13+
if not curr_data:
14+
return b""
15+
16+
return gzip.compress(curr_data)
17+
18+
19+
def zstd_decoded_by_default() -> bool:
20+
try:
21+
version = importlib.metadata.version("urllib3")
22+
except importlib.metadata.PackageNotFoundError:
23+
return False
24+
25+
if version < "2.0.0":
26+
return False
27+
28+
distribution = importlib.metadata.metadata("urllib3")
29+
if requires_dist := distribution.get_all("Requires-Dist"):
30+
for req in requires_dist:
31+
if "[zstd]" in req:
32+
return True
33+
34+
return False

0 commit comments

Comments
 (0)