Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "softcopy"
version = "0.0.2"
version = "0.0.10"
description = "Copies zarr archives from an acquisition frontend to a different disk, using filesystem watching and lockfiles to allow copying during acquisition."
authors = [{ name = "Seth Hinz", email = "sethhinz@me.com" }]
readme = "README.md"
Expand Down Expand Up @@ -34,6 +34,7 @@ dev-dependencies = [
"mypy>=0.991",
"ruff>=0.0.235",
"pytest-cov>=5.0.0",
"iohub>=0.2.0",
]

[build-system]
Expand Down
142 changes: 142 additions & 0 deletions softcopy/hcs_copier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import logging
import shutil
from pathlib import Path
from threading import Event, Thread

import iohub

from .copier import AbstractCopier
from .zarr_copier import ZarrCopier

logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s : %(levelname)s from %(name)s] %(message)s")
LOG = logging.getLogger(__name__)


def get_all_zarr_paths(plate_path: Path) -> list[Path]:
plate: iohub.ngff.Plate = iohub.open_ome_zarr(plate_path)
wells = plate.wells()
zarr_paths = []

for _, well in wells:
for _, pos in well.positions():
for _, image in pos.images():
zarr_paths.append(Path(image._attrs.store.path) / Path(image._attrs.key).parent)

return zarr_paths


def create_metadata_path_mapping(source: Path, destination: Path) -> list[tuple[Path, Path]]:
path_mappings = []
plate: iohub.ngff.Plate = iohub.open_ome_zarr(source)

for metadata_file in [".zattrs", ".zgroup"]:
src_metadata_path = source / metadata_file
dest_metadata_path = destination / metadata_file
path_mappings.append((src_metadata_path, dest_metadata_path))

for row, _ in plate.rows():
src = source / row
dest = destination / row
for metadata_file in [".zattrs", ".zgroup"]:
src_metadata_path = src / metadata_file
dest_metadata_path = dest / metadata_file
path_mappings.append((src_metadata_path, dest_metadata_path))

for well_path_component, well in plate.wells():
well_src = source / well_path_component
well_dest = destination / well_path_component
for metadata_file in [".zattrs", ".zgroup"]:
src_metadata_path = well_src / metadata_file
dest_metadata_path = well_dest / metadata_file
path_mappings.append((src_metadata_path, dest_metadata_path))

for pos_path_component, _ in well.positions():
src = well_src / pos_path_component
dest = well_dest / pos_path_component
for metadata_file in [".zattrs", ".zgroup"]:
src_metadata_path = src / metadata_file
dest_metadata_path = dest / metadata_file
path_mappings.append((src_metadata_path, dest_metadata_path))

return path_mappings


class SlowCopier(Thread):
def __init__(self, path_mappings: list[tuple[Path, Path]]):
super().__init__()
self.path_mappings = path_mappings
self.stop_event = Event()
self.last_copy_times = {dest: None for _, dest in path_mappings}

def _copy_pass(self):
for source, destination in self.path_mappings:
try:
if source.exists():
source_mtime = source.stat().st_mtime
last_copy_time = self.last_copy_times[destination]

if last_copy_time is None or source_mtime > last_copy_time:
shutil.copy2(source, destination)
self.last_copy_times[destination] = source.stat().st_mtime
LOG.debug(f"Copied {source} to {destination}")
else:
LOG.debug(f"Source file {source} does not exist yet.")
except Exception as e:
LOG.exception(f"Error copying {source} to {destination}: {e}") # noqa: TRY401

def run(self):
while not self.stop_event.is_set():
self._copy_pass()
self.stop_event.wait(0.1) # Sleep for 0.1 seconds or until stop_event is set

def join(self):
self.stop_event.set()
super().join() # Wait for the thread to finish
self._copy_pass() # Final copy pass to ensure all files are copied


class HCSCopier(AbstractCopier):
_copiers: list[ZarrCopier]
_metadata_copier: SlowCopier

def __init__(
self,
source: Path,
destination: Path,
n_copy_procs: int,
sleep_time: float = 0,
wait_for_source: bool = True,
log: logging.Logger = LOG,
):
super().__init__(source, destination, n_copy_procs, sleep_time, log)

image_paths = get_all_zarr_paths(source)
print(image_paths)
self._copiers = []
for image_path in image_paths:
image_destination = destination / image_path.relative_to(source)
print(image_destination)
image_destination.mkdir(parents=True, exist_ok=True)
zarr_copier = ZarrCopier(image_path, image_destination, n_copy_procs, sleep_time, wait_for_source, log)
self._copiers.append(zarr_copier)

path_map = create_metadata_path_mapping(source, destination)
self._metadata_copier = SlowCopier(path_map)

def start(self):
# Before starting, we make the parent directory that the image will be copied into.
self._destination.mkdir(parents=True, exist_ok=True)
for copier in self._copiers:
copier.start()
self._metadata_copier.start()

def join(self):
for copier in self._copiers:
copier.join()
self._metadata_copier.join()

