In [1]:
import zarr
import xarray as xr
import sgkit as sg
import pandas as pd
import numcodecs
from Bio import SeqIO
from fsspec.implementations.zip import ZipFileSystem
import dask
import dask.array as da

# Classes
try:
    
    # newer versions of zarr
    
    from zarr.storage import KVStore
    
    class SafeStore(KVStore):
        
        def __getitem__(self, key):
            try:
                return self._mutable_mapping[key]
            except KeyError as e:
                # always raise a runtime error to ensure zarr propagates the exception
                raise RuntimeError(e)

        def __contains__(self, key):
            return key in self._mutable_mapping

                
except ImportError:
    
    # older versions of zarr

    class SafeStore(Mapping):

        ## This helps to ensure that no missing data are silently filled in.

        def __init__(self, store):
            self.store = store

    def __getitem__(self, key):
        try:
            return self.store[key]
        except KeyError as e:
            # Allow missing .zattrs to return empty JSON for attribute reading
            if key.endswith(".zattrs"):
                return b'{}'
            raise RuntimeError(f"Missing key in Zarr store: {key}") from e

        def __contains__(self, key):
            return key in self.store

        def __iter__(self):
            return iter(self.store)

        def __len__(self):
            return len(self.store)

In [2]:
# Get started
# Load paths and metadata
ref_path = '/QRISdata/Q6151/dennistpw/far_hin_1.x/data/reference/VectorBase-54_AfarautiFAR1_Genome.fasta'
output_dir_path = f'/scratch/user/uqtdenni/snp_genotypes_combined.zarr'

In [4]:
# Functions
def get_contigs(ref_path, length_threshold=1e5):
    # now let's get a list of the contigs that we are going to call over
    contig_lengths = {}
    for record in SeqIO.parse(ref_path, "fasta"):
        seq_id = record.id
        seq_length = len(record.seq)
        contig_lengths[seq_id] = seq_length
    
    # Filter to called contigs (over 1e5 bp)
    return {k: v for k, v in sorted(contig_lengths.items(), key=lambda item: item[1], reverse=True) if v > length_threshold}

def load_samples():
    
    ## Returns the sorted unique list of samples from the sample manifest

    # Read the samples from the file
    fofn = pd.read_table('/home/uqtdenni/far_hin_1.x/metadata/fofn.tsv')

    # Get the unique list of samples from the DataFrame and sort
    samples = fofn['zarr_name'].dropna().unique().tolist()
    return samples

def open_input(sample, seq_id, field):

    zip_file = f"/scratch/user/uqtdenni/snp_genotypes_zarr/{sample}.zarr.zip"
    
    zip_fs = ZipFileSystem(zip_file)

    # Map the full path to one contig
    contig_path = f"{sample}.zarr/{seq_id}"
    
    zarr_store = zip_fs.get_mapper(contig_path)

    z = zarr.open(zarr_store)

    #z = zarr_store[f"{field}"]

    #zarr_path = zarr_path_template.format(sample_id = s)
    #array_path = f'{sample}.zarr/{seq_id}/{field}'

    # open and load the array
    #root =  zarr.ZipStore(zarr_path)
    field = z[f'{field}'][:]

    return field
