# Run MiXeR

Convenience notebook to run multiple sets of summary stats through MiXeR with handling of Slurm job dependencies.

## Using
To use, start jupyter from a machine which can submit jobs using Slurm (e.g., TSD `pXXX-appn-norment01`, or `pXXX-submit` nodes - replace `pXXX` with the project id below). 
An ssh tunnel must exist at a given port (here `8888`):

```
# on pXXX-appn-norment01:
jupyter notebook --no-browser --port=8888 (This command is for VM ware Horizon users) 
# (when the given port is busy due to previous sessions, try another port and change accordingly in the below command)

# directly on the login node: 
ssh -N -f -L localhost:8888:localhost:8888 $USER@pXXX-appn-norment01
# (This command is for thin link users)

# open your browser and connect to (or URL printed to terminal when starting jupyter notebook)
localhost:8888
```

The ssh tunnel step (`ssh -N -f -L ...`) may be omitted on other systems than TSD. 

NOTE: This code does not check for existing file output and may overwrite files already present in 
output dirs (`sumstats_mixer`, `logs`, `results`, `jobs`)

NOTE: There is a jupyter/jupyterlab installation in the `mixer.sif` container, but this does not expose the host's `sbatch` command required to submit jobs.
Start jupyter from the host machine instead, and install it if is missing (e.g., `pip install jupyter`) or load the corresponding module (`module load JupyterLab/3.5.0-GCCcore-11.3.0`) before invoking `jupyter notebook ...`.

In [None]:
# package imports
import os
import numpy as np
import pandas as pd
import scipy.stats as st
import subprocess as sp
import datetime
from joblib import Parallel, delayed

In [None]:
# define some common job params (modify as needed)
account = 'p33_norment'  # project account on TSD p33. 'p697_norment' on p697. Modify as needed
mixer_dir = '/ess/p33/cluster/github/comorment/mixer'  # path to clone of repository. 
# The above is for p33, should be '/ess/p697/data/durable/s3-api/github/comorment/mixer' on p697
singularity_module = 'singularity/3.7.1'  # module for singularity, modify as needed
cpus_per_task = 16  # number of CPUs per task
mem_per_cpu = '8000M'  # memory per CPU in MB
array = '1-2'  # NOTE: use '1-20' for production runs!

In [None]:
# define input files (assuming all are present in "traitfolder" directory).
# Can also be an absolute path.
# traitfolder = '/REF/sumstats'  # if running using jupyterlab in mixer.sif container
traitfolder = os.path.join(mixer_dir, 'reference', 'sumstats')

# Define primary and secondary traits. 
# It is possible to include multiple traits/files in each dictionary,
# which will allow running for with multiple traits. 
# Caution should be taken if the same traits/files exist in both dictionaries,
# as same vs same will be omitted
primary_traits = {
    'SCZ': 'clozuk_pgc2.meta.sumstats.txt.gz'
}

secondary_traits = {
    'INT': 'SavageJansen_2018_intelligence_metaanalysis.txt.gz'
}

In [None]:
# create output directories
output_dir = os.path.join('.', 'output')
os.makedirs(output_dir, exist_ok=True)

jobscripts_dir = os.path.join('.', 'jobscripts')
os.makedirs(jobscripts_dir, exist_ok=True)

logs_dir = os.path.join('.', 'logs')
os.makedirs(logs_dir, exist_ok=True)

### Prep input files

In [None]:
# rename some sumstats columns. 
# Need ['SNP', 'CHR', 'BP', 'A1', 'A2', 'N', 'Z']
sumstats_mixer = 'sumstat_mixer'
os.makedirs(sumstats_mixer, exist_ok=True)

mixercols = ['SNP', 'CHR', 'BP', 'A1', 'A2', 'N', 'Z']
# map columns to mixercols (edit as necessary)
mapper = {
    'RSID': 'SNP',
    'POS': 'BP',
    'EffectAllele': 'A1',
    'OtherAllele': 'A2',
    'Zscore': 'Z',
    'N_analyzed': 'N',
}

