diff --git a/docs/source/datafile.rst b/docs/source/datafile.rst index 915d7d320..8b323839c 100644 --- a/docs/source/datafile.rst +++ b/docs/source/datafile.rst @@ -12,3 +12,134 @@ the following main attributes: - ``sequence`` - a sequence number of this file within its cluster (if sequences are appropriate) - ``tags`` - a space-separated string or iterable of tags relevant to this file - ``timestamp`` - a posix timestamp associated with the file, in seconds since epoch, typically when it was created but could relate to a relevant time point for the data + + +----- +Usage +----- + +``Datafile`` can be used functionally or as a context manager. When used as a context manager, it is analogous to the +builtin ``open`` function context manager. On exiting the context (``with`` block), it closes the datafile locally and, +if it is a cloud datafile, updates the cloud object with any data or metadata changes. + + +.. image:: images/datafile_use_cases.png + + +Example A +--------- +**Scenario:** Download a cloud object, calculate Octue metadata from its contents, and add the new metadata to the cloud object + +**Starting point:** Object in cloud with or without Octue metadata + +**Goal:** Object in cloud with updated metadata + +.. code-block:: python + + from octue.resources import Datafile + + + project_name = "my-project" + bucket_name = "my-bucket", + datafile_path = "path/to/data.csv" + + with Datafile.from_cloud(project_name, bucket_name, datafile_path, mode="r") as datafile, f: + data = f.read() + new_metadata = metadata_calculating_function(data) + + datafile.timestamp = new_metadata["timestamp"] + datafile.cluster = new_metadata["cluster"] + datafile.sequence = new_metadata["sequence"] + datafile.tags = new_metadata["tags"] + + +Example B +--------- +**Scenario:** Add or update Octue metadata on an existing cloud object *without downloading its content* + +**Starting point:** A cloud object with or without Octue metadata + +**Goal:** Object in cloud with updated metadata + +.. code-block:: python + + from datetime import datetime + from octue.resources import Datafile + + + project_name = "my-project" + bucket_name = "my-bucket" + datafile_path = "path/to/data.csv" + + datafile = Datafile.from_cloud(project_name, bucket_name, datafile_path): + + datafile.timestamp = datetime.now() + datafile.cluster = 0 + datafile.sequence = 3 + datafile.tags = {"manufacturer:Vestas", "output:1MW"} + + datafile.to_cloud() # Or, datafile.update_cloud_metadata() + + +Example C +--------- +**Scenario:** Read in the contents and Octue metadata of an existing cloud object without intent to update it in the cloud + +**Starting point:** A cloud object with Octue metadata + +**Goal:** Cloud object data (contents) and metadata held locally in local variables + +.. code-block:: python + + from octue.resources import Datafile + + + project_name = "my-project" + bucket_name = "my-bucket" + datafile_path = "path/to/data.csv" + + datafile = Datafile.from_cloud(project_name, bucket_name, datafile_path) + + with datafile.open("r") as f: + data = f.read() + + metadata = datafile.metadata() + + +Example D +--------- +**Scenario:** Create a new cloud object from local data, adding Octue metadata + +**Starting point:** A file-like locally (or content data in local variable) with Octue metadata stored in local variables + +**Goal:** A new object in the cloud with data and Octue metadata + +For creating new data in a new local file: + +.. code-block:: python + + from octue.resources import Datafile + + + sequence = 2 + tags = {"cleaned:True", "type:linear"} + + + with Datafile(path="path/to/local/file.dat", timestamp=None, sequence=sequence, tags=tags, mode="w") as datafile, f: + f.write("This is some cleaned data.") + + datafile.to_cloud(project_name="my-project", bucket_name="my-bucket", path_in_bucket="path/to/data.dat") + + +For existing data in an existing local file: + +.. code-block:: python + + from octue.resources import Datafile + + + sequence = 2 + tags = {"cleaned:True", "type:linear"} + + datafile = Datafile(path="path/to/local/file.dat", timestamp=None, sequence=sequence, tags=tags) + datafile.to_cloud(project_name="my-project", bucket_name="my-bucket", path_in_bucket="path/to/data.dat") diff --git a/docs/source/images/datafile_use_cases.png b/docs/source/images/datafile_use_cases.png new file mode 100644 index 000000000..972e10bd5 Binary files /dev/null and b/docs/source/images/datafile_use_cases.png differ diff --git a/octue/cloud/storage/client.py b/octue/cloud/storage/client.py index abf9976b4..d4ba9f087 100644 --- a/octue/cloud/storage/client.py +++ b/octue/cloud/storage/client.py @@ -27,6 +27,7 @@ def __init__(self, project_name, credentials=OCTUE_MANAGED_CREDENTIALS): credentials = credentials self.client = storage.Client(project=project_name, credentials=credentials) + self.project_name = project_name def create_bucket(self, name, location=None, allow_existing=False, timeout=_DEFAULT_TIMEOUT): """Create a new bucket. If the bucket already exists, and `allow_existing` is `True`, do nothing; if it is @@ -82,6 +83,17 @@ def upload_from_string(self, string, bucket_name, path_in_bucket, metadata=None, self._update_metadata(blob, metadata) logger.info("Uploaded data to Google Cloud at %r.", blob.public_url) + def update_metadata(self, bucket_name, path_in_bucket, metadata): + """Update the metadata for the given cloud file. + + :param str bucket_name: + :param str path_in_bucket: + :param dict metadata: + :return None: + """ + blob = self._blob(bucket_name, path_in_bucket) + self._update_metadata(blob, metadata) + def download_to_file(self, bucket_name, path_in_bucket, local_path, timeout=_DEFAULT_TIMEOUT): """Download a file to a file from a Google Cloud bucket at gs:///. @@ -118,14 +130,22 @@ def get_metadata(self, bucket_name, path_in_bucket, timeout=_DEFAULT_TIMEOUT): """ bucket = self.client.get_bucket(bucket_or_name=bucket_name) blob = bucket.get_blob(blob_name=self._strip_leading_slash(path_in_bucket), timeout=timeout) - metadata = blob._properties - - # Get timestamps from blob rather than properties so they are datetime.datetime objects rather than strings. - metadata["updated"] = blob.updated - metadata["timeCreated"] = blob.time_created - metadata["timeDeleted"] = blob.time_deleted - metadata["customTime"] = blob.custom_time - return metadata + + if blob is None: + return None + + return { + "custom_metadata": blob.metadata or {}, + "crc32c": blob.crc32c, + "size": blob.size, + "updated": blob.updated, + "time_created": blob.time_created, + "time_deleted": blob.time_deleted, + "custom_time": blob.custom_time, + "project_name": self.project_name, + "bucket_name": bucket_name, + "path_in_bucket": path_in_bucket, + } def delete(self, bucket_name, path_in_bucket, timeout=_DEFAULT_TIMEOUT): """Delete the given file from the given bucket. diff --git a/octue/exceptions.py b/octue/exceptions.py index 61b943ecf..09afba66c 100644 --- a/octue/exceptions.py +++ b/octue/exceptions.py @@ -82,3 +82,9 @@ class AttributeConflict(OctueSDKException): class MissingServiceID(OctueSDKException): """Raise when a specific ID for a service is expected to be provided, but is missing or None.""" + + +class CloudLocationNotSpecified(OctueSDKException): + """Raise when attempting to interact with a cloud resource implicitly but the implicit details of its location are + missing. + """ diff --git a/octue/mixins/hashable.py b/octue/mixins/hashable.py index ae4140805..e29afdd8f 100644 --- a/octue/mixins/hashable.py +++ b/octue/mixins/hashable.py @@ -21,8 +21,8 @@ class Hashable: _ATTRIBUTES_TO_HASH = None _HASH_TYPE = "CRC32C" - def __init__(self, hash_value=None, *args, **kwargs): - self._hash_value = hash_value + def __init__(self, immutable_hash_value=None, *args, **kwargs): + self._immutable_hash_value = immutable_hash_value self._ATTRIBUTES_TO_HASH = self._ATTRIBUTES_TO_HASH or [] super().__init__(*args, **kwargs) @@ -40,11 +40,10 @@ class Holder(cls): @property def hash_value(self): """Get the hash of the instance.""" - if self._hash_value: - return self._hash_value + if self._immutable_hash_value is None: + return self._calculate_hash() - self._hash_value = self._calculate_hash() - return self._hash_value + return self._immutable_hash_value @hash_value.setter def hash_value(self, value): @@ -53,14 +52,17 @@ def hash_value(self, value): :param str value: :return None: """ - self._hash_value = value + if self._immutable_hash_value is not None: + raise ValueError(f"The hash of {self!r} is immutable - hash_value cannot be set.") + + self._immutable_hash_value = value def reset_hash(self): """Reset the hash value to the calculated hash (rather than whatever value has been set). :return None: """ - self._hash_value = self._calculate_hash() + self._immutable_hash_value = None def _calculate_hash(self, hash_=None): """Calculate the hash of the sorted attributes in self._ATTRIBUTES_TO_HASH. If hash_ is not None and is diff --git a/octue/mixins/identifiable.py b/octue/mixins/identifiable.py index f3f06730d..96c9969e6 100644 --- a/octue/mixins/identifiable.py +++ b/octue/mixins/identifiable.py @@ -25,27 +25,7 @@ def __init__(self, *args, id=None, name=None, **kwargs): """Constructor for Identifiable class""" self._name = name super().__init__(*args, **kwargs) - - # Store a boolean record of whether this object was created with a previously-existing uuid or was created new. - self._created = True if id is None else False - - if isinstance(id, uuid.UUID): - # If it's a uuid, stringify it - id = str(id) - - elif isinstance(id, str): - # If it's a string (or something similar which can be converted to UUID) check it's valid - try: - id = str(uuid.UUID(id)) - except ValueError: - raise InvalidInputException(f"Value of id '{id}' is not a valid uuid string or instance of class UUID") - - elif id is not None: - raise InvalidInputException( - f"Value of id '{id}' must be a valid uuid string, an instance of class UUID or None" - ) - - self._id = id or gen_uuid() + self._set_id(id) def __str__(self): return f"{self.__class__.__name__} {self._id}" @@ -60,3 +40,32 @@ def id(self): @property def name(self): return self._name + + def _set_id(self, value): + """Set the ID to the given value. + + :param str|uuid.UUID|None value: + :return None: + """ + # Store a boolean record of whether this object was created with a previously-existing uuid or was created new. + self._created = True if value is None else False + + if isinstance(value, uuid.UUID): + # If it's a uuid, stringify it + value = str(value) + + elif isinstance(value, str): + # If it's a string (or something similar which can be converted to UUID) check it's valid + try: + value = str(uuid.UUID(value)) + except ValueError: + raise InvalidInputException( + f"Value of id '{value}' is not a valid uuid string or instance of class UUID" + ) + + elif value is not None: + raise InvalidInputException( + f"Value of id '{value}' must be a valid uuid string, an instance of class UUID or None" + ) + + self._id = value or gen_uuid() diff --git a/octue/resources/datafile.py b/octue/resources/datafile.py index 573ddeb67..e47245066 100644 --- a/octue/resources/datafile.py +++ b/octue/resources/datafile.py @@ -1,4 +1,5 @@ import datetime +import functools import logging import os import tempfile @@ -7,7 +8,7 @@ from octue.cloud import storage from octue.cloud.storage import GoogleCloudStorageClient from octue.cloud.storage.path import CLOUD_STORAGE_PROTOCOL -from octue.exceptions import AttributeConflict, FileNotFoundException, InvalidInputException +from octue.exceptions import AttributeConflict, CloudLocationNotSpecified, FileNotFoundException, InvalidInputException from octue.mixins import Filterable, Hashable, Identifiable, Loggable, Pathable, Serialisable, Taggable from octue.mixins.hashable import EMPTY_STRING_HASH_VALUE from octue.utils import isfile @@ -56,7 +57,10 @@ class Datafile(Taggable, Serialisable, Pathable, Loggable, Identifiable, Hashabl :param int sequence: A sequence number of this file within its cluster (if sequences are appropriate) :param str tags: Space-separated string of tags relevant to this file :param bool skip_checks: - + :param str mode: if using as a context manager, open the datafile for reading/editing in this mode (the mode + options are the same as for the builtin open function) + :param bool update_cloud_metadata: if using as a context manager and this is True, update the cloud metadata of + the datafile when the context is exited :return None: """ @@ -82,12 +86,14 @@ def __init__( sequence=SEQUENCE_DEFAULT, tags=TAGS_DEFAULT, skip_checks=True, + mode="r", + update_cloud_metadata=True, **kwargs, ): super().__init__( id=id, - name=kwargs.get("name"), - hash_value=kwargs.get("hash_value"), + name=kwargs.pop("name", None), + immutable_hash_value=kwargs.pop("immutable_hash_value", None), logger=logger, tags=tags, path=path, @@ -106,6 +112,14 @@ def __init__( self.check(**kwargs) self._cloud_metadata = {} + self._open_attributes = {"mode": mode, "update_cloud_metadata": update_cloud_metadata, **kwargs} + + def __enter__(self): + self._open_context_manager = self.open(**self._open_attributes) + return self, self._open_context_manager.__enter__() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._open_context_manager.__exit__(exc_type, exc_val, exc_tb) def __lt__(self, other): if not isinstance(other, Datafile): @@ -142,7 +156,16 @@ def deserialise(cls, serialised_datafile, path_from=None): return datafile @classmethod - def from_cloud(cls, project_name, bucket_name, datafile_path, allow_overwrite=False, **kwargs): + def from_cloud( + cls, + project_name, + bucket_name, + datafile_path, + allow_overwrite=False, + mode="r", + update_cloud_metadata=True, + **kwargs, + ): """Instantiate a Datafile from a previously-persisted Datafile in Google Cloud storage. To instantiate a Datafile from a regular file on Google Cloud storage, the usage is the same, but a meaningful value for each of the instantiated Datafile's attributes can be included in the kwargs (a "regular" file is a file that has been @@ -155,66 +178,107 @@ def from_cloud(cls, project_name, bucket_name, datafile_path, allow_overwrite=Fa :param str datafile_path: path to file represented by datafile :param bool allow_overwrite: if `True`, allow attributes of the datafile to be overwritten by values given in kwargs + :param str mode: if using as a context manager, open the datafile for reading/editing in this mode (the mode + options are the same as for the builtin open function) + :param bool update_cloud_metadata: if using as a context manager and this is True, update the cloud metadata of + the datafile when the context is exited :return Datafile: """ - metadata = GoogleCloudStorageClient(project_name).get_metadata(bucket_name, datafile_path) - custom_metadata = metadata.get("metadata") or {} + datafile = cls(timestamp=None, path=storage.path.generate_gs_path(bucket_name, datafile_path)) + datafile.get_cloud_metadata(project_name, bucket_name, datafile_path) + custom_metadata = datafile._cloud_metadata.get("custom_metadata", {}) if not allow_overwrite: - for attribute_name, attribute_value in kwargs.items(): - - if custom_metadata.get(attribute_name) == attribute_value: - continue + cls._check_for_attribute_conflict(custom_metadata, **kwargs) - raise AttributeConflict( - f"The value {custom_metadata.get(attribute_name)!r} of the {cls.__name__} attribute " - f"{attribute_name!r} conflicts with the value given in kwargs {attribute_value!r}. If you wish to " - f"overwrite the attribute value, set `allow_overwrite` to `True`." - ) - - cluster = kwargs.get("cluster", custom_metadata.get("cluster", CLUSTER_DEFAULT)) - sequence = kwargs.get("sequence", custom_metadata.get("sequence", SEQUENCE_DEFAULT)) timestamp = kwargs.get("timestamp", custom_metadata.get("timestamp")) - if isinstance(cluster, str): - cluster = int(cluster) - - if isinstance(sequence, str): - sequence = int(sequence) - if isinstance(timestamp, str): timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f%z") - datafile = cls( - timestamp=timestamp, - id=kwargs.get("id", custom_metadata.get("id", ID_DEFAULT)), - path=storage.path.generate_gs_path(bucket_name, datafile_path), - hash_value=metadata.get("crc32c", EMPTY_STRING_HASH_VALUE), - cluster=cluster, - sequence=sequence, - tags=kwargs.get("tags", custom_metadata.get("tags", TAGS_DEFAULT)), - ) - - datafile._cloud_metadata = metadata - datafile._cloud_metadata["project_name"] = project_name + datafile._set_id(kwargs.pop("id", custom_metadata.get("id", ID_DEFAULT))) + datafile.path = storage.path.generate_gs_path(bucket_name, datafile_path) + datafile.timestamp = timestamp + datafile.immutable_hash_value = datafile._cloud_metadata.get("crc32c", EMPTY_STRING_HASH_VALUE) + datafile.cluster = kwargs.pop("cluster", custom_metadata.get("cluster", CLUSTER_DEFAULT)) + datafile.sequence = kwargs.pop("sequence", custom_metadata.get("sequence", SEQUENCE_DEFAULT)) + datafile.tags = kwargs.pop("tags", custom_metadata.get("tags", TAGS_DEFAULT)) + datafile._open_attributes = {"mode": mode, "update_cloud_metadata": update_cloud_metadata, **kwargs} return datafile - def to_cloud(self, project_name, bucket_name, path_in_bucket): + def to_cloud(self, project_name=None, bucket_name=None, path_in_bucket=None, update_cloud_metadata=True): """Upload a datafile to Google Cloud Storage. - :param str project_name: - :param str bucket_name: - :param str path_in_bucket: + :param str|None project_name: + :param str|None bucket_name: + :param str|None path_in_bucket: + :param bool update_cloud_metadata: :return str: gs:// path for datafile """ - GoogleCloudStorageClient(project_name=project_name).upload_file( - local_path=self.get_local_path(), + project_name, bucket_name, path_in_bucket = self._get_cloud_location(project_name, bucket_name, path_in_bucket) + self.get_cloud_metadata(project_name, bucket_name, path_in_bucket) + + storage_client = GoogleCloudStorageClient(project_name=project_name) + + # If the datafile's file has been changed locally, overwrite its cloud copy. + if self._cloud_metadata.get("crc32c") != self.hash_value: + storage_client.upload_file( + local_path=self.get_local_path(), + bucket_name=bucket_name, + path_in_bucket=path_in_bucket, + metadata=self.metadata(), + ) + + if update_cloud_metadata: + # If the datafile's metadata has been changed locally, update the cloud file's metadata. + local_metadata = self.metadata() + + if self._cloud_metadata.get("custom_metadata") != local_metadata: + self.update_cloud_metadata(project_name, bucket_name, path_in_bucket) + + return storage.path.generate_gs_path(bucket_name, path_in_bucket) + + def get_cloud_metadata(self, project_name=None, bucket_name=None, path_in_bucket=None): + """Get the cloud metadata for the datafile, casting the types of the cluster and sequence fields to integer. + + :param str|None project_name: + :param str|None bucket_name: + :param str|None path_in_bucket: + :return dict: + """ + project_name, bucket_name, path_in_bucket = self._get_cloud_location(project_name, bucket_name, path_in_bucket) + cloud_metadata = GoogleCloudStorageClient(project_name).get_metadata(bucket_name, path_in_bucket) + + if cloud_metadata is None: + return None + + custom_metadata = cloud_metadata["custom_metadata"] + + if custom_metadata.get("cluster") is not None: + custom_metadata["cluster"] = int(custom_metadata["cluster"]) + + if custom_metadata.get("sequence") is not None: + custom_metadata["sequence"] = int(custom_metadata["sequence"]) + + self._cloud_metadata = cloud_metadata + + def update_cloud_metadata(self, project_name=None, bucket_name=None, path_in_bucket=None): + """Update the cloud metadata for the datafile. + + :param str|None project_name: + :param str|None bucket_name: + :param str|None path_in_bucket: + :return None: + """ + project_name, bucket_name, path_in_bucket = self._get_cloud_location(project_name, bucket_name, path_in_bucket) + + GoogleCloudStorageClient(project_name=project_name).update_metadata( bucket_name=bucket_name, path_in_bucket=path_in_bucket, metadata=self.metadata(), ) - return storage.path.generate_gs_path(bucket_name, path_in_bucket) + self._store_cloud_location(project_name, bucket_name, path_in_bucket) @property def name(self): @@ -243,6 +307,10 @@ def timestamp(self, value): @property def posix_timestamp(self): + """Get the timestamp of the datafile in posix format. + + :return float: + """ if self.timestamp is None: return None @@ -267,12 +335,7 @@ def _last_modified(self): @property def size_bytes(self): if self._path_is_in_google_cloud_storage: - size = self._cloud_metadata.get("size") - - if size is None: - return None - - return int(size) + return self._cloud_metadata.get("size") return os.path.getsize(self.absolute_path) @@ -304,8 +367,21 @@ def get_local_path(self): ) TEMPORARY_LOCAL_FILE_CACHE[self.absolute_path] = temporary_local_path + + # Now use hash value of local file instead of cloud file. + self.reset_hash() return temporary_local_path + def clear_from_file_cache(self): + """Clear the datafile from the temporary local file cache, if it is in there. If datafile.get_local_path is + called again and the datafile is a cloud datafile, the file will be re-downloaded to a new temporary local path, + allowing any independent cloud updates to be synced locally. + + :return None: + """ + if self.absolute_path in TEMPORARY_LOCAL_FILE_CACHE: + del TEMPORARY_LOCAL_FILE_CACHE[self.absolute_path] + def _get_extension_from_path(self, path=None): """Gets extension of a file, either from a provided file path or from self.path field""" path = path or self.path @@ -315,13 +391,66 @@ def _calculate_hash(self): """Calculate the hash of the file.""" hash = Checksum() - with open(self.absolute_path, "rb") as f: + with open(self.get_local_path(), "rb") as f: # Read and update hash value in blocks of 4K. for byte_block in iter(lambda: f.read(4096), b""): hash.update(byte_block) return super()._calculate_hash(hash) + def _get_cloud_location(self, project_name=None, bucket_name=None, path_in_bucket=None): + """Get the cloud location details for the bucket, allowing the keyword arguments to override any stored values. + + :param str|None project_name: + :param str|None bucket_name: + :param str|None path_in_bucket: + :raise octue.exceptions.CloudLocationNotSpecified: if an exact cloud location isn't provided and isn't available + implicitly (i.e. the Datafile wasn't loaded from the cloud previously) + :return (str, str, str): + """ + try: + project_name = project_name or self._cloud_metadata["project_name"] + bucket_name = bucket_name or self._cloud_metadata["bucket_name"] + path_in_bucket = path_in_bucket or self._cloud_metadata["path_in_bucket"] + except KeyError: + raise CloudLocationNotSpecified( + f"{self!r} wasn't previously loaded from the cloud so doesn't have an implicit cloud location - please" + f"specify its exact location (its project_name, bucket_name, and path_in_bucket)." + ) + + return project_name, bucket_name, path_in_bucket + + def _store_cloud_location(self, project_name, bucket_name, path_in_bucket): + """Store the cloud location of the datafile. + + :param str project_name: + :param str bucket_name: + :param str path_in_bucket: + :return None: + """ + self._cloud_metadata["project_name"] = project_name + self._cloud_metadata["bucket_name"] = bucket_name + self._cloud_metadata["path_in_bucket"] = path_in_bucket + + @classmethod + def _check_for_attribute_conflict(cls, custom_metadata, **kwargs): + """Raise an error if there is a conflict between the custom metadata and the kwargs. + + :param dict custom_metadata: + :raise octue.exceptions.AttributeConflict: if any of the custom metadata conflicts with kwargs + :return None: + """ + for attribute_name, attribute_value in kwargs.items(): + + if custom_metadata.get(attribute_name) == attribute_value: + continue + + raise AttributeConflict( + f"The value {custom_metadata.get(attribute_name)!r} of the {cls.__name__} attribute " + f"{attribute_name!r} conflicts with the value given in kwargs {attribute_value!r}. If you wish to " + f"overwrite the attribute value, set `allow_overwrite` to `True`." + ) + def check(self, size_bytes=None, sha=None, last_modified=None, extension=None): """Check file presence and integrity""" # TODO Check consistency of size_bytes input against self.size_bytes property for a file if we have one @@ -343,62 +472,11 @@ def exists(self): @property def open(self): - """Context manager to handle the opening and closing of a Datafile. - - If opened in write mode, the manager will attempt to determine if the folder path exists and, if not, will - create the folder structure required to write the file. - - Use it like: - ``` - my_datafile = Datafile(timestamp=None, path='subfolder/subsubfolder/my_datafile.json) - with my_datafile.open('w') as fp: - fp.write("{}") - ``` - This is equivalent to the standard python: - ``` - my_datafile = Datafile(timestamp=None, path='subfolder/subsubfolder/my_datafile.json) - os.makedirs(os.path.split(my_datafile.absolute_path)[0], exist_ok=True) - with open(my_datafile.absolute_path, 'w') as fp: - fp.write("{}") - ``` - """ - datafile = self - - class DataFileContextManager: - def __init__(obj, mode="r", **kwargs): - obj.mode = mode - obj.kwargs = kwargs - obj.fp = None - obj.path = None - - def __enter__(obj): - """Open the datafile, first downloading it from the cloud if necessary. + """Get a context manager for handling the opening and closing of the datafile for reading/editing. - :return io.TextIOWrapper: - """ - obj.path = datafile.get_local_path() - - if "w" in obj.mode: - os.makedirs(os.path.split(obj.path)[0], exist_ok=True) - - obj.fp = open(obj.path, obj.mode, **obj.kwargs) - return obj.fp - - def __exit__(obj, *args): - """Close the datafile, updating the corresponding file in the cloud if necessary. - - :return None: - """ - if obj.fp is not None: - obj.fp.close() - - if datafile.is_in_cloud and any(character in obj.mode for character in {"w", "a", "x", "+", "U"}): - datafile.to_cloud( - datafile._cloud_metadata["project_name"], - *storage.path.split_bucket_name_from_gs_path(datafile.absolute_path), - ) - - return DataFileContextManager + :return type: the class octue.resources.datafile._DatafileContextManager + """ + return functools.partial(_DatafileContextManager, self) def metadata(self): """Get the datafile's metadata in a serialised form. @@ -412,3 +490,67 @@ def metadata(self): "sequence": self.sequence, "tags": self.tags.serialise(to_string=True), } + + +class _DatafileContextManager: + """A context manager for opening datafiles for reading and writing locally or from the cloud. Its usage is analogous + to the builtin open context manager. If opening a local datafile in write mode, the manager will attempt to + determine if the folder path exists and, if not, will create the folder structure required to write the file. + + Usage: + ``` + my_datafile = Datafile(timestamp=None, path='subfolder/subsubfolder/my_datafile.json) + with my_datafile.open('w') as fp: + fp.write("{}") + ``` + + This is equivalent to the standard python: + ``` + my_datafile = Datafile(timestamp=None, path='subfolder/subsubfolder/my_datafile.json) + os.makedirs(os.path.split(my_datafile.absolute_path)[0], exist_ok=True) + with open(my_datafile.absolute_path, 'w') as fp: + fp.write("{}") + ``` + + :param octue.resources.datafile.Datafile datafile: + :param str mode: open the datafile for reading/editing in this mode (the mode options are the same as for the + builtin open function) + :param bool update_cloud_metadata: this is True, update the cloud metadata of + the datafile when the context is exited + :return None: + """ + + MODIFICATION_MODES = {"w", "a", "x", "+", "U"} + + def __init__(self, datafile, mode="r", update_cloud_metadata=True, **kwargs): + self.datafile = datafile + self.mode = mode + self._update_cloud_metadata = update_cloud_metadata + self.kwargs = kwargs + self._fp = None + self.path = None + + def __enter__(self): + """Open the datafile, first downloading it from the cloud if necessary. + + :return io.TextIOWrapper: + """ + self.path = self.datafile.get_local_path() + + if "w" in self.mode: + os.makedirs(os.path.split(self.path)[0], exist_ok=True) + + self._fp = open(self.path, self.mode, **self.kwargs) + return self._fp + + def __exit__(self, *args): + """Close the datafile, updating the corresponding file in the cloud if necessary and its metadata if + self._update_cloud_metadata is True. + + :return None: + """ + if self._fp is not None: + self._fp.close() + + if self.datafile.is_in_cloud and any(character in self.mode for character in self.MODIFICATION_MODES): + self.datafile.to_cloud(update_cloud_metadata=self._update_cloud_metadata) diff --git a/tests/mixins/test_hashable.py b/tests/mixins/test_hashable.py index 7fae21b73..edfb13924 100644 --- a/tests/mixins/test_hashable.py +++ b/tests/mixins/test_hashable.py @@ -79,3 +79,10 @@ def test_reset_hash(self): type_with_hash.reset_hash() self.assertEqual(type_with_hash.hash_value, original_calculated_hash) + + def test_hash_value_cannot_be_set_if_hashable_has_immutable_hash_value(self): + """Test that the hash value of a hashable instance with an immutable hash value cannot be set.""" + hashable = Hashable(immutable_hash_value="blue") + + with self.assertRaises(ValueError): + hashable.hash_value = "red" diff --git a/tests/resources/test_datafile.py b/tests/resources/test_datafile.py index d5a3600a9..d05841c1a 100644 --- a/tests/resources/test_datafile.py +++ b/tests/resources/test_datafile.py @@ -9,7 +9,7 @@ from octue import exceptions from octue.cloud.storage import GoogleCloudStorageClient from octue.mixins import MixinBase, Pathable -from octue.resources import Datafile +from octue.resources.datafile import TEMPORARY_LOCAL_FILE_CACHE, Datafile from octue.resources.tag import TagSet from tests import TEST_BUCKET_NAME, TEST_PROJECT_NAME from ..base import BaseTestCase @@ -25,9 +25,36 @@ def setUp(self): self.path_from = MyPathable(path=os.path.join(self.data_path, "basic_files", "configuration", "test-dataset")) self.path = os.path.join("path-within-dataset", "a_test_file.csv") + def tearDown(self): + TEMPORARY_LOCAL_FILE_CACHE.clear() + def create_valid_datafile(self): return Datafile(timestamp=None, path_from=self.path_from, path=self.path, skip_checks=False) + def create_datafile_in_cloud( + self, + project_name=TEST_PROJECT_NAME, + bucket_name=TEST_BUCKET_NAME, + path_in_bucket="cloud_file.txt", + contents="some text", + **kwargs, + ): + """Create a datafile in the cloud. Any metadata attributes can be set via kwargs. + + :param str project_name: + :param str bucket_name: + :param str path_in_bucket: + :param str contents: + :return (str, str, str, str): + """ + with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: + temporary_file.write(contents) + + timestamp = kwargs.pop("timestamp", None) + datafile = Datafile(path=temporary_file.name, timestamp=timestamp, **kwargs) + datafile.to_cloud(project_name=project_name, bucket_name=bucket_name, path_in_bucket=path_in_bucket) + return datafile, project_name, bucket_name, path_in_bucket, contents + def test_instantiates(self): """Ensures a Datafile instantiates using only a path and generates a uuid ID""" df = Datafile(timestamp=None, path="a_path") @@ -170,133 +197,158 @@ def test_from_cloud_with_bare_file(self): def test_from_cloud_with_datafile(self): """Test that a Datafile can be constructed from a file on Google Cloud storage with custom metadata.""" - path_in_bucket = "file_to_upload.txt" - - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write("[1, 2, 3]") - - datafile = Datafile( + datafile, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud( timestamp=datetime.now(tz=timezone.utc), - path=temporary_file.name, cluster=0, sequence=1, tags={"blah:shah:nah", "blib", "glib"}, ) - datafile.to_cloud(project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket=path_in_bucket) - persisted_datafile = Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, datafile_path=path_in_bucket - ) + downloaded_datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) - self.assertEqual(persisted_datafile.path, f"gs://{TEST_BUCKET_NAME}/{path_in_bucket}") - self.assertEqual(persisted_datafile.id, datafile.id) - self.assertEqual(persisted_datafile.timestamp, datafile.timestamp) - self.assertEqual(persisted_datafile.hash_value, datafile.hash_value) - self.assertEqual(persisted_datafile.cluster, datafile.cluster) - self.assertEqual(persisted_datafile.sequence, datafile.sequence) - self.assertEqual(persisted_datafile.tags, datafile.tags) - self.assertEqual(persisted_datafile.size_bytes, datafile.size_bytes) - self.assertTrue(isinstance(persisted_datafile._last_modified, float)) + self.assertEqual(downloaded_datafile.path, f"gs://{TEST_BUCKET_NAME}/{path_in_bucket}") + self.assertEqual(downloaded_datafile.id, datafile.id) + self.assertEqual(downloaded_datafile.timestamp, datafile.timestamp) + self.assertEqual(downloaded_datafile.hash_value, datafile.hash_value) + self.assertEqual(downloaded_datafile.cluster, datafile.cluster) + self.assertEqual(downloaded_datafile.sequence, datafile.sequence) + self.assertEqual(downloaded_datafile.tags, datafile.tags) + self.assertEqual(downloaded_datafile.size_bytes, datafile.size_bytes) + self.assertTrue(isinstance(downloaded_datafile._last_modified, float)) def test_from_cloud_with_overwrite(self): """Test that a datafile can be instantiated from the cloud and have its attributes overwritten if new values are given in kwargs. """ - path_in_bucket = "file_to_upload.txt" - - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write("[1, 2, 3]") - - Datafile(timestamp=None, path=temporary_file.name).to_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket=path_in_bucket - ) + datafile, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud() new_id = str(uuid.uuid4()) - new_datafile = Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, + downloaded_datafile = Datafile.from_cloud( + project_name=project_name, + bucket_name=bucket_name, datafile_path=path_in_bucket, allow_overwrite=True, id=new_id, ) - self.assertEqual(new_datafile.id, new_id) + self.assertEqual(downloaded_datafile.id, new_id) + self.assertNotEqual(datafile.id, downloaded_datafile.id) def test_from_cloud_with_overwrite_when_disallowed_results_in_error(self): """Test that attempting to overwrite the attributes of a datafile instantiated from the cloud when not allowed results in an error. """ - path_in_bucket = "my-file.txt" - - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write("[1, 2, 3]") - - Datafile(timestamp=None, path=temporary_file.name).to_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket=path_in_bucket - ) + _, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud() with self.assertRaises(exceptions.AttributeConflict): Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, - bucket_name=TEST_BUCKET_NAME, + project_name=project_name, + bucket_name=bucket_name, datafile_path=path_in_bucket, allow_overwrite=False, id=str(uuid.uuid4()), ) - def test_to_cloud_updates_cloud_files(self): - """Test that calling Datafile.to_cloud on a datafile that is already cloud-based updates it in the cloud.""" - path_in_bucket = "file_to_upload.txt" - - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write("[1, 2, 3]") + def test_to_cloud_updates_cloud_metadata(self): + """Test that calling Datafile.to_cloud on a datafile that is already cloud-based updates its metadata in the + cloud. + """ + datafile, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud(cluster=0) - datafile = Datafile(timestamp=None, path=temporary_file.name, cluster=0) + datafile.cluster = 3 + datafile.to_cloud(project_name, bucket_name, path_in_bucket) - datafile.to_cloud(project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket=path_in_bucket) + self.assertEqual(Datafile.from_cloud(project_name, bucket_name, path_in_bucket).cluster, 3) + def test_to_cloud_does_not_update_cloud_metadata_if_update_cloud_metadata_is_false(self): + """Test that calling Datafile.to_cloud with `update_cloud_metadata=False` doesn't update the cloud metadata.""" + datafile, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud(cluster=0) datafile.cluster = 3 - datafile.to_cloud(project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket=path_in_bucket) + with patch("octue.resources.datafile.Datafile.update_cloud_metadata") as mock: + datafile.to_cloud(project_name, bucket_name, path_in_bucket, update_cloud_metadata=False) + self.assertFalse(mock.called) - self.assertEqual( - Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, datafile_path=path_in_bucket - ).cluster, - 3, - ) + self.assertEqual(Datafile.from_cloud(project_name, bucket_name, path_in_bucket).cluster, 0) - def test_get_local_path(self): - """Test that a file in the cloud can be temporarily downloaded and its local path returned.""" - file_contents = "[1, 2, 3]" + def test_to_cloud_does_not_update_metadata_if_no_metadata_change_has_been_made(self): + """Test that Datafile.to_cloud does not try to update cloud metadata if no metadata change has been made.""" + _, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud(cluster=0) - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write(file_contents) + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) - Datafile(timestamp=None, path=temporary_file.name).to_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket="nope.txt" - ) + with patch("octue.resources.datafile.Datafile.update_cloud_metadata") as mock: + datafile.to_cloud() + self.assertFalse(mock.called) - datafile = Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, datafile_path="nope.txt" - ) + def test_to_cloud_raises_error_if_no_cloud_location_provided_and_datafile_not_from_cloud(self): + """Test that trying to send a datafile to the cloud with no cloud location provided when the datafile was not + constructed from a cloud file results in cloud location error. + """ + datafile = Datafile(path="hello.txt", timestamp=None) + + with self.assertRaises(exceptions.CloudLocationNotSpecified): + datafile.to_cloud() + + def test_to_cloud_works_with_implicit_cloud_location_if_cloud_location_previously_provided(self): + """Test datafile.to_cloud works with an implicit cloud location if the cloud location has previously been + provided. + """ + _, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud() + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) + datafile.to_cloud() + + def test_to_cloud_does_not_try_to_update_file_if_no_change_has_been_made_locally(self): + """Test that Datafile.to_cloud does not try to update cloud file if no change has been made locally.""" + datafile, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud(cluster=0) + + with patch("octue.cloud.storage.client.GoogleCloudStorageClient.upload_file") as mock: + datafile.to_cloud() + self.assertFalse(mock.called) + + def test_update_cloud_metadata(self): + """Test that a cloud datafile's metadata can be updated.""" + _, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud() + + new_datafile = Datafile(path="glib.txt", timestamp=None, cluster=32) + new_datafile.update_cloud_metadata(project_name, bucket_name, path_in_bucket) + + self.assertEqual(Datafile.from_cloud(project_name, bucket_name, path_in_bucket).cluster, 32) + + def test_update_cloud_metadata_works_with_implicit_cloud_location_if_cloud_location_previously_provided(self): + """Test that datafile.update_metadata works with an implicit cloud location if the cloud location has been + previously provided. + """ + _, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud() + + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) + datafile.cluster = 32 + datafile.update_cloud_metadata() + + self.assertEqual(Datafile.from_cloud(project_name, bucket_name, path_in_bucket).cluster, 32) + + def test_update_cloud_metadata_raises_error_if_no_cloud_location_provided_and_datafile_not_from_cloud(self): + """Test that trying to update a cloud datafile's metadata with no cloud location provided when the datafile was + not constructed from a cloud file results in cloud location error. + """ + datafile = Datafile(path="hello.txt", timestamp=None) + + with self.assertRaises(exceptions.CloudLocationNotSpecified): + datafile.update_cloud_metadata() + + def test_get_local_path(self): + """Test that a file in the cloud can be temporarily downloaded and its local path returned.""" + _, project_name, bucket_name, path_in_bucket, contents = self.create_datafile_in_cloud() + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) with open(datafile.get_local_path()) as f: - self.assertEqual(f.read(), file_contents) + self.assertEqual(f.read(), contents) def test_get_local_path_with_cached_file_avoids_downloading_again(self): """Test that attempting to download a cached file avoids downloading it again.""" - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write("[1, 2, 3]") - - Datafile(timestamp=None, path=temporary_file.name).to_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket="nope.txt" - ) - - datafile = Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, datafile_path="nope.txt" - ) + _, project_name, bucket_name, path_in_bucket, _ = self.create_datafile_in_cloud() + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) # Download for first time. datafile.get_local_path() @@ -335,37 +387,16 @@ def test_open_with_writing_local_file(self): def test_open_with_reading_cloud_file(self): """Test that a cloud datafile can be opened for reading.""" - file_contents = "[1, 2, 3]" - - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write(file_contents) - - Datafile(timestamp=None, path=temporary_file.name).to_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket="nope.txt" - ) - - datafile = Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, datafile_path="nope.txt" - ) + _, project_name, bucket_name, path_in_bucket, contents = self.create_datafile_in_cloud() + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) with datafile.open() as f: - self.assertEqual(f.read(), file_contents) + self.assertEqual(f.read(), contents) def test_open_with_writing_to_cloud_file(self): """Test that a cloud datafile can be opened for writing and that both the remote and local copies are updated.""" - original_file_contents = "[1, 2, 3]" - filename = "nope.txt" - - with tempfile.NamedTemporaryFile("w", delete=False) as temporary_file: - temporary_file.write(original_file_contents) - - Datafile(timestamp=None, path=temporary_file.name).to_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, path_in_bucket=filename - ) - - datafile = Datafile.from_cloud( - project_name=TEST_PROJECT_NAME, bucket_name=TEST_BUCKET_NAME, datafile_path=filename - ) + _, project_name, bucket_name, path_in_bucket, original_contents = self.create_datafile_in_cloud() + datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) new_file_contents = "nanana" @@ -374,16 +405,16 @@ def test_open_with_writing_to_cloud_file(self): # Check that the cloud file isn't updated until the context manager is closed. self.assertEqual( - GoogleCloudStorageClient(project_name=TEST_PROJECT_NAME).download_as_string( - bucket_name=TEST_BUCKET_NAME, path_in_bucket=filename + GoogleCloudStorageClient(project_name=project_name).download_as_string( + bucket_name=bucket_name, path_in_bucket=path_in_bucket ), - original_file_contents, + original_contents, ) # Check that the cloud file has now been updated. self.assertEqual( - GoogleCloudStorageClient(project_name=TEST_PROJECT_NAME).download_as_string( - bucket_name=TEST_BUCKET_NAME, path_in_bucket=filename + GoogleCloudStorageClient(project_name=project_name).download_as_string( + bucket_name=bucket_name, path_in_bucket=path_in_bucket ), new_file_contents, ) @@ -449,3 +480,36 @@ def test_posix_timestamp(self): datafile.timestamp = datetime(1970, 1, 1) self.assertEqual(datafile.posix_timestamp, 0) + + def test_datafile_as_context_manager(self): + """Test that Datafile can be used as a context manager to manage local changes.""" + temporary_file = tempfile.NamedTemporaryFile("w", delete=False) + contents = "Here is the content." + + with Datafile(path=temporary_file.name, timestamp=None, mode="w") as (datafile, f): + f.write(contents) + + # Check that the cloud file has been updated. + with datafile.open() as f: + self.assertEqual(f.read(), contents) + + def test_from_datafile_as_context_manager(self): + """Test that Datafile.from_cloud can be used as a context manager to manage cloud changes.""" + _, project_name, bucket_name, path_in_bucket, original_content = self.create_datafile_in_cloud() + new_contents = "Here is the new content." + self.assertNotEqual(original_content, new_contents) + + with Datafile.from_cloud(project_name, bucket_name, path_in_bucket, mode="w") as (datafile, f): + datafile.add_tags("blue") + f.write(new_contents) + + # Check that the cloud metadata has been updated. + re_downloaded_datafile = Datafile.from_cloud(project_name, bucket_name, path_in_bucket) + self.assertTrue("blue" in re_downloaded_datafile.tags) + + # The file cache must be cleared so the modified cloud file is downloaded. + re_downloaded_datafile.clear_from_file_cache() + + # Check that the cloud file has been updated. + with re_downloaded_datafile.open() as f: + self.assertEqual(f.read(), new_contents)