diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fc818f7..860a8851 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # 0.0.5 2024-04-XX - Fix bug in schema handling (compressor settings ignored) +- Move making ICF field partition directories into per-partition processing. + Remove progress on the init mkdirs step. # 0.0.4 2024-04-08 diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 1889279c..10bbadd6 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -791,6 +791,8 @@ def __init__( for vcf_field in icf_metadata.fields: field_path = get_vcf_field_path(out_path, vcf_field) field_partition_path = field_path / f"p{partition_index}" + # Should be robust to running explode_partition twice. + field_partition_path.mkdir(exist_ok=True) transformer = VcfValueTransformer.factory(vcf_field, num_samples) self.field_writers[vcf_field.full_name] = IcfFieldWriter( vcf_field, @@ -890,14 +892,6 @@ def num_columns(self): return len(self.columns) -def mkdir_with_progress(path): - logger.debug(f"mkdir f{path}") - # NOTE we may have race-conditions here, I'm not sure. Hopefully allowing - # parents=True will take care of it. - path.mkdir(parents=True) - core.update_progress(1) - - class IntermediateColumnarFormatWriter: def __init__(self, path): self.path = pathlib.Path(path) @@ -940,7 +934,7 @@ def init( # dependencies as well. self.metadata.provenance = {"source": f"bio2zarr-{provenance.__version__}"} - self.mkdirs(worker_processes, show_progress=show_progress) + self.mkdirs() # Note: this is needed for the current version of the vcfzarr spec, but it's # probably going to be dropped. @@ -955,30 +949,14 @@ def init( json.dump(self.metadata.asdict(), f, indent=4) return self.num_partitions - def mkdirs(self, worker_processes=1, show_progress=False): - num_dirs = len(self.metadata.fields) * self.num_partitions - logger.info(f"Creating {num_dirs} directories") + def mkdirs(self): + num_dirs = len(self.metadata.fields) + logger.info(f"Creating {num_dirs} field directories") self.path.mkdir() self.wip_path.mkdir() - # Due to high latency batch system filesystems, we create all the directories in - # parallel - progress_config = core.ProgressConfig( - total=num_dirs, - units="dirs", - title="Mkdirs", - show=show_progress, - ) - with core.ParallelWorkManager( - worker_processes=worker_processes, progress_config=progress_config - ) as manager: - for field in self.metadata.fields: - col_path = get_vcf_field_path(self.path, field) - # Don't bother trying to count the intermediate directories towards - # progress - manager.submit(col_path.mkdir, parents=True) - for j in range(self.num_partitions): - part_path = col_path / f"p{j}" - manager.submit(mkdir_with_progress, part_path) + for field in self.metadata.fields: + col_path = get_vcf_field_path(self.path, field) + col_path.mkdir(parents=True) def load_partition_summaries(self): summaries = []