In [None]:
# convert files for MiXer (in parallel). May take a few minutes.
def process_sumstats(num_cores=2):
    def _process_sumstats(fname):
        print(f'processing {fname}')
        df = pd.read_csv(os.path.join(traitfolder, fname), delim_whitespace=True, low_memory=False)
        df.rename(columns=mapper, inplace=True)

        # fill in some missing info
        if fname == 'clozuk_pgc2.meta.sumstats.txt.gz':
            df['N'] = 4 / (1 / 40675 + 1 / 64643)
            df['Z'] = -st.norm.ppf(df['P'].values*0.5)*np.sign(df['OR'].values - 1).astype(np.float64)


        for col in mixercols:
            assert col in df.columns, f'col {col} is missing in {fname}'
        
        # make sure that all are upper case
        df['A1']=df['A1'].str.upper()
        df['A2']=df['A2'].str.upper()

        # MHC region:
        chr_6_mhc = (df['CHR'] == 6) & (df['BP'] > 26e6) & (df['BP'] < 34e6)
        
        # drop X chrom.
        # mixer crash on sumstats lines with CHR=X
        chr_X = df['CHR'] == 'X' 
        
        # and or
        exclude = chr_6_mhc | chr_X

        assert exclude.sum() == (chr_6_mhc.sum() + chr_X.sum()), 'mismatch in excluded positions'
        
        # save
        df.loc[~exclude, mixercols].to_csv(os.path.join(sumstats_mixer, fname), index=False, sep='\t')
    

    return Parallel(n_jobs=num_cores)(delayed(_process_sumstats)(fname) for fname in {**primary_traits, **secondary_traits}.values())

_ = process_sumstats(num_cores=2)

### Define jobscript contents

In [None]:
# generic SLURM header. Modify as needed.
slurm_header = '''#!/bin/sh
#SBATCH --job-name={job_name}
#SBATCH --account={account}
#SBATCH --open-mode=truncate
#SBATCH --output={log_file}
#SBATCH --error={log_file}
#SBATCH --time={time}
#SBATCH --cpus-per-task={cpus_per_task}
#SBATCH --mem-per-cpu={mem_per_cpu}
#SBATCH --array={array}

module purge
module load {singularity_module}

export MIXER={mixer_dir}
export SINGULARITY_BIND="$MIXER/reference:/REF:ro"
export SIF=$MIXER/singularity
export MIXER_COMMON_ARGS="--ld-file /REF/ldsc/1000G_EUR_Phase3_plink/1000G.EUR.QC.@.run4.ld --bim-file /REF/ldsc/1000G_EUR_Phase3_plink/1000G.EUR.QC.@.bim --threads 16"
export REP="rep${{SLURM_ARRAY_TASK_ID}}"
export EXTRACT="--extract /REF/ldsc/1000G_EUR_Phase3_plink/1000G.EUR.QC.prune_maf0p05_rand2M_r2p8.$REP.snps"

export PYTHON="singularity exec --home=$PWD:/home --cleanenv $SIF/mixer.sif python"

'''

In [None]:
fit1 = '''
$PYTHON /tools/mixer/precimed/mixer.py fit1 $MIXER_COMMON_ARGS $EXTRACT \
    --trait1-file {} \
    --out {}/{}.fit.$REP
'''

In [None]:
fit2 = '''
$PYTHON /tools/mixer/precimed/mixer.py fit2 $MIXER_COMMON_ARGS $EXTRACT \
    --trait1-file {} \
    --trait2-file {} \
    --trait1-params {}/{}.fit.$REP.json --trait2-params {}/{}.fit.$REP.json \
    --out {}/{}_vs_{}.fit.$REP
'''

In [None]:
test1 = '''
$PYTHON /tools/mixer/precimed/mixer.py test1 $MIXER_COMMON_ARGS \
    --trait1-file {} \
    --load-params {}/{}.fit.$REP.json \
    --out {}/{}.test.$REP
'''

