diff --git a/changelog/19.breaking.md b/changelog/19.breaking.md new file mode 100644 index 0000000..5dcdf4c --- /dev/null +++ b/changelog/19.breaking.md @@ -0,0 +1,3 @@ +- Required `db_dir` to be passed when initialising [pandas_openscm.db.reader.OpenSCMDBReader][]. This is required to support portable databases +- Renamed `out_column_type` to `out_columns_type` in [pandas_openscm.io.load_timeseries_csv][] for consistency with the rest of the API +- Bumped the minimum supported version of [filelock](https://py-filelock.readthedocs.io/) to 3.12.3, as only this version handles automatic creation of directories for the lock diff --git a/changelog/19.feature.md b/changelog/19.feature.md new file mode 100644 index 0000000..291912d --- /dev/null +++ b/changelog/19.feature.md @@ -0,0 +1,3 @@ +- Made the database portable by only storing relative paths in the file map. This allows the database to be converted to an archive with [pandas_openscm.db.OpenSCMDB.to_gzipped_tar_archive][] and then unpacked elsewhere with [pandas_openscm.db.OpenSCMDB.from_gzipped_tar_archive][] +- Added [pandas_openscm.db.path_handling][] to clarify how we handle paths internally to support portability +- Added support for specifying the name of the output columns via [pandas_openscm.db.OpenSCMDB.load][], [pandas_openscm.db.reader.OpenSCMDBReader.load][] and [pandas_openscm.io.load_timeseries_csv][] diff --git a/changelog/19.improvement.md b/changelog/19.improvement.md new file mode 100644 index 0000000..41d604f --- /dev/null +++ b/changelog/19.improvement.md @@ -0,0 +1,2 @@ +- Added the explicit [pandas_openscm.db.backends][] module to handle the backends we support more clearly +- Added [pandas_openscm.db.backends.DataBackendOptions.guess_backend][] and [pandas_openscm.db.backends.IndexBackendOptions.guess_backend][] to allow for move convenient inference of the backend to use with different files diff --git a/changelog/19.trivial.md b/changelog/19.trivial.md new file mode 100644 index 0000000..18fcaaf --- /dev/null +++ b/changelog/19.trivial.md @@ -0,0 +1 @@ +Moved DATA_BACKENDS and INDEX_BACKENDS to [pandas_openscm.db.backends][], out of the top level [pandas_openscm.db][] module diff --git a/docs/how-to-guides/how-to-use-openscmdb.py b/docs/how-to-guides/how-to-use-openscmdb.py index 7e4ca3e..3effb7d 100644 --- a/docs/how-to-guides/how-to-use-openscmdb.py +++ b/docs/how-to-guides/how-to-use-openscmdb.py @@ -26,6 +26,7 @@ import concurrent.futures import contextlib import itertools +import tarfile import tempfile import traceback from functools import partial @@ -200,6 +201,82 @@ # %% [markdown] # ## Advanced topics +# %% [markdown] +# ### Sharing the database +# +# If you need to share a database, +# you can zip it and pass it to someone else. + +# %% [markdown] +# We start by putting some data in a database. + +# %% +top_level_dir = Path(tempfile.mkdtemp()) + +# %% +db_start = OpenSCMDB( + db_dir=top_level_dir / "start", + backend_data=DATA_BACKENDS.get_instance("csv"), + backend_index=INDEX_BACKENDS.get_instance("csv"), +) +db_start.save(df_timeseries_like) + +# %% [markdown] +# Then we create a gzipped tar archive of our database. + +# %% +gzipped = top_level_dir / "db_archive.tar.gz" +db_start.to_gzipped_tar_archive(gzipped) + +# %% [markdown] +# To demonstrate that this does not rely on the original data, +# we delete the original database. + +# %% +db_start.delete() + +# %% [markdown] +# We can inspect the tar file's contents. + +# %% +with tarfile.open(gzipped) as tar: + print(f"{tar.getmembers()=}") + +# %% [markdown] +# A new database can be initialised from the gzipped tar archive. + +# %% +db_moved = OpenSCMDB.from_gzipped_tar_archive( + gzipped, + db_dir=top_level_dir / "moved", +) +db_moved + +# %% [markdown] +# As above, we remove the archive +# to demonstrate that there is no reliance on it +# for the following operations. + +# %% +gzipped.unlink() + +# %% [markdown] +# You can then use this database like normal, +# but now from the new location +# (whether on your machine or someone else's). + +# %% +db_moved.load() + +# %% +db_moved.load(pix.isin(unit="J")) + +# %% [markdown] +# We clean up the files before moving onto the next demonstration. + +# %% +db_moved.delete() + # %% [markdown] # ### Grouping data # diff --git a/pyproject.toml b/pyproject.toml index 71e2ae3..ba6befb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ Issues = "https://github.com/openscm/pandas-openscm/issues" [project.optional-dependencies] db = [ - "filelock>=3.0.0", + "filelock>=3.12.3", ] db-full = [ "netcdf4>=1.7.2", diff --git a/src/pandas_openscm/db/__init__.py b/src/pandas_openscm/db/__init__.py index 3f97266..ca5dd92 100644 --- a/src/pandas_openscm/db/__init__.py +++ b/src/pandas_openscm/db/__init__.py @@ -4,8 +4,7 @@ from __future__ import annotations -from attrs import frozen - +from pandas_openscm.db.backends import DATA_BACKENDS, INDEX_BACKENDS from pandas_openscm.db.csv import CSVDataBackend, CSVIndexBackend from pandas_openscm.db.feather import FeatherDataBackend, FeatherIndexBackend from pandas_openscm.db.in_memory import InMemoryDataBackend, InMemoryIndexBackend @@ -13,125 +12,6 @@ from pandas_openscm.db.netcdf import netCDFDataBackend, netCDFIndexBackend from pandas_openscm.db.openscm_db import AlreadyInDBError, EmptyDBError, OpenSCMDB - -@frozen -class DataBackendOptions: - """A collection of data back-end options""" - - options: tuple[ # type hint doesn't work properly, but ok - tuple[str, type[OpenSCMDBDataBackend]], ... - ] - """ - Options - - The first element of each option is the option's short name. - The second element is the class that matches that option. - """ - - def get_instance(self, option: str) -> OpenSCMDBDataBackend: - """ - Get an instance of one of the options - - Parameters - ---------- - option - Option for which to get a data back-end instance - - Returns - ------- - : - Initialised instance - - Raises - ------ - KeyError - The option is not supported - """ - for short_name, option_cls in self.options: - if short_name == option: - return option_cls() - - msg = ( - f"{option=} is not supported. " - f"Available options: {tuple(v[1] for v in self.options)}" - ) - raise KeyError(msg) - - -DATA_BACKENDS = DataBackendOptions( - ( # type: ignore # using class with protocol doesn't work properly - ("csv", CSVDataBackend), - ("feather", FeatherDataBackend), - ("in_memory", InMemoryDataBackend), - ("netCDF", netCDFDataBackend), - # Other options to consider: - # - # - pretty netCDF, where we try and save the data with dimensions where possible - # - # - HDF5: https://pandas.pydata.org/docs/user_guide/io.html#hdf5-pytables - # - sqllite - ) -) -"""Inbuilt data back-ends""" - - -@frozen -class IndexBackendOptions: - """A collection of index back-end options""" - - options: tuple[tuple[str, type[OpenSCMDBIndexBackend]], ...] - """ - Options - - The first element of each option is the option's short name. - The second element is the class that matches that option. - """ - - def get_instance(self, option: str) -> OpenSCMDBIndexBackend: - """ - Get an instance of one of the options - - Parameters - ---------- - option - Option for which to get a index back-end instance - - Returns - ------- - : - Initialised instance - - Raises - ------ - KeyError - The option is not supported - """ - for short_name, option_cls in self.options: - if short_name == option: - return option_cls() - - msg = ( - f"{option=} is not supported. " - f"Available options: {tuple(v[1] for v in self.options)}" - ) - raise KeyError(msg) - - -INDEX_BACKENDS = IndexBackendOptions( - ( # type: ignore # using class with protocol doesn't work properly - ("csv", CSVIndexBackend), - ("feather", FeatherIndexBackend), - ("in_memory", InMemoryIndexBackend), - ("netCDF", netCDFIndexBackend), - # Other options to consider: - # - # - HDF5: https://pandas.pydata.org/docs/user_guide/io.html#hdf5-pytables - # - sqllite - ) -) -"""Inbuilt index back-ends""" - - __all__ = [ "DATA_BACKENDS", "INDEX_BACKENDS", diff --git a/src/pandas_openscm/db/backends.py b/src/pandas_openscm/db/backends.py new file mode 100644 index 0000000..2d2976e --- /dev/null +++ b/src/pandas_openscm/db/backends.py @@ -0,0 +1,201 @@ +""" +Available back-ends + +This is just a shortcut/convenience module +""" + +from __future__ import annotations + +from pathlib import Path + +from attrs import frozen + +from pandas_openscm.db.csv import CSVDataBackend, CSVIndexBackend +from pandas_openscm.db.feather import FeatherDataBackend, FeatherIndexBackend +from pandas_openscm.db.in_memory import InMemoryDataBackend, InMemoryIndexBackend +from pandas_openscm.db.interfaces import OpenSCMDBDataBackend, OpenSCMDBIndexBackend +from pandas_openscm.db.netcdf import netCDFDataBackend, netCDFIndexBackend + + +@frozen +class DataBackendOptions: + """A collection of data back-end options""" + + options: tuple[ # type hint doesn't work properly, but ok + tuple[str, type[OpenSCMDBDataBackend]], ... + ] + """ + Options + + The first element of each option is the option's short name. + The second element is the class that matches that option. + """ + + def get_instance(self, option: str) -> OpenSCMDBDataBackend: + """ + Get an instance of one of the options + + Parameters + ---------- + option + Option for which to get a data back-end instance + + Returns + ------- + : + Initialised instance + + Raises + ------ + KeyError + The option is not supported + """ + for short_name, option_cls in self.options: + if short_name == option: + return option_cls() + + msg = ( + f"{option=} is not supported. " + f"Available options: {tuple(v[1] for v in self.options)}" + ) + raise KeyError(msg) + + def guess_backend(self, data_file_name: str) -> OpenSCMDBDataBackend: + """ + Guess backend from a file name + + Parameters + ---------- + data_file_name + Name of the data file from which to guess the backend + + Returns + ------- + : + Guessed backend + + Raises + ------ + ValueError + The backend could not be guessed from `data_file_name` + """ + ext = Path(data_file_name).suffix + for _, option_cls in self.options: + option = option_cls() + if ext == option.ext: + return option + + known_options_and_extensions = [(v[0], v[1]().ext) for v in self.options] + msg = ( + f"Could not guess backend from {data_file_name=!r}. " + "The file's extension does not match any of the available options: " + f"{known_options_and_extensions=}" + ) + raise ValueError(msg) + + +DATA_BACKENDS = DataBackendOptions( + ( # type: ignore # using class with protocol doesn't work properly + ("csv", CSVDataBackend), + ("feather", FeatherDataBackend), + ("in_memory", InMemoryDataBackend), + ("netCDF", netCDFDataBackend), + # Other options to consider: + # + # - pretty netCDF, where we try and save the data with dimensions where possible + # + # - HDF5: https://pandas.pydata.org/docs/user_guide/io.html#hdf5-pytables + # - sqllite + ) +) +"""Inbuilt data back-ends""" + + +@frozen +class IndexBackendOptions: + """A collection of index back-end options""" + + options: tuple[tuple[str, type[OpenSCMDBIndexBackend]], ...] + """ + Options + + The first element of each option is the option's short name. + The second element is the class that matches that option. + """ + + def get_instance(self, option: str) -> OpenSCMDBIndexBackend: + """ + Get an instance of one of the options + + Parameters + ---------- + option + Option for which to get a index back-end instance + + Returns + ------- + : + Initialised instance + + Raises + ------ + KeyError + The option is not supported + """ + for short_name, option_cls in self.options: + if short_name == option: + return option_cls() + + msg = ( + f"{option=} is not supported. " + f"Available options: {tuple(v[1] for v in self.options)}" + ) + raise KeyError(msg) + + def guess_backend(self, index_file_name: str) -> OpenSCMDBIndexBackend: + """ + Guess backend from a file name + + Parameters + ---------- + index_file_name + Name of the index file from which to guess the backend + + Returns + ------- + : + Guessed backend + + Raises + ------ + ValueError + The backend could not be guessed from `index_file_name` + """ + ext = Path(index_file_name).suffix + for _, option_cls in self.options: + option = option_cls() + if ext == option.ext: + return option + + known_options_and_extensions = [(v[0], v[1]().ext) for v in self.options] + msg = ( + f"Could not guess backend from {index_file_name=!r}. " + "The file's extension does not match any of the available options: " + f"{known_options_and_extensions=}" + ) + raise ValueError(msg) + + +INDEX_BACKENDS = IndexBackendOptions( + ( # type: ignore # using class with protocol doesn't work properly + ("csv", CSVIndexBackend), + ("feather", FeatherIndexBackend), + ("in_memory", InMemoryIndexBackend), + ("netCDF", netCDFIndexBackend), + # Other options to consider: + # + # - HDF5: https://pandas.pydata.org/docs/user_guide/io.html#hdf5-pytables + # - sqllite + ) +) +"""Inbuilt index back-ends""" diff --git a/src/pandas_openscm/db/loading.py b/src/pandas_openscm/db/loading.py index dcd2bbf..926ea7e 100644 --- a/src/pandas_openscm/db/loading.py +++ b/src/pandas_openscm/db/loading.py @@ -33,8 +33,10 @@ def load_data( # noqa: PLR0913 backend_data: OpenSCMDBDataBackend, db_index: pd.DataFrame, db_file_map: pd.Series[Path], # type: ignore # pandas type hints confused about what they support + db_dir: Path, selector: pd.Index[Any] | pd.MultiIndex | pix.selectors.Selector | None = None, out_columns_type: type | None = None, + out_columns_name: str | None = None, parallel_op_config: ParallelOpConfig | None = None, progress: bool = False, max_workers: int | None = None, @@ -53,6 +55,9 @@ def load_data( # noqa: PLR0913 db_file_map File map of the database from which to load + db_dir + The directory in which the database lives + selector Selector to use to choose the data to load @@ -61,6 +66,16 @@ def load_data( # noqa: PLR0913 If not supplied, we don't set the output columns' type. + out_columns_name + The name for the columns in the output. + + If not supplied, we don't set the output columns' name. + + This can also be set with + [pd.DataFrame.rename_axis][pandas.DataFrame.rename_axis] + but we provide it here for convenience + (and in case you couldn't find this trick for ages, like us). + parallel_op_config Configuration for executing the operation in parallel with progress bars @@ -97,7 +112,7 @@ def load_data( # noqa: PLR0913 else: index_to_load = mi_loc(db_index, selector) - files_to_load = (Path(v) for v in db_file_map[index_to_load["file_id"].unique()]) + files_to_load = (db_dir / v for v in db_file_map[index_to_load["file_id"].unique()]) loaded_l = load_data_files( files_to_load=files_to_load, backend_data=backend_data, @@ -132,6 +147,9 @@ def load_data( # noqa: PLR0913 if out_columns_type is not None: res.columns = res.columns.astype(out_columns_type) + if out_columns_name is not None: + res = res.rename_axis(out_columns_name, axis="columns") + return res diff --git a/src/pandas_openscm/db/netcdf.py b/src/pandas_openscm/db/netcdf.py index b8d5ed8..8079e24 100644 --- a/src/pandas_openscm/db/netcdf.py +++ b/src/pandas_openscm/db/netcdf.py @@ -96,7 +96,13 @@ def save_data(self, data: pd.DataFrame, data_file: Path) -> None: # Resetting the index will also give each timeseries a unique ID data_rs = data.reset_index() timeseries_coord_info = {self.timeseries_dim: data_rs.index.values} - time_coord_info = {"time": data.columns} + if data.columns.name is None: + time_dim = "time" + else: + time_dim = data.columns.name + + time_coord_info = {time_dim: data.columns.values} + data_index_xr = metadata_df_to_xr( data_rs[data.index.names], timeseries_id_coord=xr.Coordinates(timeseries_coord_info), @@ -104,7 +110,7 @@ def save_data(self, data: pd.DataFrame, data_file: Path) -> None: ) data_values_xr = xr.DataArray( data, - dims=[self.timeseries_dim, "time"], + dims=[self.timeseries_dim, time_dim], coords=xr.Coordinates(timeseries_coord_info | time_coord_info), ) data_xr = xr.merge([data_index_xr, data_values_xr.to_dataset(name="values")]) diff --git a/src/pandas_openscm/db/openscm_db.py b/src/pandas_openscm/db/openscm_db.py index 6b79861..62c8ccb 100644 --- a/src/pandas_openscm/db/openscm_db.py +++ b/src/pandas_openscm/db/openscm_db.py @@ -4,6 +4,7 @@ from __future__ import annotations +import tarfile import warnings from pathlib import Path from typing import TYPE_CHECKING, Any @@ -11,6 +12,7 @@ import pandas as pd from attrs import define, field +from pandas_openscm.db.backends import DATA_BACKENDS, INDEX_BACKENDS from pandas_openscm.db.deleting import delete_files from pandas_openscm.db.interfaces import OpenSCMDBDataBackend, OpenSCMDBIndexBackend from pandas_openscm.db.loading import ( @@ -19,6 +21,7 @@ load_db_index, load_db_metadata, ) +from pandas_openscm.db.path_handling import DBPath from pandas_openscm.db.reader import OpenSCMDBReader from pandas_openscm.db.rewriting import make_move_plan, rewrite_files from pandas_openscm.db.saving import save_data @@ -225,6 +228,7 @@ def create_reader( res = OpenSCMDBReader( backend_data=self.backend_data, + db_dir=self.db_dir, db_index=db_index, db_file_map=db_file_map, lock=lock, @@ -287,7 +291,67 @@ def delete( max_workers=max_workers, ) - def get_new_data_file_path(self, file_id: int) -> Path: + @classmethod + def from_gzipped_tar_archive( + cls, + tar_archive: Path, + db_dir: Path, + backend_data: OpenSCMDBDataBackend | None = None, + backend_index: OpenSCMDBIndexBackend | None = None, + ) -> OpenSCMDB: + """ + Initialise from a gzipped tar archive + + This also unpacks the files to disk + + Parameters + ---------- + tar_archive + Tar archive from which to initialise + + db_dir + Directory in which to unpack the database + + backend_data + Backend to use for handling the data + + backend_index + Backend to use for handling the index + + Returns + ------- + : + Initialised database + """ + with tarfile.open(tar_archive, "r") as tar: + for member in tar.getmembers(): + if not member.isreg(): + # Only extract files + continue + # Extract to the db_dir + member.name = Path(member.name).name + tar.extract(member, db_dir) + if backend_index is None and member.name.startswith("index"): + backend_index = INDEX_BACKENDS.guess_backend(member.name) + + if backend_data is None and not any( + member.name.startswith(v) for v in ["index", "filemap"] + ): + backend_data = DATA_BACKENDS.guess_backend(member.name) + + if backend_data is None: # pragma: no cover + # Should be impossible to get here + raise TypeError(backend_data) + + if backend_index is None: # pragma: no cover + # Should be impossible to get here + raise TypeError(backend_index) + + res = cls(backend_data=backend_data, backend_index=backend_index, db_dir=db_dir) + + return res + + def get_new_data_file_path(self, file_id: int) -> DBPath: """ Get the path in which to write a new data file @@ -299,7 +363,7 @@ def get_new_data_file_path(self, file_id: int) -> Path: Returns ------- : - File in which to write the new data + Information about the path in which to write the new data Raises ------ @@ -311,7 +375,7 @@ def get_new_data_file_path(self, file_id: int) -> Path: if file_path.exists(): raise FileExistsError(file_path) - return file_path + return DBPath.from_abs_path_and_db_dir(abs=file_path, db_dir=self.db_dir) def load( # noqa: PLR0913 self, @@ -319,6 +383,7 @@ def load( # noqa: PLR0913 *, index_file_lock: filelock.BaseFileLock | None = None, out_columns_type: type | None = None, + out_columns_name: str | None = None, parallel_op_config: ParallelOpConfig | None = None, progress: bool = False, max_workers: int | None = None, @@ -341,6 +406,16 @@ def load( # noqa: PLR0913 If not supplied, we don't set the output columns' type. + out_columns_name + The name for the columns in the output. + + If not supplied, we don't set the output columns' name. + + This can also be set with + [pd.DataFrame.rename_axis][pandas.DataFrame.rename_axis] + but we provide it here for convenience + (and in case you couldn't find this trick for ages, like us). + parallel_op_config Configuration for executing the operation in parallel with progress bars @@ -391,8 +466,10 @@ def load( # noqa: PLR0913 backend_data=self.backend_data, db_index=index, db_file_map=file_map, + db_dir=self.db_dir, selector=selector, out_columns_type=out_columns_type, + out_columns_name=out_columns_name, parallel_op_config=parallel_op_config, progress=progress, max_workers=max_workers, @@ -644,6 +721,7 @@ def save( # noqa: PLR0913 file_map_start=file_map_db, data_to_write=data, get_new_data_file_path=self.get_new_data_file_path, + db_dir=self.db_dir, ) # As needed, re-write files without deleting the old files @@ -706,3 +784,27 @@ def save( # noqa: PLR0913 progress=progress, max_workers=max_workers, ) + + def to_gzipped_tar_archive(self, out_file: Path, mode: str = "w:gz") -> Path: + """ + Convert to a gzipped tar archive + + Parameters + ---------- + out_file + File in which to write the output + + mode + Mode to use to open `out_file` + + Returns + ------- + : + Path to the gzipped tar archive + + This is the same as `out_file`, but is returned for convenience. + """ + with tarfile.open(out_file, mode=mode) as tar: + tar.add(self.db_dir, arcname="db") + + return out_file diff --git a/src/pandas_openscm/db/path_handling.py b/src/pandas_openscm/db/path_handling.py new file mode 100644 index 0000000..4278f2f --- /dev/null +++ b/src/pandas_openscm/db/path_handling.py @@ -0,0 +1,75 @@ +""" +Functionality for handling paths + +In order to make our databases portable, +we need to be a bit smarter than just using raw paths. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import attr +from attrs import define, field + + +@define +class DBPath: + """ + Database-related path + + Carries the information required to write paths with certainty + and keep the database portable. + """ + + abs: Path + """The absolute path for the file""" + + rel_db: Path = field() + """The path relative to the database's directory""" + + @rel_db.validator + def rel_db_validator(self, attribute: attr.Attribute[Any], value: Path) -> None: + """ + Validate the value of `rel_db` + + Parameters + ---------- + attribute + Attribute being set + + value + Value to use + + Raises + ------ + AssertionError + `value` is not within `self.abs` + """ + if not str(self.abs).endswith(str(value)): + msg = ( + f"{attribute.name} value, {value!r}, " + f"is not a sub-path of {self.abs=!r}" + ) + raise AssertionError(msg) + + @classmethod + def from_abs_path_and_db_dir(cls, abs: Path, db_dir: Path) -> DBPath: + """ + Initialise from an absolute path and a database directory + + Parameters + ---------- + abs + Absolute path + + db_dir + Database directory + + Returns + ------- + : + Initialised `DBPath` + """ + return cls(abs=abs, rel_db=abs.relative_to(db_dir)) diff --git a/src/pandas_openscm/db/reader.py b/src/pandas_openscm/db/reader.py index 6f746d4..49c6484 100644 --- a/src/pandas_openscm/db/reader.py +++ b/src/pandas_openscm/db/reader.py @@ -32,7 +32,8 @@ class OpenSCMDBReader: Reader for reading data out of a database created with `OpenSCMDB` Holds the database file map and index in memory, - so this can be faster for repeated reads. + which can make repeated read operations faster + than using an `OpenSCMDB` instance. """ backend_data: OpenSCMDBDataBackend = field(kw_only=True) @@ -40,6 +41,11 @@ class OpenSCMDBReader: The backend for reading data from disk """ + db_dir: Path = field(kw_only=True) + """ + The directory in which the database lives + """ + db_file_map: pd.Series[Path] = field(kw_only=True) # type: ignore # pandas type hints confused about what they support """ The file map of the database from which we are reading. @@ -85,11 +91,12 @@ def metadata(self) -> pd.MultiIndex: """ return convert_db_index_to_metadata(db_index=self.db_index) - def load( + def load( # noqa: PLR0913 self, selector: pd.Index[Any] | pd.MultiIndex | pix.selectors.Selector | None = None, *, out_columns_type: type | None = None, + out_columns_name: str | None = None, parallel_op_config: ParallelOpConfig | None = None, progress: bool = False, max_workers: int | None = None, @@ -107,6 +114,16 @@ def load( If not supplied, we don't set the output columns' type. + out_columns_name + The name for the columns in the output. + + If not supplied, we don't set the output columns' name. + + This can also be set with + [pd.DataFrame.rename_axis][pandas.DataFrame.rename_axis] + but we provide it here for convenience + (and in case you couldn't find this trick for ages, like us). + parallel_op_config Configuration for executing the operation in parallel with progress bars @@ -142,8 +159,10 @@ def load( backend_data=self.backend_data, db_index=self.db_index, db_file_map=self.db_file_map, + db_dir=self.db_dir, selector=selector, out_columns_type=out_columns_type, + out_columns_name=out_columns_name, parallel_op_config=parallel_op_config, progress=progress, max_workers=max_workers, diff --git a/src/pandas_openscm/db/rewriting.py b/src/pandas_openscm/db/rewriting.py index 67ba176..1ab89ef 100644 --- a/src/pandas_openscm/db/rewriting.py +++ b/src/pandas_openscm/db/rewriting.py @@ -17,6 +17,7 @@ from attrs import define from pandas_openscm.db.interfaces import OpenSCMDBDataBackend +from pandas_openscm.db.path_handling import DBPath from pandas_openscm.index_manipulation import ( unify_index_levels_check_index_types, update_index_from_candidates, @@ -180,7 +181,8 @@ def make_move_plan( index_start: pd.DataFrame, file_map_start: pd.Series[Path], # type: ignore # pandas confused about ability to support Path data_to_write: pd.DataFrame, - get_new_data_file_path: Callable[[int], Path], + get_new_data_file_path: Callable[[int], DBPath], + db_dir: Path, ) -> MovePlan: """ Make a plan for moving data around to make room for new data @@ -196,6 +198,12 @@ def make_move_plan( data_to_write Data that is going to be written in the database + get_new_data_file_path + Callable which, given an integer, returns the path info for the new data file + + db_dir + Database directory + Returns ------- : @@ -229,7 +237,7 @@ def make_move_plan( # (would be even more efficient to just update the file IDs, # but that would create a coupling I can't get my head around right now). delete_file_ids = full_overwrite.index[full_overwrite] - delete_paths = file_map_start.loc[delete_file_ids] + delete_paths = (db_dir / v for v in file_map_start.loc[delete_file_ids]) moved_index = index_start[~index_start["file_id"].isin(delete_file_ids)] file_map_out = file_map_start.loc[moved_index["file_id"].unique()] @@ -246,7 +254,7 @@ def make_move_plan( full_overwrite_file_ids = full_overwrite.index[full_overwrite] partial_overwrite_file_ids = partial_overwrite.index[partial_overwrite] file_ids_to_delete = np.union1d(full_overwrite_file_ids, partial_overwrite_file_ids) - delete_paths = file_map_start.loc[file_ids_to_delete] + delete_paths = (db_dir / v for v in file_map_start.loc[file_ids_to_delete]) file_id_map = {} max_file_id_start = file_map_start.index.max() @@ -259,12 +267,13 @@ def make_move_plan( ): new_file_id = max_file_id_start + 1 + increment - file_map_out.loc[new_file_id] = get_new_data_file_path(new_file_id) + new_db_path = get_new_data_file_path(new_file_id) + file_map_out.loc[new_file_id] = new_db_path.rel_db rewrite_actions_l.append( ReWriteAction( - from_file=file_map_start.loc[file_id_old], - to_file=file_map_out.loc[new_file_id], + from_file=db_dir / file_map_start.loc[file_id_old], + to_file=new_db_path.abs, locator=fiddf.index.droplevel("file_id"), ) ) diff --git a/src/pandas_openscm/db/saving.py b/src/pandas_openscm/db/saving.py index d1cd059..a911cf2 100644 --- a/src/pandas_openscm/db/saving.py +++ b/src/pandas_openscm/db/saving.py @@ -15,6 +15,7 @@ from attrs import define from pandas_openscm.db.interfaces import OpenSCMDBDataBackend, OpenSCMDBIndexBackend +from pandas_openscm.db.path_handling import DBPath from pandas_openscm.index_manipulation import ( unify_index_levels_check_index_types, ) @@ -63,7 +64,7 @@ def save_data( # noqa: PLR0913 data: pd.DataFrame, *, backend_data: OpenSCMDBDataBackend, - get_new_data_file_path: Callable[[int], Path], + get_new_data_file_path: Callable[[int], DBPath], backend_index: OpenSCMDBIndexBackend, index_file: Path, file_map_file: Path, @@ -84,8 +85,20 @@ def save_data( # noqa: PLR0913 data Data to save - db - Database in which to save the data + backend_data + Backend to use to save the data + + get_new_data_file_path + Callable which, given an integer, returns the path info for the new data file + + backend_index + Backend to use to save the index + + index_file + File in which to save the index + + file_map_file + File in which to save the file map index_non_data Index that is already in the database but isn't related to data. @@ -94,7 +107,7 @@ def save_data( # noqa: PLR0913 before we write the database's index. file_map_non_data - File map that is already in the database but isn't related to data. + File map that is already in the database but isn't related to `data`. If supplied, this is combined with the file map generated for `data` before we write the database's file map. @@ -179,9 +192,9 @@ def save_data( # noqa: PLR0913 for increment, (_, df) in enumerate(grouper): file_id = min_file_id + increment - new_file_path = get_new_data_file_path(file_id) + new_db_path = get_new_data_file_path(file_id) - file_map_out.loc[file_id] = new_file_path # type: ignore # pandas types confused about what they support + file_map_out.loc[file_id] = new_db_path.rel_db # type: ignore # pandas types confused about what they support if index_non_data_unified_index is None: df_index_unified = df.index else: @@ -202,7 +215,7 @@ def save_data( # noqa: PLR0913 info=df, info_kind=DBFileType.DATA, backend=backend_data, - save_path=new_file_path, + save_path=new_db_path.abs, ) ) diff --git a/src/pandas_openscm/io.py b/src/pandas_openscm/io.py index cbc08df..edbcd74 100644 --- a/src/pandas_openscm/io.py +++ b/src/pandas_openscm/io.py @@ -13,7 +13,8 @@ def load_timeseries_csv( fp: Path, lower_column_names: bool = True, index_columns: list[str] | None = None, - out_column_type: type | None = None, + out_columns_type: type | None = None, + out_columns_name: str | None = None, ) -> pd.DataFrame: """ Load a CSV holding timeseries @@ -42,11 +43,21 @@ def load_timeseries_csv( In future, if not provided, we will try and infer the columns based on whether they look like time columns or not. - out_column_type + out_columns_type The type to apply to the output columns that are not part of the index. If not supplied, the raw type returned by pandas is returned. + out_columns_name + The name for the columns in the output. + + If not supplied, the raw name returned by pandas is returned. + + This can also be set with + [pd.DataFrame.rename_axis][pandas.DataFrame.rename_axis] + but we provide it here for convenience + (and in case you couldn't find this trick for ages, like us). + Returns ------- : @@ -62,7 +73,10 @@ def load_timeseries_csv( out = out.set_index(index_columns) - if out_column_type is not None: - out.columns = out.columns.astype(out_column_type) + if out_columns_type is not None: + out.columns = out.columns.astype(out_columns_type) + + if out_columns_name is not None: + out = out.rename_axis(out_columns_name, axis="columns") return out diff --git a/tests/integration/database/test_integration_database_make_move_plan.py b/tests/integration/database/test_integration_database_make_move_plan.py index 3f230ab..82ae8d3 100644 --- a/tests/integration/database/test_integration_database_make_move_plan.py +++ b/tests/integration/database/test_integration_database_make_move_plan.py @@ -35,7 +35,10 @@ def test_make_move_plan_no_overwrite(tmpdir): columns=["scenario", "variable", "unit", "file_id"], ).set_index(["scenario", "variable", "unit"]) file_map_start = pd.Series( - [db.get_new_data_file_path(fid) for fid in index_start["file_id"].unique()], + [ + db.get_new_data_file_path(fid).rel_db + for fid in index_start["file_id"].unique() + ], index=pd.Index(index_start["file_id"].unique(), name="file_id"), ) @@ -67,6 +70,7 @@ def test_make_move_plan_no_overwrite(tmpdir): file_map_start=file_map_start, data_to_write=data_to_write, get_new_data_file_path=db.get_new_data_file_path, + db_dir=db.db_dir, ) assert_move_plan_equal(res, exp) @@ -90,7 +94,10 @@ def test_make_move_plan_full_overwrite(tmpdir): columns=["scenario", "variable", "unit", "file_id"], ).set_index(["scenario", "variable", "unit"]) file_map_start = pd.Series( - [db.get_new_data_file_path(fid) for fid in index_start["file_id"].unique()], + [ + db.get_new_data_file_path(fid).rel_db + for fid in index_start["file_id"].unique() + ], index=pd.Index(index_start["file_id"].unique(), name="file_id"), ) @@ -110,7 +117,7 @@ def test_make_move_plan_full_overwrite(tmpdir): exp_moved_file_ids = [0] # 1 will be overwritten i.e. schedule to delete exp_moved_file_map = pd.Series( - [db.get_new_data_file_path(file_id) for file_id in exp_moved_file_ids], + [db.get_new_data_file_path(file_id).rel_db for file_id in exp_moved_file_ids], index=pd.Index(exp_moved_file_ids, name="file_id"), ) @@ -130,7 +137,7 @@ def test_make_move_plan_full_overwrite(tmpdir): moved_index=exp_moved_index, moved_file_map=exp_moved_file_map, rewrite_actions=None, - delete_paths=(file_map_start.loc[1],), + delete_paths=(db.db_dir / file_map_start.loc[1],), ) res = make_move_plan( @@ -138,6 +145,7 @@ def test_make_move_plan_full_overwrite(tmpdir): file_map_start=file_map_start, data_to_write=data_to_write, get_new_data_file_path=db.get_new_data_file_path, + db_dir=db.db_dir, ) assert_move_plan_equal(res, exp) @@ -163,7 +171,10 @@ def test_make_move_plan_partial_overwrite(tmpdir): columns=["scenario", "variable", "unit", "file_id"], ).set_index(["scenario", "variable", "unit"]) file_map_start = pd.Series( - [db.get_new_data_file_path(fid) for fid in index_start["file_id"].unique()], + [ + db.get_new_data_file_path(fid).rel_db + for fid in index_start["file_id"].unique() + ], index=pd.Index(index_start["file_id"].unique(), name="file_id"), ) @@ -192,7 +203,7 @@ def test_make_move_plan_partial_overwrite(tmpdir): exp_moved_file_ids = [0, 3] # 1 deleted, 2 re-written then deleted exp_moved_file_map = pd.Series( - [db.get_new_data_file_path(file_id) for file_id in exp_moved_file_ids], + [db.get_new_data_file_path(file_id).rel_db for file_id in exp_moved_file_ids], index=pd.Index(exp_moved_file_ids, name="file_id"), ) @@ -217,7 +228,7 @@ def test_make_move_plan_partial_overwrite(tmpdir): moved_file_map=exp_moved_file_map, rewrite_actions=( ReWriteAction( - from_file=file_map_start.loc[2], + from_file=db.db_dir / file_map_start.loc[2], locator=pd.MultiIndex.from_frame( pd.DataFrame( [ @@ -226,10 +237,12 @@ def test_make_move_plan_partial_overwrite(tmpdir): columns=["scenario", "variable", "unit"], ) ), - to_file=exp_moved_file_map.loc[3], + to_file=db.db_dir / exp_moved_file_map.loc[3], ), ), - delete_paths=(file_map_start.loc[1], file_map_start.loc[2]), + delete_paths=( + db.db_dir / v for v in (file_map_start.loc[1], file_map_start.loc[2]) + ), ) res = make_move_plan( @@ -237,6 +250,7 @@ def test_make_move_plan_partial_overwrite(tmpdir): file_map_start=file_map_start, data_to_write=data_to_write, get_new_data_file_path=db.get_new_data_file_path, + db_dir=db.db_dir, ) assert_move_plan_equal(res, exp) diff --git a/tests/integration/database/test_integration_database_portability.py b/tests/integration/database/test_integration_database_portability.py new file mode 100644 index 0000000..e0537f8 --- /dev/null +++ b/tests/integration/database/test_integration_database_portability.py @@ -0,0 +1,105 @@ +""" +Tests of moving the database +""" + +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from pandas_openscm.db import ( + CSVDataBackend, + CSVIndexBackend, + FeatherDataBackend, + FeatherIndexBackend, + OpenSCMDB, + netCDFDataBackend, + netCDFIndexBackend, +) +from pandas_openscm.testing import assert_frame_alike + +pytest.importorskip("filelock") + + +@pytest.mark.parametrize( + "backend_data, backend_index", + ( + pytest.param( + FeatherDataBackend(), + FeatherIndexBackend(), + id="feather", + ), + pytest.param( + netCDFDataBackend(), + netCDFIndexBackend(), + id="netCDF", + ), + pytest.param( + CSVDataBackend(), + CSVIndexBackend(), + id="csv", + ), + ), +) +@pytest.mark.parametrize("provide_backend_data_to_class_method", (True, False)) +@pytest.mark.parametrize("provide_backend_index_to_class_method", (True, False)) +def test_move_db( # noqa: PLR0913 + provide_backend_index_to_class_method, + provide_backend_data_to_class_method, + backend_data, + backend_index, + tmpdir, + setup_pandas_accessor, +): + initial_db_dir = Path(tmpdir) / "initial" + other_db_dir = Path(tmpdir) / "other" + tar_archive = Path(tmpdir) / "tar_archive.tar.gz" + + db = OpenSCMDB( + db_dir=initial_db_dir, + backend_data=backend_data, + backend_index=backend_index, + ) + + df_timeseries_like = pd.DataFrame( + np.arange(12).reshape(4, 3), + columns=[2010, 2015, 2025], + index=pd.MultiIndex.from_tuples( + [ + ("scenario_a", "climate_model_a", "Temperature", "K"), + ("scenario_b", "climate_model_a", "Temperature", "K"), + ("scenario_b", "climate_model_b", "Temperature", "K"), + ("scenario_b", "climate_model_b", "Ocean Heat Uptake", "J"), + ], + names=["scenario", "climate_model", "variable", "unit"], + ), + ) + + db.save(df_timeseries_like, groupby=["scenario", "variable"]) + + # Create a tar archive (returns the archive path, even though it's also an input) + tar_archive = db.to_gzipped_tar_archive(tar_archive) + + # Expand elsewhere + from_gzipped_tar_archive_kwargs = {} + if provide_backend_data_to_class_method: + from_gzipped_tar_archive_kwargs["backend_data"] = backend_data + + if provide_backend_index_to_class_method: + from_gzipped_tar_archive_kwargs["backend_index"] = backend_index + + db_other = OpenSCMDB.from_gzipped_tar_archive( + tar_archive, db_dir=other_db_dir, **from_gzipped_tar_archive_kwargs + ) + + # Delete the original + db.delete() + + assert_frame_alike(df_timeseries_like, db_other.load(out_columns_type=int)) + + locator = pd.Index(["scenario_b"], name="scenario") + assert_frame_alike( + df_timeseries_like.openscm.mi_loc(locator), + db_other.load(locator, out_columns_type=int), + ) diff --git a/tests/integration/database/test_integration_database_save_load.py b/tests/integration/database/test_integration_database_save_load.py index d98034c..fb45dd1 100644 --- a/tests/integration/database/test_integration_database_save_load.py +++ b/tests/integration/database/test_integration_database_save_load.py @@ -43,7 +43,7 @@ def test_save_and_load_basic(tmpdir, db_data_backend, db_index_backend): df_timeseries_like = pd.DataFrame( np.arange(12).reshape(4, 3), - columns=[2010, 2015, 2025], + columns=pd.Index([2010, 2015, 2025], name="year"), index=pd.MultiIndex.from_tuples( [ ("scenario_a", "climate_model_a", "Temperature", "K"), @@ -70,7 +70,10 @@ def test_save_and_load_basic(tmpdir, db_data_backend, db_index_backend): df_timeseries_like.index, metadata_compare, exact="equiv", check_order=False ) - loaded = db.load(out_columns_type=df_timeseries_like.columns.dtype) + loaded = db.load( + out_columns_type=df_timeseries_like.columns.dtype, + out_columns_name=df_timeseries_like.columns.name, + ) assert_frame_alike(df_timeseries_like, loaded) diff --git a/tests/integration/test_io.py b/tests/integration/test_io.py index 3b5d9ca..f45d494 100644 --- a/tests/integration/test_io.py +++ b/tests/integration/test_io.py @@ -85,7 +85,7 @@ def test_load_timeseries_csv_lower_column_names(tmp_path, lower_column_names): @pytest.mark.parametrize( # Column type and value type are not the same # because columns are held as numpy arrays. - "out_column_type, exp_column_value_type", + "out_columns_type, exp_column_value_type", ( (int, np.int64), (float, np.float64), @@ -93,8 +93,8 @@ def test_load_timeseries_csv_lower_column_names(tmp_path, lower_column_names): (np.float32, np.float32), ), ) -def test_load_timeseries_csv_basic_out_column_type( - tmp_path, out_column_type, exp_column_value_type +def test_load_timeseries_csv_basic_out_columns_type( + tmp_path, out_columns_type, exp_column_value_type ): out_path = tmp_path / "test_load_timeseries_csv.csv" @@ -113,13 +113,46 @@ def test_load_timeseries_csv_basic_out_column_type( index_columns = ["variable", "scenario", "run", "unit"] loaded = load_timeseries_csv( - out_path, index_columns=index_columns, out_column_type=out_column_type + out_path, index_columns=index_columns, out_columns_type=out_columns_type ) assert loaded.index.names == index_columns assert all(isinstance(c, exp_column_value_type) for c in loaded.columns.values) +@pytest.mark.parametrize( + "out_columns_name, exp_columns_name", + ( + (None, None), + ("hi", "hi"), + ("time", "time"), + ), +) +def test_load_timeseries_csv_basic_out_columns_name( + tmp_path, out_columns_name, exp_columns_name +): + out_path = tmp_path / "test_load_timeseries_csv.csv" + + timepoints = np.arange(1990.0, 2010.0 + 1.0, dtype=int) + start = create_test_df( + variables=[(f"variable_{i}", "Mt") for i in range(5)], + n_scenarios=3, + n_runs=6, + timepoints=timepoints, + ) + assert start.columns.name is None + + start.to_csv(out_path) + + index_columns = ["variable", "scenario", "run", "unit"] + + loaded = load_timeseries_csv( + out_path, index_columns=index_columns, out_columns_name=out_columns_name + ) + + assert loaded.columns.name == exp_columns_name + + @pytest.mark.xfail(reason="Not implemented") def test_load_timeseries_csv_infer_index_cols(tmp_path): # Suggested cases here: diff --git a/tests/unit/database/test_database.py b/tests/unit/database/test_database.py index e7a699d..709456e 100644 --- a/tests/unit/database/test_database.py +++ b/tests/unit/database/test_database.py @@ -40,10 +40,31 @@ def test_available_backends_data(): def test_unavailable_data_backend(): - with pytest.raises(KeyError): + with pytest.raises( + KeyError, match=re.escape("option='junk' is not supported. Available options:") + ): DATA_BACKENDS.get_instance("junk") +def test_guess_backend_data(): + assert isinstance(DATA_BACKENDS.guess_backend("0.csv"), CSVDataBackend) + assert isinstance(DATA_BACKENDS.guess_backend("0.feather"), FeatherDataBackend) + assert isinstance(DATA_BACKENDS.guess_backend("0.in-mem"), InMemoryDataBackend) + assert isinstance(DATA_BACKENDS.guess_backend("0.nc"), netCDFDataBackend) + + +def test_guess_data_backend_error(): + with pytest.raises( + ValueError, + match=re.escape( + "Could not guess backend from data_file_name='0.junk'. " + "The file's extension does not match any of the available options: " + "known_options_and_extensions=" + ), + ): + DATA_BACKENDS.guess_backend("0.junk") + + def test_available_backends_index(): assert isinstance(INDEX_BACKENDS.get_instance("csv"), CSVIndexBackend) assert isinstance(INDEX_BACKENDS.get_instance("feather"), FeatherIndexBackend) @@ -52,10 +73,31 @@ def test_available_backends_index(): def test_unavailable_index_backend(): - with pytest.raises(KeyError): + with pytest.raises( + KeyError, match=re.escape("option='junk' is not supported. Available options:") + ): INDEX_BACKENDS.get_instance("junk") +def test_guess_backend_index(): + assert isinstance(INDEX_BACKENDS.guess_backend("0.csv"), CSVIndexBackend) + assert isinstance(INDEX_BACKENDS.guess_backend("0.feather"), FeatherIndexBackend) + assert isinstance(INDEX_BACKENDS.guess_backend("0.in-mem"), InMemoryIndexBackend) + assert isinstance(INDEX_BACKENDS.guess_backend("0.nc"), netCDFIndexBackend) + + +def test_guess_index_backend_error(): + with pytest.raises( + ValueError, + match=re.escape( + "Could not guess backend from index_file_name='index.junk'. " + "The file's extension does not match any of the available options: " + "known_options_and_extensions=" + ), + ): + INDEX_BACKENDS.guess_backend("index.junk") + + def test_filelock_not_available_default_initialisation(tmpdir): with patch.dict(sys.modules, {"filelock": None}): with pytest.raises( @@ -88,7 +130,7 @@ def test_get_existing_data_file_path(tmpdir): fp = db.get_new_data_file_path(file_id=10) # Assume the file gets written somewhere - fp.touch() + fp.abs.touch() with pytest.raises(FileExistsError): db.get_new_data_file_path(file_id=10) diff --git a/tests/unit/database/test_path_handling.py b/tests/unit/database/test_path_handling.py new file mode 100644 index 0000000..d1fdf6f --- /dev/null +++ b/tests/unit/database/test_path_handling.py @@ -0,0 +1,39 @@ +""" +Tests of `pandas_openscm.db.path_handling` +""" + +import re +from contextlib import nullcontext as does_not_raise +from pathlib import Path + +import pytest + +from pandas_openscm.db.path_handling import DBPath + + +@pytest.mark.parametrize( + "abs, rel_db, exp", + ( + (Path("/a/b/c/d.csv"), Path("d.csv"), does_not_raise()), + (Path("/a/b/c/d.csv"), Path("c/d.csv"), does_not_raise()), + ( + Path("/a/b/c/d.csv"), + Path("e/d.csv"), + pytest.raises( + AssertionError, + match="".join( + ( + re.escape("rel_db value, "), + ".*Path", + re.escape(r"('e/d.csv'), is not a sub-path of self.abs="), + ".*Path", + re.escape("('/a/b/c/d.csv')"), + ) + ), + ), + ), + ), +) +def test_rel_db_validator(abs, rel_db, exp): + with exp: + DBPath(abs=abs, rel_db=rel_db) diff --git a/uv.lock b/uv.lock index 55794ad..b4aef5b 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,4 @@ version = 1 -revision = 1 requires-python = ">=3.9" resolution-markers = [ "python_full_version >= '3.12'", @@ -2450,7 +2449,7 @@ wheels = [ [[package]] name = "pandas-openscm" -version = "0.4.2a1" +version = "0.4.3a1" source = { editable = "." } dependencies = [ { name = "attrs" }, @@ -2600,7 +2599,7 @@ tests-min = [ [package.metadata] requires-dist = [ { name = "attrs", specifier = ">=24.3.0" }, - { name = "filelock", marker = "extra == 'db'", specifier = ">=3.0.0" }, + { name = "filelock", marker = "extra == 'db'", specifier = ">=3.12.3" }, { name = "matplotlib", marker = "extra == 'plots'", specifier = ">=3.7.1" }, { name = "netcdf4", marker = "extra == 'db-full'", specifier = ">=1.7.2" }, { name = "numpy", specifier = ">=1.25.0" }, @@ -2613,7 +2612,6 @@ requires-dist = [ { name = "tqdm", marker = "extra == 'progress'", specifier = ">=4.0.0" }, { name = "xarray", marker = "extra == 'db-full'", specifier = ">=2024.7.0" }, ] -provides-extras = ["db", "db-full", "plots", "progress", "full"] [package.metadata.requires-dev] all-dev = [