Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@
"-p", "--worker-processes", type=int, default=1, help="Number of worker processes"
)

# TODO help text
chunk_length = click.option(
# Note: -l and -w were chosen when these were called "width" and "length".
# possibly there are better letters now.
variants_chunk_size = click.option(
"-l",
"--chunk-length",
"--variants-chunk-size",
type=int,
default=None,
help="Chunk size in the variants dimension",
)

chunk_width = click.option(
samples_chunk_size = click.option(
"-w",
"--chunk-width",
"--samples-chunk-size",
type=int,
default=None,
help="Chunk size in the samples dimension",
Expand Down Expand Up @@ -96,8 +97,8 @@ def mkschema(if_path):
@click.argument("zarr_path", type=click.Path())
@verbose
@click.option("-s", "--schema", default=None, type=click.Path(exists=True))
@chunk_length
@chunk_width
@variants_chunk_size
@samples_chunk_size
@click.option(
"-V",
"--max-variant-chunks",
Expand All @@ -122,8 +123,8 @@ def encode(
zarr_path,
verbose,
schema,
chunk_length,
chunk_width,
variants_chunk_size,
samples_chunk_size,
max_variant_chunks,
max_memory,
worker_processes,
Expand All @@ -136,8 +137,8 @@ def encode(
if_path,
zarr_path,
schema,
chunk_length=chunk_length,
chunk_width=chunk_width,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
max_v_chunks=max_variant_chunks,
worker_processes=worker_processes,
max_memory=max_memory,
Expand All @@ -148,20 +149,22 @@ def encode(
@click.command(name="convert")
@click.argument("vcfs", nargs=-1, required=True)
@click.argument("out_path", type=click.Path())
@chunk_length
@chunk_width
@variants_chunk_size
@samples_chunk_size
@verbose
@worker_processes
def convert_vcf(vcfs, out_path, chunk_length, chunk_width, verbose, worker_processes):
def convert_vcf(
vcfs, out_path, variants_chunk_size, samples_chunk_size, verbose, worker_processes
):
"""
Convert input VCF(s) directly to vcfzarr (not recommended for large files)
"""
setup_logging(verbose)
vcf.convert(
vcfs,
out_path,
chunk_length=chunk_length,
chunk_width=chunk_width,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
show_progress=True,
worker_processes=worker_processes,
)
Expand Down Expand Up @@ -198,10 +201,15 @@ def vcf2zarr():
@click.argument("out_path", type=click.Path())
@worker_processes
@verbose
@chunk_length
@chunk_width
@variants_chunk_size
@samples_chunk_size
def convert_plink(
in_path, out_path, verbose, worker_processes, chunk_length, chunk_width
in_path,
out_path,
verbose,
worker_processes,
variants_chunk_size,
samples_chunk_size,
):
"""
In development; DO NOT USE!
Expand All @@ -212,8 +220,8 @@ def convert_plink(
out_path,
show_progress=True,
worker_processes=worker_processes,
chunk_width=chunk_width,
chunk_length=chunk_length,
samples_chunk_size=samples_chunk_size,
variants_chunk_size=variants_chunk_size,
)


Expand Down
10 changes: 5 additions & 5 deletions bio2zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ def __init__(self, array, offset):
self.buffer_row = 0

@property
def chunk_length(self):
def variants_chunk_size(self):
return self.buff.shape[0]

def next_buffer_row(self):
if self.buffer_row == self.chunk_length:
if self.buffer_row == self.variants_chunk_size:
self.flush()
row = self.buffer_row
self.buffer_row += 1
Expand All @@ -112,7 +112,7 @@ def flush(self):
f"{self.array_offset}:{self.array_offset + self.buffer_row}"
f"{self.buff.nbytes / 2**20: .2f}Mb"
)
self.array_offset += self.chunk_length
self.array_offset += self.variants_chunk_size
self.buffer_row = 0


Expand All @@ -126,13 +126,13 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset):
# incremental, and to avoid large memcopies in the underlying
# encoder implementations.
s = slice(offset, offset + np_buffer.shape[0])
chunk_width = zarr_array.chunks[1]
samples_chunk_size = zarr_array.chunks[1]
# TODO use zarr chunks here to support non-uniform chunking later
# and for simplicity
zarr_array_width = zarr_array.shape[1]
start = 0
while start < zarr_array_width:
stop = min(start + chunk_width, zarr_array_width)
stop = min(start + samples_chunk_size, zarr_array_width)
chunk_buffer = np_buffer[:, start:stop]
zarr_array[s, start:stop] = chunk_buffer
update_progress(chunk_buffer.nbytes)
Expand Down
26 changes: 13 additions & 13 deletions bio2zarr/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ def encode_genotypes_slice(bed_path, zarr_path, start, stop):
gt = core.BufferedArray(root["call_genotype"], start)
gt_mask = core.BufferedArray(root["call_genotype_mask"], start)
gt_phased = core.BufferedArray(root["call_genotype_phased"], start)
chunk_length = gt.array.chunks[0]
variants_chunk_size = gt.array.chunks[0]
n = gt.array.shape[1]
assert start % chunk_length == 0
assert start % variants_chunk_size == 0

logger.debug(f"Reading slice {start}:{stop}")
chunk_start = start
while chunk_start < stop:
chunk_stop = min(chunk_start + chunk_length, stop)
chunk_stop = min(chunk_start + variants_chunk_size, stop)
logger.debug(f"Reading bed slice {chunk_start}:{chunk_stop}")
bed_chunk = bed.read(slice(chunk_start, chunk_stop), dtype=np.int8).T
logger.debug(f"Got bed slice {humanfriendly.format_size(bed_chunk.nbytes)}")
Expand Down Expand Up @@ -60,34 +60,34 @@ def convert(
*,
show_progress=False,
worker_processes=1,
chunk_length=None,
chunk_width=None,
variants_chunk_size=None,
samples_chunk_size=None,
):
bed = bed_reader.open_bed(bed_path, num_threads=1)
n = bed.iid_count
m = bed.sid_count
logging.info(f"Scanned plink with {n} samples and {m} variants")

# FIXME
if chunk_width is None:
chunk_width = 1000
if chunk_length is None:
chunk_length = 10_000
if samples_chunk_size is None:
samples_chunk_size = 1000
if variants_chunk_size is None:
variants_chunk_size = 10_000

store = zarr.DirectoryStore(zarr_path)
root = zarr.group(store=store, overwrite=True)

ploidy = 2
shape = [m, n]
chunks = [chunk_length, chunk_width]
chunks = [variants_chunk_size, samples_chunk_size]
dimensions = ["variants", "samples"]

a = root.array(
"sample_id",
bed.iid,
dtype="str",
compressor=core.default_compressor,
chunks=(chunk_width,),
chunks=(samples_chunk_size,),
)
a.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
logger.debug(f"Encoded samples")
Expand All @@ -99,7 +99,7 @@ def convert(
bed.bp_position,
dtype=np.int32,
compressor=core.default_compressor,
chunks=(chunk_length,),
chunks=(variants_chunk_size,),
)
a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]
logger.debug(f"encoded variant_position")
Expand All @@ -110,7 +110,7 @@ def convert(
alleles,
dtype="str",
compressor=core.default_compressor,
chunks=(chunk_length,),
chunks=(variants_chunk_size,),
)
a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "alleles"]
logger.debug(f"encoded variant_allele")
Expand Down
50 changes: 26 additions & 24 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,8 +1063,8 @@ def __post_init__(self):
@dataclasses.dataclass
class ZarrConversionSpec:
format_version: str
chunk_width: int
chunk_length: int
samples_chunk_size: int
variants_chunk_size: int
dimensions: list
sample_id: list
contig_id: list
Expand All @@ -1091,15 +1091,17 @@ def fromjson(s):
return ZarrConversionSpec.fromdict(json.loads(s))

@staticmethod
def generate(pcvcf, chunk_length=None, chunk_width=None):
def generate(pcvcf, variants_chunk_size=None, samples_chunk_size=None):
m = pcvcf.num_records
n = pcvcf.num_samples
# FIXME
if chunk_width is None:
chunk_width = 1000
if chunk_length is None:
chunk_length = 10_000
logger.info(f"Generating schema with chunks={chunk_length, chunk_width}")
if samples_chunk_size is None:
samples_chunk_size = 1000
if variants_chunk_size is None:
variants_chunk_size = 10_000
logger.info(
f"Generating schema with chunks={variants_chunk_size, samples_chunk_size}"
)
compressor = core.default_compressor.get_config()

def fixed_field_spec(
Expand All @@ -1112,7 +1114,7 @@ def fixed_field_spec(
shape=shape,
description="",
dimensions=dimensions,
chunks=[chunk_length],
chunks=[variants_chunk_size],
compressor=compressor,
)

Expand Down Expand Up @@ -1170,11 +1172,11 @@ def fixed_field_spec(
shape = [m]
prefix = "variant_"
dimensions = ["variants"]
chunks = [chunk_length]
chunks = [variants_chunk_size]
if field.category == "FORMAT":
prefix = "call_"
shape.append(n)
chunks.append(chunk_width),
chunks.append(samples_chunk_size),
dimensions.append("samples")
# TODO make an option to add in the empty extra dimension
if field.summary.max_number > 1:
Expand All @@ -1196,7 +1198,7 @@ def fixed_field_spec(
if gt_field is not None:
ploidy = gt_field.summary.max_number - 1
shape = [m, n]
chunks = [chunk_length, chunk_width]
chunks = [variants_chunk_size, samples_chunk_size]
dimensions = ["variants", "samples"]

colspecs.append(
Expand Down Expand Up @@ -1241,8 +1243,8 @@ def fixed_field_spec(
return ZarrConversionSpec(
# TODO do something systematic
format_version="0.1",
chunk_width=chunk_width,
chunk_length=chunk_length,
samples_chunk_size=samples_chunk_size,
variants_chunk_size=variants_chunk_size,
columns={col.name: col for col in colspecs},
dimensions=["variants", "samples", "ploidy", "alleles", "filters"],
sample_id=pcvcf.metadata.samples,
Expand Down Expand Up @@ -1431,7 +1433,7 @@ def encode_samples(self):
self.schema.sample_id,
dtype="str",
compressor=core.default_compressor,
chunks=(self.schema.chunk_width,),
chunks=(self.schema.samples_chunk_size,),
)
array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
logger.debug("Samples done")
Expand Down Expand Up @@ -1639,8 +1641,8 @@ def encode(
if_path,
zarr_path,
schema_path=None,
chunk_length=None,
chunk_width=None,
variants_chunk_size=None,
samples_chunk_size=None,
max_v_chunks=None,
max_memory=None,
worker_processes=1,
Expand All @@ -1650,12 +1652,12 @@ def encode(
if schema_path is None:
schema = ZarrConversionSpec.generate(
pcvcf,
chunk_length=chunk_length,
chunk_width=chunk_width,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
)
else:
logger.info(f"Reading schema from {schema_path}")
if chunk_length is not None or chunk_width is not None:
if variants_chunk_size is not None or samples_chunk_size is not None:
raise ValueError("Cannot specify schema along with chunk sizes")
with open(schema_path, "r") as f:
schema = ZarrConversionSpec.fromjson(f.read())
Expand All @@ -1678,8 +1680,8 @@ def convert(
vcfs,
out_path,
*,
chunk_length=None,
chunk_width=None,
variants_chunk_size=None,
samples_chunk_size=None,
worker_processes=1,
show_progress=False,
# TODO add arguments to control location of tmpdir
Expand All @@ -1694,8 +1696,8 @@ def convert(
encode(
if_dir,
out_path,
chunk_length=chunk_length,
chunk_width=chunk_width,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
worker_processes=worker_processes,
show_progress=show_progress,
)
Expand Down
Loading