diff --git a/bio2zarr/__main__.py b/bio2zarr/__main__.py new file mode 100644 index 00000000..9334d8a8 --- /dev/null +++ b/bio2zarr/__main__.py @@ -0,0 +1,18 @@ +import click + +from . import cli + +@click.group() +def top_level(): + pass + +# Provide a single top-level interface to all of the functionality. +# This probably isn't the recommended way of interacting, as we +# install individual commands as console scripts. However, this +# is handy for development and for those whose PATHs aren't set +# up in the right way. +top_level.add_command(cli.vcf2zarr) +top_level.add_command(cli.plink2zarr) + +if __name__ == "__main__": + top_level() diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py new file mode 100644 index 00000000..b6625749 --- /dev/null +++ b/bio2zarr/cli.py @@ -0,0 +1,136 @@ +import click +import tabulate +import coloredlogs + +from . import vcf + +# Common arguments/options +verbose = click.option("-v", "--verbose", count=True, help="Increase verbosity") + +worker_processes = click.option( + "-p", "--worker-processes", type=int, default=1, help="Number of worker processes" +) + + +# Note: logging hasn't been implemented in the code at all, this is just +# a first pass to try out some ways of doing things to see what works. +def setup_logging(verbosity): + level = "WARNING" + if verbosity == 1: + level = "INFO" + elif verbosity >= 2: + level = "DEBUG" + # NOTE: I'm not that excited about coloredlogs, just trying it out + # as it is installed by cyvcf2 anyway. We will have some complicated + # stuff doing on with threads and processes, to logs might not work + # so well anyway. + coloredlogs.install(level=level) + + +@click.command +@click.argument("vcfs", nargs=-1, required=True) +@click.argument("out_path", type=click.Path()) +@verbose +@worker_processes +@click.option("-c", "--column-chunk-size", type=int, default=64) +def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size): + setup_logging(verbose) + vcf.explode( + vcfs, + out_path, + worker_processes=worker_processes, + column_chunk_size=column_chunk_size, + show_progress=True, + ) + + +@click.command +@click.argument("columnarised", type=click.Path()) +@verbose +def summarise(columnarised, verbose): + setup_logging(verbose) + pcvcf = vcf.PickleChunkedVcf.load(columnarised) + data = pcvcf.summary_table() + click.echo(tabulate.tabulate(data, headers="keys")) + + +@click.command +@click.argument("columnarised", type=click.Path()) +# @click.argument("specfile", type=click.Path()) +def genspec(columnarised): + stream = click.get_text_stream("stdout") + vcf.generate_spec(columnarised, stream) + + +@click.command +@click.argument("columnarised", type=click.Path()) +@click.argument("zarr_path", type=click.Path()) +@verbose +@click.option("-s", "--conversion-spec", default=None) +@worker_processes +def to_zarr(columnarised, zarr_path, verbose, conversion_spec, worker_processes): + setup_logging(verbose) + vcf.to_zarr( + columnarised, + zarr_path, + conversion_spec, + worker_processes=worker_processes, + show_progress=True, + ) + + +@click.command(name="convert") +@click.argument("vcfs", nargs=-1, required=True) +@click.argument("out_path", type=click.Path()) +@verbose +@worker_processes +def convert_vcf(vcfs, out_path, verbose, worker_processes): + setup_logging(verbose) + vcf.convert_vcf( + vcfs, out_path, show_progress=True, worker_processes=worker_processes + ) + + +@click.command +@click.argument("vcfs", nargs=-1, required=True) +@click.argument("out_path", type=click.Path()) +def validate(vcfs, out_path): + vcf.validate(vcfs[0], out_path, show_progress=True) + + +@click.group() +def vcf2zarr(): + pass + + +vcf2zarr.add_command(explode) +vcf2zarr.add_command(summarise) +vcf2zarr.add_command(genspec) +vcf2zarr.add_command(to_zarr) +vcf2zarr.add_command(convert_vcf) +vcf2zarr.add_command(validate) + + +@click.command(name="convert") +@click.argument("plink", type=click.Path()) +@click.argument("out_path", type=click.Path()) +@worker_processes +@click.option("--chunk-width", type=int, default=None) +@click.option("--chunk-length", type=int, default=None) +def convert_plink(plink, out_path, worker_processes, chunk_width, chunk_length): + vcf.convert_plink( + plink, + out_path, + show_progress=True, + worker_processes=worker_processes, + chunk_width=chunk_width, + chunk_length=chunk_length, + ) + + +@click.group() +def plink2zarr(): + pass + + +plink2zarr.add_command(convert_plink) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 8dae8d5a..905a9ad7 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -2,6 +2,8 @@ import dataclasses import multiprocessing import functools +import logging +import os import threading import pathlib import time @@ -13,7 +15,7 @@ import tempfile from typing import Any -import humanize +import humanfriendly import cyvcf2 import numcodecs import numpy as np @@ -23,6 +25,8 @@ import bed_reader +logger = logging.getLogger(__name__) + INT_MISSING = -1 INT_FILL = -2 STR_MISSING = "." @@ -271,8 +275,10 @@ def make_field_def(name, vcf_type, vcf_number): def scan_vcfs(paths, show_progress): partitions = [] vcf_metadata = None + logger.info(f"Scanning {len(paths)} VCFs") for path in tqdm.tqdm(paths, desc="Scan ", disable=not show_progress): vcf = cyvcf2.VCF(path) + logger.debug(f"Scanning {path}") filters = [ h["ID"] @@ -459,8 +465,12 @@ def __repr__(self): # TODO add class name return repr({"path": str(self.path), **self.vcf_field.summary.asdict()}) + def chunk_path(self, partition_index, chunk_index): + return self.path / f"p{partition_index}" / f"c{chunk_index}" + def write_chunk(self, partition_index, chunk_index, data): - path = self.path / f"p{partition_index}" / f"c{chunk_index}" + path = self.chunk_path(partition_index, chunk_index) + logger.debug(f"Start write: {path}") pkl = pickle.dumps(data) # NOTE assuming that reusing the same compressor instance # from multiple threads is OK! @@ -472,9 +482,10 @@ def write_chunk(self, partition_index, chunk_index, data): self.vcf_field.summary.num_chunks += 1 self.vcf_field.summary.compressed_size += len(compressed) self.vcf_field.summary.uncompressed_size += len(pkl) + logger.debug(f"Finish write: {path}") def read_chunk(self, partition_index, chunk_index): - path = self.path / f"p{partition_index}" / f"c{chunk_index}" + path = self.chunk_path(partition_index, chunk_index) with open(path, "rb") as f: pkl = self.compressor.decode(f.read()) return pickle.loads(pkl), len(pkl) @@ -615,6 +626,8 @@ def append(self, val): def flush(self): if len(self.buffer) > 0: + path = self.column.chunk_path(self.partition_index, self.chunk_index) + logger.debug(f"Schedule write: {path}") future = self.executor.submit( self.column.write_chunk, self.partition_index, @@ -649,7 +662,7 @@ def display_number(x): return ret def display_size(n): - return humanize.naturalsize(n, binary=True) + return humanfriendly.format_size(n) data = [] for name, col in self.columns.items(): @@ -688,6 +701,10 @@ def num_partitions(self): def num_samples(self): return len(self.metadata.samples) + @property + def num_columns(self): + return len(self.columns) + def mkdirs(self): self.path.mkdir() for col in self.columns.values(): @@ -716,6 +733,10 @@ def convert( partition.num_records for partition in vcf_metadata.partitions ) + logger.info( + f"Exploding {pcvcf.num_columns} columns {total_variants} variants " + f"{pcvcf.num_samples} samples" + ) global progress_counter progress_counter = multiprocessing.Value("Q", 0) @@ -774,6 +795,7 @@ def convert_partition( partition = vcf_metadata.partitions[partition_index] vcf = cyvcf2.VCF(partition.vcf_path) futures = set() + logger.info(f"Start partition {partition_index} {partition.vcf_path}") def service_futures(max_waiting=2 * flush_threads): while len(futures) > max_waiting: @@ -824,12 +846,7 @@ def service_futures(max_waiting=2 * flush_threads): gt.append(variant.genotype.array()) for name, buff in info_fields: - val = None - try: - val = variant.INFO[name] - except KeyError: - pass - buff.append(val) + buff.append(variant.INFO.get(name, None)) for name, buff in format_fields: val = None @@ -841,11 +858,15 @@ def service_futures(max_waiting=2 * flush_threads): service_futures() + # Note: an issue with updating the progress per variant here like this + # is that we get a significant pause at the end of the counter while + # all the "small" fields get flushed. Possibly not much to be done about it. with progress_counter.get_lock(): progress_counter.value += 1 for col in columns.values(): col.flush() + logger.info(f"VCF read finished; waiting on {len(futures)} chunk writes") service_futures(0) return summaries @@ -1213,6 +1234,7 @@ def encode_alleles(self, pcvcf): with progress_counter.get_lock(): for col in [ref_col, alt_col]: progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug("alleles done") def encode_samples(self, pcvcf, sample_id, chunk_width): if not np.array_equal(sample_id, pcvcf.metadata.samples): @@ -1225,6 +1247,7 @@ def encode_samples(self, pcvcf, sample_id, chunk_width): chunks=(chunk_width,), ) array.attrs["_ARRAY_DIMENSIONS"] = ["samples"] + logger.debug("Samples done") def encode_contig(self, pcvcf, contig_names, contig_lengths): array = self.root.array( @@ -1258,6 +1281,7 @@ def encode_contig(self, pcvcf, contig_names, contig_lengths): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug("Contig done") def encode_filters(self, pcvcf, filter_names): self.root.attrs["filters"] = filter_names @@ -1285,6 +1309,7 @@ def encode_filters(self, pcvcf, filter_names): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug("Filters done") def encode_id(self, pcvcf): col = pcvcf.columns["ID"] @@ -1305,14 +1330,21 @@ def encode_id(self, pcvcf): with progress_counter.get_lock(): progress_counter.value += col.vcf_field.summary.uncompressed_size + logger.debug("ID done") @staticmethod def convert( pcvcf, path, conversion_spec, *, worker_processes=1, show_progress=False ): - store = zarr.DirectoryStore(path) - # FIXME - sgvcf = SgvcfZarr(path) + path = pathlib.Path(path) + # TODO: we should do this as a future to avoid blocking + if path.exists(): + shutil.rmtree(path) + write_path = path.with_suffix(path.suffix + f".{os.getpid()}.build") + store = zarr.DirectoryStore(write_path) + # FIXME, duplicating logic about the store + logger.info(f"Create zarr at {write_path}") + sgvcf = SgvcfZarr(write_path) sgvcf.root = zarr.group(store=store, overwrite=True) for variable in conversion_spec.variables[:]: sgvcf.create_array(variable) @@ -1373,11 +1405,14 @@ def convert( flush_futures(futures) - zarr.consolidate_metadata(path) # FIXME can't join the bar_thread because we never get to the correct # number of bytes # if bar_thread is not None: # bar_thread.join() + zarr.consolidate_metadata(write_path) + # Atomic swap, now we've completely finished. + logger.info(f"Moving to final path {path}") + os.rename(write_path, path) def sync_flush_array(np_buffer, zarr_array, offset): @@ -1388,6 +1423,7 @@ def async_flush_array(executor, np_buffer, zarr_array, offset): """ Flush the specified chunk aligned buffer to the specified zarr array. """ + logger.debug(f"Schedule flush {zarr_array} @ {offset}") assert zarr_array.shape[1:] == np_buffer.shape[1:] # print("sync", zarr_array, np_buffer) diff --git a/setup.cfg b/setup.cfg index 0a240ba0..6dd3ba93 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,12 +29,23 @@ python_requires = >=3.8 install_requires = numpy zarr >= 2.10.0, != 2.11.0, != 2.11.1, != 2.11.2 + click + tabulate + tqdm + humanfriendly + # cyvcf2 also pulls in coloredlogs and click + # colouredlogs pulls in humanfriendly cyvcf2 bed_reader setup_requires = setuptools >= 41.2 setuptools_scm +[options.entry_points] +console_scripts = + vcf2zarr = bio2zarr.cli:vcf2zarr + plink2zarr = bio2zarr.cli:plink2zarr + [flake8] ignore = # whitespace before ':' - doesn't work well with black diff --git a/validation-data/Makefile b/validation-data/Makefile index 7d540d78..d745befe 100644 --- a/validation-data/Makefile +++ b/validation-data/Makefile @@ -6,7 +6,7 @@ all: 1kg_2020_chr20.bcf.csi \ 1kg_2020_others.vcf.gz.tbi \ 1kg_p3_all_chr1.bcf.csi \ 1kg_p3_all_chr1.vcf.gz.tbi\ - 1kg_p1_all_chr6.vcf.bcf.csi\ + 1kg_p1_all_chr6.bcf.csi\ 1kg_p1_all_chr6.vcf.gz.tbi .PRECIOUS: %.bcf diff --git a/vcf2zarr.py b/vcf2zarr.py deleted file mode 100644 index 3ed3287d..00000000 --- a/vcf2zarr.py +++ /dev/null @@ -1,103 +0,0 @@ -import json - -import click -import yaml -import tabulate - -import bio2zarr.vcf as cnv # fixme - - -@click.command -@click.argument("vcfs", nargs=-1, required=True) -@click.argument("out_path", type=click.Path()) -@click.option("-p", "--worker-processes", type=int, default=1) -@click.option("-c", "--column-chunk-size", type=int, default=64) -def explode(vcfs, out_path, worker_processes, column_chunk_size): - cnv.explode( - vcfs, - out_path, - worker_processes=worker_processes, - column_chunk_size=column_chunk_size, - show_progress=True, - ) - - -@click.command -@click.argument("columnarised", type=click.Path()) -def summarise(columnarised): - pcvcf = cnv.PickleChunkedVcf.load(columnarised) - data = pcvcf.summary_table() - print(tabulate.tabulate(data, headers="keys")) - - -@click.command -@click.argument("columnarised", type=click.Path()) -# @click.argument("specfile", type=click.Path()) -def genspec(columnarised): - stream = click.get_text_stream("stdout") - cnv.generate_spec(columnarised, stream) - - -@click.command -@click.argument("columnarised", type=click.Path()) -@click.argument("zarr_path", type=click.Path()) -@click.option("-s", "--conversion-spec", default=None) -@click.option("-p", "--worker-processes", type=int, default=1) -def to_zarr(columnarised, zarr_path, conversion_spec, worker_processes): - cnv.to_zarr( - columnarised, - zarr_path, - conversion_spec, - worker_processes=worker_processes, - show_progress=True) - - -@click.command -@click.argument("vcfs", nargs=-1, required=True) -@click.argument("out_path", type=click.Path()) -@click.option("-p", "--worker-processes", type=int, default=1) -def convert(vcfs, out_path, worker_processes): - cnv.convert_vcf( - vcfs, out_path, show_progress=True, worker_processes=worker_processes - ) - - -@click.command -@click.argument("vcfs", nargs=-1, required=True) -@click.argument("out_path", type=click.Path()) -def validate(vcfs, out_path): - cnv.validate(vcfs[0], out_path, show_progress=True) - - -@click.command -@click.argument("plink", type=click.Path()) -@click.argument("out_path", type=click.Path()) -@click.option("-p", "--worker-processes", type=int, default=1) -@click.option("--chunk-width", type=int, default=None) -@click.option("--chunk-length", type=int, default=None) -def convert_plink(plink, out_path, worker_processes, chunk_width, chunk_length): - cnv.convert_plink( - plink, - out_path, - show_progress=True, - worker_processes=worker_processes, - chunk_width=chunk_width, - chunk_length=chunk_length, - ) - - -@click.group() -def cli(): - pass - - -cli.add_command(explode) -cli.add_command(summarise) -cli.add_command(genspec) -cli.add_command(to_zarr) -cli.add_command(convert) -cli.add_command(validate) -cli.add_command(convert_plink) - -if __name__ == "__main__": - cli()