# Tree Building Pipeline

Below scripts run through tree building on 61 Mtb isolates downloaded from the SRA.

In [1]:
import sys, os, subprocess, dendropy
import pandas as pd
from Bio import Entrez, SeqIO
import mtbvartools as vt
from mtbvartools.dasktools import startClient
from tqdm import tqdm

# add scripts folder to path
scripts_path = os.path.abspath('../scripts/')

# prepare environment path for subprocesses
conda_path = os.path.dirname(sys.executable)  # store conda path for shell execution
env = os.environ.copy()
env['PATH'] = conda_path + os.pathsep + env['PATH'] + os.pathsep + scripts_path

## Prepare for Variant Calling

Download H37Rv v3 genome from the NCBI and prepare indexes used by BWA and GATK.

In [2]:
accession = 'NC_000962.3'  # H37Rv v3 accession number
os.makedirs('genome', exist_ok=True)

with Entrez.efetch(db='nuccore', rettype='gbwithparts', retmode='text', id=accession) as handle, open(f'genome/{accession}.gb', 'w') as f:
    f.write(handle.read())
with Entrez.efetch(db='nuccore', rettype='fasta', retmode='text', id=accession) as handle, open(f'genome/{accession}.fasta', 'w') as f:
    f.write(handle.read())

In [3]:
subprocess.run(
    'bwa index genome/NC_000962.3.fasta',
    shell=True, env=env)
subprocess.run(
    'gatk CreateSequenceDictionary -R genome/NC_000962.3.fasta',
    shell=True, env=env)

[bwa_index] Pack FASTA... 0.05 sec
[bwa_index] Construct BWT for the packed sequence...
[bwa_index] 1.27 seconds elapse.
[bwa_index] Update BWT... 0.04 sec
[bwa_index] Pack forward-only FASTA... 0.04 sec
[bwa_index] Construct SA from BWT and Occ... 0.55 sec
[main] Version: 0.7.18-r1243-dirty
[main] CMD: bwa index genome/NC_000962.3.fasta
[main] Real time: 2.438 sec; CPU: 1.959 sec
Using GATK jar /n/home12/pculviner/.conda/envs/mtbvartools/share/gatk4-4.6.1.0-0/gatk-package-4.6.1.0-local.jar
Running:
    java -Dsamjdk.use_async_io_read_samtools=false -Dsamjdk.use_async_io_write_samtools=true -Dsamjdk.use_async_io_write_tribble=false -Dsamjdk.compression_level=2 -jar /n/home12/pculviner/.conda/envs/mtbvartools/share/gatk4-4.6.1.0-0/gatk-package-4.6.1.0-local.jar CreateSequenceDictionary -R genome/NC_000962.3.fasta
INFO	2025-03-02 16:00:09	CreateSequenceDictionary	Output dictionary will be written in /n/boslfs02/LABS/sfortune_lab/Lab/culviner/notebooks/250301_prepare_uploads/mtbvartools/t

Tool returned:
0


CompletedProcess(args='gatk CreateSequenceDictionary -R genome/NC_000962.3.fasta', returncode=0)

---
Load in SRA example accession numbers - the first strain, SRR10522783, is a canettii sample which is a common MTBC outgroup. Including an outgroup enables rooting of the tree.

For your own trees, canettii could also be a good choice for outgroup.

In [4]:
strain_list = pd.read_csv('tree_strains.txt').sra.values
print(strain_list, len(strain_list))

['SRR10522783' 'ERR2679299' 'ERR2446424' 'ERR2446415' 'ERR551110'
 'SRR7496532' 'SRR6397656' 'DRR185092' 'ERR3256257' 'ERR11044579'
 'SRR7496478' 'SRR6797503' 'SRR6930928' 'SRR2101286' 'SRR8217901'
 'SRR6046329' 'SRR6480571' 'SRR5266537' 'SRR5153716' 'ERR11068733'
 'DRR130114' 'SRR6807685' 'SRR8651614' 'SRR6856120' 'SRR8439315'
 'ERR3273187' 'ERR137210' 'DRR185016' 'SRR6152938' 'SRR6397378'
 'DRR185093' 'ERR2446119' 'ERR11067324' 'SRR2100524' 'ERR3275540'
 'ERR11051007' 'ERR11068227' 'ERR11044288' 'ERR11044351' 'SRR6964596'
 'ERR11068958' 'ERR2446390' 'SRR2100329' 'ERR11067544' 'ERR2514624'
 'ERR2513490' 'ERR1873433' 'ERR11049166' 'SRR6074074' 'ERR551391'
 'SRR6397635' 'ERR3275899' 'ERR11081805' 'SRR11033762' 'ERR3287431'
 'ERR11081108' 'ERR400371' 'ERR11042993' 'ERR11082629' 'ERR2514855'
 'ERR2446408'] 61