In [None]:
test2 = '''
$PYTHON /tools/mixer/precimed/mixer.py test2 $MIXER_COMMON_ARGS \
    --trait1-file {} \
    --trait2-file {} \
    --load-params {}/{}_vs_{}.fit.$REP.json \
    --out {}/{}_vs_{}.test.$REP
'''

In [None]:
combine1 = '''$PYTHON /tools/mixer/precimed/mixer_figures.py combine \
    --json {output_dir}/{trait}.{fit_or_test}.rep@.json  \
    --out {output_dir}/{trait}.{fit_or_test}'''
combine2 = '''$PYTHON /tools/mixer/precimed/mixer_figures.py combine \
    --json {output_dir}/{trait0}_vs_{trait1}.{fit_or_test}.rep@.json  \
    --out {output_dir}/{trait0}_vs_{trait1}.{fit_or_test}'''
fig_one = '''$PYTHON /tools/mixer/precimed/mixer_figures.py one \
    --json {output_dir}/{trait0}.{fit_or_test}.json {output_dir}/{trait1}.{fit_or_test}.json \
    --out {output_dir}/{trait0}_and_{trait1}.{fit_or_test} \
    --trait1 {trait0} {trait1} --statistic mean std --ext {ext}'''

fig_two = '''$PYTHON /tools/mixer/precimed/mixer_figures.py two \
    --json-fit {output_dir}/{trait0}_vs_{trait1}.fit.json \
    --json-test {output_dir}/{trait0}_vs_{trait1}.test.json \
    --out {output_dir}/{trait0}_vs_{trait1} --trait1 {trait0} --trait2 {trait1} \
    --statistic mean std --ext {ext}'''

### Create and submit jobscripts with handling of job dependencies

In [None]:
# fit1
fit1_job_ids = dict()
for trait, fname in {**primary_traits, **secondary_traits}.items():
    traitfile = os.path.join(sumstats_mixer, fname)
    assert os.path.isfile(traitfile), f'file {traitfile} is missing'
    
    # write job file
    dt = datetime.datetime.now().strftime("%Y%m%d_%H:%M:%S")
    logfile = os.path.join(logs_dir, f'mixer_fit1_{trait}_{dt}_%A_%a.txt')
    jobscript = slurm_header.format(job_name=f'mixer_fit1_{trait}',
                                    account=account,
                                    mixer_dir=mixer_dir,
                                    singularity_module=singularity_module,
                                    log_file=logfile, 
                                    time='07:00:00', 
                                    cpus_per_task=cpus_per_task, 
                                    mem_per_cpu=mem_per_cpu,
                                    array=array)
    jobscript += fit1.format(traitfile, output_dir, trait)
    fjob = os.path.join(jobscripts_dir, f'mixer_fit1_{trait}.job')
    with open(fjob, 'w') as f:
        f.writelines(jobscript)
    
    # submit job
    cmd = ' '.join(['sbatch', fjob])
    print(cmd)
    output = sp.getoutput(cmd)
    jobid = output.split(' ')[-1]
    fit1_job_ids.update({f'mixer_fit1_{trait}': jobid})

In [None]:
# test1
test1_job_ids = dict()
for trait, fname in {**primary_traits, **secondary_traits}.items():
    traitfile = os.path.join(sumstats_mixer, fname)
    assert os.path.isfile(traitfile), f'file {traitfile} is missing'
    
    # write job file
    dt = datetime.datetime.now().strftime("%Y%m%d")
    logfile = os.path.join(logs_dir, f'mixer_test1_{trait}_{dt}_%A_%a.txt')
    jobscript = slurm_header.format(job_name=f'mixer_test1_{trait}_{dt}', 
                                    account=account,
                                    mixer_dir=mixer_dir,
                                    singularity_module=singularity_module,
                                    log_file=logfile, 
                                    time='01:00:00', 
                                    cpus_per_task=cpus_per_task, 
                                    mem_per_cpu=mem_per_cpu,
                                    array=array)
    jobscript += test1.format(traitfile, output_dir, trait, output_dir, trait)
    fjob = os.path.join(jobscripts_dir, f'mixer_test1_{trait}.job')
    with open(fjob, 'w') as f:
        f.writelines(jobscript)
    
    # get job dependency
    jobdep = fit1_job_ids[f'mixer_fit1_{trait}']
    
    # submit job
    cmd = ' '.join(['sbatch', f'--dependency=afterok:{jobdep}', fjob])
    print(cmd, f'; depends on {jobdep}')
    output = sp.getoutput(cmd)
    jobid = output.split(' ')[-1]
    test1_job_ids.update({f'mixer_test1_{trait}': jobid})

