diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d9bd825..36ae168 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -16,10 +16,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Set up Python 3.8 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.9 - name: Lint run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f8a11a7..ed40386 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -72,3 +72,10 @@ repos: args: - --convention=numpy - --add-select=D417 + +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.982 + hooks: + - id: mypy + additional_dependencies: [types-all] + exclude: conf.py$ diff --git a/news/66.misc b/news/66.misc new file mode 100644 index 0000000..94d9460 --- /dev/null +++ b/news/66.misc @@ -0,0 +1 @@ +Added type hints to the entire code base and a *mypy* hook to the *pre-commit* linters to identify potential typing issues. diff --git a/pyproject.toml b/pyproject.toml index 0219c16..2c34231 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,6 +111,9 @@ force_grid_wrap = 0 combine_as_imports = true line_length = 88 +[tool.mypy] +ignore_missing_imports = true + [tool.towncrier] directory = "news" package = "sequence" diff --git a/sequence/_grid.py b/sequence/_grid.py index aecd9a3..c944173 100644 --- a/sequence/_grid.py +++ b/sequence/_grid.py @@ -1,12 +1,13 @@ """Define the grid used for creating *Sequence* models.""" import os +import sys import numpy as np from landlab import RasterModelGrid -try: +if sys.version_info >= (3, 11): import tomllib -except ModuleNotFoundError: +else: import tomli as tomllib @@ -41,7 +42,7 @@ def __init__(self, n_cols: int, spacing: float = 100.0): self.at_grid["sea_level__elevation"] = 0.0 @classmethod - def from_toml(cls, filepath: os.PathLike[str]): + def from_toml(cls, filepath: os.PathLike[str]) -> "SequenceModelGrid": """Load a :class:`~SequenceModelGrid` from a *toml*-formatted file. Parameters @@ -53,7 +54,7 @@ def from_toml(cls, filepath: os.PathLike[str]): return SequenceModelGrid.from_dict(tomllib.load(fp)["sequence"]["grid"]) @classmethod - def from_dict(cls, params: dict): + def from_dict(cls, params: dict) -> "SequenceModelGrid": """Create a :class:`~SequenceModelGrid` from a `dict`. If possible, this alternate constructor simply passes diff --git a/sequence/bathymetry.py b/sequence/bathymetry.py index ac84206..2eb7ae6 100644 --- a/sequence/bathymetry.py +++ b/sequence/bathymetry.py @@ -3,10 +3,16 @@ This module contains *Landlab* components to read bathymetry into a `SequenceModelGrid`. """ +from os import PathLike +from typing import Union + import numpy as np from landlab import Component +from numpy.typing import NDArray from scipy import interpolate +from ._grid import SequenceModelGrid + class BathymetryReader(Component): """Landlab component that reads bathymetry from a file.""" @@ -26,7 +32,12 @@ class BathymetryReader(Component): } } - def __init__(self, grid, filepath=None, kind="linear", **kwds): + def __init__( + self, + grid: SequenceModelGrid, + filepath: Union[str, PathLike[str]], + kind: str = "linear", + ): """Generate a bathymetric profile from a file. Parameters @@ -40,7 +51,7 @@ def __init__(self, grid, filepath=None, kind="linear", **kwds): 'nearest', 'zero', 'slinear', 'quadratic', 'cubic'). Default is 'linear'. """ - super().__init__(grid, **kwds) + super().__init__(grid) data = np.loadtxt(filepath, delimiter=",", comments="#") self._bathymetry = interpolate.interp1d( @@ -56,18 +67,18 @@ def __init__(self, grid, filepath=None, kind="linear", **kwds): self.grid.add_zeros("topographic__elevation", at="node") @property - def x(self): + def x(self) -> NDArray[np.floating]: """Return the x-coordinates of the grid.""" return self.grid.x_of_node[self.grid.nodes_at_bottom_edge] @property - def z(self): + def z(self) -> NDArray[np.floating]: """Return the elevations along the grid.""" return self.grid.at_node["topographic__elevation"][ self.grid.nodes_at_bottom_edge ] - def run_one_step(self, dt=None): + def run_one_step(self, dt: float = None) -> None: """Update the grid's bathymetry. Parameters @@ -81,14 +92,14 @@ def run_one_step(self, dt=None): def _create_initial_profile( - x, - sl_plain=0.0008, - init_shore=19750.0, - hgt=15.0, - alpha=1 / 2000.0, - sl_sh=0.001, - wavebase=60.0, -): + x: NDArray[np.floating], + sl_plain: float = 0.0008, + init_shore: float = 19750.0, + hgt: float = 15.0, + alpha: float = 1 / 2000.0, + sl_sh: float = 0.001, + wavebase: float = 60.0, +) -> NDArray[np.floating]: # check shoreline is in array, else put in center of array if x[-1] < init_shore: diff --git a/sequence/cli.py b/sequence/cli.py index e12b183..34dbb58 100644 --- a/sequence/cli.py +++ b/sequence/cli.py @@ -3,13 +3,16 @@ import pathlib import re from io import StringIO -from typing import Any, Optional +from os import PathLike +from typing import Any, Iterable, Iterator, Optional, Union import numpy as np import rich_click as click import tomlkit as toml import yaml from landlab.core import load_params +from numpy.typing import ArrayLike +from tqdm import tqdm from .errors import MissingRequiredVariable from .input_reader import TimeVaryingConfig @@ -58,12 +61,12 @@ def err(message: Optional[str] = None, nl: bool = True, **styles: Any) -> None: _err(message, nl=nl, **styles) -def _contents_of_input_file(infile, set): +def _contents_of_input_file(infile: Union[str, PathLike[str]], set: str) -> str: params = _load_model_params( defaults=SequenceModel.DEFAULT_PARAMS, dotted_params=set ) - def as_csv(data, header=None): + def as_csv(data: ArrayLike, header: str = "") -> str: with StringIO() as fp: np.savetxt(fp, data, header=header, delimiter=",", fmt="%.1f") contents = fp.getvalue() @@ -82,10 +85,16 @@ def as_csv(data, header=None): [[0.0, 20.0], [100000.0, -80.0]], header="X [m], Elevation [m]" ), "sealevel.csv": as_csv( - [[0.0, 0.0], [200000, -10]], header="Time [y], Sea-Level Elevation [m]" + [[0.0, 0.0], [200000.0, -10.0]], header="Time [y], Sea-Level Elevation [m]" ), "subsidence.csv": as_csv( - [[0.0, 0], [30000.0, 0], [35000.0, 0], [50000.0, 0], [100000.0, 0]], + [ + [0.0, 0.0], + [30000.0, 0.0], + [35000.0, 0.0], + [50000.0, 0.0], + [100000.0, 0.0], + ], header="X [x], Subsidence Rate [m / y]", ), } @@ -94,10 +103,10 @@ def as_csv(data, header=None): section_params, default_flow_style=False ) - return contents[infile] + return contents[str(infile)] -def _time_from_filename(name): +def _time_from_filename(name: Union[str, PathLike[str]]) -> Union[int, None]: """Parse a time stamp from a file name. Parameters @@ -128,7 +137,9 @@ def _time_from_filename(name): return None -def _find_config_files(pathname): +def _find_config_files( + pathname: Union[str, PathLike[str]] +) -> tuple[list[int], list[str]]: """Find all of the time-varying config files for a simulation. Parameters @@ -146,30 +157,21 @@ def _find_config_files(pathname): toml_files = list(pathname.glob("sequence*.toml")) yaml_files = list(pathname.glob("sequence*.yaml")) - config_files = toml_files if toml_files else yaml_files + config_files = sorted(toml_files if toml_files else yaml_files) - items = [] + times: list[int] = [] + names: list[str] = [] for index, config_file in enumerate(config_files): time = _time_from_filename(config_file) if time is None: time = index - items.append((time, str(config_file))) - - return zip(*sorted(items)) - - -class _silent_progressbar: - def __init__(self, **kwds): - pass + times.append(time) + names.append(str(config_file)) - def __enter__(self): - return self + names = [name for _, name in sorted(zip(times, names))] + times.sort() - def __exit__(self, exc_type, exc_value, exc_traceback): - pass - - def update(self, inc): - pass + return times, names @click.group(chain=True) @@ -189,7 +191,7 @@ def update(self, inc): @click.option( "-v", "--verbose", is_flag=True, help="Also emit status messages to stderr." ) -def sequence(cd, silent, verbose) -> None: +def sequence(cd: str, silent: bool, verbose: bool) -> None: """# Sequence. Sequence is a modular 2D (i.e., profile) sequence stratigraphic model @@ -208,7 +210,7 @@ def sequence(cd, silent, verbose) -> None: "--with-citations", is_flag=True, help="print citations for components used" ) @click.pass_context -def run(ctx, with_citations, dry_run): +def run(ctx: Any, with_citations: bool, dry_run: bool) -> None: """Run a simulation. ## Examples @@ -271,12 +273,14 @@ def run(ctx, with_citations, dry_run): out("πŸ‘†πŸ‘†πŸ‘†These are the citations to useπŸ‘†πŸ‘†πŸ‘†") if not dry_run: - progressbar = _silent_progressbar if silent else click.progressbar + progressbar = tqdm( + total=int(model.clock.stop // model.clock.step), + desc=" ".join(["πŸš€", str(run_dir)]), + disable=True if silent else None, + ) + try: - with progressbar( - length=int(model.clock.stop // model.clock.step), - label=" ".join(["πŸš€", str(run_dir)]), - ) as bar: + with progressbar as bar: while 1: model.run_one_step() model.set_params(params.update(1)) @@ -311,14 +315,14 @@ def run(ctx, with_citations, dry_run): ), ) @click.option("--set", metavar="KEY=VALUE", multiple=True, help="Set model parameters") -def generate(infile, set): +def generate(infile: str, set: str) -> None: """Generate example input files.""" print(_contents_of_input_file(infile, set)) @sequence.command() @click.option("--set", multiple=True, help="Set model parameters") -def setup(set): +def setup(set: str) -> None: """Create a folder of input files for a simulation.""" # folder = pathlib.Path(destination) folder = pathlib.Path.cwd() @@ -354,7 +358,7 @@ def setup(set): @click.option( "-v", "--verbose", is_flag=True, help="Also emit status messages to stderr." ) -def plot(set, verbose): +def plot(set: str, verbose: bool) -> None: """Plot a Sequence output file.""" folder = pathlib.Path.cwd() @@ -380,7 +384,7 @@ def plot(set, verbose): raise click.Abort() -def _load_params_from_strings(values): +def _load_params_from_strings(values: Iterable[str]) -> dict[str, Any]: params = {} for param in values: dotted_name, value = param.split("=") @@ -389,8 +393,8 @@ def _load_params_from_strings(values): return params -def _dots_to_dict(name, value): - base = {} +def _dots_to_dict(name: str, value: Any) -> dict[str, Any]: + base: dict[str, Any] = {} level = base names = name.split(".") for k in names[:-1]: @@ -400,21 +404,23 @@ def _dots_to_dict(name, value): return base -def _dict_to_dots(d): - dots = [] +def _dict_to_dots(d: dict) -> list[str]: + dots: list[str] = [] for names in _walk_dict(d): dots.append(".".join(names[:-1]) + "=" + str(names[-1])) return dots -def _load_model_params(param_file=None, defaults=None, dotted_params=()): +def _load_model_params( + param_file: Optional[str] = None, + defaults: Optional[dict] = None, + dotted_params: Iterable[str] = (), +) -> dict[str, Any]: params = defaults or {} if param_file: params_from_file = load_params(param_file) - dotted_params = _dict_to_dots(params_from_file) + dotted_params - # for group in params.keys(): - # params[group].update(params_from_file.get(group, {})) + dotted_params = _dict_to_dots(params_from_file) + list(dotted_params) params_from_cl = _load_params_from_strings(dotted_params) for group in params.keys(): @@ -423,7 +429,7 @@ def _load_model_params(param_file=None, defaults=None, dotted_params=()): return params -def _walk_dict(indict, prev=None): +def _walk_dict(indict: Union[dict, Any], prev: Optional[list] = None) -> Iterator[Any]: prev = prev[:] if prev else [] if isinstance(indict, dict): @@ -432,9 +438,6 @@ def _walk_dict(indict, prev=None): yield from _walk_dict(value, [key] + prev) elif isinstance(value, list) or isinstance(value, tuple): yield prev + [key, value] - # for v in value: - # for d in _walk_dict(v, [key] + prev): - # yield d else: yield prev + [key, value] else: diff --git a/sequence/errors.py b/sequence/errors.py index 8570a08..cebf550 100644 --- a/sequence/errors.py +++ b/sequence/errors.py @@ -10,10 +10,10 @@ class SequenceError(Exception): class ShorelineError(SequenceError): """Raise this exception if there was an error locating the shoreline.""" - def __init__(self, msg): + def __init__(self, msg: str): self._msg = str(msg) - def __str__(self): + def __str__(self) -> str: """Return an error message in human-readable form.""" return self._msg @@ -21,10 +21,10 @@ def __str__(self): class ShelfEdgeError(SequenceError): """Raise this exception is there was an errors locating the shelf edge.""" - def __init__(self, msg): + def __init__(self, msg: str): self._msg = str(msg) - def __str__(self): + def __str__(self) -> str: """Return an error message in human-readable form.""" return self._msg @@ -32,9 +32,9 @@ def __str__(self): class MissingRequiredVariable(SequenceError): """Raise this exception if a required variable is missing from an output file.""" - def __init__(self, name): + def __init__(self, name: str): self._name = name - def __str_(self): + def __str_(self) -> str: """Return an error message in human-readable form, including the name of the missing variable.""" return f"{self._name!r}" diff --git a/sequence/fluvial.py b/sequence/fluvial.py index af507c3..d258359 100644 --- a/sequence/fluvial.py +++ b/sequence/fluvial.py @@ -6,6 +6,7 @@ import numpy as np from landlab import Component +from ._grid import SequenceModelGrid from .shoreline import find_shoreline @@ -45,15 +46,14 @@ class Fluvial(Component): def __init__( self, - grid, - sand_frac=0.5, - wave_base=60.0, - sediment_load=3.0, - sand_density=2650.0, - plain_slope=0.0008, - hemipelagic=0.0, - sea_level=0.0, - **kwds + grid: SequenceModelGrid, + sand_frac: float = 0.5, + wave_base: float = 60.0, + sediment_load: float = 3.0, + sand_density: float = 2650.0, + plain_slope: float = 0.0008, + hemipelagic: float = 0.0, + sea_level: float = 0.0, ): """Generate percent sand/mud for fluvial section. @@ -78,7 +78,7 @@ def __init__( sea_level: float, optional The current sea level (m). """ - super().__init__(grid, **kwds) + super().__init__(grid) # fixed parameters self.sand_grain = 0.001 # grain size = 1 mm @@ -106,7 +106,7 @@ def __init__( if "bedrock_surface__increment_of_elevation" not in grid.at_node: grid.add_zeros("bedrock_surface__increment_of_elevation", at="node") - def run_one_step(self, dt): + def run_one_step(self, dt: float) -> None: """Update the component one time step. Parameters diff --git a/sequence/input_reader.py b/sequence/input_reader.py index 20bc718..ca8945c 100644 --- a/sequence/input_reader.py +++ b/sequence/input_reader.py @@ -5,13 +5,17 @@ import inspect import pathlib import warnings +from os import PathLike +from typing import Any, Callable, Iterable, Optional, Sequence, TextIO, Tuple, Union import numpy as np import tomlkit as toml import yaml -def load_config(stream, fmt=None): +def load_config( + stream: Union[TextIO, str, PathLike[str]], fmt: Optional[str] = None +) -> Union[Iterable[Tuple[float, dict]], dict]: """Load model configuration from a file-like object. Parameters @@ -26,12 +30,15 @@ def load_config(stream, fmt=None): TimeVaryingConfig The, possibly time-varying, configuration. """ - if fmt is None and isinstance(stream, (str, pathlib.Path)): - fmt = pathlib.Path(stream).suffix[1:] + if fmt is None: + if isinstance(stream, (str, pathlib.Path)): + fmt = pathlib.Path(stream).suffix[1:] + else: + raise ValueError("unable to determine format") loader = TimeVaryingConfig.get_loader(fmt) - if isinstance(stream, (str, pathlib.Path)): + if isinstance(stream, (str, PathLike)): with open(stream) as fp: times_and_params = loader(fp) else: @@ -46,7 +53,7 @@ def load_config(stream, fmt=None): class TimeVaryingConfig: """A configuration dictionary that is able to change with time.""" - def __init__(self, times, dicts): + def __init__(self, times: Iterable[float], dicts: Iterable[dict]): """Create a time-varying configuration. Parameters @@ -78,15 +85,15 @@ def __init__(self, times, dicts): self._current = self._dicts[0] - def items(self): + def items(self) -> Iterable[tuple[float, dict]]: """Return the items of the current configuration.""" return self._current.items() - def as_dict(self): + def as_dict(self) -> dict: """Represent the current configuration as a `dict`.""" return _expand_dict(self._current) - def __call__(self, time): + def __call__(self, time: float) -> dict: """Return the configuration at a given time. Parameters @@ -104,10 +111,10 @@ def __call__(self, time): d.update(next_dict) return d - def _bisect_times(self, time): - return np.searchsorted(self._times, time, side="right") + def _bisect_times(self, time: float) -> int: + return int(np.searchsorted(self._times, time, side="right")) - def diff(self, start, stop): + def diff(self, start: float, stop: float) -> dict: """Return the difference between two different configurations. Parameters @@ -122,10 +129,13 @@ def diff(self, start, stop): dict The key/values that have changed between the two configurations. """ - start, stop = self(start), self(stop) - return {k: stop[k] for k, _ in set(stop.items()) - set(start.items())} + start_params, stop_params = self(start), self(stop) + return { + k: stop_params[k] + for k, _ in set(stop_params.items()) - set(start_params.items()) + } - def update(self, inc): + def update(self, inc: int) -> dict: """Update the configurations by a time step. Parameters @@ -140,7 +150,10 @@ def update(self, inc): new configuration. """ next_time = self._time + inc - prev, next_ = self._bisect_times([self._time, next_time]) + + prev = self._bisect_times(self._time) + next_ = self._bisect_times(next_time) + if next_ > prev: self._current = self(next_time) diff = _expand_dict(self.diff(self._time, next_time)) @@ -150,7 +163,7 @@ def update(self, inc): self._time = next_time return diff - def dump(self, fmt="toml"): + def dump(self, fmt: str = "toml") -> str: """Write the current configurations to a string. Parameters @@ -177,7 +190,11 @@ def dump(self, fmt="toml"): raise ValueError(f"unrecognized format: {fmt}") @classmethod - def from_files(cls, names, times=None): + def from_files( + cls, + names: Iterable[Union[str, PathLike[str]]], + times: Optional[Iterable[float]] = None, + ) -> "TimeVaryingConfig": """Load a configuration from a set of files. Parameters @@ -197,7 +214,9 @@ def from_files(cls, names, times=None): return cls(times, dicts) @classmethod - def from_file(cls, name, fmt=None): + def from_file( + cls, name: PathLike, fmt: Optional[str] = None + ) -> "TimeVaryingConfig": """Load a configuration from a file. Parameters @@ -220,7 +239,7 @@ def from_file(cls, name, fmt=None): return cls(*zip(*times_and_params)) @staticmethod - def load_yaml(stream): + def load_yaml(stream: TextIO) -> list[tuple[float, dict]]: """Load a configuration in *yaml*-format. Parameters @@ -243,7 +262,7 @@ def load_yaml(stream): return [(d.pop("_time", idx), d) for idx, d in enumerate(params)] @staticmethod - def load_toml(stream): + def load_toml(stream: TextIO) -> list[tuple[float, dict]]: """Load a configuration in *toml*-format. Parameters @@ -257,7 +276,7 @@ def load_toml(stream): The configurations and their associated times. """ - def _tomlkit_to_popo(d): + def _tomlkit_to_popo(d: dict) -> Any: try: result = getattr(d, "value") except AttributeError: @@ -296,7 +315,7 @@ def _tomlkit_to_popo(d): return [(d.pop("_time", idx), d) for idx, d in enumerate(params)] @staticmethod - def get_loader(fmt): + def get_loader(fmt: str) -> Callable[[TextIO], list[tuple[float, dict]]]: """Get a configuration loader for a given format. Parameters @@ -321,7 +340,7 @@ def get_loader(fmt): raise ValueError(f"unrecognized format: {fmt!r} (not on of {fmts!r})") @staticmethod - def get_supported_formats(): + def get_supported_formats() -> list[str]: """Return a list of supported configuration formats. Returns @@ -336,7 +355,7 @@ def get_supported_formats(): ] -def _flatten_dict(d, sep=None): +def _flatten_dict(d: dict, sep: Optional[str] = None) -> dict: """Flatten a dictionary so that each value has it's own key. Parameters @@ -369,7 +388,7 @@ def _flatten_dict(d, sep=None): return {sep.join(keys): value for keys, value in _walk_dict(d)} -def _add_flattened_item(keys, value, base=None): +def _add_flattened_item(keys: str, value: Any, base: Optional[dict] = None) -> None: expanded = {} if base is None else base parent, name = keys[:-1], keys[-1] @@ -382,17 +401,17 @@ def _add_flattened_item(keys, value, base=None): value = list(value) level[name] = value - return base - -def _expand_dict(flat_dict): - expanded = {} +def _expand_dict(flat_dict: dict[str, Any]) -> dict[str, Any]: + expanded: dict[str, Any] = {} for key, value in flat_dict.items(): _add_flattened_item(key, value, base=expanded) return expanded -def _walk_dict(indict, prev=None): +def _walk_dict( + indict: dict[str, Any], prev: Optional[Sequence[str]] = None +) -> Iterable: """Walk the elements of a dictionary. Parameters @@ -418,15 +437,18 @@ def _walk_dict(indict, prev=None): >>> sorted(_walk_dict({"foo": {"bar": {"baz": 0, "foo": "bar"}}, "bar": 1})) [(('bar',), 1), (('foo', 'bar', 'baz'), 0), (('foo', 'bar', 'foo'), 'bar')] """ - prev = tuple(prev) if prev else () + if prev is not None: + prev_dicts = list(prev) + else: + prev_dicts = [] if isinstance(indict, dict): for key, value in indict.items(): if isinstance(value, dict): - yield from _walk_dict(value, prev + (key,)) + yield from _walk_dict(value, prev_dicts + [key]) elif isinstance(value, list) or isinstance(value, tuple): - yield prev + (key,), value + yield tuple(prev_dicts + [key]), value else: - yield prev + (key,), value + yield tuple(prev_dicts + [key]), value else: yield indict diff --git a/sequence/netcdf.py b/sequence/netcdf.py index 40aa3b4..ca01fe2 100644 --- a/sequence/netcdf.py +++ b/sequence/netcdf.py @@ -2,8 +2,13 @@ import os import warnings from collections import defaultdict +from os import PathLike +from typing import Any, Iterable, Optional, Sequence, Union import netCDF4 as nc +from numpy.typing import NDArray + +from ._grid import SequenceModelGrid _NUMPY_TO_NETCDF_TYPE = { "float32": "f4", @@ -20,20 +25,26 @@ } -def _netcdf_var_name(name, at): +def _netcdf_var_name(name: str, at: str) -> str: """Get the name a field will be stored as in a netCDF file.""" return f"at_{at}:{name}" -def _netcdf_type(arr): +def _netcdf_type(arr: NDArray) -> str: """Get the netCDF type string for a numpy array.""" return _NUMPY_TO_NETCDF_TYPE[str(arr.dtype)] -def _create_grid_dimension(root, grid, at="node", ids=None): +def _create_grid_dimension( + root: Any, + grid: SequenceModelGrid, + at: str = "node", + ids: Optional[Union[slice, Iterable[int]]] = None, +) -> Any: """Create grid dimensions for a netcdf file.""" if ids is None: - ids = Ellipsis + # ids = Ellipsis + ids = slice(None) if at not in ("node", "link", "patch", "corner", "face", "cell", "grid"): raise ValueError(f"unknown grid location {at}") @@ -48,7 +59,12 @@ def _create_grid_dimension(root, grid, at="node", ids=None): return root -def _create_grid_coordinates(root, grid, at="node", ids=None): +def _create_grid_coordinates( + root: Any, + grid: SequenceModelGrid, + at: str = "node", + ids: Optional[Union[slice, Iterable[int]]] = None, +) -> Any: """Create x and y coordinates for a field location.""" _create_grid_dimension(root, grid, at=at, ids=ids) @@ -64,10 +80,16 @@ def _create_grid_coordinates(root, grid, at="node", ids=None): return root -def _set_grid_coordinates(root, grid, at="node", ids=None): +def _set_grid_coordinates( + root: Any, + grid: SequenceModelGrid, + at: str = "node", + ids: Optional[Union[slice, Iterable[int]]] = None, +) -> None: """Set the values for the coordinates of a field location.""" if ids is None: - ids = Ellipsis + ids = slice(None) + # ids = Ellipsis _create_grid_coordinates(root, grid, at=at, ids=ids) if at == "grid": @@ -78,12 +100,19 @@ def _set_grid_coordinates(root, grid, at="node", ids=None): root.variables[f"y_of_{at}"][:] = coords[ids, 1] -def _create_field(root, grid, at="node", names=None): +def _create_field( + root: Any, + grid: SequenceModelGrid, + at: str = "node", + names: Optional[Iterable[str]] = None, +) -> None: """Create variables at a field location(s).""" - names = names or grid[at] - dimensions = (at,) + if names is None: + names = grid[at] + + dimensions = [at] if "time" in root.dimensions: - dimensions = ("time",) + dimensions + dimensions = ["time"] + dimensions for name in names: netcdf_name = _netcdf_var_name(name, at) @@ -91,7 +120,13 @@ def _create_field(root, grid, at="node", names=None): root.createVariable(netcdf_name, _netcdf_type(grid[at][name]), dimensions) -def _set_field(root, grid, at="node", ids=None, names=None): +def _set_field( + root: Any, + grid: SequenceModelGrid, + at: str = "node", + ids: Optional[Union[slice, Iterable[int]]] = None, + names: Optional[Union[str, Iterable[str]]] = None, +) -> None: """Set values for variables at a field location(s).""" if isinstance(names, str): names = [names] @@ -108,16 +143,23 @@ def _set_field(root, grid, at="node", ids=None, names=None): if "time" in root.dimensions: n_times = len(root.dimensions["time"]) for name in names: - root.variables[_netcdf_var_name(name, at)][n_times - 1, :] = grid[at][name][ - ids - ] + if grid[at][name].ndim > 0: + values = grid[at][name][ids] + else: + values = grid[at][name] + root.variables[_netcdf_var_name(name, at)][n_times - 1, :] = values + else: for name in names: root.variables[_netcdf_var_name(name, at)][:] = grid[at][name][ids] -def _create_layers(root, grid, names=None): +def _create_layers( + root: Any, grid: SequenceModelGrid, names: Optional[Iterable[str]] = None +) -> None: """Create variables at grid layers.""" + if names is None: + names = [] for name in names: netcdf_name = _netcdf_var_name(name, "layer") if netcdf_name not in root.variables: @@ -130,7 +172,9 @@ def _create_layers(root, grid, names=None): root.createVariable(netcdf_name, "f8", ("layer", "cell")) -def _set_layers(root, grid, names=None): +def _set_layers( + root: Any, grid: SequenceModelGrid, names: Optional[Iterable[str]] = None +) -> None: """Set values for variables at a grid layers.""" if isinstance(names, str): names = [names] @@ -152,16 +196,18 @@ def _set_layers(root, grid, names=None): def to_netcdf( - grid, - filepath, - mode="w", - format="NETCDF4", - time=0.0, - at=None, - ids=None, - names=None, - with_layers=True, -): + grid: SequenceModelGrid, + filepath: Union[str, PathLike[str]], + mode: str = "w", + format: str = "NETCDF4", + time: float = 0.0, + at: Optional[Union[str, Sequence[str]]] = None, + ids: Optional[Union[dict[str, Iterable[int]], int, Iterable[int], slice]] = None, + names: Optional[ + Union[dict[str, Union[Iterable[str], None]], str, Iterable[str]] + ] = None, + with_layers: bool = True, +) -> None: """Write a grid and fields to a netCDF file. Parameters @@ -192,19 +238,44 @@ def to_netcdf( """ if with_layers and format != "NETCDF4": raise ValueError("Grid layers are only available with the NETCDF4 format.") + if at is None: + at = ["node", "link", "face", "cell", "grid"] - at = at or {"node", "link", "face", "cell", "grid"} if isinstance(at, str): at = [at] + if isinstance(names, str): + names = [names] + if isinstance(ids, int): + ids = [ids] + elif ids is None: + ids = slice(None) - if len(at) == 1: - if not isinstance(names, dict): - names = {at[0]: names} - if not isinstance(ids, dict): - ids = {at[0]: ids} - - names = defaultdict(lambda: None, names or {}) - ids = defaultdict(lambda: Ellipsis, ids or {}) + names_dict = defaultdict(list) + if not isinstance(names, dict): + for loc in at: + if names is None: + names_dict[loc] = list(grid[loc]) + else: + names_dict[loc] = list(set(names) & set(grid[loc])) + else: + for loc, names_ in names.items(): + if names_ is None: + names_dict[loc] = list(grid[loc]) + elif isinstance(names_, str): + names_dict[loc] = [names_] + else: + names_dict[loc] = list(names_) + + ids_dict: dict[str, Union[slice, Iterable[int]]] = defaultdict(lambda: slice(None)) + if not isinstance(ids, dict): + for loc in at: + ids_dict[loc] = ids + else: + for loc, ids_ in ids.items(): + if ids_ is None: + ids_dict[loc] = slice(None) + else: + ids_dict[loc] = ids_ if not os.path.isfile(filepath): mode = "w" @@ -215,13 +286,13 @@ def to_netcdf( root.createVariable("time", "f8", ("time",)) for loc in at: - _set_grid_coordinates(root, grid, at=loc, ids=ids[loc]) + _set_grid_coordinates(root, grid, at=loc, ids=ids_dict[loc]) n_times = len(root.dimensions["time"]) root.variables["time"][n_times] = time for loc in at: - _set_field(root, grid, at=loc, ids=ids[loc], names=names[loc]) + _set_field(root, grid, at=loc, ids=ids_dict[loc], names=names_dict[loc]) if with_layers: _set_layers(root, grid, names=None) diff --git a/sequence/output_writer.py b/sequence/output_writer.py index fbd5644..108dd98 100644 --- a/sequence/output_writer.py +++ b/sequence/output_writer.py @@ -1,9 +1,12 @@ """Write a `SequenceModelGrid` to a file.""" import errno import os +from os import PathLike +from typing import Iterable, Optional, Union from landlab import Component +from ._grid import SequenceModelGrid from .netcdf import to_netcdf @@ -12,13 +15,12 @@ class OutputWriter(Component): def __init__( self, - grid, - filepath=None, - interval=1, - fields=None, - clobber=False, - # clock=None, - rows=None, + grid: SequenceModelGrid, + filepath: Union[str, PathLike[str]], + interval: int = 1, + fields: Optional[Iterable[str]] = None, + clobber: bool = False, + rows: Optional[Iterable[str]] = None, ): """Create an output-file writer. @@ -38,12 +40,11 @@ def __init__( rows : iterable of int The rows of the grid to include in the file. """ - if filepath is None: - raise ValueError("filepath must be provided") + if fields is None: + fields = [] super().__init__(grid) - # self._clock = clock or TimeStepper() self._clobber = clobber self.interval = interval self.fields = fields @@ -57,7 +58,7 @@ def __init__( self._time = 0.0 self._step_count = 0 - def run_one_step(self, dt=None): + def run_one_step(self, dt: Optional[float] = None) -> None: """Update the writer by a time step. Parameters @@ -79,12 +80,12 @@ def run_one_step(self, dt=None): self._step_count += 1 @property - def filepath(self): + def filepath(self) -> Union[str, PathLike[str]]: """Return the path to the output file.""" return self._filepath @filepath.setter - def filepath(self, new_val): + def filepath(self, new_val: Union[str, PathLike[str]]) -> None: if os.path.isfile(new_val) and not self._clobber: raise RuntimeError("file exists") try: @@ -93,15 +94,15 @@ def filepath(self, new_val): if err.errno != errno.ENOENT: raise finally: - self._filepath = new_val + self._filepath = str(new_val) @property - def interval(self): + def interval(self) -> int: """Return the interval for which output will be written.""" return self._interval @interval.setter - def interval(self, new_val): + def interval(self, new_val: int) -> None: if new_val < 0: raise TypeError("interval not an integer") elif not isinstance(new_val, int): @@ -109,13 +110,10 @@ def interval(self, new_val): self._interval = new_val @property - def fields(self): + def fields(self) -> Iterable[str]: """Return the names of the fields to include in the output file.""" return self._fields @fields.setter - def fields(self, new_val): - if new_val is None: - self._fields = None - else: - self._fields = tuple(new_val) + def fields(self, new_val: Iterable[str]) -> None: + self._fields = tuple(new_val) diff --git a/sequence/plot.py b/sequence/plot.py index 5d0bc91..5ab7e8a 100644 --- a/sequence/plot.py +++ b/sequence/plot.py @@ -1,34 +1,38 @@ """Plot the layers of a `SequenceModelGrid`.""" from functools import partial +from os import PathLike +from typing import Any, Optional, Union import matplotlib.pyplot as plt import numpy as np import xarray as xr from matplotlib.patches import Patch +from numpy.typing import NDArray from scipy.interpolate import interp1d +from ._grid import SequenceModelGrid from .errors import MissingRequiredVariable def plot_layers( - elevation_at_layer, - x_of_stack=None, - x_of_shore_at_layer=None, - x_of_shelf_edge_at_layer=None, - color_water=(0.8, 1.0, 1.0), - color_land=(0.8, 1.0, 0.8), - color_shoreface=(0.8, 0.8, 0.0), - color_shelf=(0.75, 0.5, 0.5), - layer_line_width=0.5, - layer_line_color="k", - layer_start=0, - layer_stop=-1, - n_layers=5, - title=None, - x_label="Distance (m)", - y_label="Elevation (m)", - legend_location="lower left", -): + elevation_at_layer: NDArray[np.floating], + x_of_stack: Optional[NDArray[np.floating]] = None, + x_of_shore_at_layer: Optional[NDArray[np.floating]] = None, + x_of_shelf_edge_at_layer: Optional[NDArray[np.floating]] = None, + color_water: Union[tuple[float, float, float], str] = (0.8, 1.0, 1.0), + color_land: Union[tuple[float, float, float], str] = (0.8, 1.0, 0.8), + color_shoreface: Union[tuple[float, float, float], str] = (0.8, 0.8, 0.0), + color_shelf: Union[tuple[float, float, float], str] = (0.75, 0.5, 0.5), + layer_line_width: float = 0.5, + layer_line_color: Union[tuple[float, float, float], str] = "k", + layer_start: int = 0, + layer_stop: int = -1, + n_layers: int = 5, + title: Optional[str] = None, + x_label: str = "Distance (m)", + y_label: str = "Elevation (m)", + legend_location: str = "lower left", +) -> None: """Create a plot of sediment layers along a profile. Parameters @@ -76,7 +80,13 @@ def plot_layers( :func:`plot_grid` : Plot a `SequenceModelGrid`'s layers. """ if x_of_stack is None: - x_of_stack = np.arange(elevation_at_layer.shape[1]) + x_of_stack = np.arange(elevation_at_layer.shape[1], dtype=float) + + if x_of_shore_at_layer is None: + x_of_shore_at_layer = np.zeros(len(elevation_at_layer)) + + if x_of_shelf_edge_at_layer is None: + x_of_shelf_edge_at_layer = np.zeros(len(elevation_at_layer)) plot_land = bool(color_land) plot_shoreface = bool(color_shoreface) @@ -161,7 +171,7 @@ def plot_layers( plt.show() -def plot_grid(grid, **kwds): +def plot_grid(grid: SequenceModelGrid, **kwds: Any) -> None: """Plot a :class:`~SequenceModelGrid`. Parameters @@ -199,7 +209,7 @@ def plot_grid(grid, **kwds): ) -def plot_file(filename, **kwds): +def plot_file(filename: Union[str, PathLike], **kwds: Any) -> None: """Plot a `SequenceModelGrid` from a *Sequence* output file. Parameters @@ -243,7 +253,7 @@ def plot_file(filename, **kwds): ) -def _get_layers_to_plot(start, stop, num=-1): +def _get_layers_to_plot(start: int, stop: int, num: int = -1) -> Union[slice, None]: if num == 0: return None elif num < 0 or num > stop - start + 1: @@ -252,7 +262,13 @@ def _get_layers_to_plot(start, stop, num=-1): return slice(start, stop, step) -def _fill_between_layers(x, y, lower=None, upper=None, fc=None): +def _fill_between_layers( + x: NDArray, + y: NDArray, + lower: Optional[NDArray[np.integer]] = None, + upper: Optional[NDArray[np.integer]] = None, + fc: Optional[Union[tuple[float, float, float], str]] = None, +) -> None: n_layers = len(y) if lower is None: @@ -273,8 +289,12 @@ def _fill_between_layers(x, y, lower=None, upper=None, fc=None): def _outline_layer( - x, y_of_bottom_layer, y_of_top_layer, bottom_limits=None, top_limits=None -): + x: NDArray, + y_of_bottom_layer: NDArray, + y_of_top_layer: NDArray, + bottom_limits: Optional[tuple[Union[float, None], Union[float, None]]] = None, + top_limits: Optional[tuple[Union[float, None], Union[float, None]]] = None, +) -> tuple[NDArray, NDArray]: if bottom_limits is None: bottom_limits = (None, None) if top_limits is None: @@ -334,7 +354,9 @@ def _outline_layer( ) -def _interp_between_layers(x, y_of_bottom, y_of_top, kind="linear"): +def _interp_between_layers( + x: NDArray, y_of_bottom: NDArray, y_of_top: NDArray, kind: str = "linear" +) -> NDArray: x = np.asarray(x) y_of_top, y_of_bottom = np.asarray(y_of_top), np.asarray(y_of_bottom) diff --git a/sequence/sea_level.py b/sequence/sea_level.py index 772e6a1..d7335b0 100644 --- a/sequence/sea_level.py +++ b/sequence/sea_level.py @@ -3,10 +3,16 @@ This module contains *Landlab* components used for adjusting a grid's sea level. """ +from os import PathLike +from typing import Callable, Union + import numpy as np from landlab import Component +from numpy.typing import NDArray from scipy import interpolate +from ._grid import SequenceModelGrid + class SeaLevelTimeSeries(Component): """Modify sea level through a time series.""" @@ -28,12 +34,18 @@ class SeaLevelTimeSeries(Component): } } - def __init__(self, grid, filepath, kind="linear", start=0.0, **kwds): + def __init__( + self, + grid: SequenceModelGrid, + filepath: Union[str, PathLike[str]], + kind: str = "linear", + start: float = 0.0, + ): """Generate sea level values. Parameters ---------- - grid: ModelGrid + grid: SequenceModelGrid A landlab grid. filepath: str Name of csv-formatted sea-level file. @@ -44,7 +56,7 @@ def __init__(self, grid, filepath, kind="linear", start=0.0, **kwds): start : float, optional Set the initial time for the component. """ - super().__init__(grid, **kwds) + super().__init__(grid) self._filepath = filepath self._kind = kind @@ -55,7 +67,9 @@ def __init__(self, grid, filepath, kind="linear", start=0.0, **kwds): self._time = start @staticmethod - def _sea_level_interpolator(data, kind="linear"): + def _sea_level_interpolator( + data: NDArray[np.floating], kind: str = "linear" + ) -> Callable[[Union[float, NDArray]], NDArray]: return interpolate.interp1d( data[:, 0], data[:, 1], @@ -66,12 +80,12 @@ def _sea_level_interpolator(data, kind="linear"): ) @property - def filepath(self): + def filepath(self) -> Union[str, PathLike[str]]: """Return the path to the sea-level file.""" return self._filepath @filepath.setter - def filepath(self, new_path): + def filepath(self, new_path: Union[str, PathLike[str]]) -> None: self._filepath = new_path self._sea_level = SeaLevelTimeSeries._sea_level_interpolator( np.loadtxt(self._filepath, delimiter=","), kind=self._kind @@ -83,7 +97,7 @@ def time(self) -> float: return self._time @time.setter - def time(self, new_time: float): + def time(self, new_time: float) -> None: self._time = new_time def run_one_step(self, dt: float) -> None: @@ -103,20 +117,19 @@ class SinusoidalSeaLevel(SeaLevelTimeSeries): def __init__( self, - grid, - wave_length=1.0, - amplitude=1.0, - phase=0.0, - mean=0.0, - start=0.0, - linear=0.0, - **kwds + grid: SequenceModelGrid, + wave_length: float = 1.0, + amplitude: float = 1.0, + phase: float = 0.0, + mean: float = 0.0, + start: float = 0.0, + linear: float = 0.0, ): """Generate sea level values. Parameters ---------- - grid: ModelGrid + grid: SequenceModelGrid A landlab grid. wave_length : float, optional The wave length of the sea-level curve in [y]. @@ -132,7 +145,7 @@ def __init__( Linear trend of the sea-level curve with time [m / y]. """ wave_length /= 2.0 * np.pi - super(SeaLevelTimeSeries, self).__init__(grid, **kwds) + super(SeaLevelTimeSeries, self).__init__(grid) self._sea_level = ( lambda time: ( diff --git a/sequence/sediment_flexure.py b/sequence/sediment_flexure.py index 9853cf7..a1ebe65 100644 --- a/sequence/sediment_flexure.py +++ b/sequence/sediment_flexure.py @@ -2,6 +2,8 @@ import numpy as np from landlab.components.flexure import Flexure1D +from ._grid import SequenceModelGrid + class SedimentFlexure(Flexure1D): """*Landlab* component that deflects a `SequenceModelGrid` due sediment loading.""" @@ -65,12 +67,12 @@ class SedimentFlexure(Flexure1D): def __init__( self, - grid, - sand_density=2650, - mud_density=2720.0, - isostasytime=7000.0, - water_density=1030.0, - **kwds, + grid: SequenceModelGrid, + sand_density: float = 2650, + mud_density: float = 2720.0, + isostasytime: float = 7000.0, + water_density: float = 1030.0, + **kwds: dict, # **sediments, ): """Subside elevations due to sediment loading. @@ -121,11 +123,13 @@ def __init__( self.subs_pool = self.grid.zeros(at="node") @staticmethod - def _calc_bulk_density(grain_density, water_density, porosity): + def _calc_bulk_density( + grain_density: float, water_density: float, porosity: float + ) -> float: return grain_density * (1.0 - porosity) + water_density * porosity @staticmethod - def validate_density(density): + def validate_density(density: float) -> float: """Validate a density value. Parameters @@ -148,7 +152,7 @@ def validate_density(density): return density @staticmethod - def validate_isostasy_time(time): + def validate_isostasy_time(time: float) -> float: """Validate an isostasy time value. Parameters @@ -171,12 +175,12 @@ def validate_isostasy_time(time): return time @property - def sand_density(self): + def sand_density(self) -> float: """Return the density of sand.""" return self._sand_density @sand_density.setter - def sand_density(self, density): + def sand_density(self, density: float) -> None: # porosity = 40% self._sand_density = SedimentFlexure.validate_density(density) self._rho_sand = SedimentFlexure._calc_bulk_density( @@ -184,17 +188,17 @@ def sand_density(self, density): ) @property - def sand_bulk_density(self): + def sand_bulk_density(self) -> float: """Return the bulk density of sand.""" return self._rho_sand @property - def mud_density(self): + def mud_density(self) -> float: """Return teh density of mud.""" return self._mud_density @mud_density.setter - def mud_density(self, density): + def mud_density(self, density: float) -> None: # porosity = 65% self._mud_density = SedimentFlexure.validate_density(density) self._rho_mud = SedimentFlexure._calc_bulk_density( @@ -202,17 +206,17 @@ def mud_density(self, density): ) @property - def mud_bulk_density(self): + def mud_bulk_density(self) -> float: """Return the bulk density of mud.""" return self._rho_mud @property - def water_density(self): + def water_density(self) -> float: """Return the density of water.""" return self._water_density @water_density.setter - def water_density(self, density): + def water_density(self, density: float) -> None: self._water_density = SedimentFlexure.validate_density(density) self._rho_sand = SedimentFlexure._calc_bulk_density( self.sand_density, self.water_density, 0.4 @@ -221,7 +225,7 @@ def water_density(self, density): self.mud_density, self.water_density, 0.65 ) - def update(self): + def update(self) -> None: """Update the component by a single time step.""" if self._isostasytime > 0.0: isostasyfrac = 1 - np.exp(-1.0 * self._dt / self._isostasytime) @@ -264,7 +268,7 @@ def update(self): self.grid.at_node["bedrock_surface__elevation"] -= dz self.grid.at_node["topographic__elevation"] -= dz - def run_one_step(self, dt=100.0): + def run_one_step(self, dt: float = 100.0) -> None: """Update the component by a time step. Parameters diff --git a/sequence/sequence.py b/sequence/sequence.py index 0a0e952..a5e5f88 100644 --- a/sequence/sequence.py +++ b/sequence/sequence.py @@ -1,9 +1,13 @@ """*Sequence*'s main API for constructing sequence-stratigraphic models.""" +from typing import Any, Iterable, Optional + import numpy as np from landlab import Component from landlab.layers import EventLayers +from numpy.typing import ArrayLike, NDArray from tqdm import trange +from ._grid import SequenceModelGrid from .plot import plot_grid @@ -47,7 +51,12 @@ class Sequence(Component): }, } - def __init__(self, grid, time_step=100.0, components=None): + def __init__( + self, + grid: SequenceModelGrid, + time_step: float = 100.0, + components: Optional[Iterable] = None, + ): """Create a Sequence model. Parameters @@ -66,6 +75,7 @@ def __init__(self, grid, time_step=100.0, components=None): self._time = 0.0 self._time_step = time_step + self._n_archived_layers = 0 self.grid.at_layer_grid = EventLayers(1) @@ -83,11 +93,11 @@ def __init__(self, grid, time_step=100.0, components=None): self.add_layer(initial_sediment_thickness[self.grid.node_at_cell]) @property - def time(self): + def time(self) -> float: """Return the current model time (in years).""" return self._time - def update(self, dt=None): + def update(self, dt: Optional[float] = None) -> None: """Update the model of a given time step. Parameters @@ -107,7 +117,12 @@ def update(self, dt=None): self.grid.at_node["sediment_deposit__thickness"][self.grid.node_at_cell] ) - def run(self, until=None, dt=None, progress_bar=True): + def run( + self, + until: Optional[float] = None, + dt: Optional[float] = None, + progress_bar: bool = True, + ) -> None: """Run the model to a given time. Parameters @@ -130,7 +145,7 @@ def run(self, until=None, dt=None, progress_bar=True): for _ in trange(n_steps, desc="πŸš€", disable=not progress_bar): self.update(dt=min(dt, until - self._time)) - def layer_properties(self): + def layer_properties(self) -> dict[str, ArrayLike]: """Return the properties being tracked at each layer. Returns @@ -162,7 +177,7 @@ def layer_properties(self): return properties - def layer_reducers(self): + def layer_reducers(self) -> dict[str, Any]: """Return layer-reducers for each property. Returns @@ -182,7 +197,7 @@ def layer_reducers(self): return reducers - def add_layer(self, dz_at_cell): + def add_layer(self, dz_at_cell: NDArray[np.floating]) -> None: """Add a new layer to each cell. Properties @@ -208,11 +223,6 @@ def add_layer(self, dz_at_cell): x_of_shelf_edge=x_of_shelf_edge, ) - try: - self._n_archived_layers - except AttributeError: - self._n_archived_layers = 0 - if ( self.grid.event_layers.number_of_layers - self._n_archived_layers ) % 20 == 0: @@ -223,6 +233,6 @@ def add_layer(self, dz_at_cell): ) self._n_archived_layers += 1 - def plot(self): + def plot(self) -> None: """Plot the grid.""" plot_grid(self.grid) diff --git a/sequence/sequence_model.py b/sequence/sequence_model.py index 6312200..4a604bf 100644 --- a/sequence/sequence_model.py +++ b/sequence/sequence_model.py @@ -2,7 +2,7 @@ import warnings from collections import OrderedDict, defaultdict from collections.abc import Hashable, Iterable -from typing import Optional +from typing import Dict, Optional import numpy as np from compaction.landlab import Compact @@ -82,7 +82,7 @@ class SequenceModel: def __init__( self, - grid, + grid: SequenceModelGrid, clock: Optional[dict] = None, processes: Optional[dict] = None, output: Optional[dict] = None, @@ -104,17 +104,15 @@ def __init__( processes = {} self._grid = grid - self._clock = TimeStepper(**clock) - self._components = OrderedDict(processes) - if output is not None: self._components["output"] = OutputWriter(self._grid, **output) self.grid.at_grid["x_of_shore"] = np.nan self.grid.at_grid["x_of_shelf_edge"] = np.nan self.grid.at_grid["sea_level__elevation"] = 0.0 + self._n_archived_layers = 0 z0 = grid.at_node["bedrock_surface__elevation"] @@ -133,7 +131,7 @@ def __init__( pass @staticmethod - def load_grid(params: dict, bathymetry: Optional[dict] = None): + def load_grid(params: dict, bathymetry: Optional[dict] = None) -> SequenceModelGrid: """Load a `SequenceModelGrid`. Parameters @@ -155,7 +153,7 @@ def load_grid(params: dict, bathymetry: Optional[dict] = None): @staticmethod def load_processes( - grid, processes: Iterable[str], context: dict[str, dict] + grid: SequenceModelGrid, processes: Iterable[str], context: dict[str, dict] ) -> dict: """Instantiate processes. @@ -173,7 +171,7 @@ def load_processes( processes = list(processes) + ["fluvial"] if "shoreline" not in processes: processes = list(processes) + ["shoreline"] - params = defaultdict(dict) + params: Dict[str, dict] = defaultdict(dict) params.update( {process: context.get(process, {}).copy() for process in processes} ) @@ -207,21 +205,21 @@ def load_processes( return processes @property - def grid(self): + def grid(self) -> SequenceModelGrid: """Return the model's grid.""" return self._grid @property - def clock(self): + def clock(self) -> TimeStepper: """Return the model's clock.""" return self._clock @property - def components(self): + def components(self) -> tuple: """Return the name of enabled components.""" return tuple(self._components) - def set_params(self, params: dict[str, dict]): + def set_params(self, params: dict[str, dict]) -> None: """Update the parameters for the model's processes. Parameters @@ -234,7 +232,7 @@ def set_params(self, params: dict[str, dict]): for param, value in values.items(): setattr(c, param, value) - def run_one_step(self, dt: Optional[float] = None): + def run_one_step(self, dt: Optional[float] = None) -> None: """Run each component for one time step.""" dt = dt or self.clock.step self.clock.dt = dt @@ -242,7 +240,7 @@ def run_one_step(self, dt: Optional[float] = None): self.advance_components(dt) - def run(self): + def run(self) -> None: """Run the model until complete.""" try: while 1: @@ -250,7 +248,7 @@ def run(self): except StopIteration: pass - def advance_components(self, dt: float): + def advance_components(self, dt: float) -> None: """Update each of the components by a time step. Parameters @@ -285,11 +283,6 @@ def advance_components(self, dt: float): self.grid.event_layers.add(dz[self.grid.node_at_cell], **layer_properties) - try: - self._n_archived_layers - except AttributeError: - self._n_archived_layers = 0 - if ( self.grid.event_layers.number_of_layers - self._n_archived_layers ) % 20 == 0: @@ -305,7 +298,7 @@ def advance_components(self, dt: float): self._n_archived_layers += 1 -def _match_values(d1: dict, d2: dict, keys: Iterable[Hashable]): +def _match_values(d1: dict, d2: dict, keys: Iterable[Hashable]) -> None: """Match values between two dictionaries. Parameters diff --git a/sequence/shoreline.py b/sequence/shoreline.py index a9772b1..fa59340 100644 --- a/sequence/shoreline.py +++ b/sequence/shoreline.py @@ -4,11 +4,14 @@ shelf edge. """ import bisect +from typing import Optional import numpy as np from landlab import Component +from numpy.typing import NDArray from scipy import interpolate +from ._grid import SequenceModelGrid from .errors import ShelfEdgeError, ShorelineError @@ -54,7 +57,7 @@ class ShorelineFinder(Component): }, } - def __init__(self, grid, alpha: float = 0.0005): + def __init__(self, grid: SequenceModelGrid, alpha: float = 0.0005): """Create a shoreline finder. Parameters @@ -92,7 +95,7 @@ def alpha(self) -> float: """Return the *alpha* parameter used to calculate the shoreline.""" return self._alpha - def update(self): + def update(self) -> None: """Update the component one time step to find the new shoreline.""" x = self.grid.x_of_node[self.grid.node_at_cell] z = self.grid.at_node["topographic__elevation"][self.grid.node_at_cell] @@ -112,7 +115,7 @@ def update(self): self.grid.at_grid["x_of_shore"] = x_of_shore self.grid.at_grid["x_of_shelf_edge"] = x_of_shelf_edge - def run_one_step(self, dt=None): + def run_one_step(self, dt: Optional[float] = None) -> None: """Update the component on time step. Parameters @@ -124,7 +127,9 @@ def run_one_step(self, dt=None): self.update() -def find_shelf_edge_by_curvature(x, z, sea_level=0.0): +def find_shelf_edge_by_curvature( + x: NDArray[np.floating], z: NDArray[np.floating], sea_level: float = 0.0 +) -> float: """Find the x-coordinate of the shelf edge. The shelf edge is the location where the curvature of *sea-floor elevations* @@ -159,7 +164,12 @@ def find_shelf_edge_by_curvature(x, z, sea_level=0.0): # def find_shelf_edge(grid, x, wd, x_of_shore, sea_level=0.0, alpha = 0.0005): -def find_shelf_edge(x, dz, x_of_shore=0.0, alpha=0.0005): +def find_shelf_edge( + x: list[float], + dz: list[float], + x_of_shore: float = 0.0, + alpha: float = 0.0005, +) -> float: """Find the shelf edge based on deposit thickness. Parameters @@ -203,7 +213,12 @@ def find_shelf_edge(x, dz, x_of_shore=0.0, alpha=0.0005): return x[np.argmax(dz[ind_of_shore:]) + ind_of_shore] -def find_shoreline(x, z, sea_level=0.0, kind="cubic"): +def find_shoreline( + x: NDArray[np.floating], + z: NDArray[np.floating], + sea_level: float = 0.0, + kind: str = "cubic", +) -> float: """Find the shoreline of a profile. Parameters @@ -268,7 +283,9 @@ def find_shoreline(x, z, sea_level=0.0, kind="cubic"): return x_of_shoreline -def _find_shoreline_polyfit(x, z, sea_level=0.0): +def _find_shoreline_polyfit( + x: NDArray[np.floating], z: NDArray[np.floating], sea_level: float = 0.0 +) -> float: try: index_at_shore = find_shoreline_index(x, z, sea_level=sea_level) except ValueError: @@ -311,7 +328,9 @@ def _find_shoreline_polyfit(x, z, sea_level=0.0): return x_of_shoreline -def find_shoreline_index(x, z, sea_level=0.0): +def find_shoreline_index( + x: NDArray[np.floating], z: NDArray[np.floating], sea_level: float = 0.0 +) -> int: """Find the landward-index of the shoreline. Parameters diff --git a/sequence/submarine.py b/sequence/submarine.py index 8d9aa41..68b73c1 100644 --- a/sequence/submarine.py +++ b/sequence/submarine.py @@ -1,8 +1,11 @@ """Diffuse sediment along a profile.""" +from typing import Any + import numpy as np from landlab.components import LinearDiffuser from numpy.typing import NDArray +from ._grid import SequenceModelGrid from .shoreline import find_shoreline @@ -42,7 +45,7 @@ class SubmarineDiffuser(LinearDiffuser): def __init__( self, - grid, + grid: SequenceModelGrid, sea_level: float = 0.0, plain_slope: float = 0.0008, wave_base: float = 60.0, @@ -52,7 +55,7 @@ def __init__( sediment_load: float = 3.0, load_sealevel: float = 0.0, basin_width: float = 500000.0, - **kwds + **kwds: Any, ): """Diffuse the ocean bottom. @@ -113,7 +116,7 @@ def plain_slope(self) -> float: return self._plain_slope @plain_slope.setter - def plain_slope(self, value): + def plain_slope(self, value: float) -> None: self._plain_slope = float(value) self._ksh = self._load / self._plain_slope @@ -123,7 +126,7 @@ def wave_base(self) -> float: return self._wave_base @wave_base.setter - def wave_base(self, value): + def wave_base(self, value: float) -> None: self._wave_base = float(value) @property @@ -132,7 +135,7 @@ def shoreface_height(self) -> float: return self._shoreface_height @shoreface_height.setter - def shoreface_height(self, value): + def shoreface_height(self, value: float) -> None: self._shoreface_height = float(value) @property @@ -141,7 +144,7 @@ def alpha(self) -> float: return self._alpha @alpha.setter - def alpha(self, value): + def alpha(self, value: float) -> None: self._alpha = float(value) @property @@ -150,7 +153,7 @@ def shelf_slope(self) -> float: return self._shelf_slope @shelf_slope.setter - def shelf_slope(self, value): + def shelf_slope(self, value: float) -> None: self._shelf_slope = float(value) @property @@ -159,7 +162,7 @@ def sediment_load(self) -> float: return self._load0 @sediment_load.setter - def sediment_load(self, value): + def sediment_load(self, value: float) -> None: self._load0 = float(value) self._load = self._load0 * (1 + self._sea_level * self._load_sl) self._ksh = self._load / self._plain_slope @@ -170,7 +173,7 @@ def sediment_load(self, value): # return self._k0 @property - def load0(self): + def load0(self) -> float: """Return the sediment load entering the profile.""" return self._load0 @@ -190,10 +193,10 @@ def sea_level(self) -> float: return self.grid.at_grid["sea_level__elevation"] @sea_level.setter - def sea_level(self, sea_level): + def sea_level(self, sea_level: float) -> None: self.grid.at_grid["sea_level__elevation"] = sea_level - def calc_diffusion_coef(self, x_of_shore: float) -> NDArray[float]: + def calc_diffusion_coef(self, x_of_shore: float) -> NDArray[np.floating]: """Calculate and store diffusion coefficient values. Parameters diff --git a/sequence/subsidence.py b/sequence/subsidence.py index 6e0f2d8..b0e131a 100644 --- a/sequence/subsidence.py +++ b/sequence/subsidence.py @@ -1,10 +1,14 @@ """Subside a `SequenceModelGrid`.""" import os +from typing import Callable, Union import numpy as np from landlab import Component +from numpy.typing import NDArray from scipy import interpolate +from ._grid import SequenceModelGrid + class SubsidenceTimeSeries(Component): """A *Landlab* component that subsides a grid.""" @@ -32,7 +36,9 @@ class SubsidenceTimeSeries(Component): }, } - def __init__(self, grid, filepath: os.PathLike, kind: str = "linear"): + def __init__( + self, grid: SequenceModelGrid, filepath: os.PathLike, kind: str = "linear" + ): """Create a grid subsider from a time-series file. Parameters @@ -70,7 +76,9 @@ def __init__(self, grid, filepath: os.PathLike, kind: str = "linear"): self._time = 0.0 @staticmethod - def _subsidence_interpolator(data, kind="linear"): + def _subsidence_interpolator( + data: NDArray, kind: str = "linear" + ) -> Callable[[Union[float, NDArray]], NDArray]: return interpolate.interp1d( data[:, 0], data[:, 1], @@ -91,7 +99,7 @@ def filepath(self) -> str: return str(self._filepath) @filepath.setter - def filepath(self, new_path: os.PathLike): + def filepath(self, new_path: os.PathLike) -> None: self._filepath = new_path subsidence = SubsidenceTimeSeries._subsidence_interpolator( np.loadtxt(self._filepath, delimiter=",", comments="#"), kind=self._kind diff --git a/tests/test_config_files.py b/tests/test_config_files.py new file mode 100644 index 0000000..470c95f --- /dev/null +++ b/tests/test_config_files.py @@ -0,0 +1,66 @@ +import pathlib + +import pytest + +from sequence.cli import _find_config_files + + +@pytest.mark.parametrize("path", [".", pathlib.Path(".")]) +@pytest.mark.parametrize("ext", ["toml", "yaml"]) +def test_with_one_file(tmpdir, path, ext): + with tmpdir.as_cwd(): + pathlib.Path(f"sequence.{ext}").touch() + times, names = _find_config_files(path) + + assert times == [0] + assert names == [f"sequence.{ext}"] + + +@pytest.mark.parametrize("path", [".", pathlib.Path(".")]) +@pytest.mark.parametrize("ext", ["toml", "yaml"]) +def test_with_multiple_files_no_timestamp(tmpdir, path, ext): + files = [f"sequence-a.{ext}", f"sequence-b.{ext}", f"sequence-c.{ext}"] + with tmpdir.as_cwd(): + for name in files: + pathlib.Path(name).touch() + times, names = _find_config_files(path) + + assert times == [0, 1, 2] + assert names == files + + +@pytest.mark.parametrize("path", [".", pathlib.Path(".")]) +@pytest.mark.parametrize("ext", ["toml", "yaml"]) +def test_with_multiple_files_with_timestamp(tmpdir, path, ext): + files = [ + f"sequence-1.{ext}", + f"sequence-2.{ext}", + f"sequence-003.{ext}", + f"sequence-10.{ext}", + ] + with tmpdir.as_cwd(): + for name in files: + pathlib.Path(name).touch() + times, names = _find_config_files(path) + + assert times == [1, 2, 3, 10] + assert names == files + + +@pytest.mark.parametrize("path", [".", pathlib.Path(".")]) +def test_with_multiple_files_mixed_format(tmpdir, path): + toml_files = [ + "sequence-1.toml", + "sequence-003.toml", + "sequence-10.toml", + ] + yaml_files = [ + "sequence-2.yaml", + ] + with tmpdir.as_cwd(): + for name in toml_files + yaml_files: + pathlib.Path(name).touch() + times, names = _find_config_files(path) + + assert times == [1, 3, 10] + assert names == toml_files diff --git a/tests/test_input.py b/tests/test_input.py index 135fe60..967d77c 100644 --- a/tests/test_input.py +++ b/tests/test_input.py @@ -5,13 +5,34 @@ from sequence.input_reader import load_config -def test_load_config(datadir): +def test_load_config_from_pathlike(datadir): config = {} for fmt in ("yaml", "toml"): config[fmt] = load_config(datadir / f"sequence.{fmt}", fmt=fmt) assert config["yaml"] == config["toml"] +def test_load_config_from_str(datadir): + config = {} + for fmt in ("yaml", "toml"): + config[fmt] = load_config(str(datadir / f"sequence.{fmt}"), fmt=fmt) + assert config["yaml"] == config["toml"] + + +def test_load_config_from_stream(datadir): + config = {} + for fmt in ("yaml", "toml"): + with open(datadir / f"sequence.{fmt}") as fp: + config[fmt] = load_config(fp, fmt=fmt) + assert config["yaml"] == config["toml"] + + +def test_from_stream_without_format_keyword(datadir): + with open(datadir / "sequence.toml") as fp: + with pytest.raises(ValueError): + load_config(fp) + + @pytest.mark.parametrize("fmt", ("yaml", "toml")) def test_guess_format(datadir, fmt): filepath = datadir / f"sequence.{fmt}" diff --git a/tests/test_netcdf.py b/tests/test_netcdf.py index 783e3cf..7c75f24 100644 --- a/tests/test_netcdf.py +++ b/tests/test_netcdf.py @@ -1,8 +1,9 @@ import numpy as np +import pytest import xarray as xr -from landlab import RasterModelGrid from pytest import approx +from sequence import SequenceModelGrid from sequence.netcdf import to_netcdf @@ -14,7 +15,7 @@ def pytest_generate_tests(metafunc): def test_no_fields(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) with tmpdir.as_cwd(): to_netcdf(grid, "test.nc") ds = xr.open_dataset("test.nc") @@ -24,7 +25,7 @@ def test_no_fields(tmpdir): def test_with_node_fields(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.at_node["z"] = np.arange(12) with tmpdir.as_cwd(): to_netcdf(grid, "test.nc") @@ -34,7 +35,7 @@ def test_with_node_fields(tmpdir): def test_append(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.at_node["z"] = np.arange(12) with tmpdir.as_cwd(): to_netcdf(grid, "test.nc") @@ -46,7 +47,7 @@ def test_append(tmpdir): def test_float_var(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.at_node["int_var"] = np.arange(12, dtype=int) grid.at_node["float_var"] = np.arange(12, dtype=float) with tmpdir.as_cwd(): @@ -56,32 +57,33 @@ def test_float_var(tmpdir): assert np.all(ds["at_node:float_var"] == approx(grid.at_node["float_var"][None, :])) -def test_with_names(tmpdir): - grid = RasterModelGrid((3, 4)) +@pytest.mark.parametrize( + "names", [{"node": ("var0",)}, {"node": "var0"}, "var0", ["var0"]] +) +def test_with_names(tmpdir, names): + grid = SequenceModelGrid(4) grid.at_node["var0"] = np.arange(12) grid.at_node["var1"] = np.arange(12) with tmpdir.as_cwd(): - # to_netcdf(grid, "test.nc", names=("var0",)) - to_netcdf(grid, "test.nc", names={"node": ("var0",)}) + to_netcdf(grid, "test.nc", names=names) ds = xr.open_dataset("test.nc") assert "at_node:var0" in ds.variables assert "at_node:var1" not in ds.variables def test_with_nodes(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.at_node["z"] = np.arange(12) # nodes = grid.nodes[1, :] nodes = grid.nodes[(1,)] with tmpdir.as_cwd(): - # to_netcdf(grid, "test.nc", nodes=nodes) to_netcdf(grid, "test.nc", ids={"node": nodes}) ds = xr.open_dataset("test.nc") assert np.all(ds["at_node:z"] == [[4, 5, 6, 7]]) def test_without_layers(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) with tmpdir.as_cwd(): to_netcdf(grid, "test.nc") ds = xr.open_dataset("test.nc") @@ -89,7 +91,7 @@ def test_without_layers(tmpdir): def test_with_layers(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.event_layers.add(10.0, age=0.0, water_depth=np.arange(2)) with tmpdir.as_cwd(): to_netcdf(grid, "test.nc") @@ -101,7 +103,7 @@ def test_with_layers(tmpdir): def test_formats(tmpdir, format): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.at_node["z"] = np.arange(12.0) with tmpdir.as_cwd(): to_netcdf(grid, "test.nc", with_layers=False, format=format) @@ -111,7 +113,7 @@ def test_formats(tmpdir, format): def test_one_location(tmpdir): - grid = RasterModelGrid((3, 4)) + grid = SequenceModelGrid(4) grid.at_node["var0"] = np.arange(12.0) grid.at_node["var1"] = np.arange(12.0) * 10.0 with tmpdir.as_cwd():