Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bio2zarr/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import click

from . import cli

@click.group()
def top_level():
pass

# Provide a single top-level interface to all of the functionality.
# This probably isn't the recommended way of interacting, as we
# install individual commands as console scripts. However, this
# is handy for development and for those whose PATHs aren't set
# up in the right way.
top_level.add_command(cli.vcf2zarr)
top_level.add_command(cli.plink2zarr)

if __name__ == "__main__":
top_level()
136 changes: 136 additions & 0 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import click
import tabulate
import coloredlogs

from . import vcf

# Common arguments/options
verbose = click.option("-v", "--verbose", count=True, help="Increase verbosity")

worker_processes = click.option(
"-p", "--worker-processes", type=int, default=1, help="Number of worker processes"
)


# Note: logging hasn't been implemented in the code at all, this is just
# a first pass to try out some ways of doing things to see what works.
def setup_logging(verbosity):
level = "WARNING"
if verbosity == 1:
level = "INFO"
elif verbosity >= 2:
level = "DEBUG"
# NOTE: I'm not that excited about coloredlogs, just trying it out
# as it is installed by cyvcf2 anyway. We will have some complicated
# stuff doing on with threads and processes, to logs might not work
# so well anyway.
coloredlogs.install(level=level)


@click.command
@click.argument("vcfs", nargs=-1, required=True)
@click.argument("out_path", type=click.Path())
@verbose
@worker_processes
@click.option("-c", "--column-chunk-size", type=int, default=64)
def explode(vcfs, out_path, verbose, worker_processes, column_chunk_size):
setup_logging(verbose)
vcf.explode(
vcfs,
out_path,
worker_processes=worker_processes,
column_chunk_size=column_chunk_size,
show_progress=True,
)


@click.command
@click.argument("columnarised", type=click.Path())
@verbose
def summarise(columnarised, verbose):
setup_logging(verbose)
pcvcf = vcf.PickleChunkedVcf.load(columnarised)
data = pcvcf.summary_table()
click.echo(tabulate.tabulate(data, headers="keys"))


@click.command
@click.argument("columnarised", type=click.Path())
# @click.argument("specfile", type=click.Path())
def genspec(columnarised):
stream = click.get_text_stream("stdout")
vcf.generate_spec(columnarised, stream)


@click.command
@click.argument("columnarised", type=click.Path())
@click.argument("zarr_path", type=click.Path())
@verbose
@click.option("-s", "--conversion-spec", default=None)
@worker_processes
def to_zarr(columnarised, zarr_path, verbose, conversion_spec, worker_processes):
setup_logging(verbose)
vcf.to_zarr(
columnarised,
zarr_path,
conversion_spec,
worker_processes=worker_processes,
show_progress=True,
)


@click.command(name="convert")
@click.argument("vcfs", nargs=-1, required=True)
@click.argument("out_path", type=click.Path())
@verbose
@worker_processes
def convert_vcf(vcfs, out_path, verbose, worker_processes):
setup_logging(verbose)
vcf.convert_vcf(
vcfs, out_path, show_progress=True, worker_processes=worker_processes
)


@click.command
@click.argument("vcfs", nargs=-1, required=True)
@click.argument("out_path", type=click.Path())
def validate(vcfs, out_path):
vcf.validate(vcfs[0], out_path, show_progress=True)


@click.group()
def vcf2zarr():
pass


vcf2zarr.add_command(explode)
vcf2zarr.add_command(summarise)
vcf2zarr.add_command(genspec)
vcf2zarr.add_command(to_zarr)
vcf2zarr.add_command(convert_vcf)
vcf2zarr.add_command(validate)


@click.command(name="convert")
@click.argument("plink", type=click.Path())
@click.argument("out_path", type=click.Path())
@worker_processes
@click.option("--chunk-width", type=int, default=None)
@click.option("--chunk-length", type=int, default=None)
def convert_plink(plink, out_path, worker_processes, chunk_width, chunk_length):
vcf.convert_plink(
plink,
out_path,
show_progress=True,
worker_processes=worker_processes,
chunk_width=chunk_width,
chunk_length=chunk_length,
)