def setup_output_array(seq_id, field,  
                       cname='zstd', clevel=1, shuffle=-1, 
                       chunk_height=1000000, chunk_width=10, force=True): 
    
    ## Returns the complete-ness status of the data array at the specified output path,
    ##   along with the array itself and the gcs_output_store (for consolidating metadata).
    ## The sample_set, seq_id and field determine the output path according to the path template `output_pattern`.
    ## If a complete array does not yet exist at the specfied path, the example_arr is used to determine the shape, dtype and chunks of the output array.
    ## The length of `samples` is also used to determine the shape of the output array.
    ## The specfied or default chunk_height and chunk_width are also used to determine the output chunks.
    
    ##  https://numcodecs.readthedocs.io/en/stable/blosc.html
    ## `clevel` is the compression level for the compressor, i.e. numcodecs.Blosc, between 0 and 9
    ## `shuffle` is the shuffle option for the compressor, where -1 means autoshuffle, either bit- or byte-shuffle
    
    ## The `force` option causes existing data marked as `complete` to be overwritten, initially with zeros (as with incomplete data)
            
    # Wrap the local DirectoryStore with SafeStore
    local_store = SafeStore(zarr.DirectoryStore(output_dir_path))
    
    # Open or create the Zarr group using the safe store
    output_zarr_root = zarr.open_group(store=local_store, mode='a')
        
    ## Create the output subgroups, unless they already exist
    
    # Make a zarr subgroup of the root, named after the seq_id, e.g. "2L"
    seq_group = output_zarr_root.require_group(seq_id)
    
    # Make a zarr subgroup of the seq_group, named "calldata", e.g. "2L" > "calldata"
    #calldata_group = seq_group.require_group("calldata")
    
    ## Stay DRY
    
    # Get the field_name from the field string, e.g. 'AD' from 'calldata/AD', 'MQ' from 'variants/MQ'
    field_name = field#split("_")[1]
    
    # Set the completion flag to false
    # This mechanism will allow us to re-run the process to only create data that hasn't been flagged "complete"
    complete = False
    
    # If the field_name is already in the "calldata" zarr subgroup 
    if field_name in seq_group:
        
        # Get the data array for this field_name
        output_arr = seq_group[field]
        
        # Get the value of the "complete" flag, or default to False (perhaps unnecessary but explicit)
        complete = output_arr.attrs.get("complete", False)
        
        # If the data is marked complete and the force flag is not set
        if complete and not force:
            
            # Return the state of the complete flag (True) and the existing data array
            return complete, output_arr, gcs_output_store
    
    ## Determine output array characteristics
    
    # The output_shape should have the same number of rows as the input data shape,
    # and have the same number of columns as there are samples.
    # The other dimensions of the output_shape should should match the other dimensions in the input data.
    input_dtype, input_shape = check_array_setup(seq_id=seq_id, field=field)
    output_shape = (input_shape[0], len(samples)) + input_shape[2:]
    
    # The output data type should be the same as the input data type.
    output_dtype = input_dtype
    
    # Specify an appropriate chunk size.
    # Why this number? Big enough to give reasonable chunk sizes.
    target_chunk_size = 2**26
    
    # The output chunks should have the specified height and width,
    #   and have the same other chunk dimensions as the input data.
    output_chunks = (chunk_height, chunk_width) + output_shape[2:]
    
    # Choose a compressor for the output data
    compressor = numcodecs.Blosc(cname=cname, clevel=clevel, shuffle=shuffle)
    
    # Create an output array of zeros named after the field_name, with the determined shape, dtype and chunks.
    output_arr = seq_group.zeros(field_name, shape=output_shape, dtype=output_dtype,
                                      chunks=output_chunks, overwrite=True, compressor=compressor)
    
    # Return the complete-ness status of the output array, along with the output array itself (either blank or as it exists)
    return complete, output_arr, local_store

def check_array_setup(seq_id, field):
    
    # Returns information about data array for the first sample for the specified 
    # sample set, seq_id and field.

    z = open_input(sample=samples[0], seq_id=seq_id, field=field)
    return z.dtype, z.shape




def setup_input_array(seq_id, field):
    
    input_arrays = []
    dtype, shape = check_array_setup(seq_id=seq_id, field=field)
    
    for i, sample in enumerate(samples):
        
        # N.B., here we use dask delayed so we avoid opening any zipped zarr
        # files in the main notebook process, rather opening will only happen
        # within the workers. This should hopefully avoid running out of memory
        # in the main notebook process.
        z = dask.delayed(open_input)(sample=sample, seq_id=seq_id, field=field)
        a = da.from_delayed(z, shape=shape, dtype=dtype)
        
        if a.ndim == 1:
            # add a dimension to allow for concatenation along samples axis
            a = a[:, None]
        input_arrays.append(a)
            
    input_array = da.concatenate(input_arrays, axis=1)
    
    return input_array

