In [1]:
import pandas as pd
import pathlib
from glob import glob
import numpy as np
import subprocess
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
import cooler
import joblib
import schicluster
PACKAGE_DIR = schicluster.__path__[0]

In [2]:
group_name = 'CS-PRNr-DR_En1_Sox2_Gaba.18mo' # groupname or cluster name

In [3]:
output_bucket_dir='ecker-rachel-amb-datasets/female-amb/impute/merged-25k' 
# where the ouput goes, parent dir of all cluster cool
cpu = 45 # total available cpu on the VM
outdir = f'{group_name}-25k-imputed-cool/' # output dir name on VM and later copied under output_bucket_dir
chunk_size = 200 # number of cells per chunck
res = 25000 # resolution o

In [4]:
use_cts = joblib.load('/data/metadata/m3c_use_cts')
use_cts = [ct.replace(' ','_').replace('/','') for ct in use_cts]

meta = pd.read_csv('/data/metadata/240104_m3C_META.csv', index_col = 0)
meta['AgingMajorType'] = meta['AgingMajorType'].apply(lambda x: x.replace(' ','_').replace('/',''))
meta = meta[meta['AgingMajorType'].isin(use_cts)]
meta['AgingMajorType.Age'] = meta['AgingMajorType'] + '.' + meta['Age']

In [5]:
table = pd.read_csv(f'/data/tmp/cool_tables/{group_name}.tsv',sep= '\t',index_col=0, header = None)
table['cluster'] = meta['AgingMajorType.Age']
table = table.dropna()
table.columns =  ['cell_url','cluster']
table

Unnamed: 0_level_0,cell_url,cluster
0,Unnamed: 1_level_1,Unnamed: 2_level_1
AMB_220719_18mo_13D_14C_2_P2-6-J11-K11,/data/female-amb/impute/25K/AMB_220719_18mo_13...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220719_18mo_13D_14C_2_P4-2-B6-M16,/data/female-amb/impute/25K/AMB_220719_18mo_13...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220719_18mo_8E_9H_8J_9J_2_P2-5-F6-E21,/data/female-amb/impute/25K/AMB_220719_18mo_8E...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220630_18mo_13D_14C_1_P2-5-I2-K21,/data/female-amb/impute/25K/AMB_220630_18mo_13...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220630_18mo_13D_14C_1_P3-3-C10-K6,/data/female-amb/impute/25K/AMB_220630_18mo_13...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
...,...,...
AMB_220719_18mo_13D_14C_2_P3-5-J2-I21,/data/female-amb/impute/25K/AMB_220719_18mo_13...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220719_18mo_13D_14C_2_P2-1-J11-J14,/data/female-amb/impute/25K/AMB_220719_18mo_13...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220712_18mo_7H_8H_9G_2_P4-1-C7-I2,/data/female-amb/impute/25K/AMB_220712_18mo_7H...,CS-PRNr-DR_En1_Sox2_Gaba.18mo
AMB_220712_18mo_7H_8H_9G_2_P4-2-C7-E16,/data/female-amb/impute/25K/AMB_220712_18mo_7H...,CS-PRNr-DR_En1_Sox2_Gaba.18mo


In [6]:
pathlib.Path(f'{outdir}').mkdir(exist_ok=True)
for cluster, sub_df in table.groupby('cluster'):
    pathlib.Path(f'{outdir}/{cluster}').mkdir(exist_ok=True)
    sub_df.to_csv(f'{outdir}/{cluster}/cell_table.tsv', sep='\t', header=False, index=False)
    #with open(f'{cluster}/Snakefile_master', 'w') as f:
    #    f.write(snakemake_str)
    print(cluster, sub_df.shape[0])


CS-PRNr-DR_En1_Sox2_Gaba.18mo 133


In [7]:
def prepare_dir(output_dir, chunk_df, resolution, chrom_size_path):

    output_dir.mkdir(exist_ok=True)
    cell_table_path = str((output_dir / 'cell_table.csv').absolute())
    chunk_df[['cell_url']].to_csv(cell_table_path, header=False)
    parameters = dict(cell_table_path=f'"{cell_table_path}"',
                      chrom_size_path=f'"{chrom_size_path}"',
                      resolution=resolution
                     )
    parameters_str = '\n'.join(f'{k} = {v}'
                               for k, v in parameters.items())

    with open(output_dir / 'Snakefile', 'w') as f:
        f.write(parameters_str + GENERATE_MATRIX_CHUNK_TEMPLATE)
    return


In [8]:
total_chunk_dirs = []
group_chunks = {}
output_dir = pathlib.Path(outdir).absolute()
chrom_size_path = pathlib.Path('/ref/m3C/mm10.main.nochrM.nochrY.chrom.sizes').absolute()

