Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/omfiles/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Provides classes and utilities for reading, writing, and manipulating OM files."""

from . import types
from ._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable, _check_cpu_features
from ._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable, OmWriterVariable, _check_cpu_features

_check_cpu_features()

Expand All @@ -10,5 +10,6 @@
"OmFileReaderAsync",
"OmFileWriter",
"OmVariable",
"OmWriterVariable",
"types",
]
3 changes: 2 additions & 1 deletion python/omfiles/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is automatically generated by pyo3_stub_gen
# ruff: noqa: E501, F401, F403, F405

from omfiles._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable
from omfiles._rust import OmFileReader, OmFileReaderAsync, OmFileWriter, OmVariable, OmWriterVariable

from . import _rust

Expand All @@ -10,4 +10,5 @@ __all__ = [
"OmFileReaderAsync",
"OmFileWriter",
"OmVariable",
"OmWriterVariable",
]
90 changes: 67 additions & 23 deletions python/omfiles/_rust/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ __all__ = [
"OmFileReaderAsync",
"OmFileWriter",
"OmVariable",
"OmWriterVariable",
"RustPforCodec",
]

Expand Down Expand Up @@ -492,50 +493,68 @@ class OmFileWriter:
r"""
Check if the writer is closed.
"""
def __new__(cls, file_path: builtins.str) -> OmFileWriter:
def __new__(cls, file_path: builtins.str, metadata_placement: typing.Optional[builtins.str] = None) -> OmFileWriter:
r"""
Initialize an OmFileWriter.

Args:
file_path: Path where the .om file will be created
metadata_placement: (optional) Where to emit metadata; either "inline" to write
metadata entries immediately, or "tail" to defer emission
until close() so metadata is consolidated near the end of
the file (default: "tail").
"""
@staticmethod
def at_path(path: builtins.str) -> OmFileWriter:
def at_path(path: builtins.str, metadata_placement: typing.Optional[builtins.str] = None) -> OmFileWriter:
r"""
Initialize an OmFileWriter to write to a file at the specified path.

Args:
path: Path where the .om file will be created
metadata_placement: (optional) Where to emit metadata; either "inline" or
"tail" (see description in `__new__`).

Returns:
OmFileWriter: A new writer instance
"""
@staticmethod
def from_fsspec(fs_obj: typing.Any, path: builtins.str) -> OmFileWriter:
def from_fsspec(
fs_obj: typing.Any, path: builtins.str, metadata_placement: typing.Optional[builtins.str] = None
) -> OmFileWriter:
r"""
Create an OmFileWriter from a fsspec filesystem object.

Args:
fs_obj: A fsspec filesystem object that supports write operations
path: The path to the file within the file system
metadata_placement: (optional) Where to emit metadata; either "inline" or
"tail" (see description in `__new__`).

Returns:
OmFileWriter: A new writer instance
"""
def close(self, root_variable: OmVariable) -> None:
def close(self, root_variable: OmWriterVariable) -> None:
r"""
Finalize and close the .om file by writing the trailer with the root variable.
Finalize and close the .om file by writing the trailer with the resolved root variable.

In ``metadata_placement="tail"`` mode, metadata for arrays, scalars, and
groups is resolved and emitted during ``close()`` so that metadata is
consolidated near the end of the file. In ``metadata_placement="inline"``
mode, metadata is written immediately and child handles must already refer
to resolved variables from the same writer.

Args:
root_variable (:py:data:`omfiles.OmVariable`): The OmVariable that serves as the root/entry point of the file hierarchy.
All other variables should be accessible through this root variable.
root_variable (:py:data:`omfiles.OmWriterVariable`): The writer handle
that serves as the root/entry point of the file hierarchy.

Returns:
None on success.

Raises:
ValueError: If the writer has already been closed
RuntimeError: If a thread lock error occurs or if there's an error writing to the file
ValueError: If the writer has already been closed or the handle belongs
to a different writer.
RuntimeError: If there is an error resolving deferred metadata or
writing the trailer.
"""
def write_array(
self,
Expand All @@ -545,8 +564,8 @@ class OmFileWriter:
add_offset: typing.Optional[builtins.float] = None,
compression: typing.Optional[builtins.str] = None,
name: typing.Optional[builtins.str] = None,
children: typing.Optional[typing.Sequence[OmVariable]] = None,
) -> OmVariable:
children: typing.Optional[typing.Sequence[OmWriterVariable]] = None,
) -> OmWriterVariable:
r"""
Write a numpy array to the .om file with specified chunking and scaling parameters.

Expand All @@ -567,8 +586,14 @@ class OmFileWriter:
name: Name of the variable to be written (default: "data")
children: List of child variables (default: [])

``write_array`` returns an :py:data:`omfiles.OmWriterVariable`, which is a
write-time handle used to build hierarchy relationships and to select the
root variable passed to ``close()``. It is not the same as
:py:data:`omfiles.OmVariable`, which represents already-materialized
metadata when reading.

Returns:
:py:data:`omfiles.OmVariable` representing the written group in the file structure
:py:data:`omfiles.OmWriterVariable` representing the written group in the file structure