def copy_data(input_arr, output_arr):
    
    # Convert the input array to the same chunks as the output array
    input_arr = input_arr.rechunk(output_arr.chunks)
    
    # Store the input array in the output array.
    # Don't lock the data stores while storing.
    # https://docs.dask.org/en/latest/array-api.html#dask.array.store
    input_arr.store(output_arr, lock=False)
    
    # Set the "complete" attribute on the output array to True, to mark completion
    output_arr.attrs['complete'] = True


def combine_zarr(seq_id, field, force=False):

    # Get the complete-ness status of the output array, along with the output array itself.
    complete, output_array, gcs_output_store = setup_output_array(
        seq_id=seq_id,
        field=field,
        force=force
    )
    
    # If the output array is already complete and the option to force its replacement is not set
    if complete and not force:
        
        # Don't copy over data for this sample set, because it's already complete and we shouldn't replace
        print('output exists, skipping', seq_id, field)
        return
    
    print('processing', seq_id, field)
    
    # Get the input array for this sample set, for this seq_id and field.
    input_array = setup_input_array(seq_id=seq_id, field=field)

    print(f"input chunks: {input_array.chunks}")
    print(f"output chunks: {output_array.chunks}")
    
    # Copy the input data (for the specfied samples) to the output array (for the specfied sample set)
    copy_data(input_array, output_array)

called_contigs = get_contigs(ref_path)
samples = load_samples()

#input_dir = Path("/scratch/user/uqtdenni/snp_genotypes_zarr/")
#input_zarr_paths = sorted(input_dir.glob("*.zarr"))  # Adjust pattern if needed

nsamples = len(samples)

for field in 'call_genotype', 'call_AD', 'call_GQ':
    for seq_id in called_contigs:

        # try this to clear cluster memory between arrays
        # client.restart()
        
        # Combine the source data for each sample in the sample set into one output zarr
        combine_zarr(
             seq_id=seq_id,
             field=field,
        )

RuntimeError: 'KI915040/call_genotype/.zattrs'

In [43]:
import zarr
import fsspec
import numpy as np

# Your open_input adapted to return the whole zarr group for a contig lazily:
def open_sample_contig(sample, contig):
    zip_file = f"/scratch/user/uqtdenni/snp_genotypes_zarr/{sample}.zarr.zip"
    
    zip_fs = ZipFileSystem(zip_file)

    # Map the full path to one contig
    contig_path = f"{sample}.zarr/{contig}"
    
    zarr_store = zip_fs.get_mapper(contig_path)

    z = xr.open_zarr(zarr_store, consolidated=False)

    
    return z

called_contigs = get_contigs(ref_path)
samples = load_samples()

# The field you want to merge (example)

# Dictionary to hold merged contig arrays
merged_contigs = {}

for contig in called_contigs:
    print(f"Processing contig: {contig}")
    # Open arrays for this contig from all samples
    datasets = []
    
    for sample in samples:
        ds = open_sample_contig(sample, contig)  # one-sample dataset
    
        # Confirm it has dimensions: variants, ploidy (and probably samples = 1)
        if "samples" not in ds.dims:
            # Add sample dimension manually if not present
            ds = ds.expand_dims("samples")
    
        else:
            # If samples dim exists but is size 1, make sure it's handled correctly
            ds = ds.isel(samples=0).expand_dims("samples")
    
        datasets.append(ds)

# Concatenate along the 'samples' dimension
merged = xr.concat(datasets, dim="samples")

# Now merged_contigs contains one concatenated array per contig,
# stacked with shape (nsamples, nsites, ...) depending on your data.

# Example: save merged contigs to disk or do further processing.


Processing contig: KI915083


FileNotFoundError: [Errno 2] No such file or directory: '/scratch/user/uqtdenni/snp_genotypes_zarr//scratch/user/uqtdenni/snp_genotypes_zarr/tor_wQLD_Por_13.zarr.zip.zarr.zip'

In [44]:
samples

['/scratch/user/uqtdenni/snp_genotypes_zarr/tor_wQLD_Por_13.zarr.zip',
 '/scratch/user/uqtdenni/snp_genotypes_zarr/ore_IJ98_12_5.zarr.zip',
 '/scratch/user/uqtdenni/snp_genotypes_zarr/pun_SI98-12-2.zarr.zip']

In [41]:
#41s for a million bp
%load_ext memory_magics
