From 9c36d0382c4f5e2c8a2f24fc91721d6493ed2cfe Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 19:37:23 +0000 Subject: [PATCH 01/10] Move vcf2zarr file to cli --- vcf2zarr.py => bio2zarr/cli.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename vcf2zarr.py => bio2zarr/cli.py (100%) diff --git a/vcf2zarr.py b/bio2zarr/cli.py similarity index 100% rename from vcf2zarr.py rename to bio2zarr/cli.py From 1583758cc3cc65833213e848bed3a2b3537afce5 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 21:55:59 +0000 Subject: [PATCH 02/10] Rough outline of CLI --- bio2zarr/__main__.py | 18 ++++++++++++++++++ bio2zarr/cli.py | 41 ++++++++++++++++++++++------------------- 2 files changed, 40 insertions(+), 19 deletions(-) create mode 100644 bio2zarr/__main__.py diff --git a/bio2zarr/__main__.py b/bio2zarr/__main__.py new file mode 100644 index 00000000..9334d8a8 --- /dev/null +++ b/bio2zarr/__main__.py @@ -0,0 +1,18 @@ +import click + +from . import cli + +@click.group() +def top_level(): + pass + +# Provide a single top-level interface to all of the functionality. +# This probably isn't the recommended way of interacting, as we +# install individual commands as console scripts. However, this +# is handy for development and for those whose PATHs aren't set +# up in the right way. +top_level.add_command(cli.vcf2zarr) +top_level.add_command(cli.plink2zarr) + +if __name__ == "__main__": + top_level() diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index 3ed3287d..846be7c4 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -1,10 +1,8 @@ -import json - import click -import yaml import tabulate -import bio2zarr.vcf as cnv # fixme +# import bio2zarr.vcf as cnv # fixme +from . import vcf as cnv @click.command @@ -49,14 +47,15 @@ def to_zarr(columnarised, zarr_path, conversion_spec, worker_processes): zarr_path, conversion_spec, worker_processes=worker_processes, - show_progress=True) + show_progress=True, + ) -@click.command +@click.command(name="convert") @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) @click.option("-p", "--worker-processes", type=int, default=1) -def convert(vcfs, out_path, worker_processes): +def convert_vcf(vcfs, out_path, worker_processes): cnv.convert_vcf( vcfs, out_path, show_progress=True, worker_processes=worker_processes ) @@ -69,7 +68,20 @@ def validate(vcfs, out_path): cnv.validate(vcfs[0], out_path, show_progress=True) -@click.command +@click.group() +def vcf2zarr(): + pass + + +vcf2zarr.add_command(explode) +vcf2zarr.add_command(summarise) +vcf2zarr.add_command(genspec) +vcf2zarr.add_command(to_zarr) +vcf2zarr.add_command(convert_vcf) +vcf2zarr.add_command(validate) + + +@click.command(name="convert") @click.argument("plink", type=click.Path()) @click.argument("out_path", type=click.Path()) @click.option("-p", "--worker-processes", type=int, default=1) @@ -87,17 +99,8 @@ def convert_plink(plink, out_path, worker_processes, chunk_width, chunk_length): @click.group() -def cli(): +def plink2zarr(): pass -cli.add_command(explode) -cli.add_command(summarise) -cli.add_command(genspec) -cli.add_command(to_zarr) -cli.add_command(convert) -cli.add_command(validate) -cli.add_command(convert_plink) - -if __name__ == "__main__": - cli() +plink2zarr.add_command(convert_plink) From b832d9b6c04b3577d2adc357a50c5c188628a786 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 22:00:51 +0000 Subject: [PATCH 03/10] Add console scripts and update requirements --- setup.cfg | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/setup.cfg b/setup.cfg index 0a240ba0..641c3cd3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,12 +29,21 @@ python_requires = >=3.8 install_requires = numpy zarr >= 2.10.0, != 2.11.0, != 2.11.1, != 2.11.2 + click + tabulate + humanize + tqdm cyvcf2 bed_reader setup_requires = setuptools >= 41.2 setuptools_scm +[options.entry_points] +console_scripts = + vcf2zarr = bio2zarr.cli:vcf2zarr + plink2zarr = bio2zarr.cli:plink2zarr + [flake8] ignore = # whitespace before ':' - doesn't work well with black From adb6cc352d5f4778516ff4b9c59c86d8ab3a5407 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 22:13:47 +0000 Subject: [PATCH 04/10] Swap humanize for humanfriendly We already depend on it for tabulate --- bio2zarr/vcf.py | 4 ++-- setup.cfg | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 8dae8d5a..88e6c612 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -13,7 +13,7 @@ import tempfile from typing import Any -import humanize +import humanfriendly import cyvcf2 import numcodecs import numpy as np @@ -649,7 +649,7 @@ def display_number(x): return ret def display_size(n): - return humanize.naturalsize(n, binary=True) + return humanfriendly.format_size(n) data = [] for name, col in self.columns.items(): diff --git a/setup.cfg b/setup.cfg index 641c3cd3..b48e6464 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,7 +31,8 @@ install_requires = zarr >= 2.10.0, != 2.11.0, != 2.11.1, != 2.11.2 click tabulate - humanize + # Already required by tabulate, but good to be explicit + humanfriendly tqdm cyvcf2 bed_reader From 412a7fa91fb9a95b0ba1e27545cc2a9a0c6842c3 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 22:25:48 +0000 Subject: [PATCH 05/10] Make dependencies explicit --- setup.cfg | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index b48e6464..6dd3ba93 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,9 +31,10 @@ install_requires = zarr >= 2.10.0, != 2.11.0, != 2.11.1, != 2.11.2 click tabulate - # Already required by tabulate, but good to be explicit - humanfriendly tqdm + humanfriendly + # cyvcf2 also pulls in coloredlogs and click + # colouredlogs pulls in humanfriendly cyvcf2 bed_reader setup_requires = From c87aebcec65958c2e77619f5c9422e0f8bc7aaba Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 23:09:24 +0000 Subject: [PATCH 06/10] Infrastructure for CLI tests --- bio2zarr/cli.py | 54 +++++++++++++++++++++++++++++++++++-------------- bio2zarr/vcf.py | 4 ++++ 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index 846be7c4..cd4a454b 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -1,17 +1,39 @@ import click import tabulate +import coloredlogs -# import bio2zarr.vcf as cnv # fixme -from . import vcf as cnv +from . import vcf + +# Common arguments/options +verbose = click.option("-v", "--verbose", count=True, help="Increase verbosity") + +worker_processes = click.option( + "-p", "--worker-processes", type=int, default=1, help="Number of worker processes" +) + + +# Note: logging hasn't been implemented in the code at all, this is just +# a first pass to try out some ways of doing things to see what works. +def setup_logging(verbosity): + level = "WARNING" + if verbosity == 1: + level = "INFO" + elif verbosity >= 2: + level = "DEBUG" + # NOTE: I'm not that excited about coloredlogs, just trying it out + # as it is installed by cyvcf2 anyway. We will have some complicated + # stuff doing on with threads and processes, to logs might not work + # so well anyway. + coloredlogs.install(level=level) @click.command @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) -@click.option("-p", "--worker-processes", type=int, default=1) +@worker_processes @click.option("-c", "--column-chunk-size", type=int, default=64) def explode(vcfs, out_path, worker_processes, column_chunk_size): - cnv.explode( + vcf.explode( vcfs, out_path, worker_processes=worker_processes, @@ -22,10 +44,12 @@ def explode(vcfs, out_path, worker_processes, column_chunk_size): @click.command @click.argument("columnarised", type=click.Path()) -def summarise(columnarised): - pcvcf = cnv.PickleChunkedVcf.load(columnarised) +@verbose +def summarise(columnarised, verbose): + setup_logging(verbose) + pcvcf = vcf.PickleChunkedVcf.load(columnarised) data = pcvcf.summary_table() - print(tabulate.tabulate(data, headers="keys")) + click.echo(tabulate.tabulate(data, headers="keys")) @click.command @@ -33,16 +57,16 @@ def summarise(columnarised): # @click.argument("specfile", type=click.Path()) def genspec(columnarised): stream = click.get_text_stream("stdout") - cnv.generate_spec(columnarised, stream) + vcf.generate_spec(columnarised, stream) @click.command @click.argument("columnarised", type=click.Path()) @click.argument("zarr_path", type=click.Path()) @click.option("-s", "--conversion-spec", default=None) -@click.option("-p", "--worker-processes", type=int, default=1) +@worker_processes def to_zarr(columnarised, zarr_path, conversion_spec, worker_processes): - cnv.to_zarr( + vcf.to_zarr( columnarised, zarr_path, conversion_spec, @@ -54,9 +78,9 @@ def to_zarr(columnarised, zarr_path, conversion_spec, worker_processes): @click.command(name="convert") @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) -@click.option("-p", "--worker-processes", type=int, default=1) +@worker_processes def convert_vcf(vcfs, out_path, worker_processes): - cnv.convert_vcf( + vcf.convert_vcf( vcfs, out_path, show_progress=True, worker_processes=worker_processes ) @@ -65,7 +89,7 @@ def convert_vcf(vcfs, out_path, worker_processes): @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) def validate(vcfs, out_path): - cnv.validate(vcfs[0], out_path, show_progress=True) + vcf.validate(vcfs[0], out_path, show_progress=True) @click.group() @@ -84,11 +108,11 @@ def vcf2zarr(): @click.command(name="convert") @click.argument("plink", type=click.Path()) @click.argument("out_path", type=click.Path()) -@click.option("-p", "--worker-processes", type=int, default=1) +@worker_processes @click.option("--chunk-width", type=int, default=None) @click.option("--chunk-length", type=int, default=None) def convert_plink(plink, out_path, worker_processes, chunk_width, chunk_length): - cnv.convert_plink( + vcf.convert_plink( plink, out_path, show_progress=True, diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 88e6c612..dce19de5 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -2,6 +2,7 @@ import dataclasses import multiprocessing import functools +import logging import threading import pathlib import time @@ -23,6 +24,8 @@ import bed_reader +logger = logging.getLogger(__name__) + INT_MISSING = -1 INT_FILL = -2 STR_MISSING = "." @@ -640,6 +643,7 @@ def __init__(self, path, metadata): for col in self.columns.values(): col.num_partitions = self.num_partitions col.num_records = self.num_records + logger.info(f"Loaded PickleChunkedVcf from {path}") def summary_table(self): def display_number(x): From 2ea8f60d046b8a1edfdc862298726e676571dee5 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 16 Feb 2024 23:11:25 +0000 Subject: [PATCH 07/10] Fix error in makefile --- validation-data/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validation-data/Makefile b/validation-data/Makefile index 7d540d78..d745befe 100644 --- a/validation-data/Makefile +++ b/validation-data/Makefile @@ -6,7 +6,7 @@ all: 1kg_2020_chr20.bcf.csi \ 1kg_2020_others.vcf.gz.tbi \ 1kg_p3_all_chr1.bcf.csi \ 1kg_p3_all_chr1.vcf.gz.tbi\ - 1kg_p1_all_chr6.vcf.bcf.csi\ + 1kg_p1_all_chr6.bcf.csi\ 1kg_p1_all_chr6.vcf.gz.tbi .PRECIOUS: %.bcf From 5d6eaa4d435b072af73f22c8d44bfc43399b608a Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Sun, 18 Feb 2024 22:15:18 +0000 Subject: [PATCH 08/10] Add basic logging for explode --- bio2zarr/cli.py | 4 +++- bio2zarr/vcf.py | 34 +++++++++++++++++++++++++--------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index cd4a454b..db4053f3 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -30,9 +30,11 @@ def setup_logging(verbosity): @click.command @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) +@verbose @worker_processes @click.option("-c", "--column-chunk-size", type=int, default=64) -def explode(vcfs, out_path, worker_processes, column_chunk_size): +def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size): + setup_logging(verbose) vcf.explode( vcfs, out_path, diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index dce19de5..c7b0ee48 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -274,8 +274,10 @@ def make_field_def(name, vcf_type, vcf_number): def scan_vcfs(paths, show_progress): partitions = [] vcf_metadata = None + logger.info(f"Scanning {len(paths)} VCFs") for path in tqdm.tqdm(paths, desc="Scan ", disable=not show_progress): vcf = cyvcf2.VCF(path) + logger.debug(f"Scanning {path}") filters = [ h["ID"] @@ -462,8 +464,12 @@ def __repr__(self): # TODO add class name return repr({"path": str(self.path), **self.vcf_field.summary.asdict()}) + def chunk_path(self, partition_index, chunk_index): + return self.path / f"p{partition_index}" / f"c{chunk_index}" + def write_chunk(self, partition_index, chunk_index, data): - path = self.path / f"p{partition_index}" / f"c{chunk_index}" + path = self.chunk_path(partition_index, chunk_index) + logger.debug(f"Start write: {path}") pkl = pickle.dumps(data) # NOTE assuming that reusing the same compressor instance # from multiple threads is OK! @@ -475,9 +481,10 @@ def write_chunk(self, partition_index, chunk_index, data): self.vcf_field.summary.num_chunks += 1 self.vcf_field.summary.compressed_size += len(compressed) self.vcf_field.summary.uncompressed_size += len(pkl) + logger.debug(f"Finish write: {path}") def read_chunk(self, partition_index, chunk_index): - path = self.path / f"p{partition_index}" / f"c{chunk_index}" + path = self.chunk_path(partition_index, chunk_index) with open(path, "rb") as f: pkl = self.compressor.decode(f.read()) return pickle.loads(pkl), len(pkl) @@ -618,6 +625,8 @@ def append(self, val): def flush(self): if len(self.buffer) > 0: + path = self.column.chunk_path(self.partition_index, self.chunk_index) + logger.debug(f"Schedule write: {path}") future = self.executor.submit( self.column.write_chunk, self.partition_index, @@ -643,7 +652,6 @@ def __init__(self, path, metadata): for col in self.columns.values(): col.num_partitions = self.num_partitions col.num_records = self.num_records - logger.info(f"Loaded PickleChunkedVcf from {path}") def summary_table(self): def display_number(x): @@ -692,6 +700,10 @@ def num_partitions(self): def num_samples(self): return len(self.metadata.samples) + @property + def num_columns(self): + return len(self.columns) + def mkdirs(self): self.path.mkdir() for col in self.columns.values(): @@ -720,6 +732,10 @@ def convert( partition.num_records for partition in vcf_metadata.partitions ) + logger.info( + f"Exploding {pcvcf.num_columns} columns {total_variants} variants " + f"{pcvcf.num_samples} samples" + ) global progress_counter progress_counter = multiprocessing.Value("Q", 0) @@ -778,6 +794,7 @@ def convert_partition( partition = vcf_metadata.partitions[partition_index] vcf = cyvcf2.VCF(partition.vcf_path) futures = set() + logger.info(f"Start partition {partition_index} {partition.vcf_path}") def service_futures(max_waiting=2 * flush_threads): while len(futures) > max_waiting: @@ -828,12 +845,7 @@ def service_futures(max_waiting=2 * flush_threads): gt.append(variant.genotype.array()) for name, buff in info_fields: - val = None - try: - val = variant.INFO[name] - except KeyError: - pass - buff.append(val) + buff.append(variant.INFO.get(name, None)) for name, buff in format_fields: val = None @@ -845,11 +857,15 @@ def service_futures(max_waiting=2 * flush_threads): service_futures() + # Note: an issue with updating the progress per variant here like this + # is that we get a significant pause at the end of the counter while + # all the "small" fields get flushed. Possibly not much to be done about it. with progress_counter.get_lock(): progress_counter.value += 1 for col in columns.values(): col.flush() + logger.info(f"VCF read finished; waiting on {len(futures)} chunk writes") service_futures(0) return summaries From 5657dd40e0a3e2affee0419cf6f8d03f8f749c81 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Sun, 18 Feb 2024 22:36:20 +0000 Subject: [PATCH 09/10] Add rough logging for to_zarr step --- bio2zarr/cli.py | 8 ++++++-- bio2zarr/vcf.py | 7 +++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index db4053f3..b6625749 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -65,9 +65,11 @@ def genspec(columnarised): @click.command @click.argument("columnarised", type=click.Path()) @click.argument("zarr_path", type=click.Path()) +@verbose @click.option("-s", "--conversion-spec", default=None) @worker_processes -def to_zarr(columnarised, zarr_path, conversion_spec, worker_processes): +def to_zarr(columnarised, zarr_path, verbose, conversion_spec, worker_processes): + setup_logging(verbose) vcf.to_zarr( columnarised, zarr_path, @@ -80,8 +82,10 @@ def to_zarr(columnarised, zarr_path, conversion_spec, worker_processes): @click.command(name="convert") @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) +@verbose @worker_processes -def convert_vcf(vcfs, out_path, worker_processes): +def convert_vcf(vcfs, out_path, verbose, worker_processes): + setup_logging(verbose) vcf.convert_vcf( vcfs, out_path, show_progress=True, worker_processes=worker_processes ) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index c7b0ee48..14c0aec0 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -1233,6 +1233,7 @@ def encode_alleles(self, pcvcf): with progress_counter.get_lock(): for col in [ref_col, alt_col]: progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug("alleles done") def encode_samples(self, pcvcf, sample_id, chunk_width): if not np.array_equal(sample_id, pcvcf.metadata.samples): @@ -1245,6 +1246,7 @@ def encode_samples(self, pcvcf, sample_id, chunk_width): chunks=(chunk_width,), ) array.attrs["_ARRAY_DIMENSIONS"] = ["samples"] + logger.debug(f"Samples done") def encode_contig(self, pcvcf, contig_names, contig_lengths): array = self.root.array( @@ -1278,6 +1280,7 @@ def encode_contig(self, pcvcf, contig_names, contig_lengths): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug(f"Contig done") def encode_filters(self, pcvcf, filter_names): self.root.attrs["filters"] = filter_names @@ -1305,6 +1308,7 @@ def encode_filters(self, pcvcf, filter_names): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug(f"Filters done") def encode_id(self, pcvcf): col = pcvcf.columns["ID"] @@ -1325,6 +1329,7 @@ def encode_id(self, pcvcf): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug(f"ID done") @staticmethod def convert( @@ -1332,6 +1337,7 @@ def convert( ): store = zarr.DirectoryStore(path) # FIXME + logger.info(f"Create zarr at {path}") sgvcf = SgvcfZarr(path) sgvcf.root = zarr.group(store=store, overwrite=True) for variable in conversion_spec.variables[:]: @@ -1408,6 +1414,7 @@ def async_flush_array(executor, np_buffer, zarr_array, offset): """ Flush the specified chunk aligned buffer to the specified zarr array. """ + logger.debug(f"Schededule flush {zarr_array} @ {offset}") assert zarr_array.shape[1:] == np_buffer.shape[1:] # print("sync", zarr_array, np_buffer) From 64f0c36f1a8dc8ba68e1df153e168b0cd903cfb5 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Sun, 18 Feb 2024 22:59:53 +0000 Subject: [PATCH 10/10] Add logging for encode, plus basic safety --- bio2zarr/vcf.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 14c0aec0..905a9ad7 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -3,6 +3,7 @@ import multiprocessing import functools import logging +import os import threading import pathlib import time @@ -1246,7 +1247,7 @@ def encode_samples(self, pcvcf, sample_id, chunk_width): chunks=(chunk_width,), ) array.attrs["_ARRAY_DIMENSIONS"] = ["samples"] - logger.debug(f"Samples done") + logger.debug("Samples done") def encode_contig(self, pcvcf, contig_names, contig_lengths): array = self.root.array( @@ -1280,7 +1281,7 @@ def encode_contig(self, pcvcf, contig_names, contig_lengths): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size - logger.debug(f"Contig done") + logger.debug("Contig done") def encode_filters(self, pcvcf, filter_names): self.root.attrs["filters"] = filter_names @@ -1308,7 +1309,7 @@ def encode_filters(self, pcvcf, filter_names): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size - logger.debug(f"Filters done") + logger.debug("Filters done") def encode_id(self, pcvcf): col = pcvcf.columns["ID"] @@ -1329,16 +1330,21 @@ def encode_id(self, pcvcf): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size - logger.debug(f"ID done") + logger.debug("ID done") @staticmethod def convert( pcvcf, path, conversion_spec, *, worker_processes=1, show_progress=False ): - store = zarr.DirectoryStore(path) - # FIXME - logger.info(f"Create zarr at {path}") - sgvcf = SgvcfZarr(path) + path = pathlib.Path(path) + # TODO: we should do this as a future to avoid blocking + if path.exists(): + shutil.rmtree(path) + write_path = path.with_suffix(path.suffix + f".{os.getpid()}.build") + store = zarr.DirectoryStore(write_path) + # FIXME, duplicating logic about the store + logger.info(f"Create zarr at {write_path}") + sgvcf = SgvcfZarr(write_path) sgvcf.root = zarr.group(store=store, overwrite=True) for variable in conversion_spec.variables[:]: sgvcf.create_array(variable) @@ -1399,11 +1405,14 @@ def convert( flush_futures(futures) - zarr.consolidate_metadata(path) # FIXME can't join the bar_thread because we never get to the correct # number of bytes # if bar_thread is not None: # bar_thread.join() + zarr.consolidate_metadata(write_path) + # Atomic swap, now we've completely finished. + logger.info(f"Moving to final path {path}") + os.rename(write_path, path) def sync_flush_array(np_buffer, zarr_array, offset): @@ -1414,7 +1423,7 @@ def async_flush_array(executor, np_buffer, zarr_array, offset): """ Flush the specified chunk aligned buffer to the specified zarr array. """ - logger.debug(f"Schededule flush {zarr_array} @ {offset}") + logger.debug(f"Schedule flush {zarr_array} @ {offset}") assert zarr_array.shape[1:] == np_buffer.shape[1:] # print("sync", zarr_array, np_buffer)