Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# 0.0.5 2024-04-XX

- Fix bug in schema handling (compressor settings ignored)
- Move making ICF field partition directories into per-partition processing.
Remove progress on the init mkdirs step.

# 0.0.4 2024-04-08

Expand Down
40 changes: 9 additions & 31 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,8 @@ def __init__(
for vcf_field in icf_metadata.fields:
field_path = get_vcf_field_path(out_path, vcf_field)
field_partition_path = field_path / f"p{partition_index}"
# Should be robust to running explode_partition twice.
field_partition_path.mkdir(exist_ok=True)
transformer = VcfValueTransformer.factory(vcf_field, num_samples)
self.field_writers[vcf_field.full_name] = IcfFieldWriter(
vcf_field,
Expand Down Expand Up @@ -890,14 +892,6 @@ def num_columns(self):
return len(self.columns)


def mkdir_with_progress(path):
logger.debug(f"mkdir f{path}")
# NOTE we may have race-conditions here, I'm not sure. Hopefully allowing
# parents=True will take care of it.
path.mkdir(parents=True)
core.update_progress(1)


class IntermediateColumnarFormatWriter:
def __init__(self, path):
self.path = pathlib.Path(path)
Expand Down Expand Up @@ -940,7 +934,7 @@ def init(
# dependencies as well.
self.metadata.provenance = {"source": f"bio2zarr-{provenance.__version__}"}

self.mkdirs(worker_processes, show_progress=show_progress)
self.mkdirs()

# Note: this is needed for the current version of the vcfzarr spec, but it's
# probably going to be dropped.
Expand All @@ -955,30 +949,14 @@ def init(
json.dump(self.metadata.asdict(), f, indent=4)
return self.num_partitions

def mkdirs(self, worker_processes=1, show_progress=False):
num_dirs = len(self.metadata.fields) * self.num_partitions
logger.info(f"Creating {num_dirs} directories")
def mkdirs(self):
num_dirs = len(self.metadata.fields)
logger.info(f"Creating {num_dirs} field directories")
self.path.mkdir()
self.wip_path.mkdir()
# Due to high latency batch system filesystems, we create all the directories in
# parallel
progress_config = core.ProgressConfig(
total=num_dirs,
units="dirs",
title="Mkdirs",
show=show_progress,
)
with core.ParallelWorkManager(
worker_processes=worker_processes, progress_config=progress_config
) as manager:
for field in self.metadata.fields:
col_path = get_vcf_field_path(self.path, field)
# Don't bother trying to count the intermediate directories towards
# progress
manager.submit(col_path.mkdir, parents=True)
for j in range(self.num_partitions):
part_path = col_path / f"p{j}"
manager.submit(mkdir_with_progress, part_path)
for field in self.metadata.fields:
col_path = get_vcf_field_path(self.path, field)
col_path.mkdir(parents=True)

def load_partition_summaries(self):
summaries = []
Expand Down