## Parallel Run of Variant Calling Using SLURM

Run all above numbers, spawning workers. Can also run sequentially or using local parallelism. The pipeline can also be run on local fastq files by using the `--fastq-path` (give a single path for SE or a comma separated path for PE) option instead of the `--sra` option.

Fastq downsampling is critical to improve pipeline throughput as many bacterial samples from the SRA are massively over-sequenced. For trial run, a target depth of 30 is adequate, but for high quality trees, better to target ~100. There are diminishing returns above 100 and the breseq variant caller (the most computationally expensive step) begins to take a lot more time. Setting target depth to 0 will not downsample and use all reads. See `sra_variant_pipelinepy --help` for additional pre-filtering options (minimum depth, ignore lineages, minimum alignment) that cam be tweaked from defaults to avoid wasted commpute time on low-quality samples.

Logs for each sample will be stored in the `logs` directory. A frequent failure point for samples is SRA server instability - you might see error messages associated with `prefetch` if this is the case. The script will attempt to download each sample 3 times before giving up.

Results from each step are stored in the output folder corresponding to each sample in the output (`pipeline_results`) directory. For debugging, `--keep-intermediates` can be used, but for large numbers of samples this can use massive amounts of space as many intermediate files are not compressed to decrease resource use. If higher speed node storage is avalible on `tmp`, `--tmp-path` can be set to `/tmp/` and this will be used for temporary files. Note that a lot of disk I/O will still happen even off of `tmp` so if a path to a fast short term scratch disk is availible, ideally files should be output there and then results can be moved to cold storage.

In [5]:
client = startClient(
    n_workers=25,  # spawn 25 workers using SLURM - workers will accept new tasks as others complete
    use_local=False,  # can also do this on a single local process, but be sure not to spawn more workers than cores or local memory allows
    use_slurm=True,  # using SLRUM - below are SLRUM specific options
    log_dir='logs',  # outputs worker logs and below, logs per sample
    queue='sapphire',  # name of our queue, this will NOT be the same for all queues
    process_per_node=1,  # each worker will run separately, not multiple per node
    cores_per_process=2,  # SLURM processors to provide to process
    memory_per_process='8GB',  # SLURM memory to allocate per process
    walltime='4:00:00')  # maximum walltime before worker cancels

common_args = '\
--dir pipeline_results \
--threads 2 \
--tmp-path FALSE \
--genbank genome/NC_000962.3.gb \
--fasta genome/NC_000962.3.fasta \
--allowed-lineages any \
--target-depth 30'

futures = []
for sra in strain_list:
    futures.append(client.submit(
        subprocess.run, f'sra_variant_pipeline.py --sra {sra} --output {sra} {common_args} > logs/{sra}.out', shell=True, env=env))

outputs_list = []
for f in tqdm(futures):
    outputs_list.append(client.gather(f))
    f.release()

client.shutdown()  # this shuts down workers

2025-03-02 16:00:34 - Launching SLURMCluster client (25 workers x 2 CPU x 8GB x 4:00:00 @ sapphire with 1 workers / node)


100%|██████████| 61/61 [1:11:50<00:00, 70.67s/it]   
2025-03-02 17:12:31,983 - distributed.batched - INFO - Batched Comm Closed <TCP (closed)  local=tcp://10.31.147.62:34921 remote=tcp://10.31.147.62:35600>
Traceback (most recent call last):
  File "/n/home12/pculviner/.conda/envs/mtbvartools/lib/python3.12/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
             ^^^^^^^^^^
  File "/n/home12/pculviner/.conda/envs/mtbvartools/lib/python3.12/site-packages/tornado/gen.py", line 766, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/n/home12/pculviner/.conda/envs/mtbvartools/lib/python3.12/site-packages/distributed/comm/tcp.py", line 263, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError


Collect results into a single csv file.