In [None]:
# fit2
fit2_job_ids = dict()
for trait0, fname0 in secondary_traits.items():
    traitfile0 = os.path.join(sumstats_mixer, fname0)
    for trait1, fname1 in primary_traits.items():
        if fname0 == fname1:
            continue
        traitfile1 = os.path.join(sumstats_mixer, fname1)
    
        # write job file
        jobname = f'mixer_fit2_{trait0}_vs_{trait1}'
        dt = datetime.datetime.now().strftime("%Y%m%d")
        logfile = os.path.join(logs_dir, f'{jobname}_{dt}_%A_%a.txt')
        jobscript = slurm_header.format(job_name=jobname, 
                                        account=account,
                                        mixer_dir=mixer_dir,
                                        singularity_module=singularity_module,
                                        log_file=logfile, 
                                        time='05:00:00', 
                                        cpus_per_task=cpus_per_task, 
                                        mem_per_cpu=mem_per_cpu,
                                        array=array)
        jobscript += fit2.format(traitfile0, traitfile1, 
                       output_dir, trait0, 
                       output_dir, trait1, 
                       output_dir, trait0, trait1)
        fjob = os.path.join(jobscripts_dir, f'mixer_fit2_{trait0}_vs_{trait1}.job')
        with open(fjob, 'w') as f:
            f.writelines(jobscript)

        # get job dependencies
        jobdeps = [fit1_job_ids[f'mixer_fit1_{trait0}'], 
                   fit1_job_ids[f'mixer_fit1_{trait1}'],
                   test1_job_ids[f'mixer_test1_{trait0}'],
                   test1_job_ids[f'mixer_test1_{trait1}'],
                   ]
        
        # submit job and get job ID:
        cmd = ' '.join(['sbatch',
                        '--dependency=afterok:{}'.format(','.join(jobdeps)),
                        fjob])
        print(cmd, f'; depends on {",".join(jobdeps)}')
        output = sp.getoutput(cmd)
        jobid = output.split(' ')[-1]
        fit2_job_ids.update({f'mixer_fit2_{trait0}_vs_{trait1}': jobid})    

In [None]:
# test2
test2_job_ids = dict()
for trait0, fname0 in secondary_traits.items():
    traitfile0 = os.path.join(sumstats_mixer, fname0)
    for trait1, fname1 in primary_traits.items():
        if fname0 == fname1:
            continue
        traitfile1 = os.path.join(sumstats_mixer, fname1)
    
        # write job file
        jobname = f'mixer_test2_{trait0}_vs_{trait1}'
        dt = datetime.datetime.now().strftime("%Y%m%d")
        logfile = os.path.join(logs_dir, f'{jobname}_{dt}_%A_%a.txt')

        jobscript = slurm_header.format(job_name=jobname, 
                                        account=account,
                                        mixer_dir=mixer_dir,
                                        singularity_module=singularity_module,
                                        log_file=logfile, 
                                        time='02:00:00', 
                                        cpus_per_task=cpus_per_task, 
                                        mem_per_cpu=mem_per_cpu,
                                        array=array)
        
        jobscript += test2.format(traitfile0, traitfile1, 
                       output_dir, trait0, trait1,
                       output_dir, trait0, trait1)
        fjob = os.path.join(jobscripts_dir, f'mixer_test2_{trait0}_vs_{trait1}.job')
        with open(fjob, 'w') as f:
            f.writelines(jobscript)

        # get job dependencies
        jobdeps = [fit2_job_ids[f'mixer_fit2_{trait0}_vs_{trait1}']
        ]
        
        # submit job and get job ID:
        cmd = ' '.join(['sbatch',
                        '--dependency=afterok:{}'.format(','.join(jobdeps)),
                        fjob])
        print(cmd, f'; depends on {",".join(jobdeps)}')
        output = sp.getoutput(cmd)
        jobid = output.split(' ')[-1]
        test2_job_ids.update({f'mixer_test2_{trait0}_vs_{trait1}': jobid})