def stop(self):
for copier in self._copiers:
copier.stop()

self._metadata_copier.join()
76 changes: 42 additions & 34 deletions softcopy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

import click
import psutil
import yaml
from psutil import AccessDenied

from .copier import AbstractCopier
from .ome_zarr_copier import OMEZarrCopier
from .hcs_copier import HCSCopier
from .zarr_copier import ZarrCopier

BOLD_SEQ = "\033[1m"
Expand All @@ -17,7 +15,8 @@


@click.command()
@click.argument("targets_file", type=click.File("r"))
@click.argument("source-path", type=click.Path(file_okay=False, dir_okay=True, path_type=Path))
@click.argument("dest-path", type=click.Path(file_okay=False, dir_okay=True, path_type=Path))
@click.option("--verbose", default=False, is_flag=True, help="print debug information while running")
@click.option("--nprocs", default=1, type=int, help="number of processes to use for copying")
@click.option(
Expand All @@ -31,7 +30,7 @@
default=True,
help="If the source does not exist when softcopy is started, wait for it to appear. If false, softcopy will crash if the source does not exist",
)
def main(targets_file, verbose, nprocs, sleep_time, wait_for_source):
def main(source_path, dest_path, verbose, nprocs, sleep_time, wait_for_source):
"""Tranfer data from source to destination as described in a yaml TARGETS_FILE. Uses low priority io to allow
data to be moved while the microscope is acquiring. The program is zarr-aware and can safely copy an archive
before it is finished being written to."""
Expand All @@ -40,40 +39,49 @@ def main(targets_file, verbose, nprocs, sleep_time, wait_for_source):
LOG.setLevel(log_level)
logging.basicConfig(format="[%(asctime)s : %(levelname)s from %(name)s] " + BOLD_SEQ + "%(message)s" + RESET_SEQ)

# Load the yaml at a normal io priority because it is small and likely not on
# the target disk
all_yaml = yaml.safe_load(targets_file)
targets = all_yaml["targets"]
LOG.debug(f"Number of targets: {len(targets)}")

# Now that we have the yaml, we floor our io priority. We are about to read zarr metadata, and even doing that
# at the wrong time could slow down the writer process!
set_low_io_priority()

copiers: list[AbstractCopier] = []

# TODO: actually run the copiers in parallel
try:
source_path = source_path.expanduser().absolute()
dest_path = dest_path.expanduser().absolute()
except Exception:
LOG.exception("Error expanding paths")
sys.exit(1)

filetype_map = {
".ome.zarr": HCSCopier,
".zarr": ZarrCopier,
}

source_type = source_path.suffix.lower()
if source_type not in filetype_map:
LOG.error(
f"Unsupported source file type: {source_path.suffix}. Supported types are: {list(filetype_map.keys())}"
)
sys.exit(1)

copier_class = filetype_map[source_type]
copier = copier_class(
source_path,
dest_path,
n_copy_procs=nprocs,
sleep_time=sleep_time,
wait_for_source=wait_for_source,
log=LOG.getChild("Copier"),
)

try:
for target_id, target in enumerate(targets):
source = Path(target["source"]).expanduser().absolute()
destination = Path(target["destination"]).expanduser().absolute()
# If the source ends with .ome.zarr, then infer ome mode for this entry:
is_ome = source.name.endswith(".ome.zarr")
copier_type = OMEZarrCopier if is_ome else ZarrCopier
copier = copier_type(
source, destination, nprocs, sleep_time, wait_for_source, LOG.getChild(f"Target {target_id}")
)
copiers.append(copier)
copier.start()

# Wait for all copiers to finish
for copier in copiers:
copier.join()
LOG.info(f"Starting copy from {source_path} to {dest_path} using {copier_class.__name__}")
copier.start()
copier.join()
LOG.info("Copy completed successfully.")
except KeyboardInterrupt:
LOG.info("Keyboard interrupt recieved, stopping all copiers")
for copier in copiers:
copier.stop()
LOG.info("Keyboard interrupt received, stopping copier.")
copier.stop()
except Exception:
LOG.exception("An error occurred during copying")
copier.stop()
sys.exit(1)


def set_low_io_priority():
Expand Down
80 changes: 0 additions & 80 deletions softcopy/ome_zarr_copier.py

This file was deleted.

4 changes: 2 additions & 2 deletions softcopy/zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def stop(self):
self._observer.stop()
self._observer.join()

def join(self):
def join(self): # noqa: C901
time_start = time.time()
estimated_files = np.prod(self._files_nd)

Expand Down Expand Up @@ -220,7 +220,7 @@ def print_copy_status():
# This is a very long io-bound method - so we need to be able to exit it with ctrl c
if self._stop.value == 1:
return

chunk_packed_name: PackedName = PackedName.from_index(chunk_index)
chunk_path = chunk_packed_name.get_path(
self._files_nd, self._destination, self._dimension_separator, self._zarr_format
Expand Down
Loading
Loading