In [6]:
def collectResults(target_directory, name_list):
    completed_list = []
    error_list = []
    unhandled_list = []
    completed = 0
    caught_error = 0
    unhandled_error = 0
    for name in tqdm(name_list):
        try:  # get completed timings
            completed_list.append(
                pd.read_csv(f'{target_directory}/{name}/results/{name}.results.csv', index_col=0).loc[name])
            completed += 1
        except FileNotFoundError:
            try:
                error_list.append(
                    pd.read_csv(f'{target_directory}/{name}/results/{name}.error.results.csv', index_col=0).loc[name])
                caught_error += 1
            except FileNotFoundError:
                unhandled_error += 1
                unhandled_list.append(name)
    completed_results = pd.concat(
        completed_list, axis=1).T
    try:
        error_results = pd.concat(
            error_list, axis=1).T
    except:
        error_results = None
    print(f'{completed} completed, {caught_error} caught errors, {unhandled_error} unhandled errors.')
    return completed_results, error_results, unhandled_list

In [7]:
results_df, _, _ = collectResults('pipeline_results', strain_list)
results_df.to_csv('variant_calling_results.csv')

100%|██████████| 61/61 [00:02<00:00, 22.74it/s]


61 completed, 0 caught errors, 0 unhandled errors.


## Prepare a SNP fasta file for tree building

Assuming all variant calling results appear to be of high quality, prepare an input datasheet for tree fasta generation.

Common sample failure modes are low depth (see `mindepth`), low coverage (see `coverage`), failure of TBProfiler to call a lineage (see `call`, note canettii is not called - this is expected behavior), mixed samples (see below), or poor breseq-mutect consensus (see below).

The pipeline runs a `TBProfiler` followed by a script to identify conflicting lineage calls which can be caused by mixed infections or contamination. If multiple lineage calls are made it will appear in `conflict_lineages`. The number of SNPs calling each conflicting lineage are denoted in `conflict_snps` - note that sometimes a single SNP will cause a conflicting call - this is may not a "real" mixed sample just a lineage arising SNP that arose de novo.

The variant script runs GATK's `mutect2` and `breseq` in consensus mode. SNPs called by `mutect2` are categorized into fixed (AF = 1), near fixed (AF > .9), mixed (.9 > AF > .1), or low alt (AF < .1). A high ratio of mixed to fixed + near fixed can be an indicator of mixed samples or poor quality sequencing. Finally, comparison of the Breseq SNP counts to Mutect2 SNP counts can be used to filter samples - if these values strongly disagree, it might be difficult to call variants with a high certainty.

In [8]:
results_inputs = pd.read_csv('variant_calling_results.csv', index_col=0)

datasheet_data = []
for index, rdata in results_inputs.iterrows():
    row_data = [
        index,
        f'{index}/results/{index}.breseq.vcf',
        f'{index}/results/{index}.miss.breseq.zarr']
    datasheet_data.append(row_data)

merge_df = pd.DataFrame(
    data=datasheet_data,
    columns=['label', 'vcf_path', 'miss_path'])

# run on all strains
merge_df.to_csv(f'tree_fasta_inputs.csv')

Run a batch command to generate fasta files. The script generates a pseudo-genome alignment of all locations with a SNP in at least one sample.