with open(f'{PACKAGE_DIR}/cool/Snakefile_chunk_template') as tmp:
    GENERATE_MATRIX_CHUNK_TEMPLATE = tmp.read()

for group, group_df in table.groupby('cluster'):
    group_chunks[group] = []
    if group_df.shape[0] <= chunk_size:
        this_dir = output_dir / f'{group}_chunk0'
        prepare_dir(this_dir, group_df, resolution=res, chrom_size_path=chrom_size_path)
        total_chunk_dirs.append(this_dir)
        group_chunks[group].append(this_dir)
    else:
        group_df['chunk'] = [i // chunk_size for i in range(group_df.shape[0])]
        for chunk, chunk_df in group_df.groupby('chunk'):
            this_dir = output_dir / f'{group}_chunk{chunk}'
            prepare_dir(this_dir, chunk_df, resolution=res, chrom_size_path=chrom_size_path)
            total_chunk_dirs.append(this_dir)
            group_chunks[group].append(this_dir)

            

In [9]:
with open(output_dir / 'snakemake_cmd_step1.txt', 'w') as f:
    for chunk_dir in total_chunk_dirs:
        cmd = f'snakemake -d {chunk_dir} --snakefile {chunk_dir}/Snakefile -j 20 --rerun-incomplete'
        f.write(cmd + '\n')

In [10]:
scool_parameters = dict(
    output_dir=f'"{output_dir}"',
    chrom_size_path=f'"{chrom_size_path}"',
    resolution=res,
)
parameters_str = '\n'.join(f'{k} = {v}'
                           for k, v in scool_parameters.items())

with open(f'{PACKAGE_DIR}/cool/Snakefile_group_template') as tmp:
    GENERATE_MATRIX_GROUP_TEMPLATE = tmp.read()

with open(output_dir / 'Snakefile', 'w') as f:
    f.write(parameters_str + '\n' + GENERATE_MATRIX_GROUP_TEMPLATE)
with open(output_dir / 'snakemake_cmd_step2.txt', 'w') as f:
    cmd = f'snakemake -d {output_dir} --snakefile {output_dir}/Snakefile -j 10 --rerun-incomplete'
    f.write(cmd + '\n')


In [11]:
def run_snakemake(cmd):
    subprocess.run(f'{cmd}', shell = True)
    return cmd
    

# step 1 snakemake

### for each chunck of cells, combine by chromosome.

In [12]:
cmd_list = []
filename = f'{outdir}/snakemake_cmd_step1.txt'
with open(filename) as file:
    for line in file:
        cmd_list.append(line.rstrip())
cmd_list

['snakemake -d /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo_chunk0 --snakefile /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo_chunk0/Snakefile -j 20 --rerun-incomplete']

In [13]:
with ProcessPoolExecutor(cpu) as executor:
    futures = []
    for cmd in cmd_list:
        future = executor.submit(
            run_snakemake,
            cmd=cmd
        )
        futures.append(future)

    # result = []
    for future in as_completed(futures):
        tmp = future.result()
        print(f'{tmp} finished')
        

Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cores: 20
Rules claiming more threads will be scaled down.
Job stats:
job             count
------------  -------
merge_Q            20
merge_chroms        1
summary             1
total              22

Select jobs to execute...

[Mon Feb 12 20:25:56 2024]
rule merge_Q:
    input: /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo_chunk0/cell_table.csv
    output: chrX.Q.hdf
    jobid: 21
    reason: Missing output files: chrX.Q.hdf
    wildcards: chrom=chrX
    resources: tmpdir=/var/tmp


[Mon Feb 12 20:25:56 2024]
rule merge_Q:
    input: /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo_chunk0/cell_table.csv
    output: chr5.Q.hdf
    jobid: 6
    reason: Missing output files: chr5.Q.hdf
    wildcards: chrom=chr5
    resources: tmpdir=/var/tmp


[Mon Feb 12 20:25

Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.
Merging Q (imputed, before normalization) matrix.


[Mon Feb 12 20:27:23 2024]
Finished job 20.
1 of 22 steps (5%) done
[Mon Feb 12 20:27:23 2024]
Finished job 19.
2 of 22 steps (9%) done
[Mon Feb 12 20:27:23 2024]
Finished job 12.
3 of 22 steps (14%) done
[Mon Feb 12 20:27:23 2024]
Finished job 15.
4 of 22 steps (18%) done
[Mon Feb 12 20:27:23 2024]
Finished job 13.
5 of 22 steps (23%) done
[Mon Feb 12 20:27:23 2024]
Finished job 16.
6 of 22 steps (27%) done
[Mon Feb 12 20:27:24 2024]
Finished job 9.
7 of 22 steps (32%) done
[Mon Feb 12 20:27:24 2024]
Finished job 10.
8 of 22 steps (36%) done
[Mon Feb 12 20:27:24 2024]
Finished job 7.
9 of 22 steps (41%) done
[Mon Feb 12 20:27:24 2024]
Finished job 18.
10 of 22 steps (45%) done
[Mon Feb 12 20:27:24 2024]
Finished job 11.
11 of 22 steps (50%) done
[Mon Feb 12 20:27:24 2024]
Finished job 14.
12 of 22 steps (55%) done
[Mon Feb 12 20:27:24 2024]
Finished job 17.
13 of 22 steps (59%) done
[Mon Feb 12 20:27:24 2024]
Finished job 21.
14 of 22 steps (64%) done
[Mon Feb 12 20:27:24 2024]
Finish

snakemake -d /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo_chunk0 --snakefile /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo_chunk0/Snakefile -j 20 --rerun-incomplete finished


# step 2 snakemake

### merge all chuncks for the cluster

In [14]:
cmd_list = []
filename = f'{outdir}/snakemake_cmd_step2.txt'
with open(filename) as file:
    for line in file:
        cmd_list.append(line.rstrip())
cmd_list

['snakemake -d /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool --snakefile /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/Snakefile -j 10 --rerun-incomplete']

In [15]:

with ProcessPoolExecutor(cpu) as executor:
    futures = []
    for cmd in cmd_list:
        future = executor.submit(
            run_snakemake,
            cmd=cmd
        )
        futures.append(future)

    # result = []
    for future in as_completed(futures):
        tmp = future.result()
        print(f'{tmp} finished')
        

Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cores: 10
Rules claiming more threads will be scaled down.
Job stats:
job             count
------------  -------
merge_chunks        1
summary             1
total               2

Select jobs to execute...

[Mon Feb 12 20:27:32 2024]
rule merge_chunks:
    output: CS-PRNr-DR_En1_Sox2_Gaba.18mo/CS-PRNr-DR_En1_Sox2_Gaba.18mo.Q.cool
    jobid: 1
    reason: Missing output files: CS-PRNr-DR_En1_Sox2_Gaba.18mo/CS-PRNr-DR_En1_Sox2_Gaba.18mo.Q.cool
    wildcards: group=CS-PRNr-DR_En1_Sox2_Gaba.18mo
    resources: tmpdir=/var/tmp



Matrix Q generated
0
snakemake -d /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool --snakefile /home/qzeng_salk_edu/project/240212-merge-imputed-cool/CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool/Snakefile -j 10 --rerun-incomplete finished


[Mon Feb 12 20:27:41 2024]
Finished job 1.
1 of 2 steps (50%) done
Select jobs to execute...

[Mon Feb 12 20:27:41 2024]
rule summary:
    input: CS-PRNr-DR_En1_Sox2_Gaba.18mo/CS-PRNr-DR_En1_Sox2_Gaba.18mo.Q.cool
    jobid: 0
    reason: Rules with a run or shell declaration but no output are always executed.
    resources: tmpdir=/var/tmp

[Mon Feb 12 20:27:41 2024]
Finished job 0.
2 of 2 steps (100%) done
Complete log: .snakemake/log/2024-02-12T202732.087540.snakemake.log


# generate mcool

In [16]:
tag_path = f'{output_bucket_dir}/{group_name}.finish'
tag_path = pathlib.Path(tag_path)

if tag_path.exists() == False:
    subprocess.run(f'gsutil -m cp -r {outdir}/{group_name}/{group_name}.Q.cool gs://{output_bucket_dir}', shell = True)
    subprocess.run(f'touch {group_name}.finish', shell = True)
    subprocess.run(f'gsutil cp {group_name}.finish gs://{output_bucket_dir}', shell = True)
else:
    print(f'{group_name} done')

Copying file://CS-PRNr-DR_En1_Sox2_Gaba.18mo-25k-imputed-cool//CS-PRNr-DR_En1_Sox2_Gaba.18mo/CS-PRNr-DR_En1_Sox2_Gaba.18mo.Q.cool [Content-Type=application/octet-stream]...
/ [1/1 files][ 64.7 MiB/ 64.7 MiB] 100% Done                                    
Operation completed over 1 objects/64.7 MiB.                                     
Copying file://CS-PRNr-DR_En1_Sox2_Gaba.18mo.finish [Content-Type=application/octet-stream]...
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