Raises:
ValueError: If the data type is unsupported or if parameters are invalid
Expand All @@ -583,8 +608,8 @@ class OmFileWriter:
add_offset: typing.Optional[builtins.float] = None,
compression: typing.Optional[builtins.str] = None,
name: typing.Optional[builtins.str] = None,
children: typing.Optional[typing.Sequence[OmVariable]] = None,
) -> OmVariable:
children: typing.Optional[typing.Sequence[OmWriterVariable]] = None,
) -> OmWriterVariable:
r"""
Write an array to the .om file by streaming chunks from a Python iterator.

Expand All @@ -607,15 +632,15 @@ class OmFileWriter:
children: List of child variables (default: [])

Returns:
:py:data:`omfiles.OmVariable` representing the written array in the file structure
:py:data:`omfiles.OmWriterVariable` representing the written array in the file structure

Raises:
ValueError: If the dtype is unsupported or parameters are invalid
RuntimeError: If there's an error during compression or I/O
"""
def write_scalar(
self, value: typing.Any, name: builtins.str, children: typing.Optional[typing.Sequence[OmVariable]] = None
) -> OmVariable:
self, value: typing.Any, name: builtins.str, children: typing.Optional[typing.Sequence[OmWriterVariable]] = None
) -> OmWriterVariable:
r"""
Write a scalar value to the .om file.

Expand All @@ -625,28 +650,36 @@ class OmFileWriter:
name: Name of the scalar variable
children: List of child variables (default: None)

Child handles must come from the same writer. In ``metadata_placement="inline"``
mode they must already be resolved because metadata is emitted immediately.
In ``metadata_placement="tail"`` mode they may be resolved later during
``close()``.

Returns:
:py:data:`omfiles.OmVariable` representing the written scalar in the file structure
:py:data:`omfiles.OmWriterVariable` representing the written group in the file structure

Raises:
ValueError: If the value type is unsupported (e.g., booleans)
RuntimeError: If there's an error writing to the file
"""
def write_group(self, name: builtins.str, children: typing.Sequence[OmVariable]) -> OmVariable:
def write_group(self, name: builtins.str, children: typing.Sequence[OmWriterVariable]) -> OmWriterVariable:
r"""
Create a new group in the .om file.

This is essentially a variable with no data, which serves as a container for other variables.
This is essentially a variable with no data, which serves as a container
for other variables.

Args:
name: Name of the group
children: List of child variables
children: List of child variables from the same writer

Returns:
:py:data:`omfiles.OmVariable` representing the written group in the file structure
:py:data:`omfiles.OmWriterVariable` representing the written group in the file structure

Raises:
RuntimeError: If there's an error writing to the file
ValueError: If a child handle belongs to a different writer
RuntimeError: If inline metadata placement is requested before child
metadata has been resolved, or if there is an I/O error
"""

@typing.final
Expand All @@ -671,6 +704,17 @@ class OmVariable:
"""
def __repr__(self) -> builtins.str: ...

@typing.final
class OmWriterVariable:
r"""
Represents a variable handle during writing.
"""
@property
def name(self) -> builtins.str:
r"""
The name of the variable.
"""

@typing.final
class RustPforCodec:
r"""
Expand Down
8 changes: 4 additions & 4 deletions python/omfiles/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import numpy as np

from omfiles._rust import OmFileWriter, OmVariable
from omfiles._rust import OmFileWriter, OmWriterVariable

try:
import dask.array as da
Expand Down Expand Up @@ -78,8 +78,8 @@ def write_dask_array(
add_offset: float = 0.0,
compression: str = "pfor_delta_2d",
name: str = "data",
children: Optional[Sequence[OmVariable]] = None,
) -> OmVariable:
children: Optional[Sequence[OmWriterVariable]] = None,
) -> OmWriterVariable:
"""
Write a dask array to an OM file using streaming/incremental writes.

Expand Down Expand Up @@ -111,7 +111,7 @@ def write_dask_array(
children: Child variables (default: None).

Returns:
OmVariable representing the written array.
OmWriterVariable representing the written array.

Raises:
TypeError: If data is not a dask array.
Expand Down
12 changes: 12 additions & 0 deletions src/hierarchy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,15 @@ impl Into<OmOffsetSize> for OmVariable {
}
}
}

#[gen_stub_pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
/// Represents a variable handle during writing.
pub(crate) struct OmWriterVariable {
#[pyo3(get)]
/// The name of the variable.
pub name: String,
pub(crate) writer_id: u64,
pub(crate) variable_id: u64,
}
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn _rust(m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<writer::OmFileWriter>()?;
m.add_class::<reader_async::OmFileReaderAsync>()?;
m.add_class::<hierarchy::OmVariable>()?;
m.add_class::<hierarchy::OmWriterVariable>()?;
m.add_class::<codecs::RustPforCodec>()?;
m.add_function(wrap_pyfunction!(cpu_info::_check_cpu_features, m)?)?;

Expand All @@ -34,6 +35,7 @@ pyo3_stub_gen::reexport_module_members!(
"OmFileReader",
"OmFileReaderAsync",
"OmFileWriter",
"OmVariable"
"OmVariable",
"OmWriterVariable"
);
define_stub_info_gatherer!(stub_info);
4 changes: 2 additions & 2 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl OmFileReader {
/// Returns:
/// OmFileReader: OmFileReader instance.
#[staticmethod]
fn from_path(file_path: &str) -> PyResult<Self> {
pub(crate) fn from_path(file_path: &str) -> PyResult<Self> {
let file_handle = File::open(file_path)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyIOError, _>(e.to_string()))?;
let backend =
Expand All @@ -202,7 +202,7 @@ impl OmFileReader {
/// Returns:
/// OmFileReader: A new reader instance.
#[staticmethod]
fn from_fsspec(fs_obj: Py<PyAny>, path: String) -> PyResult<Self> {
pub(crate) fn from_fsspec(fs_obj: Py<PyAny>, path: String) -> PyResult<Self> {
Python::attach(|py| {
let bound_object = fs_obj.bind(py);

Expand Down
Loading
Loading