The above for loop stores the breseq VCF file as well as an array of breseq "miss" locations - places where breseq decided data was too low quality to make a call. To avoid reference bias (where poor quality sequencing results default to looking like the reference, in our case a L4.9 strain, H37Rv), the fasta generation script masks low quality sites from a given sample and fully removes SNPs in regions that were called as poor quality in a fraction of samples (see `--miss-threshold`). As an added step to remove biases, we use a mapping quality filter that was previously calculated (https://pubmed.ncbi.nlm.nih.gov/35020793/, see `--mask`).

In [9]:
threads = 10
path = env['PATH']
work_dir = 'tree_output'

os.makedirs(work_dir, exist_ok=True)
cmd = f'\
    sbatch -c {threads} -p sapphire -t 2:00:00 --mem=20G -o {work_dir}/fasta-%j.out --wrap="\
    export PATH={path} && \
    write_snp_fastas.py \
    --input-csv tree_fasta_inputs.csv \
    --input-fasta genome/NC_000962.3.fasta \
    --inputs-dir pipeline_results \
    --out-dir {work_dir} \
    --output snp_fasta \
    --mask rlc_plus_lowmap_marin.bed \
    --miss-threshold 0.1 \
    --local-threads {threads}"'
subprocess.run(cmd, shell=True)

Submitted batch job 5285859


CompletedProcess(args='    sbatch -c 10 -p sapphire -t 2:00:00 --mem=20G -o tree_output/fasta-%j.out --wrap="    export PATH=/n/home12/pculviner/.conda/envs/mtbvartools/bin:/n/home12/pculviner/.conda/envs/jlab/bin:/n/sw/Mambaforge-22.11.1-4/condabin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/puppetlabs/bin:/n/home12/pculviner/.local/bin:/n/home12/pculviner/bin:/n/boslfs02/LABS/sfortune_lab/Lab/culviner/bin:/n/boslfs02/LABS/sfortune_lab/Lab/culviner/notebooks/250301_prepare_uploads/mtbvartools/scripts &&     write_snp_fastas.py     --input-csv tree_fasta_inputs.csv     --input-fasta genome/NC_000962.3.fasta     --inputs-dir pipeline_results     --out-dir tree_output     --output snp_fasta     --mask rlc_plus_lowmap_marin.bed     --miss-threshold 0.1     --local-threads 10"', returncode=0)

Fasta generation outputs show the number of invariant A,T,G,C for input into ascertainment bias.

In [10]:
pd.read_csv('tree_output/snp_fasta.results.csv', index_col=0)

Unnamed: 0,snp_fasta
n_samples,61.0
genome_len,4411532.0
miss_threshold,0.1
pass_miss_threshold_sites,4162647.0
variant_sites,34091.0
minimum_variant_strains_to_consider,1.0
considered_variant_sites,34091.0
bed_mask_sites,276750.0
passing_variant_sites,31027.0
passing_output_sites,31027.0


## Tree building with IQTree

Due to clonal inheritance and mimimal genomic diversity most sites in the Mtb genome are invariant. To generate proper branch lengths, we provide IQTree with the number of constant sites output by the fasta generation script.

In [11]:
# ascertainment bias ordered: A, C, G, T
fconst = '699061,1315112,1311900,699337'
threads = 20
memory = 30
work_dir = 'tree_output'
input_fasta = f'{work_dir}/snp_fasta.fasta'
path = env['PATH']

cmd = f'\
    sbatch -c {threads} -p sapphire -t 4:00:00 --mem={memory}G -o {work_dir}/tree-%j.out --wrap="\
    export PATH={path} && export OMP_NUM_THREADS={threads} && \
    module load Mambaforge/22.11.1-fasrc01 && conda activate mtb_isolates && \
    iqtree -T {threads} -fconst {fconst} -s {input_fasta} --model GTR -bb 1000 --prefix {work_dir}/iqtree"'
subprocess.run(
    cmd, shell=True)

Submitted batch job 5285891


CompletedProcess(args='    sbatch -c 20 -p sapphire -t 4:00:00 --mem=30G -o tree_output/tree-%j.out --wrap="    export PATH=/n/home12/pculviner/.conda/envs/mtbvartools/bin:/n/home12/pculviner/.conda/envs/jlab/bin:/n/sw/Mambaforge-22.11.1-4/condabin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/puppetlabs/bin:/n/home12/pculviner/.local/bin:/n/home12/pculviner/bin:/n/boslfs02/LABS/sfortune_lab/Lab/culviner/bin:/n/boslfs02/LABS/sfortune_lab/Lab/culviner/notebooks/250301_prepare_uploads/mtbvartools/scripts && export OMP_NUM_THREADS=20 &&     module load Mambaforge/22.11.1-fasrc01 && conda activate mtb_isolates &&     iqtree -T 20 -fconst 699061,1315112,1311900,699337 -s tree_output/snp_fasta.fasta --model GTR -bb 1000 --prefix tree_output/iqtree"', returncode=0)

Canettii outgroup (SRR10522783) is extremely divergent relative to MTBC. To enable easy plotting in software like itol (https://itol.embl.de/), we root at canettii's parent node and prune out canettii.

In [12]:
outgroup = 'SRR10522783'
work_dir = 'tree_output'
tree_label = 'iqtree'

# load the tree
tree = dendropy.Tree.get_from_path(
    f'{work_dir}/{tree_label}.contree', 'newick')
# root at outgroup
tree.reroot_at_node(
    tree.find_node_with_taxon_label(outgroup).parent_node)
# ladderize
tree.ladderize()
tree.write(
    path=f'{work_dir}/{tree_label}.rooted.nwk',
    schema='newick')
tree.prune_taxa_with_labels([outgroup])
tree.write(
    path=f'{work_dir}/{tree_label}.pruned.nwk',
    schema='newick')