Skip to content

Commit

Permalink
Added 'data_dict' attribute (DataDictDataset) to AbstractVersionedDat…
Browse files Browse the repository at this point in the history
…aset

Signed-off-by: Noam Goldberg <noamgoldberg2@gmail.com>
  • Loading branch information
noamgoldberg committed Mar 29, 2024
1 parent b77de75 commit f41a822
Showing 1 changed file with 101 additions and 1 deletion.
102 changes: 101 additions & 1 deletion kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from glob import iglob
from operator import attrgetter
from pathlib import Path, PurePath, PurePosixPath
from typing import Any, Callable, Generic, TypeVar
from typing import Any, Callable, Generic, TypeVar, Union, Type, Dict
from urllib.parse import urlsplit

from cachetools import Cache, cachedmethod
Expand Down Expand Up @@ -464,6 +464,98 @@ def _local_exists(local_filepath: str) -> bool: # SKIP_IF_NO_SPARK
filepath = Path(local_filepath)
return filepath.exists() or any(par.is_file() for par in filepath.parents)

class DataDictDataset(AbstractDataset):
"""
``DataDictDataset`` is designed to facilitate the integration of a data
dictionary with other dataset objects that inherit from
``AbstractVersionedDataset``. This class allows users to configure and
manage additional metadata or settings for their datasets, enhancing
flexibility and customization for data processing workflows.
The primary use case of ``DataDictDataset`` is to enable the configuration
of a data dictionary directly in the dataset catalog or "underneath" another
dataset using the 'data_dict' attribute. This approach provides a structured
way to maintain additional dataset parameters, configurations, or metadata
alongside the primary dataset objects.
Examples:
- Instantiating a data dictionary directly in the catalog:
::
data_dict_dataset:
type: <path-to-DataDictDataset>.DataDictDataset
dataset: yaml.YAMLDataSet
filepath: data/01_raw/data_dict.yml
# Additional yaml.YAMLDataSet kwargs
- Configuring a data dictionary underneath another dataset:
::
dataset_with_data_dict:
type: pandas.CSVDataSet
filepath: data/01_raw/data.csv
# Additional pandas.CSVDataSet kwargs
data_dict:
dataset: yaml.YAMLDataSet
filepath: data/01_raw/data_dict.yml
# Additional yaml.YAMLDataSet kwargs
Args:
dataset: The dataset or the configuration of the dataset with which
the data dictionary is being configured. This can be a string
representing the dataset type, a dictionary containing the dataset
configuration, or a direct reference to a `Type[AbstractVersionedDataset]`.
filepath: Filepath in POSIX format to a file or a directory where the
dataset is stored or will be saved to.
Raises:
DatasetError: If there's an issue instantiating the data dictionary,
or if versioning is attempted to be applied to the underlying dataset,
as ``DataDictDataset`` does not support versioning of the underlying dataset.
"""
def __init__(
self,
dataset: Union[str, Type[AbstractVersionedDataset], Dict[str, Any]],
filepath: str,
):
try:
self._data_dict = None
self._filepath = filepath
dataset = dataset if isinstance(dataset, dict) else {"type": dataset}
self._dataset_type, self._dataset_config = parse_dataset_definition(dataset)
if VERSION_KEY in self._dataset_config:
raise DatasetError(
f"'{self.__class__.__name__}' does not support versioning of the "
f"underlying dataset. Please remove '{VERSIONED_FLAG_KEY}' flag from "
f"the dataset definition."
)
except Exception as e:
raise DatasetError(f"Failed to instantiate data dictionary. Error: {e}")

@property
def data_dict(self) -> Union[None | Type[AbstractDataset]]:
if self._data_dict is None:
self._data_dict = self._dataset_type(self._filepath, **self._dataset_config)
return self._data_dict

def _load(self) -> Any:
return self.data_dict.load()

def _save(self, data: Any):
return self.data_dict.save(data)

def _describe(self):
return self.data_dict._describe()

def _exists(self):
return self.data_dict._exists()

def describe(self):
return self._describe()

def exists(self):
return self._exists()


class AbstractVersionedDataset(AbstractDataset[_DI, _DO], abc.ABC):
"""
Expand Down Expand Up @@ -537,6 +629,7 @@ def __init__(
self._glob_function = glob_function or iglob
# 1 entry for load version, 1 for save version
self._version_cache = Cache(maxsize=2) # type: Cache
self._data_dict = None

# 'key' is set to prevent cache key overlapping for load and save:
# https://cachetools.readthedocs.io/en/stable/#cachetools.cachedmethod
Expand Down Expand Up @@ -662,6 +755,13 @@ def exists(self) -> bool:
)
raise DatasetError(message) from exc

@property
def data_dict(self) -> Dict[str, str]:
if isinstance(self._data_dict, dict):
if self._data_dict:
self._data_dict = DataDictDataset(**self._data_dict)
return self._data_dict

def _release(self) -> None:
super()._release()
self._version_cache.clear()
Expand Down

0 comments on commit f41a822

Please sign in to comment.