diff --git a/python/omfiles/__init__.py b/python/omfiles/__init__.py index 42f492c..163028a 100644 --- a/python/omfiles/__init__.py +++ b/python/omfiles/__init__.py @@ -1,7 +1,7 @@ """Provides classes and utilities for reading, writing, and manipulating OM files.""" from . import types -from ._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable, _check_cpu_features +from ._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable, OmWriterVariable, _check_cpu_features _check_cpu_features() @@ -10,5 +10,6 @@ "OmFileReaderAsync", "OmFileWriter", "OmVariable", + "OmWriterVariable", "types", ] diff --git a/python/omfiles/__init__.pyi b/python/omfiles/__init__.pyi index 926b8a6..a5e64cd 100644 --- a/python/omfiles/__init__.pyi +++ b/python/omfiles/__init__.pyi @@ -1,7 +1,7 @@ # This file is automatically generated by pyo3_stub_gen # ruff: noqa: E501, F401, F403, F405 -from omfiles._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable +from omfiles._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable, OmWriterVariable from . import _rust @@ -10,4 +10,5 @@ __all__ = [ "OmFileReaderAsync", "OmFileWriter", "OmVariable", + "OmWriterVariable", ] diff --git a/python/omfiles/_rust/__init__.pyi b/python/omfiles/_rust/__init__.pyi index 17e60f1..5191741 100644 --- a/python/omfiles/_rust/__init__.pyi +++ b/python/omfiles/_rust/__init__.pyi @@ -14,6 +14,7 @@ __all__ = [ "OmFileReaderAsync", "OmFileWriter", "OmVariable", + "OmWriterVariable", "RustPforCodec", ] @@ -492,50 +493,68 @@ class OmFileWriter: r""" Check if the writer is closed. """ - def __new__(cls, file_path: builtins.str) -> OmFileWriter: + def __new__(cls, file_path: builtins.str, metadata_placement: typing.Optional[builtins.str] = None) -> OmFileWriter: r""" Initialize an OmFileWriter. Args: file_path: Path where the .om file will be created + metadata_placement: (optional) Where to emit metadata; either "inline" to write + metadata entries immediately, or "tail" to defer emission + until close() so metadata is consolidated near the end of + the file (default: "tail"). """ @staticmethod - def at_path(path: builtins.str) -> OmFileWriter: + def at_path(path: builtins.str, metadata_placement: typing.Optional[builtins.str] = None) -> OmFileWriter: r""" Initialize an OmFileWriter to write to a file at the specified path. Args: path: Path where the .om file will be created + metadata_placement: (optional) Where to emit metadata; either "inline" or + "tail" (see description in `__new__`). Returns: OmFileWriter: A new writer instance """ @staticmethod - def from_fsspec(fs_obj: typing.Any, path: builtins.str) -> OmFileWriter: + def from_fsspec( + fs_obj: typing.Any, path: builtins.str, metadata_placement: typing.Optional[builtins.str] = None + ) -> OmFileWriter: r""" Create an OmFileWriter from a fsspec filesystem object. Args: fs_obj: A fsspec filesystem object that supports write operations path: The path to the file within the file system + metadata_placement: (optional) Where to emit metadata; either "inline" or + "tail" (see description in `__new__`). Returns: OmFileWriter: A new writer instance """ - def close(self, root_variable: OmVariable) -> None: + def close(self, root_variable: OmWriterVariable) -> None: r""" - Finalize and close the .om file by writing the trailer with the root variable. + Finalize and close the .om file by writing the trailer with the resolved root variable. + + In ``metadata_placement="tail"`` mode, metadata for arrays, scalars, and + groups is resolved and emitted during ``close()`` so that metadata is + consolidated near the end of the file. In ``metadata_placement="inline"`` + mode, metadata is written immediately and child handles must already refer + to resolved variables from the same writer. Args: - root_variable (:py:data:`omfiles.OmVariable`): The OmVariable that serves as the root/entry point of the file hierarchy. - All other variables should be accessible through this root variable. + root_variable (:py:data:`omfiles.OmWriterVariable`): The writer handle + that serves as the root/entry point of the file hierarchy. Returns: None on success. Raises: - ValueError: If the writer has already been closed - RuntimeError: If a thread lock error occurs or if there's an error writing to the file + ValueError: If the writer has already been closed or the handle belongs + to a different writer. + RuntimeError: If there is an error resolving deferred metadata or + writing the trailer. """ def write_array( self, @@ -545,8 +564,8 @@ class OmFileWriter: add_offset: typing.Optional[builtins.float] = None, compression: typing.Optional[builtins.str] = None, name: typing.Optional[builtins.str] = None, - children: typing.Optional[typing.Sequence[OmVariable]] = None, - ) -> OmVariable: + children: typing.Optional[typing.Sequence[OmWriterVariable]] = None, + ) -> OmWriterVariable: r""" Write a numpy array to the .om file with specified chunking and scaling parameters. @@ -567,8 +586,14 @@ class OmFileWriter: name: Name of the variable to be written (default: "data") children: List of child variables (default: []) + ``write_array`` returns an :py:data:`omfiles.OmWriterVariable`, which is a + write-time handle used to build hierarchy relationships and to select the + root variable passed to ``close()``. It is not the same as + :py:data:`omfiles.OmVariable`, which represents already-materialized + metadata when reading. + Returns: - :py:data:`omfiles.OmVariable` representing the written group in the file structure + :py:data:`omfiles.OmWriterVariable` representing the written group in the file structure Raises: ValueError: If the data type is unsupported or if parameters are invalid @@ -583,8 +608,8 @@ class OmFileWriter: add_offset: typing.Optional[builtins.float] = None, compression: typing.Optional[builtins.str] = None, name: typing.Optional[builtins.str] = None, - children: typing.Optional[typing.Sequence[OmVariable]] = None, - ) -> OmVariable: + children: typing.Optional[typing.Sequence[OmWriterVariable]] = None, + ) -> OmWriterVariable: r""" Write an array to the .om file by streaming chunks from a Python iterator. @@ -607,15 +632,15 @@ class OmFileWriter: children: List of child variables (default: []) Returns: - :py:data:`omfiles.OmVariable` representing the written array in the file structure + :py:data:`omfiles.OmWriterVariable` representing the written array in the file structure Raises: ValueError: If the dtype is unsupported or parameters are invalid RuntimeError: If there's an error during compression or I/O """ def write_scalar( - self, value: typing.Any, name: builtins.str, children: typing.Optional[typing.Sequence[OmVariable]] = None - ) -> OmVariable: + self, value: typing.Any, name: builtins.str, children: typing.Optional[typing.Sequence[OmWriterVariable]] = None + ) -> OmWriterVariable: r""" Write a scalar value to the .om file. @@ -625,28 +650,36 @@ class OmFileWriter: name: Name of the scalar variable children: List of child variables (default: None) + Child handles must come from the same writer. In ``metadata_placement="inline"`` + mode they must already be resolved because metadata is emitted immediately. + In ``metadata_placement="tail"`` mode they may be resolved later during + ``close()``. + Returns: - :py:data:`omfiles.OmVariable` representing the written scalar in the file structure + :py:data:`omfiles.OmWriterVariable` representing the written group in the file structure Raises: ValueError: If the value type is unsupported (e.g., booleans) RuntimeError: If there's an error writing to the file """ - def write_group(self, name: builtins.str, children: typing.Sequence[OmVariable]) -> OmVariable: + def write_group(self, name: builtins.str, children: typing.Sequence[OmWriterVariable]) -> OmWriterVariable: r""" Create a new group in the .om file. - This is essentially a variable with no data, which serves as a container for other variables. + This is essentially a variable with no data, which serves as a container + for other variables. Args: name: Name of the group - children: List of child variables + children: List of child variables from the same writer Returns: - :py:data:`omfiles.OmVariable` representing the written group in the file structure + :py:data:`omfiles.OmWriterVariable` representing the written group in the file structure Raises: - RuntimeError: If there's an error writing to the file + ValueError: If a child handle belongs to a different writer + RuntimeError: If inline metadata placement is requested before child + metadata has been resolved, or if there is an I/O error """ @typing.final @@ -671,6 +704,17 @@ class OmVariable: """ def __repr__(self) -> builtins.str: ... +@typing.final +class OmWriterVariable: + r""" + Represents a variable handle during writing. + """ + @property + def name(self) -> builtins.str: + r""" + The name of the variable. + """ + @typing.final class RustPforCodec: r""" diff --git a/python/omfiles/dask.py b/python/omfiles/dask.py index ffcc303..4d94398 100644 --- a/python/omfiles/dask.py +++ b/python/omfiles/dask.py @@ -5,7 +5,7 @@ import numpy as np -from omfiles._rust import OmFileWriter, OmVariable +from omfiles._rust import OmFileWriter, OmWriterVariable try: import dask.array as da @@ -78,8 +78,8 @@ def write_dask_array( add_offset: float = 0.0, compression: str = "pfor_delta_2d", name: str = "data", - children: Optional[Sequence[OmVariable]] = None, -) -> OmVariable: + children: Optional[Sequence[OmWriterVariable]] = None, +) -> OmWriterVariable: """ Write a dask array to an OM file using streaming/incremental writes. @@ -111,7 +111,7 @@ def write_dask_array( children: Child variables (default: None). Returns: - OmVariable representing the written array. + OmWriterVariable representing the written array. Raises: TypeError: If data is not a dask array. diff --git a/src/hierarchy.rs b/src/hierarchy.rs index 05aaedd..94fa6f0 100644 --- a/src/hierarchy.rs +++ b/src/hierarchy.rs @@ -46,3 +46,15 @@ impl Into for OmVariable { } } } + +#[gen_stub_pyclass] +#[pyclass(from_py_object)] +#[derive(Clone)] +/// Represents a variable handle during writing. +pub(crate) struct OmWriterVariable { + #[pyo3(get)] + /// The name of the variable. + pub name: String, + pub(crate) writer_id: u64, + pub(crate) variable_id: u64, +} diff --git a/src/lib.rs b/src/lib.rs index 637ffdc..3f5b793 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ fn _rust(m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(cpu_info::_check_cpu_features, m)?)?; @@ -34,6 +35,7 @@ pyo3_stub_gen::reexport_module_members!( "OmFileReader", "OmFileReaderAsync", "OmFileWriter", - "OmVariable" + "OmVariable", + "OmWriterVariable" ); define_stub_info_gatherer!(stub_info); diff --git a/src/reader.rs b/src/reader.rs index b2c1934..f57ab86 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -185,7 +185,7 @@ impl OmFileReader { /// Returns: /// OmFileReader: OmFileReader instance. #[staticmethod] - fn from_path(file_path: &str) -> PyResult { + pub(crate) fn from_path(file_path: &str) -> PyResult { let file_handle = File::open(file_path) .map_err(|e| PyErr::new::(e.to_string()))?; let backend = @@ -202,7 +202,7 @@ impl OmFileReader { /// Returns: /// OmFileReader: A new reader instance. #[staticmethod] - fn from_fsspec(fs_obj: Py, path: String) -> PyResult { + pub(crate) fn from_fsspec(fs_obj: Py, path: String) -> PyResult { Python::attach(|py| { let bound_object = fs_obj.bind(py); diff --git a/src/writer.rs b/src/writer.rs index af28db3..263d57c 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1,6 +1,6 @@ use crate::{ compression::PyCompressionType, errors::convert_omfilesrs_error, - fsspec_backend::FsSpecWriterBackend, hierarchy::OmVariable, + fsspec_backend::FsSpecWriterBackend, hierarchy::OmWriterVariable, }; use delegate::delegate; use numpy::{ @@ -9,7 +9,7 @@ use numpy::{ }; use omfiles_rs::{ traits::{OmFileArrayDataType, OmFileScalarDataType, OmFileWriterBackend}, - writer::{OmFileWriter as OmFileWriterRs, OmFileWriterArray}, + writer::{OmFileWriter as OmFileWriterRs, OmFileWriterArray, OmFileWriterArrayFinalized}, OmCompressionType, OmFilesError, OmOffsetSize, }; use pyo3::{ @@ -19,16 +19,25 @@ use pyo3::{ }; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use std::{ + collections::HashMap, fs::File, - sync::{Mutex, PoisonError}, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Mutex, PoisonError, + }, }; -/// Helper to convert OmOffsetSize to OmVariable -fn to_variable(name: &str, os: OmOffsetSize) -> OmVariable { - OmVariable { +static NEXT_WRITER_ID: AtomicU64 = AtomicU64::new(1); + +fn next_writer_id() -> u64 { + NEXT_WRITER_ID.fetch_add(1, Ordering::Relaxed) +} + +fn to_writer_variable(name: &str, writer_id: u64, variable_id: u64) -> OmWriterVariable { + OmWriterVariable { name: name.to_string(), - offset: os.offset, - size: os.size, + writer_id, + variable_id, } } @@ -118,7 +127,7 @@ impl<'a, 'py> Feeder<'a, 'py> { /// Resolved parameters shared by both `write_array` and `write_array_streaming`. struct WriteArrayParams<'a> { name: &'a str, - children: Vec, + children: Vec, scale_factor: f32, add_offset: f32, compression: OmCompressionType, @@ -127,22 +136,18 @@ struct WriteArrayParams<'a> { impl<'a> WriteArrayParams<'a> { fn from_options( name: Option<&'a str>, - children: Option>, + children: Option>, scale_factor: Option, add_offset: Option, compression: Option<&str>, ) -> PyResult { Ok(Self { name: name.unwrap_or("data"), - children: children - .unwrap_or_default() - .iter() - .map(Into::into) - .collect(), + children: children.unwrap_or_default(), scale_factor: scale_factor.unwrap_or(1.0), add_offset: add_offset.unwrap_or(0.0), compression: compression - .map(|s| PyCompressionType::from_str(s)) + .map(PyCompressionType::from_str) .transpose()? .unwrap_or(PyCompressionType::PforDelta2d) .to_omfilesrs(), @@ -150,11 +155,270 @@ impl<'a> WriteArrayParams<'a> { } } +enum DeferredVariableKind { + Scalar { + value: DeferredScalarValue, + children: Vec, + }, + Array { + array: DeferredArrayValue, + children: Vec, + }, + Group { + children: Vec, + }, +} + +#[derive(Clone)] +enum DeferredScalarValue { + String(String), + Float64(f64), + Float32(f32), + Int64(i64), + Int32(i32), + Int16(i16), + Int8(i8), + Uint64(u64), + Uint32(u32), + Uint16(u16), + Uint8(u8), +} + +enum DeferredArrayValue { + Float32(Option), + Float64(Option), + Int8(Option), + Uint8(Option), + Int16(Option), + Uint16(Option), + Int32(Option), + Uint32(Option), + Int64(Option), + Uint64(Option), +} + +impl DeferredArrayValue { + fn take_finalized(&mut self) -> Result { + match self { + Self::Float32(v) => v.take(), + Self::Float64(v) => v.take(), + Self::Int8(v) => v.take(), + Self::Uint8(v) => v.take(), + Self::Int16(v) => v.take(), + Self::Uint16(v) => v.take(), + Self::Int32(v) => v.take(), + Self::Uint32(v) => v.take(), + Self::Int64(v) => v.take(), + Self::Uint64(v) => v.take(), + } + .ok_or_else(|| { + OmFilesError::GenericError("Deferred array metadata was already consumed".to_string()) + }) + } +} + +struct DeferredVariable { + name: String, + kind: DeferredVariableKind, + resolved: Option, +} + +struct WriterState { + writer: OmFileWriterRs, + next_variable_id: u64, + variables: HashMap, +} + +impl WriterState { + fn new(writer: OmFileWriterRs) -> Self { + Self { + writer, + next_variable_id: 1, + variables: HashMap::new(), + } + } + + fn allocate_variable_id(&mut self) -> u64 { + let variable_id = self.next_variable_id; + self.next_variable_id += 1; + variable_id + } + + fn register_resolved( + &mut self, + name: &str, + kind: DeferredVariableKind, + offset_size: OmOffsetSize, + ) -> u64 { + let variable_id = self.allocate_variable_id(); + self.variables.insert( + variable_id, + DeferredVariable { + name: name.to_string(), + kind, + resolved: Some(offset_size), + }, + ); + variable_id + } + + fn register_deferred(&mut self, name: &str, kind: DeferredVariableKind) -> u64 { + let variable_id = self.allocate_variable_id(); + self.variables.insert( + variable_id, + DeferredVariable { + name: name.to_string(), + kind, + resolved: None, + }, + ); + variable_id + } + + fn ensure_children_exist(&self, children: &[u64]) -> Result<(), OmFilesError> { + for child in children { + if !self.variables.contains_key(child) { + return Err(OmFilesError::GenericError(format!( + "Unknown child variable id {}", + child + ))); + } + } + Ok(()) + } + + fn resolved_children_inline( + &self, + children: &[u64], + ) -> Result, OmFilesError> { + let mut resolved = Vec::with_capacity(children.len()); + for child in children { + let variable = self.variables.get(child).ok_or_else(|| { + OmFilesError::GenericError(format!("Unknown child variable id {}", child)) + })?; + let offset_size = variable.resolved.clone().ok_or_else(|| { + OmFilesError::GenericError(format!( + "Child variable '{}' is not yet resolved for inline metadata placement", + variable.name + )) + })?; + resolved.push(offset_size); + } + Ok(resolved) + } + + fn resolve_variable( + &mut self, + variable_id: u64, + resolving: &mut Vec, + ) -> Result { + let Some(variable) = self.variables.get(&variable_id) else { + return Err(OmFilesError::GenericError(format!( + "Unknown variable id {}", + variable_id + ))); + }; + + if let Some(offset_size) = &variable.resolved { + return Ok(offset_size.clone()); + } + + if resolving.contains(&variable_id) { + return Err(OmFilesError::GenericError( + "Cycle detected in deferred variable hierarchy".to_string(), + )); + } + + resolving.push(variable_id); + + let child_ids = { + let variable = self.variables.get(&variable_id).ok_or_else(|| { + OmFilesError::GenericError(format!("Unknown variable id {}", variable_id)) + })?; + match &variable.kind { + DeferredVariableKind::Scalar { children, .. } => children.clone(), + DeferredVariableKind::Array { children, .. } => children.clone(), + DeferredVariableKind::Group { children } => children.clone(), + } + }; + + let mut resolved_children = Vec::with_capacity(child_ids.len()); + for child_id in child_ids { + resolved_children.push(self.resolve_variable(child_id, resolving)?); + } + + let resolved = { + let variable = self.variables.get_mut(&variable_id).ok_or_else(|| { + OmFilesError::GenericError(format!("Unknown variable id {}", variable_id)) + })?; + let name = variable.name.clone(); + + match &mut variable.kind { + DeferredVariableKind::Scalar { value, .. } => match value { + DeferredScalarValue::String(v) => { + self.writer + .write_scalar(v.clone(), &name, &resolved_children)? + } + DeferredScalarValue::Float64(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Float32(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Int64(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Int32(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Int16(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Int8(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Uint64(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Uint32(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Uint16(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + DeferredScalarValue::Uint8(v) => { + self.writer.write_scalar(*v, &name, &resolved_children)? + } + }, + DeferredVariableKind::Array { array, .. } => { + let finalized = array.take_finalized()?; + self.writer + .write_array(finalized, &name, &resolved_children)? + } + DeferredVariableKind::Group { .. } => { + self.writer.write_none(&name, &resolved_children)? + } + } + }; + + let variable = self.variables.get_mut(&variable_id).ok_or_else(|| { + OmFilesError::GenericError(format!("Unknown variable id {}", variable_id)) + })?; + variable.resolved = Some(resolved.clone()); + + resolving.pop(); + Ok(resolved) + } +} + /// A Python wrapper for the Rust OmFileWriter implementation. #[gen_stub_pyclass] #[pyclass] pub struct OmFileWriter { - writer: Mutex>>, + writer: Mutex>, + writer_id: u64, + deferred_tail_metadata: bool, + explicitly_closed: AtomicBool, } impl OmFileWriter { @@ -183,14 +447,46 @@ impl OmFileWriter { PyErr::new::(format!("Unsupported scalar data type: {}", type_name)) } - /// Helper method for safe writer access. - fn with_writer(&self, f: F) -> PyResult + fn invalid_metadata_placement_error(value: &str) -> PyErr { + PyErr::new::(format!("Unsupported metadata placement: {}", value)) + } + + fn validate_metadata_placement(metadata_placement: Option<&str>) -> PyResult { + let placement = metadata_placement.unwrap_or("tail"); + match placement { + "inline" | "tail" => Ok(placement.to_string()), + other => Err(Self::invalid_metadata_placement_error(other)), + } + } + + fn validate_writer_variable(&self, variable: &OmWriterVariable) -> PyResult<()> { + if variable.writer_id != self.writer_id { + return Err(PyErr::new::( + "Variable handle belongs to a different writer", + )); + } + Ok(()) + } + + fn validate_writer_variables(&self, variables: &[OmWriterVariable]) -> PyResult<()> { + for variable in variables { + self.validate_writer_variable(variable)?; + } + Ok(()) + } + + fn child_ids(&self, children: &[OmWriterVariable]) -> PyResult> { + self.validate_writer_variables(children)?; + Ok(children.iter().map(|child| child.variable_id).collect()) + } + + fn with_state(&self, f: F) -> PyResult where - F: FnOnce(&mut OmFileWriterRs) -> PyResult, + F: FnOnce(&mut WriterState) -> PyResult, { - let mut guard = self.writer.lock().map_err(|e| Self::lock_error(e))?; + let mut guard = self.writer.lock().map_err(Self::lock_error)?; match guard.as_mut() { - Some(writer) => f(writer), + Some(state) => f(state), None => Err(Self::closed_error()), } } @@ -207,56 +503,118 @@ impl OmFileWriter { chunks: Vec, params: &WriteArrayParams<'_>, feeder: Feeder<'_, '_>, - ) -> PyResult { - self.with_writer(|writer| { + ) -> PyResult { + let child_ids = self.child_ids(¶ms.children)?; + self.with_state(|state| { + state + .ensure_children_exist(&child_ids) + .map_err(convert_omfilesrs_error)?; + macro_rules! dispatch { - ($($variant:ident => $T:ty),+ $(,)?) => { + ($($variant:ident => $T:ty => $wrap:ident),+ $(,)?) => { match element_type { $(OmElementType::$variant => { - let mut w = writer + let mut w = state + .writer .prepare_array::<$T>( - dimensions, chunks, params.compression, - params.scale_factor, params.add_offset, + dimensions, + chunks, + params.compression, + params.scale_factor, + params.add_offset, ) .map_err(convert_omfilesrs_error)?; feeder.feed::<$T>(&mut w)?; - w.finalize() + let finalized = w.finalize(); + if self.deferred_tail_metadata { + let variable_id = state.register_deferred( + params.name, + DeferredVariableKind::Array { + array: DeferredArrayValue::$wrap(Some(finalized)), + children: child_ids, + }, + ); + Ok(to_writer_variable(params.name, self.writer_id, variable_id)) + } else { + let resolved_children = state + .resolved_children_inline(&child_ids) + .map_err(convert_omfilesrs_error)?; + let offset_size = state + .writer + .write_array(finalized, params.name, &resolved_children) + .map_err(convert_omfilesrs_error)?; + let variable_id = state.register_resolved( + params.name, + DeferredVariableKind::Array { + array: DeferredArrayValue::$wrap(None), + children: child_ids, + }, + offset_size, + ); + Ok(to_writer_variable(params.name, self.writer_id, variable_id)) + } }),+ } }; } - let array_meta = dispatch! { - Float32 => f32, - Float64 => f64, - Int8 => i8, - Uint8 => u8, - Int16 => i16, - Uint16 => u16, - Int32 => i32, - Uint32 => u32, - Int64 => i64, - Uint64 => u64, - }; - - writer - .write_array(array_meta, params.name, ¶ms.children) - .map_err(convert_omfilesrs_error) - .map(|os| to_variable(params.name, os)) + dispatch! { + Float32 => f32 => Float32, + Float64 => f64 => Float64, + Int8 => i8 => Int8, + Uint8 => u8 => Uint8, + Int16 => i16 => Int16, + Uint16 => u16 => Uint16, + Int32 => i32 => Int32, + Uint32 => u32 => Uint32, + Int64 => i64 => Int64, + Uint64 => u64 => Uint64 + } }) } + /// Store a scalar immediately for inline metadata placement, or defer its + /// metadata emission until close-time for tail metadata placement. fn store_scalar( &self, value: T, name: &str, - children: &[OmOffsetSize], - ) -> PyResult { - self.with_writer(|writer| { - writer - .write_scalar(value, name, children) - .map_err(convert_omfilesrs_error) - .map(|os| to_variable(name, os)) + children: &[OmWriterVariable], + deferred_value: DeferredScalarValue, + ) -> PyResult { + let child_ids = self.child_ids(children)?; + self.with_state(|state| { + state + .ensure_children_exist(&child_ids) + .map_err(convert_omfilesrs_error)?; + + if self.deferred_tail_metadata { + let variable_id = state.register_deferred( + name, + DeferredVariableKind::Scalar { + value: deferred_value, + children: child_ids, + }, + ); + Ok(to_writer_variable(name, self.writer_id, variable_id)) + } else { + let resolved_children = state + .resolved_children_inline(&child_ids) + .map_err(convert_omfilesrs_error)?; + let offset_size = state + .writer + .write_scalar(value, name, &resolved_children) + .map_err(convert_omfilesrs_error)?; + let variable_id = state.register_resolved( + name, + DeferredVariableKind::Scalar { + value: deferred_value, + children: child_ids, + }, + offset_size, + ); + Ok(to_writer_variable(name, self.writer_id, variable_id)) + } }) } } @@ -268,24 +626,37 @@ impl OmFileWriter { /// /// Args: /// file_path: Path where the .om file will be created + /// metadata_placement: (optional) Where to emit metadata; either "inline" to write + /// metadata entries immediately, or "tail" to defer emission + /// until close() so metadata is consolidated near the end of + /// the file (default: "tail"). #[new] - fn new(file_path: &str) -> PyResult { - Self::at_path(file_path) + #[pyo3(signature = (file_path, metadata_placement=None))] + fn new(file_path: &str, metadata_placement: Option<&str>) -> PyResult { + Self::at_path(file_path, metadata_placement) } /// Initialize an OmFileWriter to write to a file at the specified path. /// /// Args: /// path: Path where the .om file will be created + /// metadata_placement: (optional) Where to emit metadata; either "inline" or + /// "tail" (see description in `__new__`). /// /// Returns: /// OmFileWriter: A new writer instance #[staticmethod] - fn at_path(path: &str) -> PyResult { + #[pyo3(signature = (path, metadata_placement=None))] + fn at_path(path: &str, metadata_placement: Option<&str>) -> PyResult { + let metadata_placement = Self::validate_metadata_placement(metadata_placement)?; + let deferred_tail_metadata = metadata_placement == "tail"; let file_handle = WriterBackendImpl::File(File::create(path)?); let writer = OmFileWriterRs::new(file_handle, 8 * 1024); Ok(Self { - writer: Mutex::new(Some(writer)), + writer: Mutex::new(Some(WriterState::new(writer))), + writer_id: next_writer_id(), + deferred_tail_metadata, + explicitly_closed: AtomicBool::new(false), }) } @@ -294,50 +665,87 @@ impl OmFileWriter { /// Args: /// fs_obj: A fsspec filesystem object that supports write operations /// path: The path to the file within the file system + /// metadata_placement: (optional) Where to emit metadata; either "inline" or + /// "tail" (see description in `__new__`). /// /// Returns: /// OmFileWriter: A new writer instance #[staticmethod] - fn from_fsspec(fs_obj: Py, path: String) -> PyResult { + #[pyo3(signature = (fs_obj, path, metadata_placement=None))] + fn from_fsspec( + fs_obj: Py, + path: String, + metadata_placement: Option<&str>, + ) -> PyResult { + let metadata_placement = Self::validate_metadata_placement(metadata_placement)?; + let deferred_tail_metadata = metadata_placement == "tail"; let fsspec_backend = WriterBackendImpl::FsSpec(FsSpecWriterBackend::new(fs_obj, path)?); let writer = OmFileWriterRs::new(fsspec_backend, 8 * 1024); Ok(Self { - writer: Mutex::new(Some(writer)), + writer: Mutex::new(Some(WriterState::new(writer))), + writer_id: next_writer_id(), + deferred_tail_metadata, + explicitly_closed: AtomicBool::new(false), }) } - /// Finalize and close the .om file by writing the trailer with the root variable. + /// Finalize and close the .om file by writing the trailer with the resolved root variable. + /// + /// In ``metadata_placement="tail"`` mode, metadata for arrays, scalars, and + /// groups is resolved and emitted during ``close()`` so that metadata is + /// consolidated near the end of the file. In ``metadata_placement="inline"`` + /// mode, metadata is written immediately and child handles must already refer + /// to resolved variables from the same writer. /// /// Args: - /// root_variable (:py:data:`omfiles.OmVariable`): The OmVariable that serves as the root/entry point of the file hierarchy. - /// All other variables should be accessible through this root variable. + /// root_variable (:py:data:`omfiles.OmWriterVariable`): The writer handle + /// that serves as the root/entry point of the file hierarchy. /// /// Returns: /// None on success. /// /// Raises: - /// ValueError: If the writer has already been closed - /// RuntimeError: If a thread lock error occurs or if there's an error writing to the file - fn close(&mut self, root_variable: OmVariable) -> PyResult<()> { - let mut guard = self.writer.lock().map_err(|e| Self::lock_error(e))?; - - if let Some(writer) = guard.as_mut() { - writer - .write_trailer(root_variable.into()) - .map_err(convert_omfilesrs_error)?; - // Take ownership and drop to ensure proper file closure - guard.take(); - } else { + /// ValueError: If the writer has already been closed or the handle belongs + /// to a different writer. + /// RuntimeError: If there is an error resolving deferred metadata or + /// writing the trailer. + fn close(&mut self, root_variable: OmWriterVariable) -> PyResult<()> { + self.validate_writer_variable(&root_variable)?; + let mut guard = self.writer.lock().map_err(Self::lock_error)?; + + let Some(state) = guard.as_mut() else { return Err(Self::closed_error()); - } + }; + let root_offset_size = if self.deferred_tail_metadata { + let mut resolving = Vec::new(); + state + .resolve_variable(root_variable.variable_id, &mut resolving) + .map_err(convert_omfilesrs_error)? + } else { + let variable = state + .variables + .get(&root_variable.variable_id) + .ok_or_else(|| PyErr::new::("Unknown root variable handle"))?; + variable + .resolved + .clone() + .ok_or_else(|| PyErr::new::("Root variable was not resolved"))? + }; + + state + .writer + .write_trailer(root_offset_size) + .map_err(convert_omfilesrs_error)?; + guard.take(); + self.explicitly_closed.store(true, Ordering::Relaxed); Ok(()) } /// Check if the writer is closed. #[getter] fn closed(&self) -> PyResult { - let guard = self.writer.lock().map_err(|e| Self::lock_error(e))?; + let guard = self.writer.lock().map_err(Self::lock_error)?; Ok(guard.is_none()) } @@ -360,8 +768,14 @@ impl OmFileWriter { /// name: Name of the variable to be written (default: "data") /// children: List of child variables (default: []) /// + /// ``write_array`` returns an :py:data:`omfiles.OmWriterVariable`, which is a + /// write-time handle used to build hierarchy relationships and to select the + /// root variable passed to ``close()``. It is not the same as + /// :py:data:`omfiles.OmVariable`, which represents already-materialized + /// metadata when reading. + /// /// Returns: - /// :py:data:`omfiles.OmVariable` representing the written group in the file structure + /// :py:data:`omfiles.OmWriterVariable` representing the written group in the file structure /// /// Raises: /// ValueError: If the data type is unsupported or if parameters are invalid @@ -377,8 +791,8 @@ impl OmFileWriter { add_offset: Option, compression: Option<&str>, name: Option<&str>, - children: Option>, - ) -> PyResult { + children: Option>, + ) -> PyResult { let params = WriteArrayParams::from_options(name, children, scale_factor, add_offset, compression)?; let element_type = OmElementType::from_numpy_dtype(&data.dtype())?; @@ -409,7 +823,7 @@ impl OmFileWriter { /// children: List of child variables (default: []) /// /// Returns: - /// :py:data:`omfiles.OmVariable` representing the written array in the file structure + /// :py:data:`omfiles.OmWriterVariable` representing the written array in the file structure /// /// Raises: /// ValueError: If the dtype is unsupported or parameters are invalid @@ -430,8 +844,8 @@ impl OmFileWriter { add_offset: Option, compression: Option<&str>, name: Option<&str>, - children: Option>, - ) -> PyResult { + children: Option>, + ) -> PyResult { let params = WriteArrayParams::from_options(name, children, scale_factor, add_offset, compression)?; let element_type = OmElementType::from_numpy_dtype(dtype)?; @@ -449,8 +863,13 @@ impl OmFileWriter { /// name: Name of the scalar variable /// children: List of child variables (default: None) /// + /// Child handles must come from the same writer. In ``metadata_placement="inline"`` + /// mode they must already be resolved because metadata is emitted immediately. + /// In ``metadata_placement="tail"`` mode they may be resolved later during + /// ``close()``. + /// /// Returns: - /// :py:data:`omfiles.OmVariable` representing the written scalar in the file structure + /// :py:data:`omfiles.OmWriterVariable` representing the written group in the file structure /// /// Raises: /// ValueError: If the value type is unsupported (e.g., booleans) @@ -463,23 +882,22 @@ impl OmFileWriter { &mut self, value: &Bound, name: &str, - children: Option>, - ) -> PyResult { - let children: Vec = children - .unwrap_or_default() - .iter() - .map(Into::into) - .collect(); - + children: Option>, + ) -> PyResult { + let children = children.unwrap_or_default(); let py = value.py(); - // make an instance check against numpy scalar types macro_rules! check_numpy_type { - ($numpy:expr, $type_name:literal, $rust_type:ty) => { + ($numpy:expr, $type_name:literal, $rust_type:ty, $variant:ident) => { if let Ok(numpy_type) = $numpy.getattr($type_name) { if value.is_instance(&numpy_type)? { let scalar_value: $rust_type = value.call_method0("item")?.extract()?; - return self.store_scalar(scalar_value, name, &children); + return self.store_scalar( + scalar_value, + name, + &children, + DeferredScalarValue::$variant(scalar_value), + ); } } }; @@ -487,68 +905,102 @@ impl OmFileWriter { // Try to import numpy and check for numpy scalar types if let Ok(numpy) = py.import("numpy") { - check_numpy_type!(numpy, "int8", i8); - check_numpy_type!(numpy, "uint8", u8); - check_numpy_type!(numpy, "int16", i16); - check_numpy_type!(numpy, "uint16", u16); - check_numpy_type!(numpy, "int32", i32); - check_numpy_type!(numpy, "uint32", u32); - check_numpy_type!(numpy, "int64", i64); - check_numpy_type!(numpy, "uint64", u64); - check_numpy_type!(numpy, "float32", f32); - check_numpy_type!(numpy, "float64", f64); + check_numpy_type!(numpy, "int8", i8, Int8); + check_numpy_type!(numpy, "uint8", u8, Uint8); + check_numpy_type!(numpy, "int16", i16, Int16); + check_numpy_type!(numpy, "uint16", u16, Uint16); + check_numpy_type!(numpy, "int32", i32, Int32); + check_numpy_type!(numpy, "uint32", u32, Uint32); + check_numpy_type!(numpy, "int64", i64, Int64); + check_numpy_type!(numpy, "uint64", u64, Uint64); + check_numpy_type!(numpy, "float32", f32, Float32); + check_numpy_type!(numpy, "float64", f64, Float64); } - // Fall back to Python built-in types - let result = if let Ok(_value) = value.extract::() { - self.store_scalar(value.to_string(), name, &children)? + if let Ok(value) = value.extract::() { + self.store_scalar( + value.clone(), + name, + &children, + DeferredScalarValue::String(value), + ) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Float64(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Float32(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Int64(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Int32(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Int16(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Int8(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Uint64(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Uint32(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Uint16(value)) } else if let Ok(value) = value.extract::() { - self.store_scalar(value, name, &children)? + self.store_scalar(value, name, &children, DeferredScalarValue::Uint8(value)) } else { - return Err(Self::unsupported_scalar_type_error(value.get_type())); - }; - Ok(result) + Err(Self::unsupported_scalar_type_error(value.get_type())) + } } /// Create a new group in the .om file. /// - /// This is essentially a variable with no data, which serves as a container for other variables. + /// This is essentially a variable with no data, which serves as a container + /// for other variables. /// /// Args: /// name: Name of the group - /// children: List of child variables + /// children: List of child variables from the same writer /// /// Returns: - /// :py:data:`omfiles.OmVariable` representing the written group in the file structure + /// :py:data:`omfiles.OmWriterVariable` representing the written group in the file structure /// /// Raises: - /// RuntimeError: If there's an error writing to the file - fn write_group(&mut self, name: &str, children: Vec) -> PyResult { - let children: Vec = children.iter().map(Into::into).collect(); - - self.with_writer(|writer| { - writer - .write_none(name, &children) - .map_err(convert_omfilesrs_error) - .map(|os| to_variable(name, os)) + /// ValueError: If a child handle belongs to a different writer + /// RuntimeError: If inline metadata placement is requested before child + /// metadata has been resolved, or if there is an I/O error + fn write_group( + &mut self, + name: &str, + children: Vec, + ) -> PyResult { + let child_ids = self.child_ids(&children)?; + self.with_state(|state| { + state + .ensure_children_exist(&child_ids) + .map_err(convert_omfilesrs_error)?; + + if self.deferred_tail_metadata { + let variable_id = state.register_deferred( + name, + DeferredVariableKind::Group { + children: child_ids, + }, + ); + Ok(to_writer_variable(name, self.writer_id, variable_id)) + } else { + let resolved_children = state + .resolved_children_inline(&child_ids) + .map_err(convert_omfilesrs_error)?; + let offset_size = state + .writer + .write_none(name, &resolved_children) + .map_err(convert_omfilesrs_error)?; + let variable_id = state.register_resolved( + name, + DeferredVariableKind::Group { + children: child_ids, + }, + offset_size, + ); + Ok(to_writer_variable(name, self.writer_id, variable_id)) + } }) } } @@ -556,6 +1008,11 @@ impl OmFileWriter { impl Drop for OmFileWriter { fn drop(&mut self) { if let Ok(mut guard) = self.writer.lock() { + if guard.is_some() && !self.explicitly_closed.load(Ordering::Relaxed) { + eprintln!( + "Warning: OmFileWriter was dropped without calling close(); the OM file may be incomplete" + ); + } guard.take(); } } @@ -581,6 +1038,8 @@ impl OmFileWriterBackend for WriterBackendImpl { #[cfg(test)] mod tests { + use crate::reader::OmFileReader; + use super::*; use numpy::{ndarray::ArrayD, PyArrayDyn, PyArrayMethods}; use std::fs; @@ -601,7 +1060,7 @@ mod tests { } // Test parameters - let file_path = "test_data.om"; + let file_path = "test_write_array.om"; let dimensions = vec![10, 20]; let chunks = vec![5u64, 5]; @@ -609,7 +1068,7 @@ mod tests { let data = ArrayD::from_shape_fn(dimensions, |idx| (idx[0] + idx[1]) as f32); let py_array = PyArrayDyn::from_array(py, &data); - let mut file_writer = OmFileWriter::new(file_path).unwrap(); + let mut file_writer = OmFileWriter::new(file_path, None).unwrap(); // Write data let result = file_writer.write_array( @@ -623,7 +1082,12 @@ mod tests { ); assert!(result.is_ok()); - assert!(fs::metadata(file_path).is_ok()); + + let root = result.unwrap(); + file_writer.close(root)?; + + let reader = OmFileReader::from_path(file_path); + assert!(reader.is_ok()); // Clean up fs::remove_file(file_path).unwrap(); @@ -640,12 +1104,58 @@ mod tests { Python::attach(|py| -> Result<(), Box> { let fsspec = py.import("fsspec")?; let fs = fsspec.call_method1("filesystem", ("memory",))?; + let fs_py_any: Py = fs.into(); + + let file_path = "test_fsspec_writer.om"; - let _writer = OmFileWriter::from_fsspec(fs.into(), "test_file.om".to_string())?; + let mut writer = + OmFileWriter::from_fsspec(fs_py_any.clone_ref(py), file_path.to_string(), None)?; + let value = 0i32.into_pyobject(py)?; + let root = writer.write_scalar(&value, "zero_root", None)?; + writer.close(root)?; + + let reader = OmFileReader::from_fsspec(fs_py_any, file_path.to_string()); + assert!(reader.is_ok()); Ok(()) })?; Ok(()) } + + #[test] + fn test_resolve_variable_detects_cycle() { + let file_path = "test_cycle_detection.om"; + let file_handle = WriterBackendImpl::File(File::create(file_path).unwrap()); + let writer = OmFileWriterRs::new(file_handle, 8 * 1024); + let mut state = WriterState::new(writer); + + let child_id = state.register_deferred( + "child", + DeferredVariableKind::Group { + children: Vec::new(), + }, + ); + let root_id = state.register_deferred( + "root", + DeferredVariableKind::Group { + children: vec![child_id], + }, + ); + + if let Some(variable) = state.variables.get_mut(&child_id) { + variable.kind = DeferredVariableKind::Group { + children: vec![root_id], + }; + } + + let err = state + .resolve_variable(root_id, &mut Vec::new()) + .unwrap_err(); + assert!( + matches!(err, OmFilesError::GenericError(message) if message == "Cycle detected in deferred variable hierarchy") + ); + + let _ = fs::remove_file(file_path); + } } diff --git a/tests/test_read_write.py b/tests/test_read_write.py index 41504d9..688c7a7 100644 --- a/tests/test_read_write.py +++ b/tests/test_read_write.py @@ -180,6 +180,168 @@ def test_write_hierarchical_file(empty_temp_om_file): metadata_reader3.close() +def test_write_tail_metadata_at_end_of_file(empty_temp_om_file): + child_data = np.arange(4, dtype=np.float32).reshape(2, 2) + root_data = np.arange(16, dtype=np.float32).reshape(4, 4) + + writer = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="tail") + child_var = writer.write_array(child_data, chunks=[1, 1], name="child", scale_factor=10000.0) + metadata_var = writer.write_scalar(np.int32(7), name="metadata") + root_var = writer.write_array( + root_data, + chunks=[2, 2], + name="root", + scale_factor=10000.0, + children=[child_var, metadata_var], + ) + writer.close(root_var) + + with open(empty_temp_om_file, "rb") as f: + file_bytes = f.read() + + trailer_size = 20 + trailer = file_bytes[-trailer_size:] + metadata_region = file_bytes[:-trailer_size] + + child_name = b"child" + scalar_name = b"metadata" + root_name = b"root" + + child_pos = metadata_region.rfind(child_name) + scalar_pos = metadata_region.rfind(scalar_name) + root_pos = metadata_region.rfind(root_name) + + assert child_pos != -1 + assert scalar_pos != -1 + assert root_pos != -1 + + assert child_pos < scalar_pos < root_pos + + metadata_start = min(child_pos, scalar_pos, root_pos) + metadata_tail = metadata_region[metadata_start:] + + assert child_name in metadata_tail + assert scalar_name in metadata_tail + assert root_name in metadata_tail + + assert trailer != b"\x00" * trailer_size + assert trailer not in metadata_region + + +def test_write_inline_metadata_preserves_hierarchy(empty_temp_om_file): + child_data = np.arange(4, dtype=np.float32).reshape(2, 2) + root_data = np.arange(16, dtype=np.float32).reshape(4, 4) + + writer = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="inline") + child_var = writer.write_array(child_data, chunks=[1, 1], name="child", scale_factor=10000.0) + metadata_var = writer.write_scalar(np.int32(7), name="metadata") + root_var = writer.write_array( + root_data, + chunks=[2, 2], + name="root", + scale_factor=10000.0, + children=[child_var, metadata_var], + ) + writer.close(root_var) + + reader = omfiles.OmFileReader(empty_temp_om_file) + child_metadata = reader._get_flat_variable_metadata() + + child_reader = reader._init_from_variable(child_metadata["/root/child"]) + metadata_reader = reader._init_from_variable(child_metadata["/root/metadata"]) + + np.testing.assert_array_almost_equal(child_reader[:], child_data, decimal=4) + assert metadata_reader.read_scalar() == np.int32(7) + + metadata_reader.close() + child_reader.close() + reader.close() + + +def test_write_inline_group_preserves_children(empty_temp_om_file): + child1_data = np.arange(4, dtype=np.float32).reshape(2, 2) + child2_data = np.arange(6, dtype=np.float32).reshape(2, 3) + + writer = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="inline") + child1_var = writer.write_array(child1_data, chunks=[1, 1], name="child1", scale_factor=10000.0) + child2_var = writer.write_array(child2_data, chunks=[1, 3], name="child2", scale_factor=10000.0) + group_var = writer.write_group("root_group", children=[child1_var, child2_var]) + writer.close(group_var) + + reader = omfiles.OmFileReader(empty_temp_om_file) + assert reader.num_children == 2 + + child1_reader = reader.get_child_by_name("child1") + child2_reader = reader.get_child_by_name("child2") + + np.testing.assert_array_almost_equal(child1_reader[:], child1_data, decimal=4) + np.testing.assert_array_almost_equal(child2_reader[:], child2_data, decimal=4) + + child1_reader.close() + child2_reader.close() + reader.close() + + +def test_invalid_child_handle_from_different_writer_raises(empty_temp_om_file, empty_temp_om_file_2): + writer1 = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="tail") + writer2 = omfiles.OmFileWriter(empty_temp_om_file_2, metadata_placement="tail") + + foreign_child = writer2.write_scalar(np.int32(5), name="foreign") + + with pytest.raises(ValueError, match="different writer"): + writer1.write_group("root", children=[foreign_child]) + + +def test_invalid_root_handle_from_different_writer_raises(empty_temp_om_file, empty_temp_om_file_2): + writer1 = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="tail") + writer2 = omfiles.OmFileWriter(empty_temp_om_file_2, metadata_placement="tail") + + root_var = writer2.write_scalar(np.int32(5), name="root") + + with pytest.raises(ValueError, match="different writer"): + writer1.close(root_var) + + +def test_inline_mode_allows_resolved_child_order(empty_temp_om_file): + data = np.arange(4, dtype=np.float32).reshape(2, 2) + + writer = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="inline") + child_var = writer.write_array(data, chunks=[1, 1], name="child", scale_factor=10000.0) + root_var = writer.write_group("root", children=[child_var]) + writer.close(root_var) + + reader = omfiles.OmFileReader(empty_temp_om_file) + child_reader = reader.get_child_by_name("child") + np.testing.assert_array_almost_equal(child_reader[:], data, decimal=4) + child_reader.close() + reader.close() + + +def test_close_after_failed_close_still_allows_retry(empty_temp_om_file, empty_temp_om_file_2): + writer = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="tail") + foreign_writer = omfiles.OmFileWriter(empty_temp_om_file_2, metadata_placement="tail") + + valid_root = writer.write_scalar(np.int32(1), name="root") + foreign_root = foreign_writer.write_scalar(np.int32(2), name="foreign_root") + + with pytest.raises(ValueError, match="different writer"): + writer.close(foreign_root) + + writer.close(valid_root) + assert writer.closed + + +def test_drop_without_close_warns(empty_temp_om_file, capfd): + writer = omfiles.OmFileWriter(empty_temp_om_file, metadata_placement="tail") + _ = writer.write_scalar(np.int32(1), name="root") + del writer + gc.collect() + + captured = capfd.readouterr() + combined_output = f"{captured.out}\n{captured.err}".lower() + assert "warning: omfilewriter was dropped without calling close(); the om file may be incomplete" in combined_output + + @pytest.mark.asyncio async def test_read_async(temp_om_file): with await omfiles.OmFileReaderAsync.from_path(temp_om_file) as reader: