Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions bio2zarr/vcf2zarr/vcz.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def inspect(path):


@dataclasses.dataclass
class ZarrColumnSpec:
class ZarrArraySpec:
name: str
dtype: str
shape: tuple
Expand All @@ -54,7 +54,7 @@ def __post_init__(self):

@staticmethod
def new(**kwargs):
spec = ZarrColumnSpec(
spec = ZarrArraySpec(
**kwargs, compressor=DEFAULT_ZARR_COMPRESSOR.get_config(), filters=[]
)
spec._choose_compressor_settings()
Expand Down Expand Up @@ -94,7 +94,7 @@ def from_field(
dimensions.append("genotypes")
else:
dimensions.append(f"{vcf_field.category}_{vcf_field.name}_dim")
return ZarrColumnSpec.new(
return ZarrArraySpec.new(
vcf_field=vcf_field.full_name,
name=variable_name,
dtype=vcf_field.smallest_dtype(),
Expand Down Expand Up @@ -127,6 +127,23 @@ def _choose_compressor_settings(self):

self.compressor["shuffle"] = shuffle

@property
def chunk_nbytes(self):
"""
Returns the nbytes for a single chunk in this array.
"""
items = 1
dim = 0
for chunk_size in self.chunks:
size = min(chunk_size, self.shape[dim])
items *= size
dim += 1
# Include sizes for extra dimensions.
for size in self.shape[dim:]:
items *= size
dt = np.dtype(self.dtype)
return items * dt.itemsize

@property
def variant_chunk_nbytes(self):
"""
Expand Down Expand Up @@ -157,6 +174,24 @@ class VcfZarrSchema(core.JsonDataclass):
filters: list
fields: list

def validate(self):
"""
Checks that the schema is well-formed and within required limits.
"""
for field in self.fields:
# This is the Blosc max buffer size
if field.chunk_nbytes > 2147483647:
# TODO add some links to documentation here advising how to
# deal with PL values.
raise ValueError(
f"Field {field.name} chunks are too large "
f"({field.chunk_nbytes} > 2**31 - 1 bytes). "
"Either generate a schema and drop this field (if you don't "
"need it) or reduce the variant or sample chunk sizes."
)
# TODO other checks? There must be lots of ways people could mess
# up the schema leading to cryptic errors.

def field_map(self):
return {field.name: field for field in self.fields}

Expand All @@ -171,7 +206,7 @@ def fromdict(d):
ret.samples = [icf.Sample(**sd) for sd in d["samples"]]
ret.contigs = [icf.Contig(**sd) for sd in d["contigs"]]
ret.filters = [icf.Filter(**sd) for sd in d["filters"]]
ret.fields = [ZarrColumnSpec(**sd) for sd in d["fields"]]
ret.fields = [ZarrArraySpec(**sd) for sd in d["fields"]]
return ret

@staticmethod
Expand All @@ -192,7 +227,7 @@ def generate(icf, variants_chunk_size=None, samples_chunk_size=None):
)

def spec_from_field(field, variable_name=None):
return ZarrColumnSpec.from_field(
return ZarrArraySpec.from_field(
field,
num_samples=n,
num_variants=m,
Expand All @@ -204,7 +239,7 @@ def spec_from_field(field, variable_name=None):
def fixed_field_spec(
name, dtype, vcf_field=None, shape=(m,), dimensions=("variants",)
):
return ZarrColumnSpec.new(
return ZarrArraySpec.new(
vcf_field=vcf_field,
name=name,
dtype=dtype,
Expand All @@ -230,13 +265,13 @@ def fixed_field_spec(
),
fixed_field_spec(
name="variant_allele",
dtype="str",
dtype="O",
shape=(m, max_alleles),
dimensions=["variants", "alleles"],
),
fixed_field_spec(
name="variant_id",
dtype="str",
dtype="O",
),
fixed_field_spec(
name="variant_id_mask",
Expand Down Expand Up @@ -267,7 +302,7 @@ def fixed_field_spec(
chunks = [variants_chunk_size, samples_chunk_size]
dimensions = ["variants", "samples"]
colspecs.append(
ZarrColumnSpec.new(
ZarrArraySpec.new(
vcf_field=None,
name="call_genotype_phased",
dtype="bool",
Expand All @@ -280,7 +315,7 @@ def fixed_field_spec(
shape += [ploidy]
dimensions += ["ploidy"]
colspecs.append(
ZarrColumnSpec.new(
ZarrArraySpec.new(
vcf_field=None,
name="call_genotype",
dtype=gt_field.smallest_dtype(),
Expand All @@ -291,7 +326,7 @@ def fixed_field_spec(
)
)
colspecs.append(
ZarrColumnSpec.new(
ZarrArraySpec.new(
vcf_field=None,
name="call_genotype_mask",
dtype="bool",
Expand Down Expand Up @@ -447,6 +482,7 @@ def init(
self.icf = icf
if self.path.exists():
raise ValueError("Zarr path already exists") # NEEDS TEST
schema.validate()
partitions = VcfZarrPartition.generate_partitions(
self.icf.num_records,
schema.variants_chunk_size,
Expand Down
72 changes: 61 additions & 11 deletions tests/test_vcz.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_not_enough_memory(self, tmp_path, icf_path, max_memory):
with pytest.raises(ValueError, match="Insufficient memory"):
vcf2zarr.encode(icf_path, zarr_path, max_memory=max_memory)

@pytest.mark.parametrize("max_memory", ["150KiB", "200KiB"])
@pytest.mark.parametrize("max_memory", ["315KiB", "500KiB"])
def test_not_enough_memory_for_two(
self, tmp_path, icf_path, zarr_path, caplog, max_memory
):
Expand Down Expand Up @@ -214,6 +214,55 @@ def get_field_dict(a_schema, name):
return field


class TestChunkNbytes:
@pytest.mark.parametrize(
("field", "value"),
[
("call_genotype", 54), # 9 * 3 * 2 * 1
("call_genotype_phased", 27),
("call_genotype_mask", 54),
("variant_position", 36), # 9 * 4
("variant_H2", 9),
("variant_AC", 18), # 9 * 2
# Object fields have an itemsize of 8
("variant_AA", 72), # 9 * 8
("variant_allele", 9 * 4 * 8),
],
)
def test_example_schema(self, schema, field, value):
field = schema.field_map()[field]
assert field.chunk_nbytes == value

def test_chunk_size(self, icf_path, tmp_path):
store = vcf2zarr.IntermediateColumnarFormat(icf_path)
schema = vcf2zarr.VcfZarrSchema.generate(
store, samples_chunk_size=2, variants_chunk_size=3
)
fields = schema.field_map()
assert fields["call_genotype"].chunk_nbytes == 3 * 2 * 2
assert fields["variant_position"].chunk_nbytes == 3 * 4
assert fields["variant_AC"].chunk_nbytes == 3 * 2


class TestValidateSchema:
@pytest.mark.parametrize("size", [2**31, 2**31 + 1, 2**32])
def test_chunk_too_large(self, schema, size):
schema = vcf2zarr.VcfZarrSchema.fromdict(schema.asdict())
field = schema.field_map()["variant_H2"]
field.shape = (size,)
field.chunks = (size,)
with pytest.raises(ValueError, match="Field variant_H2 chunks are too large"):
schema.validate()

@pytest.mark.parametrize("size", [2**31 - 1, 2**30])
def test_chunk_not_too_large(self, schema, size):
schema = vcf2zarr.VcfZarrSchema.fromdict(schema.asdict())
field = schema.field_map()["variant_H2"]
field.shape = (size,)
field.chunks = (size,)
schema.validate()


class TestDefaultSchema:
def test_format_version(self, schema):
assert schema.format_version == vcz_mod.ZARR_SCHEMA_FORMAT_VERSION
Expand Down Expand Up @@ -359,16 +408,17 @@ class TestVcfDescriptions:
def test_fields(self, schema, field, description):
assert schema.field_map()[field].description == description

# This information is not in the schema yet,
# https://github.com/sgkit-dev/vcf2zarr/issues/123
# @pytest.mark.parametrize(
# ("filt", "description"),
# [
# ("s50","Less than 50% of samples have data"),
# ("q10", "Quality below 10"),
# ])
# def test_filters(self, schema, filt, description):
# assert schema["filters"][field]["description"] == description
@pytest.mark.parametrize(
("filt", "description"),
[
("PASS", "All filters passed"),
("s50", "Less than 50% of samples have data"),
("q10", "Quality below 10"),
],
)
def test_filters(self, schema, filt, description):
d = {f.id: f.description for f in schema.filters}
assert d[filt] == description


class TestVcfZarrWriterExample:
Expand Down