Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 80 additions & 41 deletions octue/cloud/storage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google_crc32c import Checksum

from octue.cloud.credentials import GCPCredentialsManager
from octue.cloud.storage.path import split_bucket_name_from_gs_path


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -46,17 +47,27 @@ def create_bucket(self, name, location=None, allow_existing=False, timeout=_DEFA

self.client.create_bucket(bucket_or_name=name, location=location, timeout=timeout)

def upload_file(self, local_path, bucket_name, path_in_bucket, metadata=None, timeout=_DEFAULT_TIMEOUT):
"""Upload a local file to a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>.
def upload_file(
self,
local_path,
cloud_path=None,
bucket_name=None,
path_in_bucket=None,
metadata=None,
timeout=_DEFAULT_TIMEOUT,
):
"""Upload a local file to a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>. Either (`bucket_name`
and `path_in_bucket`) or `cloud_path` must be provided.

:param str local_path:
:param str bucket_name:
:param str path_in_bucket:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:param dict metadata:
:param float timeout:
:return None:
"""
blob = self._blob(bucket_name, path_in_bucket)
blob = self._blob(cloud_path, bucket_name, path_in_bucket)

with open(local_path) as f:
blob.crc32c = self._compute_crc32c_checksum(f.read())
Expand All @@ -65,69 +76,85 @@ def upload_file(self, local_path, bucket_name, path_in_bucket, metadata=None, ti
self._update_metadata(blob, metadata)
logger.info("Uploaded %r to Google Cloud at %r.", local_path, blob.public_url)

def upload_from_string(self, string, bucket_name, path_in_bucket, metadata=None, timeout=_DEFAULT_TIMEOUT):
def upload_from_string(
self, string, cloud_path=None, bucket_name=None, path_in_bucket=None, metadata=None, timeout=_DEFAULT_TIMEOUT
):
"""Upload serialised data in string form to a file in a Google Cloud bucket at
gs://<bucket_name>/<path_in_bucket>.
gs://<bucket_name>/<path_in_bucket>. Either (`bucket_name` and `path_in_bucket`) or `cloud_path` must be provided.

:param str string:
:param str bucket_name:
:param str path_in_bucket:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:param dict metadata:
:param float timeout:
:return None:
"""
blob = self._blob(bucket_name, path_in_bucket)
blob = self._blob(cloud_path, bucket_name, path_in_bucket)
blob.crc32c = self._compute_crc32c_checksum(string)

blob.upload_from_string(data=string, timeout=timeout)
self._update_metadata(blob, metadata)
logger.info("Uploaded data to Google Cloud at %r.", blob.public_url)

def update_metadata(self, bucket_name, path_in_bucket, metadata):
"""Update the metadata for the given cloud file.
def update_metadata(self, metadata, cloud_path=None, bucket_name=None, path_in_bucket=None):
"""Update the metadata for the given cloud file. Either (`bucket_name` and `path_in_bucket`) or `cloud_path` must
be provided.

:param str bucket_name:
:param str path_in_bucket:
:param dict metadata:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:return None:
"""
blob = self._blob(bucket_name, path_in_bucket)
blob = self._blob(cloud_path, bucket_name, path_in_bucket)
self._update_metadata(blob, metadata)

def download_to_file(self, bucket_name, path_in_bucket, local_path, timeout=_DEFAULT_TIMEOUT):
"""Download a file to a file from a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>.
def download_to_file(
self, local_path, cloud_path=None, bucket_name=None, path_in_bucket=None, timeout=_DEFAULT_TIMEOUT
):
"""Download a file to a file from a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>. Either
(`bucket_name` and `path_in_bucket`) or `cloud_path` must be provided.

:param str bucket_name:
:param str path_in_bucket:
:param str local_path:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:param float timeout:
:return None:
"""
blob = self._blob(bucket_name, path_in_bucket)
blob = self._blob(cloud_path, bucket_name, path_in_bucket)
blob.download_to_filename(local_path, timeout=timeout)
logger.info("Downloaded %r from Google Cloud to %r.", blob.public_url, local_path)

def download_as_string(self, bucket_name, path_in_bucket, timeout=_DEFAULT_TIMEOUT):
"""Download a file to a string from a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>.
def download_as_string(self, cloud_path=None, bucket_name=None, path_in_bucket=None, timeout=_DEFAULT_TIMEOUT):
"""Download a file to a string from a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>. Either
(`bucket_name` and `path_in_bucket`) or `cloud_path` must be provided.

:param str bucket_name:
:param str path_in_bucket:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:param float timeout:
:return str:
"""
blob = self._blob(bucket_name, path_in_bucket)
blob = self._blob(cloud_path, bucket_name, path_in_bucket)
data = blob.download_as_bytes(timeout=timeout)
logger.info("Downloaded %r from Google Cloud to as string.", blob.public_url)
return data.decode()

def get_metadata(self, bucket_name, path_in_bucket, timeout=_DEFAULT_TIMEOUT):
"""Get the metadata of the given file in the given bucket.
def get_metadata(self, cloud_path=None, bucket_name=None, path_in_bucket=None, timeout=_DEFAULT_TIMEOUT):
"""Get the metadata of the given file in the given bucket. Either (`bucket_name` and `path_in_bucket`) or
`cloud_path` must be provided.

:param str bucket_name:
:param str path_in_bucket:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:param float timeout:
:return dict:
"""
if cloud_path:
bucket_name, path_in_bucket = split_bucket_name_from_gs_path(cloud_path)

bucket = self.client.get_bucket(bucket_or_name=bucket_name)
blob = bucket.get_blob(blob_name=self._strip_leading_slash(path_in_bucket), timeout=timeout)

Expand All @@ -147,27 +174,34 @@ def get_metadata(self, bucket_name, path_in_bucket, timeout=_DEFAULT_TIMEOUT):
"path_in_bucket": path_in_bucket,
}

def delete(self, bucket_name, path_in_bucket, timeout=_DEFAULT_TIMEOUT):
"""Delete the given file from the given bucket.
def delete(self, cloud_path=None, bucket_name=None, path_in_bucket=None, timeout=_DEFAULT_TIMEOUT):
"""Delete the given file from the given bucket. Either (`bucket_name` and `path_in_bucket`) or `cloud_path` must
be provided.

:param str bucket_name:
:param str path_in_bucket:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:param float timeout:
:return None:
"""
blob = self._blob(bucket_name, path_in_bucket)
blob = self._blob(cloud_path, bucket_name, path_in_bucket)
blob.delete(timeout=timeout)
logger.info("Deleted %r from Google Cloud.", blob.public_url)

def scandir(self, bucket_name, directory_path, filter=None, timeout=_DEFAULT_TIMEOUT):
"""Yield the blobs belonging to the given "directory" in the given bucket.
def scandir(self, cloud_path=None, bucket_name=None, directory_path=None, filter=None, timeout=_DEFAULT_TIMEOUT):
"""Yield the blobs belonging to the given "directory" in the given bucket. Either (`bucket_name` and
`path_in_bucket`) or `cloud_path` must be provided.

:param str bucket_name:
:param str directory_path:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None directory_path:
:param callable filter:
:param float timeout:
:yield google.cloud.storage.blob.Blob:
"""
if cloud_path:
bucket_name, directory_path = split_bucket_name_from_gs_path(cloud_path)

bucket = self.client.get_bucket(bucket_or_name=bucket_name)
blobs = bucket.list_blobs(timeout=timeout)
directory_path = self._strip_leading_slash(directory_path)
Expand All @@ -185,13 +219,18 @@ def _strip_leading_slash(self, path):
"""
return path.lstrip("/")

def _blob(self, bucket_name, path_in_bucket):
def _blob(self, cloud_path=None, bucket_name=None, path_in_bucket=None):
"""Instantiate a blob for the given bucket at the given path. Note that this is not synced up with Google Cloud.
Either (`bucket_name` and `path_in_bucket`) or `cloud_path` must be provided.

:param str bucket_name:
:param str path_in_bucket:
:param str|None cloud_path:
:param str|None bucket_name:
:param str|None path_in_bucket:
:return google.cloud.storage.blob.Blob:
"""
if cloud_path:
bucket_name, path_in_bucket = split_bucket_name_from_gs_path(cloud_path)

bucket = self.client.get_bucket(bucket_or_name=bucket_name)
return bucket.blob(blob_name=self._strip_leading_slash(path_in_bucket))

Expand Down
6 changes: 5 additions & 1 deletion octue/mixins/serialisable.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ def __init__(self):
self_as_primitive = {}
for name in names_of_attributes_to_serialise:
attribute = getattr(self, name, None)

# Serialise sets as sorted list (JSON doesn't support sets).
self_as_primitive[name] = sorted(attribute) if isinstance(attribute, set) else attribute
if isinstance(attribute, set):
self_as_primitive[name] = sorted(attribute)
else:
self_as_primitive[name] = attribute

# TODO this conversion backward-and-forward is very inefficient but allows us to use the same encoder for
# converting the object to a dict as to strings, which ensures that nested attributes are also cast to
Expand Down
Loading