@click.group()
def plink2zarr():
pass


plink2zarr.add_command(convert_plink)
64 changes: 50 additions & 14 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import dataclasses
import multiprocessing
import functools
import logging
import os
import threading
import pathlib
import time
Expand All @@ -13,7 +15,7 @@
import tempfile
from typing import Any

import humanize
import humanfriendly
import cyvcf2
import numcodecs
import numpy as np
Expand All @@ -23,6 +25,8 @@

import bed_reader

logger = logging.getLogger(__name__)

INT_MISSING = -1
INT_FILL = -2
STR_MISSING = "."
Expand Down Expand Up @@ -271,8 +275,10 @@ def make_field_def(name, vcf_type, vcf_number):
def scan_vcfs(paths, show_progress):
partitions = []
vcf_metadata = None
logger.info(f"Scanning {len(paths)} VCFs")
for path in tqdm.tqdm(paths, desc="Scan ", disable=not show_progress):
vcf = cyvcf2.VCF(path)
logger.debug(f"Scanning {path}")

filters = [
h["ID"]
Expand Down Expand Up @@ -459,8 +465,12 @@ def __repr__(self):
# TODO add class name
return repr({"path": str(self.path), **self.vcf_field.summary.asdict()})

def chunk_path(self, partition_index, chunk_index):
return self.path / f"p{partition_index}" / f"c{chunk_index}"

def write_chunk(self, partition_index, chunk_index, data):
path = self.path / f"p{partition_index}" / f"c{chunk_index}"
path = self.chunk_path(partition_index, chunk_index)
logger.debug(f"Start write: {path}")
pkl = pickle.dumps(data)
# NOTE assuming that reusing the same compressor instance
# from multiple threads is OK!
Expand All @@ -472,9 +482,10 @@ def write_chunk(self, partition_index, chunk_index, data):
self.vcf_field.summary.num_chunks += 1
self.vcf_field.summary.compressed_size += len(compressed)
self.vcf_field.summary.uncompressed_size += len(pkl)
logger.debug(f"Finish write: {path}")

def read_chunk(self, partition_index, chunk_index):
path = self.path / f"p{partition_index}" / f"c{chunk_index}"
path = self.chunk_path(partition_index, chunk_index)
with open(path, "rb") as f:
pkl = self.compressor.decode(f.read())
return pickle.loads(pkl), len(pkl)
Expand Down Expand Up @@ -615,6 +626,8 @@ def append(self, val):

def flush(self):
if len(self.buffer) > 0:
path = self.column.chunk_path(self.partition_index, self.chunk_index)
logger.debug(f"Schedule write: {path}")
future = self.executor.submit(
self.column.write_chunk,
self.partition_index,
Expand Down Expand Up @@ -649,7 +662,7 @@ def display_number(x):
return ret

def display_size(n):
return humanize.naturalsize(n, binary=True)
return humanfriendly.format_size(n)

data = []
for name, col in self.columns.items():
Expand Down Expand Up @@ -688,6 +701,10 @@ def num_partitions(self):
def num_samples(self):
return len(self.metadata.samples)

@property
def num_columns(self):
return len(self.columns)

def mkdirs(self):
self.path.mkdir()
for col in self.columns.values():
Expand Down Expand Up @@ -716,6 +733,10 @@ def convert(
partition.num_records for partition in vcf_metadata.partitions
)

logger.info(
f"Exploding {pcvcf.num_columns} columns {total_variants} variants "
f"{pcvcf.num_samples} samples"
)
global progress_counter
progress_counter = multiprocessing.Value("Q", 0)

Expand Down Expand Up @@ -774,6 +795,7 @@ def convert_partition(
partition = vcf_metadata.partitions[partition_index]
vcf = cyvcf2.VCF(partition.vcf_path)
futures = set()
logger.info(f"Start partition {partition_index} {partition.vcf_path}")

def service_futures(max_waiting=2 * flush_threads):
while len(futures) > max_waiting:
Expand Down Expand Up @@ -824,12 +846,7 @@ def service_futures(max_waiting=2 * flush_threads):
gt.append(variant.genotype.array())

for name, buff in info_fields:
val = None
try:
val = variant.INFO[name]
except KeyError:
pass
buff.append(val)
buff.append(variant.INFO.get(name, None))

for name, buff in format_fields:
val = None
Expand All @@ -841,11 +858,15 @@ def service_futures(max_waiting=2 * flush_threads):

service_futures()

# Note: an issue with updating the progress per variant here like this
# is that we get a significant pause at the end of the counter while
# all the "small" fields get flushed. Possibly not much to be done about it.
with progress_counter.get_lock():
progress_counter.value += 1

for col in columns.values():
col.flush()
logger.info(f"VCF read finished; waiting on {len(futures)} chunk writes")
service_futures(0)

return summaries
Expand Down Expand Up @@ -1213,6 +1234,7 @@ def encode_alleles(self, pcvcf):
with progress_counter.get_lock():
for col in [ref_col, alt_col]:
progress_counter.value += col.vcf_field.summary.uncompressed_size
logger.debug("alleles done")

def encode_samples(self, pcvcf, sample_id, chunk_width):
if not np.array_equal(sample_id, pcvcf.metadata.samples):
Expand All @@ -1225,6 +1247,7 @@ def encode_samples(self, pcvcf, sample_id, chunk_width):
chunks=(chunk_width,),
)
array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
logger.debug("Samples done")

def encode_contig(self, pcvcf, contig_names, contig_lengths):
array = self.root.array(
Expand Down Expand Up @@ -1258,6 +1281,7 @@ def encode_contig(self, pcvcf, contig_names, contig_lengths):

with progress_counter.get_lock():
progress_counter.value += col.vcf_field.summary.uncompressed_size
logger.debug("Contig done")

def encode_filters(self, pcvcf, filter_names):
self.root.attrs["filters"] = filter_names
Expand Down Expand Up @@ -1285,6 +1309,7 @@ def encode_filters(self, pcvcf, filter_names):

with progress_counter.get_lock():
progress_counter.value += col.vcf_field.summary.uncompressed_size
logger.debug("Filters done")

def encode_id(self, pcvcf):
col = pcvcf.columns["ID"]
Expand All @@ -1305,14 +1330,21 @@ def encode_id(self, pcvcf):

with progress_counter.get_lock():
progress_counter.value += col.vcf_field.summary.uncompressed_size
logger.debug("ID done")

@staticmethod
def convert(
pcvcf, path, conversion_spec, *, worker_processes=1, show_progress=False
):
store = zarr.DirectoryStore(path)
# FIXME
sgvcf = SgvcfZarr(path)
path = pathlib.Path(path)
# TODO: we should do this as a future to avoid blocking
if path.exists():
shutil.rmtree(path)
write_path = path.with_suffix(path.suffix + f".{os.getpid()}.build")
store = zarr.DirectoryStore(write_path)
# FIXME, duplicating logic about the store
logger.info(f"Create zarr at {write_path}")
sgvcf = SgvcfZarr(write_path)
sgvcf.root = zarr.group(store=store, overwrite=True)
for variable in conversion_spec.variables[:]:
sgvcf.create_array(variable)
Expand Down Expand Up @@ -1373,11 +1405,14 @@ def convert(

flush_futures(futures)

zarr.consolidate_metadata(path)
# FIXME can't join the bar_thread because we never get to the correct
# number of bytes
# if bar_thread is not None:
# bar_thread.join()
zarr.consolidate_metadata(write_path)
# Atomic swap, now we've completely finished.
logger.info(f"Moving to final path {path}")
os.rename(write_path, path)


def sync_flush_array(np_buffer, zarr_array, offset):
Expand All @@ -1388,6 +1423,7 @@ def async_flush_array(executor, np_buffer, zarr_array, offset):
"""
Flush the specified chunk aligned buffer to the specified zarr array.
"""
logger.debug(f"Schedule flush {zarr_array} @ {offset}")
assert zarr_array.shape[1:] == np_buffer.shape[1:]
# print("sync", zarr_array, np_buffer)

Expand Down
Loading