Skip to content

Commit

Permalink
fix: improve new minio implementation
Browse files Browse the repository at this point in the history
The team mentioned wanting to ship the new read implementation first
then the new write implementation so this commit enables that.

i'm folding the NewMinioStorageService functionality back into the
MinioStorageService and creating 2 new feature flags one for read and
one for write.

Based on the values of those feature flags the MinioStorageService
will choose an implementation of read/write.

I also removed the old USE_NEW_MINIO feature flag, and modified the
get_appropriate_storage_service function to make it so it no longer
caches the MinioStorageService and instead the MinioStorageService is
caching its internal minio_client since that is the expensive part
of creating a new MinioStorageService instance.

I also moved some code around
  • Loading branch information
joseph-sentry committed Feb 26, 2025
1 parent 015d76e commit 57ea96b
Showing 10 changed files with 471 additions and 567 deletions.
4 changes: 3 additions & 1 deletion shared/rollouts/features.py
Original file line number Diff line number Diff line change
@@ -2,4 +2,6 @@

BUNDLE_THRESHOLD_FLAG = Feature("bundle_threshold_flag")
INCLUDE_GITHUB_COMMENT_ACTIONS_BY_OWNER = Feature("include_github_comment_actions")
USE_NEW_MINIO = Feature("use_new_minio")

READ_NEW_MINIO = Feature("read_new_minio")
WRITE_NEW_MINIO = Feature("write_new_minio")
30 changes: 20 additions & 10 deletions shared/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from shared.config import get_config
from shared.rollouts.features import USE_NEW_MINIO
from shared.rollouts.features import READ_NEW_MINIO, WRITE_NEW_MINIO
from shared.storage.aws import AWSStorageService
from shared.storage.base import BaseStorageService
from shared.storage.fallback import StorageWithFallbackService
from shared.storage.gcp import GCPStorageService
from shared.storage.minio import MinioStorageService
from shared.storage.new_minio import NewMinioStorageService

_storage_service_cache: dict[str, BaseStorageService] = {}

@@ -18,9 +17,8 @@ def get_appropriate_storage_service(
if force_minio:
chosen_storage = "minio"

if repoid and chosen_storage == "minio":
if USE_NEW_MINIO.check_value(repoid, default=False):
chosen_storage = "new_minio"
if chosen_storage == "minio":
return get_minio_storage_service(repoid)

if chosen_storage not in _storage_service_cache:
_storage_service_cache[chosen_storage] = (
@@ -30,6 +28,22 @@ def get_appropriate_storage_service(
return _storage_service_cache[chosen_storage]


def get_minio_storage_service(
repo_id: int | None,
) -> MinioStorageService:
minio_config = get_config("services", "minio", default={})
if repo_id:
new_read = READ_NEW_MINIO.check_value(repo_id, default=False)
new_write = WRITE_NEW_MINIO.check_value(repo_id, default=False)
return MinioStorageService(
minio_config,
new_read=new_read,
new_write=new_write,
)
else:
return MinioStorageService(minio_config)


def _get_appropriate_storage_service_given_storage(
chosen_storage: str,
) -> BaseStorageService:
@@ -45,9 +59,5 @@ def _get_appropriate_storage_service_given_storage(
aws_config = get_config("services", "aws", default={})
aws_service = AWSStorageService(aws_config)
return StorageWithFallbackService(gcp_service, aws_service)
elif chosen_storage == "new_minio":
minio_config = get_config("services", "minio", default={})
return NewMinioStorageService(minio_config)
else:
minio_config = get_config("services", "minio", default={})
return MinioStorageService(minio_config)
raise ValueError(f"Invalid storage service: {chosen_storage}")

Check warning on line 63 in shared/storage/__init__.py

Codecov Notifications / codecov/patch

shared/storage/__init__.py#L63

Added line #L63 was not covered by tests
34 changes: 34 additions & 0 deletions shared/storage/compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import gzip
import importlib.metadata
from typing import IO


class GZipStreamReader:
def __init__(self, fileobj: IO[bytes]):
self.data = fileobj

def read(self, size: int = -1, /) -> bytes:
curr_data = self.data.read(size)

if not curr_data:
return b""

return gzip.compress(curr_data)


def zstd_decoded_by_default() -> bool:
try:
version = importlib.metadata.version("urllib3")
except importlib.metadata.PackageNotFoundError:
return False

Check warning on line 23 in shared/storage/compression.py

Codecov Notifications / codecov/patch

shared/storage/compression.py#L22-L23

Added lines #L22 - L23 were not covered by tests

if version < "2.0.0":
return False

distribution = importlib.metadata.metadata("urllib3")

Check warning on line 28 in shared/storage/compression.py

Codecov Notifications / codecov/patch

shared/storage/compression.py#L28

Added line #L28 was not covered by tests
if requires_dist := distribution.get_all("Requires-Dist"):
for req in requires_dist:
if "[zstd]" in req:
return True

Check warning on line 32 in shared/storage/compression.py

Codecov Notifications / codecov/patch

shared/storage/compression.py#L32

Added line #L32 was not covered by tests

return False

Check warning on line 34 in shared/storage/compression.py

Codecov Notifications / codecov/patch

shared/storage/compression.py#L34

Added line #L34 was not covered by tests
Loading
Oops, something went wrong.

0 comments on commit 57ea96b

Please sign in to comment.