diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index 4d574bd4..4d5d9274 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -14,18 +14,19 @@ "-p", "--worker-processes", type=int, default=1, help="Number of worker processes" ) -# TODO help text -chunk_length = click.option( +# Note: -l and -w were chosen when these were called "width" and "length". +# possibly there are better letters now. +variants_chunk_size = click.option( "-l", - "--chunk-length", + "--variants-chunk-size", type=int, default=None, help="Chunk size in the variants dimension", ) -chunk_width = click.option( +samples_chunk_size = click.option( "-w", - "--chunk-width", + "--samples-chunk-size", type=int, default=None, help="Chunk size in the samples dimension", @@ -96,8 +97,8 @@ def mkschema(if_path): @click.argument("zarr_path", type=click.Path()) @verbose @click.option("-s", "--schema", default=None, type=click.Path(exists=True)) -@chunk_length -@chunk_width +@variants_chunk_size +@samples_chunk_size @click.option( "-V", "--max-variant-chunks", @@ -122,8 +123,8 @@ def encode( zarr_path, verbose, schema, - chunk_length, - chunk_width, + variants_chunk_size, + samples_chunk_size, max_variant_chunks, max_memory, worker_processes, @@ -136,8 +137,8 @@ def encode( if_path, zarr_path, schema, - chunk_length=chunk_length, - chunk_width=chunk_width, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, max_v_chunks=max_variant_chunks, worker_processes=worker_processes, max_memory=max_memory, @@ -148,11 +149,13 @@ def encode( @click.command(name="convert") @click.argument("vcfs", nargs=-1, required=True) @click.argument("out_path", type=click.Path()) -@chunk_length -@chunk_width +@variants_chunk_size +@samples_chunk_size @verbose @worker_processes -def convert_vcf(vcfs, out_path, chunk_length, chunk_width, verbose, worker_processes): +def convert_vcf( + vcfs, out_path, variants_chunk_size, samples_chunk_size, verbose, worker_processes +): """ Convert input VCF(s) directly to vcfzarr (not recommended for large files) """ @@ -160,8 +163,8 @@ def convert_vcf(vcfs, out_path, chunk_length, chunk_width, verbose, worker_proce vcf.convert( vcfs, out_path, - chunk_length=chunk_length, - chunk_width=chunk_width, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, show_progress=True, worker_processes=worker_processes, ) @@ -198,10 +201,15 @@ def vcf2zarr(): @click.argument("out_path", type=click.Path()) @worker_processes @verbose -@chunk_length -@chunk_width +@variants_chunk_size +@samples_chunk_size def convert_plink( - in_path, out_path, verbose, worker_processes, chunk_length, chunk_width + in_path, + out_path, + verbose, + worker_processes, + variants_chunk_size, + samples_chunk_size, ): """ In development; DO NOT USE! @@ -212,8 +220,8 @@ def convert_plink( out_path, show_progress=True, worker_processes=worker_processes, - chunk_width=chunk_width, - chunk_length=chunk_length, + samples_chunk_size=samples_chunk_size, + variants_chunk_size=variants_chunk_size, ) diff --git a/bio2zarr/core.py b/bio2zarr/core.py index 8bbef809..00436294 100644 --- a/bio2zarr/core.py +++ b/bio2zarr/core.py @@ -86,11 +86,11 @@ def __init__(self, array, offset): self.buffer_row = 0 @property - def chunk_length(self): + def variants_chunk_size(self): return self.buff.shape[0] def next_buffer_row(self): - if self.buffer_row == self.chunk_length: + if self.buffer_row == self.variants_chunk_size: self.flush() row = self.buffer_row self.buffer_row += 1 @@ -112,7 +112,7 @@ def flush(self): f"{self.array_offset}:{self.array_offset + self.buffer_row}" f"{self.buff.nbytes / 2**20: .2f}Mb" ) - self.array_offset += self.chunk_length + self.array_offset += self.variants_chunk_size self.buffer_row = 0 @@ -126,13 +126,13 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset): # incremental, and to avoid large memcopies in the underlying # encoder implementations. s = slice(offset, offset + np_buffer.shape[0]) - chunk_width = zarr_array.chunks[1] + samples_chunk_size = zarr_array.chunks[1] # TODO use zarr chunks here to support non-uniform chunking later # and for simplicity zarr_array_width = zarr_array.shape[1] start = 0 while start < zarr_array_width: - stop = min(start + chunk_width, zarr_array_width) + stop = min(start + samples_chunk_size, zarr_array_width) chunk_buffer = np_buffer[:, start:stop] zarr_array[s, start:stop] = chunk_buffer update_progress(chunk_buffer.nbytes) diff --git a/bio2zarr/plink.py b/bio2zarr/plink.py index 9569ffee..ce7e08f6 100644 --- a/bio2zarr/plink.py +++ b/bio2zarr/plink.py @@ -22,14 +22,14 @@ def encode_genotypes_slice(bed_path, zarr_path, start, stop): gt = core.BufferedArray(root["call_genotype"], start) gt_mask = core.BufferedArray(root["call_genotype_mask"], start) gt_phased = core.BufferedArray(root["call_genotype_phased"], start) - chunk_length = gt.array.chunks[0] + variants_chunk_size = gt.array.chunks[0] n = gt.array.shape[1] - assert start % chunk_length == 0 + assert start % variants_chunk_size == 0 logger.debug(f"Reading slice {start}:{stop}") chunk_start = start while chunk_start < stop: - chunk_stop = min(chunk_start + chunk_length, stop) + chunk_stop = min(chunk_start + variants_chunk_size, stop) logger.debug(f"Reading bed slice {chunk_start}:{chunk_stop}") bed_chunk = bed.read(slice(chunk_start, chunk_stop), dtype=np.int8).T logger.debug(f"Got bed slice {humanfriendly.format_size(bed_chunk.nbytes)}") @@ -60,8 +60,8 @@ def convert( *, show_progress=False, worker_processes=1, - chunk_length=None, - chunk_width=None, + variants_chunk_size=None, + samples_chunk_size=None, ): bed = bed_reader.open_bed(bed_path, num_threads=1) n = bed.iid_count @@ -69,17 +69,17 @@ def convert( logging.info(f"Scanned plink with {n} samples and {m} variants") # FIXME - if chunk_width is None: - chunk_width = 1000 - if chunk_length is None: - chunk_length = 10_000 + if samples_chunk_size is None: + samples_chunk_size = 1000 + if variants_chunk_size is None: + variants_chunk_size = 10_000 store = zarr.DirectoryStore(zarr_path) root = zarr.group(store=store, overwrite=True) ploidy = 2 shape = [m, n] - chunks = [chunk_length, chunk_width] + chunks = [variants_chunk_size, samples_chunk_size] dimensions = ["variants", "samples"] a = root.array( @@ -87,7 +87,7 @@ def convert( bed.iid, dtype="str", compressor=core.default_compressor, - chunks=(chunk_width,), + chunks=(samples_chunk_size,), ) a.attrs["_ARRAY_DIMENSIONS"] = ["samples"] logger.debug(f"Encoded samples") @@ -99,7 +99,7 @@ def convert( bed.bp_position, dtype=np.int32, compressor=core.default_compressor, - chunks=(chunk_length,), + chunks=(variants_chunk_size,), ) a.attrs["_ARRAY_DIMENSIONS"] = ["variants"] logger.debug(f"encoded variant_position") @@ -110,7 +110,7 @@ def convert( alleles, dtype="str", compressor=core.default_compressor, - chunks=(chunk_length,), + chunks=(variants_chunk_size,), ) a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "alleles"] logger.debug(f"encoded variant_allele") diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 6274b4f2..ed6e6b50 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -1063,8 +1063,8 @@ def __post_init__(self): @dataclasses.dataclass class ZarrConversionSpec: format_version: str - chunk_width: int - chunk_length: int + samples_chunk_size: int + variants_chunk_size: int dimensions: list sample_id: list contig_id: list @@ -1091,15 +1091,17 @@ def fromjson(s): return ZarrConversionSpec.fromdict(json.loads(s)) @staticmethod - def generate(pcvcf, chunk_length=None, chunk_width=None): + def generate(pcvcf, variants_chunk_size=None, samples_chunk_size=None): m = pcvcf.num_records n = pcvcf.num_samples # FIXME - if chunk_width is None: - chunk_width = 1000 - if chunk_length is None: - chunk_length = 10_000 - logger.info(f"Generating schema with chunks={chunk_length, chunk_width}") + if samples_chunk_size is None: + samples_chunk_size = 1000 + if variants_chunk_size is None: + variants_chunk_size = 10_000 + logger.info( + f"Generating schema with chunks={variants_chunk_size, samples_chunk_size}" + ) compressor = core.default_compressor.get_config() def fixed_field_spec( @@ -1112,7 +1114,7 @@ def fixed_field_spec( shape=shape, description="", dimensions=dimensions, - chunks=[chunk_length], + chunks=[variants_chunk_size], compressor=compressor, ) @@ -1170,11 +1172,11 @@ def fixed_field_spec( shape = [m] prefix = "variant_" dimensions = ["variants"] - chunks = [chunk_length] + chunks = [variants_chunk_size] if field.category == "FORMAT": prefix = "call_" shape.append(n) - chunks.append(chunk_width), + chunks.append(samples_chunk_size), dimensions.append("samples") # TODO make an option to add in the empty extra dimension if field.summary.max_number > 1: @@ -1196,7 +1198,7 @@ def fixed_field_spec( if gt_field is not None: ploidy = gt_field.summary.max_number - 1 shape = [m, n] - chunks = [chunk_length, chunk_width] + chunks = [variants_chunk_size, samples_chunk_size] dimensions = ["variants", "samples"] colspecs.append( @@ -1241,8 +1243,8 @@ def fixed_field_spec( return ZarrConversionSpec( # TODO do something systematic format_version="0.1", - chunk_width=chunk_width, - chunk_length=chunk_length, + samples_chunk_size=samples_chunk_size, + variants_chunk_size=variants_chunk_size, columns={col.name: col for col in colspecs}, dimensions=["variants", "samples", "ploidy", "alleles", "filters"], sample_id=pcvcf.metadata.samples, @@ -1431,7 +1433,7 @@ def encode_samples(self): self.schema.sample_id, dtype="str", compressor=core.default_compressor, - chunks=(self.schema.chunk_width,), + chunks=(self.schema.samples_chunk_size,), ) array.attrs["_ARRAY_DIMENSIONS"] = ["samples"] logger.debug("Samples done") @@ -1639,8 +1641,8 @@ def encode( if_path, zarr_path, schema_path=None, - chunk_length=None, - chunk_width=None, + variants_chunk_size=None, + samples_chunk_size=None, max_v_chunks=None, max_memory=None, worker_processes=1, @@ -1650,12 +1652,12 @@ def encode( if schema_path is None: schema = ZarrConversionSpec.generate( pcvcf, - chunk_length=chunk_length, - chunk_width=chunk_width, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, ) else: logger.info(f"Reading schema from {schema_path}") - if chunk_length is not None or chunk_width is not None: + if variants_chunk_size is not None or samples_chunk_size is not None: raise ValueError("Cannot specify schema along with chunk sizes") with open(schema_path, "r") as f: schema = ZarrConversionSpec.fromjson(f.read()) @@ -1678,8 +1680,8 @@ def convert( vcfs, out_path, *, - chunk_length=None, - chunk_width=None, + variants_chunk_size=None, + samples_chunk_size=None, worker_processes=1, show_progress=False, # TODO add arguments to control location of tmpdir @@ -1694,8 +1696,8 @@ def convert( encode( if_dir, out_path, - chunk_length=chunk_length, - chunk_width=chunk_width, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, worker_processes=worker_processes, show_progress=show_progress, ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 05b96782..69b77ad6 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -9,124 +9,123 @@ class TestWithMocks: - def test_vcf_explode(self): + @mock.patch("bio2zarr.vcf.explode") + def test_vcf_explode(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.vcf.explode") as mocked: - result = runner.invoke( - cli.vcf2zarr, ["explode", "source", "dest"], catch_exceptions=False - ) - assert result.exit_code == 0 - assert len(result.stdout) == 0 - assert len(result.stderr) == 0 - mocked.assert_called_once_with( - ("source",), - "dest", - column_chunk_size=64, - worker_processes=1, - show_progress=True, - ) + result = runner.invoke( + cli.vcf2zarr, ["explode", "source", "dest"], catch_exceptions=False + ) + assert result.exit_code == 0 + assert len(result.stdout) == 0 + assert len(result.stderr) == 0 + mocked.assert_called_once_with( + ("source",), + "dest", + column_chunk_size=64, + worker_processes=1, + show_progress=True, + ) - def test_inspect(self): + @mock.patch("bio2zarr.vcf.inspect") + def test_inspect(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.vcf.inspect", return_value={}) as mocked: - result = runner.invoke( - cli.vcf2zarr, ["inspect", "path"], catch_exceptions=False - ) - assert result.exit_code == 0 - assert result.stdout == "\n" - assert len(result.stderr) == 0 - mocked.assert_called_once_with("path") + result = runner.invoke( + cli.vcf2zarr, ["inspect", "path"], catch_exceptions=False + ) + assert result.exit_code == 0 + assert result.stdout == "\n" + assert len(result.stderr) == 0 + mocked.assert_called_once_with("path") - def test_mkschema(self): + @mock.patch("bio2zarr.vcf.mkschema") + def test_mkschema(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.vcf.mkschema") as mocked: - result = runner.invoke( - cli.vcf2zarr, ["mkschema", "path"], catch_exceptions=False - ) - assert result.exit_code == 0 - assert len(result.stdout) == 0 - assert len(result.stderr) == 0 - # TODO figure out how to test that we call it with stdout from - # the CliRunner - # mocked.assert_called_once_with("path", stdout) - mocked.assert_called_once() - - def test_encode(self): + result = runner.invoke( + cli.vcf2zarr, ["mkschema", "path"], catch_exceptions=False + ) + assert result.exit_code == 0 + assert len(result.stdout) == 0 + assert len(result.stderr) == 0 + # TODO figure out how to test that we call it with stdout from + # the CliRunner + # mocked.assert_called_once_with("path", stdout) + mocked.assert_called_once() + + @mock.patch("bio2zarr.vcf.encode") + def test_encode(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.vcf.encode") as mocked: - result = runner.invoke( - cli.vcf2zarr, ["encode", "if_path", "zarr_path"], catch_exceptions=False - ) - assert result.exit_code == 0 - assert len(result.stdout) == 0 - assert len(result.stderr) == 0 - mocked.assert_called_once_with( - "if_path", - "zarr_path", - None, - chunk_length=None, - chunk_width=None, - max_v_chunks=None, - worker_processes=1, - max_memory=None, - show_progress=True, - ) + result = runner.invoke( + cli.vcf2zarr, ["encode", "if_path", "zarr_path"], catch_exceptions=False + ) + assert result.exit_code == 0 + assert len(result.stdout) == 0 + assert len(result.stderr) == 0 + mocked.assert_called_once_with( + "if_path", + "zarr_path", + None, + variants_chunk_size=None, + samples_chunk_size=None, + max_v_chunks=None, + worker_processes=1, + max_memory=None, + show_progress=True, + ) - def test_convert_vcf(self): + @mock.patch("bio2zarr.vcf.convert") + def test_convert_vcf(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.vcf.convert") as mocked: - result = runner.invoke( - cli.vcf2zarr, - ["convert", "vcf_path", "zarr_path"], - catch_exceptions=False, - ) - assert result.exit_code == 0 - assert len(result.stdout) == 0 - assert len(result.stderr) == 0 - mocked.assert_called_once_with( - ("vcf_path",), - "zarr_path", - chunk_length=None, - chunk_width=None, - worker_processes=1, - show_progress=True, - ) + result = runner.invoke( + cli.vcf2zarr, + ["convert", "vcf_path", "zarr_path"], + catch_exceptions=False, + ) + assert result.exit_code == 0 + assert len(result.stdout) == 0 + assert len(result.stderr) == 0 + mocked.assert_called_once_with( + ("vcf_path",), + "zarr_path", + variants_chunk_size=None, + samples_chunk_size=None, + worker_processes=1, + show_progress=True, + ) - def test_validate(self): + @mock.patch("bio2zarr.vcf.validate") + def test_validate(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.vcf.validate") as mocked: - result = runner.invoke( - cli.vcf2zarr, - ["validate", "vcf_path", "zarr_path"], - catch_exceptions=False, - ) - assert result.exit_code == 0 - assert len(result.stdout) == 0 - assert len(result.stderr) == 0 - mocked.assert_called_once_with( - "vcf_path", - "zarr_path", - show_progress=True, - ) + result = runner.invoke( + cli.vcf2zarr, + ["validate", "vcf_path", "zarr_path"], + catch_exceptions=False, + ) + assert result.exit_code == 0 + assert len(result.stdout) == 0 + assert len(result.stderr) == 0 + mocked.assert_called_once_with( + "vcf_path", + "zarr_path", + show_progress=True, + ) - def test_convert_plink(self): + @mock.patch("bio2zarr.plink.convert") + def test_convert_plink(self, mocked): runner = ct.CliRunner(mix_stderr=False) - with mock.patch("bio2zarr.plink.convert") as mocked: - result = runner.invoke( - cli.plink2zarr, ["convert", "in", "out"], catch_exceptions=False - ) - assert result.exit_code == 0 - assert len(result.stdout) == 0 - assert len(result.stderr) == 0 - mocked.assert_called_once_with( - "in", - "out", - worker_processes=1, - chunk_width=None, - chunk_length=None, - show_progress=True, - ) - + result = runner.invoke( + cli.plink2zarr, ["convert", "in", "out"], catch_exceptions=False + ) + assert result.exit_code == 0 + assert len(result.stdout) == 0 + assert len(result.stderr) == 0 + mocked.assert_called_once_with( + "in", + "out", + worker_processes=1, + samples_chunk_size=None, + variants_chunk_size=None, + show_progress=True, + ) class TestVcfPartition: diff --git a/tests/test_plink.py b/tests/test_plink.py index a99bd926..125fcb0f 100644 --- a/tests/test_plink.py +++ b/tests/test_plink.py @@ -199,7 +199,7 @@ def test_genotypes(self, ds): # @pytest.mark.xfail @pytest.mark.parametrize( - ["chunk_length", "chunk_width"], + ["variants_chunk_size", "samples_chunk_size"], [ (10, 1), (10, 10), @@ -210,15 +210,15 @@ def test_genotypes(self, ds): ) @pytest.mark.parametrize("worker_processes", [0, 1, 2]) def test_chunk_size( - self, ds, tmp_path, chunk_length, chunk_width, worker_processes + self, ds, tmp_path, variants_chunk_size, samples_chunk_size, worker_processes ): path = "tests/data/plink/plink_sim_10s_100v_10pmiss.bed" out = tmp_path / "example.zarr" plink.convert( path, out, - chunk_length=chunk_length, - chunk_width=chunk_width, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, worker_processes=worker_processes, ) ds2 = sg.load_dataset(out) @@ -227,7 +227,7 @@ def test_chunk_size( @pytest.mark.parametrize( - ["chunk_length", "chunk_width"], + ["variants_chunk_size", "samples_chunk_size"], [ (10, 1), (10, 10), @@ -237,14 +237,14 @@ def test_chunk_size( ], ) @pytest.mark.parametrize("worker_processes", [0]) -def test_by_validating(tmp_path, chunk_length, chunk_width, worker_processes): +def test_by_validating(tmp_path, variants_chunk_size, samples_chunk_size, worker_processes): path = "tests/data/plink/plink_sim_10s_100v_10pmiss.bed" out = tmp_path / "example.zarr" plink.convert( path, out, - chunk_length=chunk_length, - chunk_width=chunk_width, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, worker_processes=worker_processes, ) plink.validate(path, out) diff --git a/tests/test_vcf_examples.py b/tests/test_vcf_examples.py index 13d3c934..ce75f5b5 100644 --- a/tests/test_vcf_examples.py +++ b/tests/test_vcf_examples.py @@ -244,7 +244,7 @@ def test_no_genotypes(self, ds, tmp_path): xt.assert_equal(ds[col], ds2[col]) @pytest.mark.parametrize( - ["chunk_length", "chunk_width", "y_chunks", "x_chunks"], + ["variants_chunk_size", "samples_chunk_size", "y_chunks", "x_chunks"], [ (1, 1, (1, 1, 1, 1, 1, 1, 1, 1, 1), (1, 1, 1)), (2, 2, (2, 2, 2, 2, 1), (2, 1)), @@ -253,11 +253,14 @@ def test_no_genotypes(self, ds, tmp_path): ], ) def test_chunk_size( - self, ds, tmp_path, chunk_length, chunk_width, y_chunks, x_chunks + self, ds, tmp_path, variants_chunk_size, samples_chunk_size, y_chunks, x_chunks ): out = tmp_path / "example.vcf.zarr" vcf.convert( - [self.data_path], out, chunk_length=chunk_length, chunk_width=chunk_width + [self.data_path], + out, + variants_chunk_size=variants_chunk_size, + samples_chunk_size=samples_chunk_size, ) ds2 = sg.load_dataset(out) # print(ds2.call_genotype.values) @@ -316,20 +319,30 @@ def test_full_pipeline(self, ds, tmp_path, worker_processes): xt.assert_equal(ds, ds2) @pytest.mark.parametrize("max_v_chunks", [1, 2, 3]) - @pytest.mark.parametrize("chunk_length", [1, 2, 3]) - def test_max_v_chunks(self, ds, tmp_path, max_v_chunks, chunk_length): + @pytest.mark.parametrize("variants_chunk_size", [1, 2, 3]) + def test_max_v_chunks(self, ds, tmp_path, max_v_chunks, variants_chunk_size): exploded = tmp_path / "example.exploded" vcf.explode([self.data_path], exploded) out = tmp_path / "example.zarr" - vcf.encode(exploded, out, chunk_length=chunk_length, max_v_chunks=max_v_chunks) + vcf.encode( + exploded, + out, + variants_chunk_size=variants_chunk_size, + max_v_chunks=max_v_chunks, + ) ds2 = sg.load_dataset(out) - xt.assert_equal(ds.isel(variants=slice(None, chunk_length * max_v_chunks)), ds2) + xt.assert_equal( + ds.isel(variants=slice(None, variants_chunk_size * max_v_chunks)), ds2 + ) @pytest.mark.parametrize("worker_processes", [0, 1, 2]) def test_worker_processes(self, ds, tmp_path, worker_processes): out = tmp_path / "example.vcf.zarr" vcf.convert( - [self.data_path], out, chunk_length=3, worker_processes=worker_processes + [self.data_path], + out, + variants_chunk_size=3, + worker_processes=worker_processes, ) ds2 = sg.load_dataset(out) xt.assert_equal(ds, ds2) @@ -340,7 +353,7 @@ def test_inspect(self, tmp_path): vcf.convert( [self.data_path], out, - chunk_length=3, + variants_chunk_size=3, ) data = vcf.inspect(out) assert len(data) > 0