Skip to content

Commit

Permalink
ENH: automatic discovery and parsing for pluto.ini, idefix.ini, defin…
Browse files Browse the repository at this point in the history
…itions.h and definitions.hpp
  • Loading branch information
neutrinoceros committed Jan 8, 2023
1 parent 94f1c52 commit 4cee3bd
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 99 deletions.
2 changes: 1 addition & 1 deletion tests/test_vtk.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_pluto_over_units_override(pluto_vtk_file):
def test_pluto_wrong_definitions_header(pluto_vtk_file):
with pytest.raises(
FileNotFoundError,
match=("No such file 'definitions2.h'"),
match=(r".*No such file or directory: '.*definitions2\.h'"),
):
yt.load(pluto_vtk_file["path"], definitions_header="definitions2.h")

Expand Down
202 changes: 104 additions & 98 deletions yt_idefix/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import weakref
from abc import ABC, abstractmethod
from functools import cached_property
from pathlib import Path
from typing import Literal

import inifix
Expand Down Expand Up @@ -205,6 +206,8 @@ class IdefixDataset(Dataset, ABC):

_version_regexp = re.compile(r"v\d+\.\d+\.?\d*[-\w+]*")
_dataset_type: str # defined in subclasses
_default_definitions_header = "definitions.hpp"
_default_inifile = "idefix.ini"

