In [2]:
import pathlib
import random
import string
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed

import pandas as pd

In [3]:
def sort_and_split_large_fragment_files(
    path, prefix, sort=True, cpu=1, mem_gb_per_core=3, barcode_col=4, max_rows=100000000, temp_dir="/tmp"
):
    """
    Split large fragment files into smaller files and sort them by barcode.

    Parameters
    ----------
    path : str
        Path to the fragment file.
    prefix : str
        Prefix for the split files.
    sort : bool, optional
        Sort the fragments by barcode. Default is True.
    cpu : int, optional
        Number of CPUs to use for sorting. Default is 1.
    mem_gb_per_core : int, optional
        Memory in GB per core for sorting. Default is 3.
    barcode_col : int, optional
        Column index for the barcode in the fragments file, start from 1. Default is 4.
    max_rows : int, optional
        Maximum number of rows for each split file. Default is 100,000,000.
    temp_dir : str, optional
        Temporary directory to store the sorted fragments file. Default is "/tmp".

    Returns
    -------
    None
    """
    path = pathlib.Path(path)

    # Generate a random string of length 10
    random_string = "".join(random.choices(string.ascii_letters + string.digits, k=10))

    unzip_cmd = f"gunzip -c {path} > {temp_dir}/{random_string}.tsv"
    if sort:
        sort_cmd = (
            f"sort -S {int(cpu*mem_gb_per_core)}G -T {temp_dir} "
            f"--compress-program gzip --parallel {cpu} "
            f"-k{barcode_col},{barcode_col} {temp_dir}/{random_string}.tsv "
            f"| split -l {max_rows} - {temp_dir}/{random_string}{prefix}."
        )
    else:
        sort_cmd = f"split -l {max_rows} {temp_dir}/{random_string}.tsv {temp_dir}/{random_string}{prefix}."
    zip_cmd = f"rm -f {temp_dir}/{random_string}.tsv && pigz -p {cpu} {temp_dir}/{random_string}{prefix}*"

    subprocess.check_call(unzip_cmd, shell=True)
    subprocess.check_call(sort_cmd, shell=True)
    subprocess.check_call(zip_cmd, shell=True)

    new_path_temp_list = []
    for p in pathlib.Path(temp_dir).glob(f"{random_string}{prefix}*.gz"):
        new_path_temp = path.parent / f"{p.name[len(random_string):-3]}.fragments.tsv.gz_temp"
        subprocess.check_call(f"mv {p} {new_path_temp}", shell=True)
        new_path_temp_list.append(new_path_temp)
    for new_path_temp in new_path_temp_list:
        new_path_temp.rename(new_path_temp.parent / new_path_temp.name[:-5])
    return

In [4]:
frag_paths = pd.Series(
    {
        ".".join(p.name.split(".")[:-3]): p
        for p in pathlib.Path("/tempdata/Domcke2020Science/").glob("*.fragments.txt.gz")
    }
)
frag_paths

sample_10_muscle      /tempdata/Domcke2020Science/sample_10_muscle.f...
sample_11_cerebrum    /tempdata/Domcke2020Science/sample_11_cerebrum...
sample_12_heart       /tempdata/Domcke2020Science/sample_12_heart.fr...
sample_13_placenta    /tempdata/Domcke2020Science/sample_13_placenta...
sample_14_heart       /tempdata/Domcke2020Science/sample_14_heart.fr...
                                            ...                        
sample_71_cerebrum    /tempdata/Domcke2020Science/sample_71_cerebrum...
sample_72_standard    /tempdata/Domcke2020Science/sample_72_standard...
sample_7_liver        /tempdata/Domcke2020Science/sample_7_liver.fra...
sample_8_lung         /tempdata/Domcke2020Science/sample_8_lung.frag...
sample_9_liver        /tempdata/Domcke2020Science/sample_9_liver.fra...
Length: 72, dtype: object

In [7]:
use_sample = {
    "sample_42_heart",
    "sample_43_liver",
    "sample_44_lung",
    "sample_45_cerebrum",
    "sample_46_liver",
    "sample_47_lung",
    "sample_48_standard",
    "sample_49_stomach",
    "sample_4_lung",
    "sample_50_bonemarrow",
    "sample_51_gonad",
    "sample_52_pancreas",
    "sample_53_eye",
    "sample_54_thymus",
    "sample_55_eye",
    "sample_56_spleen",
    "sample_57_spleen",
    "sample_58_cerebellum",
    "sample_59_bonemarrow",
    "sample_5_kidney",
    "sample_60_thymus",
    "sample_61_pancreas",
    "sample_62_stomach",
    "sample_63_gonad",
    "sample_64_cerebrum",
    "sample_65_kidney",
    "sample_66_cerebrum",
    "sample_67_kidney",
    "sample_68_lung",
    "sample_69_cerebrum",
    "sample_6_cerebrum",
    "sample_70_lung",
    "sample_71_cerebrum",
    "sample_72_standard",
    "sample_7_liver",
    "sample_8_lung",
    "sample_9_liver",
}
frag_paths = frag_paths[frag_paths.index.isin(use_sample)]
frag_paths.size

37

In [None]:
with ProcessPoolExecutor(19) as exe:
    fs = []
    for sample, path in frag_paths.items():
        f = exe.submit(
            sort_and_split_large_fragment_files,
            path=path,
            prefix=sample,
            sort=True,
            cpu=4,
            mem_gb_per_core=3,
            barcode_col=4,
            max_rows=100000000,
            temp_dir="/tmp",
        )
        fs.append(f)
    for f in as_completed(fs):
        f.result()