Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"deltalake>=1.0.2",
"graphviz>=0.21",
"gitpython>=3.1.45",
"universal-pathlib>=0.3.8",
"starfix @ git+https://github.com/nauticalab/starfix-python.git@344617bc6f7fcabab5c011d5774ed47de33f21de",
"pygraphviz>=1.14",
"tzdata>=2024.1",
Expand Down
2 changes: 1 addition & 1 deletion src/orcapod/contexts/data/schemas/context_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
"_config": {
"converters": [
{
"_class": "orcapod.types.semantic_types.PathStructConverter",
"_class": "orcapod.types.semantic_types.PythonPathStructConverter",
"_config": {}
}
]
Expand Down
9 changes: 8 additions & 1 deletion src/orcapod/contexts/data/v0.1.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
"_class": "orcapod.semantic_types.semantic_registry.SemanticTypeRegistry",
"_config": {
"converters": {
"upath": {
"_class": "orcapod.semantic_types.semantic_struct_converters.UPathStructConverter",
"_config": {
"file_hasher": {"_ref": "file_hasher"}
}
},
"path": {
"_class": "orcapod.semantic_types.semantic_struct_converters.PathStructConverter",
"_class": "orcapod.semantic_types.semantic_struct_converters.PythonPathStructConverter",
"_config": {
"file_hasher": {"_ref": "file_hasher"}
}
Expand Down Expand Up @@ -52,6 +58,7 @@
[{"_type": "builtins.bytes"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.BytesHandler", "_config": {}}],
[{"_type": "builtins.bytearray"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.BytesHandler", "_config": {}}],
[{"_type": "pathlib.Path"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.PathContentHandler", "_config": {"file_hasher": {"_ref": "file_hasher"}}}],
[{"_type": "upath.core.UPath"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.UPathContentHandler", "_config": {"file_hasher": {"_ref": "file_hasher"}}}],
[{"_type": "uuid.UUID"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.UUIDHandler", "_config": {}}],
[{"_type": "types.FunctionType"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.FunctionHandler", "_config": {"function_info_extractor": {"_ref": "function_info_extractor"}}}],
[{"_type": "types.BuiltinFunctionType"}, {"_class": "orcapod.hashing.semantic_hashing.builtin_handlers.FunctionHandler", "_config": {"function_info_extractor": {"_ref": "function_info_extractor"}}}],
Expand Down
33 changes: 27 additions & 6 deletions src/orcapod/hashing/hash_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from pathlib import Path

import xxhash
from upath import UPath

from orcapod.types import ContentHash
from orcapod.types import ContentHash, PathLike

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,9 +44,27 @@ def combine_hashes(
return combined_hash


def hash_file(file_path, algorithm="sha256", buffer_size=65536) -> ContentHash:
def _to_path(file_path: PathLike) -> Path | UPath:
"""Convert a path-like to a Path, preserving UPath instances.

If ``file_path`` is already a ``Path`` (including ``UPath`` subclasses),
return it as-is so that remote-filesystem semantics are retained.
Otherwise wrap it in ``Path()``.
"""
# Check UPath first to preserve remote-filesystem semantics even if
# the inheritance relationship with pathlib.Path ever changes.
if isinstance(file_path, UPath):
return file_path
if isinstance(file_path, Path):
return file_path
return Path(file_path)


def hash_file(file_path: PathLike, algorithm="sha256", buffer_size=65536) -> ContentHash:
"""Calculate the hash of a file using the specified algorithm.

Supports both local ``pathlib.Path`` and remote ``UPath`` objects.

Args:
file_path: Path to the file to hash.
algorithm: Hash algorithm to use — options include:
Expand All @@ -56,7 +75,9 @@ def hash_file(file_path, algorithm="sha256", buffer_size=65536) -> ContentHash:
A ContentHash with method set to the algorithm name and digest
containing the raw hash bytes.
"""
if not Path(file_path).is_file():
path = _to_path(file_path)

if not path.is_file():
raise FileNotFoundError(f"The file {file_path} does not exist")

# Hash the path string itself rather than file content
Expand All @@ -67,7 +88,7 @@ def hash_file(file_path, algorithm="sha256", buffer_size=65536) -> ContentHash:

if algorithm == "xxh64":
hasher = xxhash.xxh64()
with open(file_path, "rb") as file:
with path.open("rb") as file:
while True:
data = file.read(buffer_size)
if not data:
Expand All @@ -77,7 +98,7 @@ def hash_file(file_path, algorithm="sha256", buffer_size=65536) -> ContentHash:

if algorithm == "crc32":
crc = 0
with open(file_path, "rb") as file:
with path.open("rb") as file:
while True:
data = file.read(buffer_size)
if not data:
Expand All @@ -96,7 +117,7 @@ def hash_file(file_path, algorithm="sha256", buffer_size=65536) -> ContentHash:
f"Invalid algorithm: {algorithm}. Available algorithms: {valid_algorithms}, xxh64, crc32"
)

with open(file_path, "rb") as file:
with path.open("rb") as file:
while True:
data = file.read(buffer_size)
if not data:
Expand Down
42 changes: 42 additions & 0 deletions src/orcapod/hashing/semantic_hashing/builtin_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
knows how to process out of the box:

- PathContentHandler -- pathlib.Path: returns ContentHash of file content
- UPathContentHandler -- upath.UPath: returns ContentHash of file content (remote-aware)
- UUIDHandler -- uuid.UUID: canonical string representation
- BytesHandler -- bytes / bytearray: hex string representation
- FunctionHandler -- callable with __code__: via FunctionInfoExtractorProtocol
Expand Down Expand Up @@ -33,6 +34,8 @@
from typing import TYPE_CHECKING, Any
from uuid import UUID

from upath import UPath

from orcapod.types import PathLike, Schema

if TYPE_CHECKING:
Expand Down Expand Up @@ -96,6 +99,45 @@ def handle(self, obj: PathLike, hasher: "SemanticHasherProtocol") -> Any:
return self.file_hasher.hash_file(path)


class UPathContentHandler:
"""
Handler for universal_pathlib.UPath objects.

Behaves identically to ``PathContentHandler`` but preserves the UPath
instance so that remote filesystem semantics (e.g. S3, GCS) are retained
during file content hashing.

Args:
file_hasher: Any object with a ``hash_file(path) -> ContentHash``
method (satisfies the FileContentHasherProtocol protocol).
"""

def __init__(self, file_hasher: FileContentHasherProtocol) -> None:
self.file_hasher = file_hasher

def handle(self, obj: Any, hasher: "SemanticHasherProtocol") -> Any:
if not isinstance(obj, UPath):
raise TypeError(
f"UPathContentHandler: expected a UPath, got {type(obj)!r}. "
"Use PathContentHandler for pathlib.Path objects."
)

if not obj.exists():
raise FileNotFoundError(
f"UPathContentHandler: path does not exist: {obj!r}. "
"Paths must refer to existing files for content-based hashing."
)

if obj.is_dir():
raise IsADirectoryError(
f"UPathContentHandler: path is a directory: {obj!r}. "
"Only regular files are supported for content-based hashing."
)

logger.debug("UPathContentHandler: hashing file content at %s", obj)
return self.file_hasher.hash_file(obj)


class UUIDHandler:
"""
Handler for uuid.UUID objects.
Expand Down
4 changes: 2 additions & 2 deletions src/orcapod/hashing/versioned_hashers.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ def get_versioned_semantic_arrow_hasher(
from orcapod.hashing.arrow_hashers import StarfixArrowHasher
from orcapod.hashing.file_hashers import BasicFileHasher
from orcapod.semantic_types.semantic_registry import SemanticTypeRegistry
from orcapod.semantic_types.semantic_struct_converters import PathStructConverter
from orcapod.semantic_types.semantic_struct_converters import PythonPathStructConverter

# Build a default semantic registry populated with the standard converters.
# We use Any-typed locals here to side-step type-checker false positives
# that arise from the protocol definition of SemanticStructConverterProtocol having
# a slightly different hash_struct_dict signature than the concrete class.
registry: Any = SemanticTypeRegistry()
file_hasher = BasicFileHasher(algorithm="sha256")
path_converter: Any = PathStructConverter(file_hasher=file_hasher)
path_converter: Any = PythonPathStructConverter(file_hasher=file_hasher)
registry.register_converter("path", path_converter)

logger.debug(
Expand Down
128 changes: 89 additions & 39 deletions src/orcapod/semantic_types/semantic_struct_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
making semantic types visible in schemas and preserved through operations.
"""

from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any

from upath import UPath

from orcapod.types import ContentHash
from orcapod.utils.lazy_module import LazyModule

Expand Down Expand Up @@ -74,68 +77,73 @@ def _compute_content_hash(self, content: bytes) -> ContentHash:
return ContentHash(method=f"{self.semantic_type_name}:sha256", digest=digest)


# Path-specific implementation
class PathStructConverter(SemanticStructConverterBase):
"""Converter for pathlib.Path objects to/from semantic structs of form { path: "/value/of/path"}"""
class PathStructConverterBase(SemanticStructConverterBase, ABC):
"""Base converter for file path types (Path and UPath).

def __init__(self, file_hasher: "FileContentHasherProtocol"):
super().__init__("path")
self._python_type = Path
self._file_hasher = file_hasher
Extracts the shared conversion logic since Path and UPath have
identical APIs for the operations we need (str conversion,
construction from string, ``read_bytes``).
"""

# Define the Arrow struct type for paths
self._arrow_struct_type = pa.struct(
[
pa.field("path", pa.large_string()),
]
)
def __init__(
self,
name: str,
path_type: type,
file_hasher: "FileContentHasherProtocol",
):
super().__init__(name)
self._python_type = path_type
self._field_name = name
self._file_hasher = file_hasher
self._arrow_struct_type = pa.struct([
pa.field(name, pa.large_string()),
])

@property
def python_type(self) -> type:
return self._python_type

@property
def arrow_struct_type(self) -> pa.StructType:
def arrow_struct_type(self) -> "pa.StructType":
return self._arrow_struct_type

def python_to_struct_dict(self, value: Path) -> dict[str, Any]:
"""Convert Path to struct dictionary."""
if not isinstance(value, Path):
raise TypeError(f"Expected Path, got {type(value)}")
@abstractmethod
def _make_path(self, path_str: str) -> Any:
"""Construct the appropriate path object from a string."""
...

return {
"path": str(value),
}
def python_to_struct_dict(self, value: Any) -> dict[str, Any]:
"""Convert path object to struct dictionary."""
if not isinstance(value, self._python_type):
raise TypeError(f"Expected {self._python_type.__name__}, got {type(value)}")
return {self._field_name: str(value)}

def struct_dict_to_python(self, struct_dict: dict[str, Any]) -> Path:
"""Convert struct dictionary back to Path."""
path_str = struct_dict.get("path")
def struct_dict_to_python(self, struct_dict: dict[str, Any]) -> Any:
"""Convert struct dictionary back to path object."""
path_str = struct_dict.get(self._field_name)
if path_str is None:
raise ValueError("Missing 'path' field in struct")

return Path(path_str)
raise ValueError(f"Missing '{self._field_name}' field in struct")
return self._make_path(path_str)

def can_handle_python_type(self, python_type: type) -> bool:
"""Check if this converter can handle the given Python type."""
return issubclass(python_type, Path)
return issubclass(python_type, self._python_type)

def can_handle_struct_type(self, struct_type: pa.StructType) -> bool:
def can_handle_struct_type(self, struct_type: "pa.StructType") -> bool:
"""Check if this converter can handle the given struct type."""
# Check if struct has the expected fields
for field in self._arrow_struct_type:
if (
field.name not in struct_type.names
or struct_type[field.name].type != field.type
):
return False

return True

def is_semantic_struct(self, struct_dict: dict[str, Any]) -> bool:
"""Check if a struct dictionary represents this semantic type."""
# TODO: infer this check based on identified struct type as defined in the __init__
return set(struct_dict.keys()) == {"path"} and isinstance(
struct_dict["path"], str
return (
set(struct_dict.keys()) == {self._field_name}
and isinstance(struct_dict[self._field_name], str)
)

def hash_struct_dict(
Expand All @@ -144,8 +152,8 @@ def hash_struct_dict(
"""Compute hash of a path semantic type by hashing the file content.

Args:
struct_dict: Dict with a "path" key containing a file path string.
add_prefix: If True, prefix with "path:sha256:...".
struct_dict: Dict with the path field containing a file path string.
add_prefix: If True, prefix with semantic type and algorithm info.

Returns:
Hash string of the file content.
Expand All @@ -154,15 +162,57 @@ def hash_struct_dict(
FileNotFoundError: If the path does not exist.
IsADirectoryError: If the path is a directory.
"""
path_str = struct_dict.get("path")
path_str = struct_dict.get(self._field_name)
if path_str is None:
raise ValueError("Missing 'path' field in struct dict")
raise ValueError(f"Missing '{self._field_name}' field in struct dict")

path = Path(path_str)
path = self._make_path(path_str)
if not path.exists():
raise FileNotFoundError(f"Path does not exist: {path}")
if path.is_dir():
raise IsADirectoryError(f"Path is a directory: {path}")

content_hash = self._file_hasher.hash_file(path)
return self._format_hash_string(content_hash.digest, add_prefix=add_prefix)


class PythonPathStructConverter(PathStructConverterBase):
"""Converter for pathlib.Path objects to/from semantic structs.

Rejects ``UPath`` instances to avoid ambiguity with
``UPathStructConverter``, since ``UPath`` is a ``Path`` subclass.
"""

def __init__(self, file_hasher: "FileContentHasherProtocol"):
super().__init__("path", Path, file_hasher)

def _make_path(self, path_str: str) -> Path:
return Path(path_str)

def python_to_struct_dict(self, value: Any) -> dict[str, Any]:
"""Convert Path to struct dictionary, rejecting UPath instances."""
if isinstance(value, UPath):
raise TypeError(
f"Expected Path (not UPath), got {type(value)}. "
"Use UPathStructConverter for UPath instances."
)
return super().python_to_struct_dict(value)

def can_handle_python_type(self, python_type: type) -> bool:
"""Check if this converter can handle the given Python type.

Returns False for UPath (and its subclasses) to avoid ambiguity.
"""
if issubclass(python_type, UPath):
return False
return issubclass(python_type, Path)


class UPathStructConverter(PathStructConverterBase):
"""Converter for universal_pathlib.UPath objects to/from semantic structs."""

def __init__(self, file_hasher: "FileContentHasherProtocol"):
super().__init__("upath", UPath, file_hasher)

def _make_path(self, path_str: str) -> UPath:
return UPath(path_str)
Loading
Loading