In [None]:
# write plotting job file
jobname = 'mixer_plots'
dt = datetime.datetime.now().strftime("%Y%m%d")
logfile = os.path.join(logs_dir, f'{jobname}_{dt}_%A_%a.txt')
jobscript = slurm_header.format(job_name=jobname, 
                                account=account,
                                mixer_dir=mixer_dir,
                                singularity_module=singularity_module,
                                log_file=logfile, 
                                time='01:00:00', 
                                cpus_per_task=2, 
                                mem_per_cpu=mem_per_cpu,
                                array='1')
fjob = os.path.join(jobscripts_dir, f'{jobname}.job')

# job script content
tasklist = []

# combine1
tasklist += ['# combine univariate'] 
for fit_or_test in ['fit', 'test']:
    for trait in {**primary_traits, **secondary_traits}.keys():
        tasklist += [combine1.format(output_dir=output_dir, 
                                     trait=trait, 
                                     fit_or_test=fit_or_test)]
    tasklist += ['\n']

# combine2
tasklist += ['# combine bivariate'] 
for fit_or_test in ['fit', 'test']:
    for trait0 in secondary_traits.keys():
        for trait1 in primary_traits.keys():
            if trait0 == trait1:
                continue
            tasklist += [combine2.format(output_dir=output_dir, 
                                         trait0=trait0, 
                                         trait1=trait1, 
                                         fit_or_test=fit_or_test)]
    tasklist += ['\n']

# fig_one
tasklist += ['# mixer_figure one'] 
for fit_or_test in ['fit', 'test']:
    for trait0 in secondary_traits.keys():
        for trait1 in primary_traits.keys():
            if trait0 == trait1:
                continue
            for ext in ['png', 'svg']:
                tasklist += [fig_one.format(output_dir=output_dir, 
                               trait0=trait0, 
                               trait1=trait1, 
                               fit_or_test=fit_or_test, 
                               ext=ext)]
tasklist += ['\n']

# fig_two
tasklist += ['# mixer_figure two'] 
for trait0 in secondary_traits.keys():
    for trait1 in primary_traits.keys():
        if trait0 == trait1:
            continue
        for ext in ['png', 'svg']:
            tasklist += [fig_two.format(output_dir=output_dir, 
                           trait0=trait0, 
                           trait1=trait1, 
                           ext=ext)]
tasklist += ['\n']

with open(fjob, 'w') as f:
    f.writelines(jobscript + '\n'.join(tasklist))


# get job dependencies
jobdeps = []
for trait0 in secondary_traits.keys():
    for trait1 in primary_traits.keys():
        if trait0 == trait1:
            continue
        jobdeps += [test2_job_ids[f'mixer_test2_{trait0}_vs_{trait1}']]
if len(jobdeps) == 0:
    cmd = ' '.join(['sbatch', fjob])
    print(cmd)
else:    
    cmd = ' '.join(['sbatch',
                    '--dependency=afterok:{}'.format(','.join(jobdeps)),
                    fjob])
    print(cmd, f'; depends on {",".join(jobdeps)}')

output = sp.getoutput(cmd)
jobid = output.split(' ')[-1]

In [None]:
# uncomment to check the status of submitted jobs
# !squeue -u $USER

In [None]:
# uncomment to check that there are no errors in logs (like missing output files):
# !grep rror logs/*.txt