# Use Notebook to deploy Motus (via the wrapper read_counter.py) over a cluster with a Slurm resource manager and job scheduler.  
### Here, a job comprises a computer task performing taxonomic profiling of a MTG sample collected during the TARA Ocean expedition. 
### Based on this tool we can profile MTGs using a reference database of phylogenetically conserved marker genes across a great variety of planktonic groups.  This database was built using the following marker gene DBs:

The huge catalog of phytoplankton psbO marker gene sequences, which encodes the manganese-stabilising polypeptide of the photosystem II oxygen evolving complex, reported in this [paper](https://onlinelibrary.wiley.com/doi/epdf/10.1111/1755-0998.13592) and accessible [here](https://www.ebi.ac.uk/biostudies/studies/S-BSST761?query=A%20robust%20approach%20to%20estimate%20relative%20phytoplankton%20cell%20abundances%20from%20metagenomes).

The [MZGdb](https://metazoogene.org/MZGdb) database and most specifically the "All Plankton Combo" files contain all data from the All Zooplankton and the All Ichthyoplankton combined files. This database was described in this [paper](https://link.springer.com/article/10.1007/s00227-021-03887-y). Here we will focus on DNA sequences for the barcode region of mitochondrial cytochrome oxidase I (COI).  

The reference database was built in Notebook: `CustomDB_MTG_Taxa_Profiling_v1.0.ipynb`


In [1]:
import matplotlib as mpl
mpl.rcParams['figure.dpi']= 300
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import cm, colors
from matplotlib.colors import ListedColormap
import seaborn as sns
import pandas as pd
sns.set_style('ticks')
import glob 
from subprocess import Popen, call, STDOUT, PIPE
import os
import numpy as np
import matplotlib

In [2]:
matplotlib.rcParams['savefig.dpi'] = 1000
matplotlib.rcParams['figure.dpi'] = 900
sns.set_style("whitegrid", {'axes.grid' : False})
sns.set_context("paper")
sns.set(font='serif')
# sns.set_style("white", {
#         "font.family": "serif",
#         "font.serif": ["Times", "Palatino", "serif"]
#     })
sns.set_style('ticks')

In [1]:
#Utility function
def runShell(cmd):
    Popen(cmd,shell=True,stdout=PIPE,stderr=PIPE).communicate()[0]

### To submit jobs to the cluster using this notebook follow the steps below

Job script template to submit on a per-sample basis.  

**NOTE:** For properly setting the following script you must:  
   - Activate dedicated conda environment within which the motus software was installed (MOTUS_ENV)
   - Provide full path to all things Motus profiler (BASE_DIR), which contains data/ code/, etc. 
   - Provide full path to read_counter wrapper (READ_COUNTER_PATH), which  calls and runs Motus on a customized DB
   
**NOTE2:** The /readonly prefix in the bash script below can be removed for most clusters.

In [7]:
job_script_template = '''
#!/bin/bash
#PBS -l nodes=1:ppn=1
#PBS -l mem=30gb
#PBS -l walltime=24:00:00

#Activate conda environment
source activate ${MOTUS_ENV}

#Run script to fetch clean MTG/MTX paired-end reads from ftp server
bash /dodrio/scratch/projects/starting_2022_048/Motus/scripts/fetch_merge_pe_reads.sh %(accession_id)s %(ftp2r1)s %(ftp2r2)s

MERGED_READS=${BASE_DIR}/data/TARA_metaGs/%(accession_id)s_merged_reads.fastq
export INPUT_FASTQ=/readonly${MERGED_READS}

#NOTE: ${BASE_DIR}/data/custom_db/DB == ${BASE_DIR}/data/custom_db/CustomPhytoZooPlanktonMGs.fna
CUSTOM_DB=/readonly${BASE_DIR}/data/custom_db/DB

OUTPUT_MAPPING=${BASE_DIR}/results/%(accession_id)s_mapped_reads.map

cd ${BASE_DIR}/scripts

${READ_COUNTER_PATH}/read_counter map -s ${INPUT_FASTQ} -y insert.raw_counts -db ${CUSTOM_DB} -o ${OUTPUT_MAPPING} -l 80 -t 48

#Remove merged reads after running humann3
rm ${MERGED_READS}

'''

In [8]:
#Read in DF containing info on metaGs and metaTs
metag_df = pd.read_csv('../../data/taxa_profiling_test_1/TARA_metaG_metadata.tsv',index_col='Unnamed: 0')

#ID samples by SAM accession ID
accessions_list = metag_df['sample_accession'].drop_duplicates().values

#Set directory where dedicated script will be run from
job_script_dir = '/dodrio/scratch/projects/starting_2022_048/Motus/scripts/'

Loop over each SAM accession ID in list *accessions_list* and create a dedicated job script file to submit to the cluster.  

**WARNING:** You might want to run only a subsample of the whole list of accessions in *accessions_list*, otherwise you might get the whole system overloaded!!!  

To potentially prevent your system from collapsing, the code below is commented. Uncomment for running the whole set of samples contained in *accessions_list*.

In [7]:
# #Set type of sample, keep in mind that we are dealing with paired MTG-MTT
# for accid in accessions_list: 
    
#     #FTPs to fetch paired end reads from
#     ftp2r1, ftp2r2 = metag_df.query('sample_accession=="{}"'.format(accid))['submitted_ftp'].values[0].split(';')

#     #Create sample specific job script, save to dedicated folder and submit job
#     job_submit_script = job_script_template % {'accession_id':accid, 'ftp2r1':ftp2r1, 'ftp2r2':ftp2r2}
#     script_name = os.path.join(job_script_dir,'job_motus_sample_{}.sh'.format(accid))

#     #Dump text into .sh file
#     with open(script_name,'w') as fid:
#         fid.write(job_submit_script)

#     #Submit job
#     runShell("qsub {}".format(script_name))

Check the structure of a job script

In [16]:
accid = accessions_list[0]
script_name = os.path.join(job_script_dir,'job_motus_sample_{}.sh'.format(accid))

!cat $script_name


#!/bin/bash
#PBS -l nodes=1:ppn=1
#PBS -l mem=30gb
#PBS -l walltime=24:00:00
#PBS -A starting_2022_048

#Activate conda environment
source activate /dodrio/scratch/projects/starting_2022_048/envs/motus

###Set base folder
BASE_DIR=/dodrio/scratch/projects/starting_2022_048/Motus

#Run script to fetch clean MTG/MTX paired-end reads from ftp server
bash /dodrio/scratch/projects/starting_2022_048/Motus/scripts/fetch_merge_pe_reads.sh SAMEA2731167 ftp.sra.ebi.ac.uk/vol1/run/ERR172/ERR1726802/AHX_BMGIOSF_4_1_C2FGHACXX.IND2_clean.fastq.gz ftp.sra.ebi.ac.uk/vol1/run/ERR172/ERR1726802/AHX_BMGIOSF_4_2_C2FGHACXX.IND2_clean.fastq.gz

MERGED_READS=${BASE_DIR}/data/TARA_metaGs/SAMEA2731167_merged_reads.fastq
export INPUT_FASTQ=/readonly${MERGED_READS}

#NOTE: ${BASE_DIR}/custom_db/DB == ${BASE_DIR}/custom_db/CustomPhytoZooPlanktonMGs.fna
CUSTOM_DB=/readonly${BASE_DIR}/custom_db/DB

OUTPUT_MAPPING=${BASE_DIR}/results/SAMEA2731167_mapped_reads.map

cd ${BASE_DIR}/scripts

/dodrio/scratch/projects/st

### Next, after running the whole set of samples contained in *accessions_list*, check which output files (e.g. SAMEA*_mapped_reads.map) have been successfuly run to completion

In [17]:
get_all_output_files = glob.glob("../../results/taxa_profiling_test_1/SAMEA*_mapped_reads.map")

success_runs = []
for ofn in get_all_output_files:
    with open(ofn,'r') as fid:
        l = fid.readlines()[1:]
        if(len(l)>10):
            success_runs.append(ofn.split("_mapped_reads.map")[0].split('/')[-1])
            
#Samples to re-run for metaGs            
sid_samples2rerun = list(set(accessions_list).difference(set(success_runs)))

# print("Total No. of jobs still pending for running: {}\n".format(len(sid_samples2rerun)))

print("No. of MTGs processed by Motus/read_counter: {}".format(len(success_runs)))

No. of MTGs processed by Motus/read_counter: 50