def __init__(
self,
Expand All @@ -218,12 +221,19 @@ def __init__(
geometry: Literal["cartesian", "spherical", "cylindrical", "polar"]
| None = None,
inifile: str | os.PathLike[str] | None = None,
definitions_header: str | os.PathLike[str] | None = None,
):
self._geometry_from_user = geometry

dt = type(self)._dataset_type
self.fluid_types += (dt,)

self._input_filename: str = os.fspath(filename)
self._inifile = self._get_meta_file(inifile, default=self._default_inifile)
self._definitions_header = self._get_meta_file(
definitions_header, default=self._default_definitions_header
)

super().__init__(
filename,
dataset_type=dt,
Expand All @@ -232,14 +242,31 @@ def __init__(
default_species_fields=default_species_fields,
)

self.inifile = inifile
self._parse_inifile()

self.storage_filename = None

# idefix does not support grid refinement
self.refine_by = 1

def _get_meta_file(
self, arg: str | os.PathLike[str] | None, /, *, default: str
) -> str:
root_dir = Path(self.directory)

if arg is not None:
if os.path.isabs(arg):
return os.fspath(arg)
else:
return str((root_dir / arg).absolute())

_, ext = os.path.splitext(default)
if (
len(candidates := list(root_dir.glob(f"*{ext}"))) == 1
and (file := candidates[0]).name == default
):
return str(file.absolute())
else:
return ""

def _parse_parameter_file(self):
# base method, intended to be subclassed
# parse the version hash
Expand All @@ -252,6 +279,8 @@ def _parse_parameter_file(self):
self.omega_matter = 0.0
self.hubble_constant = 0.0

self._parse_inifile()
self._parse_definitions_header()
self._setup_geometry()

def _setup_geometry(self) -> None:
Expand All @@ -277,10 +306,10 @@ def _setup_geometry(self) -> None:
self.geometry = from_file

def _parse_inifile(self) -> None:
if self.inifile is None:
if not self._inifile:
return

with open(self.inifile, "rb") as fh:
with open(self._inifile, "rb") as fh:
self.parameters.update(inifix.load(fh))
grid_ini = self.parameters["Grid"]

Expand All @@ -293,6 +322,14 @@ def _parse_inifile(self) -> None:
if any(_ != "u" for _ in vals[3::3]):
msg_elems.append(f"found non-uniform block(s) in direction {ax}")

@abstractmethod
def _parse_definitions_header(self) -> None:
if not self._definitions_header:
return

with open(self._definitions_header) as fh: # noqa F841
...

def _set_code_unit_attributes(self):
# This is where quantities are created that represent the various
# on-disk units. These are the currently available quantities which
Expand All @@ -313,15 +350,15 @@ def _set_code_unit_attributes(self):
# The following methods are frontend-specific

@abstractmethod
def _get_header(self) -> str:
def _read_data_header(self) -> str:
pass

def _get_code_version(self) -> str:
# take the last line of the header
# - in Idefix dumps there's only one line
# - in Vtk files (Idefix or Pluto), there are two,
# the first of which isn't code specific
header = self._get_header().splitlines()[-1]
header = self._read_data_header().splitlines()[-1]

regexp = self.__class__._version_regexp

Expand Down Expand Up @@ -350,7 +387,11 @@ def _is_valid(cls, filename, *args, **kwargs) -> bool:
else:
return cls._required_header_keyword in header

def _get_header(self) -> str:
def _parse_definitions_header(self) -> None:
# this method is required for IdefixDataset, but currently not used
return

def _read_data_header(self) -> str:
return vtk_io.read_header(self.parameter_filename)

def _parse_parameter_file(self):
Expand Down Expand Up @@ -402,11 +443,15 @@ def _is_valid(cls, filename, *args, **kwargs) -> bool:
except Exception:
return False

def _parse_definitions_header(self) -> None:
# this method is required for IdefixDataset, but currently not used
return

def _get_fields_metadata(self) -> tuple[IdefixFieldProperties, IdefixMetadata]:
# read everything except large arrays
return dmp_io.read_idefix_dmpfile(self.parameter_filename, skip_data=True)

def _get_header(self) -> str:
def _read_data_header(self) -> str:
return dmp_io.read_header(self.parameter_filename)

def _parse_parameter_file(self):
Expand Down Expand Up @@ -443,110 +488,71 @@ class PlutoVtkDataset(IdefixVtkDataset):
_dataset_type = "pluto-vtk"
_version_regexp = re.compile(r"\d+\.\d+\.?\d*[-\w+]*")
_required_header_keyword = "PLUTO"

def __init__(
self,
filename,
*,
dataset_type: str | None = None, # deleguated to child classes
units_override: dict[str, UnitLike] | None = None,
unit_system: Literal["cgs", "mks", "code"] = "cgs",
default_species_fields: Literal["neutral", "ionized"] | None = None,
# from here, frontend-specific arguments
geometry: Literal["cartesian", "spherical", "cylindrical", "polar"]
| None = None,
inifile: str | os.PathLike[str] | None = None,
definitions_header: str | None = None,
):
self._definitions_header: str | None
if definitions_header is not None:
self._definitions_header = os.fspath(definitions_header)
else:
self._definitions_header = None

super().__init__(
filename,
dataset_type=dataset_type,
units_override=units_override,
unit_system=unit_system,
geometry=geometry,
inifile=inifile,
default_species_fields=default_species_fields,
)
_default_definitions_header = "definitions.h"
_default_inifile = "pluto.ini"

def _parse_parameter_file(self):
self._parse_header_file()
super()._parse_parameter_file()
self._get_time()

def _parse_header_file(self):
# parse time from vtk.out
log_file = os.path.join(self.directory, "vtk.out")
if (match := re.search(r"\.(\d*)\.", self.parameter_filename)) is None:
raise RuntimeError(
f"Failed to parse output number from file name {self.parameter_filename}"
)
index = int(match.group(1))

# will be converted to actual unyt_quantity in _set_derived_attrs
self.current_time = -1

if not os.path.isfile(log_file):
ytLogger.warning("Missing log file %s, setting current_time = -1", log_file)
return

log_regexp = re.compile(rf"^{index}\s(\S+)")
with open(log_file) as fh:
for line in fh.readlines():
log_match = re.search(log_regexp, line)
if log_match:
self.current_time = float(log_match.group(1))
break
else:
ytLogger.warning(
"Failed to retrieve time from %s, setting current_time = -1",
log_file,
)

def _parse_definitions_header(self) -> None:
"""Read some metadata from header file 'definitions.h'."""
if not self._definitions_header:
ytLogger.warning(
"%s was not found. Code units will be set to 1.0 in cgs.",
self._default_definitions_header,
)
return

geom_regexp = re.compile(r"^\s*#define\s+GEOMETRY\s+([A-Z]+)")
unit_regexp = re.compile(r"^\s*#define\s+UNIT_(\w+)\s+(\S+)")
constexpr = re.compile(r"CONST_\w+")

# definitions.h is presumed to be along with data file
if self._definitions_header is None:
self._definitions_header = os.path.join(self.directory, "definitions.h")
elif not os.path.isfile(self._definitions_header):
raise FileNotFoundError(f"No such file {self._definitions_header!r}")

if os.path.isfile(self._definitions_header):
with open(self._definitions_header) as fh:
body = fh.read()
lines = C_io.strip_comments(body).split("\n")

for line in lines:
geom_match = re.fullmatch(geom_regexp, line)
if geom_match is not None:
self.parameters["geometry"] = geom_match.group(1).lower()
continue

unit_match = re.fullmatch(unit_regexp, line)
if unit_match is not None:
unit = unit_match.group(1).lower() + "_unit"
expr = unit_match.group(2)
expr = re.sub(constexpr, self._get_constants, expr)
self.parameters[unit] = eval(expr)
else:
warnings.warn(
f"Header file {self._definitions_header} couldn't be found. "
"The code units are set to be 1.0 in cgs by default."
)
with open(self._definitions_header) as fh:
body = fh.read()
lines = C_io.strip_comments(body).split("\n")

for line in lines:
if (geom_match := re.fullmatch(geom_regexp, line)) is not None:
self.parameters["geometry"] = geom_match.group(1).lower()
elif (unit_match := re.fullmatch(unit_regexp, line)) is not None:
unit = unit_match.group(1).lower() + "_unit"
expr = unit_match.group(2)
expr = re.sub(constexpr, self._get_constants, expr)
self.parameters[unit] = eval(expr)

def _get_constants(self, match: re.Match) -> str:
"""Replace matched constant string with its value"""
key = match.group()
return str(pluto_def_constants[key])

def _get_time(self):
"""Get current time from vtk.out."""
log_file = os.path.join(self.directory, "vtk.out")
match = re.search(r"\.(\d*)\.", self.parameter_filename)
if match is None:
raise RuntimeError(
f"Failed to parse output number from file name {self.parameter_filename}"
)
index = int(match.group(1))

# will be converted to actual unyt_quantity in _set_derived_attrs
self.current_time = -1
if os.path.isfile(log_file):
log_regexp = re.compile(rf"^{index}\s(\S+)")
with open(log_file) as fh:
for line in fh.readlines():
log_match = re.search(log_regexp, line)
if log_match:
self.current_time = float(log_match.group(1))
break
else:
ytLogger.warning(
"Failed to retrieve time from %s, setting current_time = -1",
log_file,
)
else:
ytLogger.warning("Missing log file %s, setting current_time = -1", log_file)

def _set_code_unit_attributes(self):
"""Conversion between physical units and code units."""

Expand Down

0 comments on commit 4cee3bd

Please sign in to comment.