# RNA-seq expression - Workflow

## Setup and global parameters

In [6]:
[global]
# The output directory for generated files.
parameter: cwd = path("output")
# Sample meta data list
parameter: samples = path
# Raw data directory, default to the same directory as sample list
parameter: data_dir = path(f"{samples:d}")
# For cluster jobs, number commands to run per job
parameter: job_size = 1
# Wall clock time expected
parameter: walltime = "5h"
# Memory expected
parameter: mem = "16G"
# Memory for Java virtual mechine (`picard`)
parameter: java_mem = "6G"
# Number of threads
parameter: numThreads = 8
# Software container option
parameter: container = ""
from sos.utils import expand_size
cwd = path(f'{cwd:a}')
## Whether the fasta/fastq file is compressed or not.
parameter: uncompressed = False
import os
import pandas as pd
## FIX: The way to get sample needs to be revamped to 1. Accomodate rf/fr as column 2. accomodate single end read (Only 2 fq/samples)
sample_inv = pd.read_csv(samples,"\t")
## Extract strand information if user have specified the strand
strand_inv = []

if "strand" in sample_inv.columns:
    strand_inv = sample_inv.strand.values.tolist()
    sample_inv = sample_inv.drop("strand" , axis = 1)
    stop_if(not all([x in ["fr", "rf", "unstranded","strand_missing"] for x in strand_inv ]), msg = "strand columns should only include ``fr``, ``rf``, ``strand_missing`` or ``unstranded``")
            
## Extract read_length if user have specified read_length
read_length = []
if "read_length" in sample_inv.columns:
    read_length = sample_inv.read_length.values.tolist()
    sample_inv = sample_inv.drop("read_length" , axis = 1)
    
    
## Extract sample_id
sample_inv_list = sample_inv.values.tolist()
sample_id = [x[0] for x in sample_inv_list]


    
## Get the file name for single/paired end data
file_inv = [x[1:] for x in sample_inv_list]
file_inv = [item for sublist in file_inv for item in sublist]

fastq = [f'{data_dir}/{x}' for x in file_inv]


for y in fastq:
        if not os.path.isfile(y):
            raise ValueError(f"File {y} does not exist")

if len(fastq) != len(set(fastq)):
        raise ValueError("Duplicated files are found (but should not be allowed) in fastq file list")

# Is the RNA-seq data pair-end
is_paired_end = 0 if len(fastq) == len(sample_id) else 1 
from sos.utils import env
env.logger.info(f'Input samples are {"paired-end" if is_paired_end else "single-end"} sequences.')

## Step 0: QC before alignment

In [7]:
[fastqc]
input: fastq, group_by =  1
output: f'{cwd}/{_input:bn}_fastqc.html',f'{cwd}/{_input:bn}_fastqc/fastqc_data.txt' 
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
bash: expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout', container=container
    fastqc ${_input} -o ${_output[0]:d}
    unzip -o ${_output[0]:n}.zip -d ${cwd}

## Step 1: Remove adaptor through `fastp`

In [None]:
[fastp_trim_adaptor_1]
# sliding window setting
parameter: window_size = 4
parameter: required_quality = 20
# the mean quality requirement option for cut_front
parameter: leading = 20
# the mean quality requirement option for cut_tail
parameter: trailing = 20
# reads shorter than length_required will be discarded
parameter: min_len = 15
# Path to the reference adaptors
parameter: fasta_with_adapters_etc = path(".")
warn_if(fasta_with_adapters_etc.is_file(),msg = "Use input fasta and adaptor detection of paired-end read was disabled" )

input: fastq, group_by = is_paired_end + 1 , group_with = "sample_id"
output: [f'{cwd}/{path(x):bnn}.trimmed.fq.gz' for x in _input]
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
bash:  container=container,expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
        fastp -i ${f'{_input[0]} -I {_input[1]}' if is_paired_end else _input} -o ${ f'{_output[0]} -O {_output[1]}' if is_paired_end else _output } \
            ${f'--adapter_fasta {fasta_with_adapters_etc}' if fasta_with_adapters_etc.is_file() else "--detect_adapter_for_pe"}  -V -h ${_output[0]:n}.html -j ${_output[0]:n}.json -w ${numThreads} \
            --length_required ${min_len}  -W ${window_size} -M ${required_quality} -5 -3 --cut_front_mean_quality ${leading} --cut_tail_mean_quality ${leading}

In [None]:
[fastp_trim_adaptor_2]
input: group_by = "all"
output: f'{cwd}/{samples:n}.trimmed.txt'
sample_inv_tmp = pd.read_csv(samples,"\t")
if is_paired_end:
    sample_inv_tmp.fq1 = [f'{x:r}' for x in _input][::2]
    sample_inv_tmp.fq2 = [f'{x:r}' for x in _input][1::2]
else:
    sample_inv_tmp.fq1 = [f'{x:r}' for x in _input]
sample_inv_tmp.to_csv(_output,sep = "\t")

## Step 1 Alternative: Remove adaptor through `Trimmomatic`

In [1]:
[trimmomatic_trim_adaptor]
# Path to the software. Default set to using our rna_quantification.sif image
parameter: trimmomatic_jar = "/opt/Trimmomatic-0.39/trimmomatic-0.39.jar"
# Illumina clip setting
# Path to the reference adaptors
parameter: fasta_with_adapters_etc = path(".")
parameter: seed_mismatches = 2
parameter: palindrome_clip_threshold = 30
parameter: simple_clip_threshold = 10
# sliding window setting
parameter: window_size = 4
parameter: required_quality = 20
# Other settings
parameter: leading = 3
parameter: trailing = 3
parameter: min_len = 50
input: fastq, group_by = 2, group_with = "sample_id"
output: fq_1 = f'{cwd}/{_sample_id}_paired_{_input[0]:bn}.gz',
        fq_1_up = f'{cwd}/{_sample_id}_unpaired_{_input[0]:bn}.gz',
        fq_2 = f'{cwd}/{_sample_id}_paired_{_input[1]:bn}.gz',
        fq_2_up = f'{cwd}/{_sample_id}_unpaired_{_input[1]:bn}.gz'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    java -jar -Xmx${java_mem} ${trimmomatic_jar} PE -threads ${numThreads} \
        ${_input[0]} \
        ${_input[1]} \
        ${_output[0]} \
        ${_output[1]} \
        ${_output[2]} \
        ${_output[3]} \
        ILLUMINACLIP:${fasta_with_adapters_etc}:${seed_mismatches}:${palindrome_clip_threshold}:${simple_clip_threshold} \
        LEADING:${leading} TRAILING:${trailing} SLIDINGWINDOW:${window_size}:${required_quality} MINLEN:${min_len}

## Step 2: Alignment through `STAR`

In [1]:
[STAR_align_1]
# Reference gene model
parameter: gtf = path
# STAR indexing file
parameter: STAR_index = path
parameter: outFilterMultimapNmax = 20 
parameter: alignSJoverhangMin = 8 
parameter: alignSJDBoverhangMin = 1 
parameter: outFilterMismatchNmax = 999 
parameter: outFilterMismatchNoverLmax = 0.1
parameter: alignIntronMin = 20 
parameter: alignIntronMax = 1000000 
parameter: alignMatesGapMax = 1000000 
parameter: outFilterType =  "BySJout" 
parameter: outFilterScoreMinOverLread = 0.33 
parameter: outFilterMatchNminOverLread = 0.33 
parameter: limitSjdbInsertNsj = 1200000 
parameter: outSAMstrandField = "intronMotif" 
parameter: outFilterIntronMotifs = "None" 
parameter: alignSoftClipAtReferenceEnds = "Yes" 
parameter: quantMode = ["TranscriptomeSAM", "GeneCounts"]
parameter: outSAMattrRGline = ["ID:rg1", "SM:sm1"]
parameter: outSAMattributes = ["NH", "HI", "AS", "nM", "NM", "ch"] 
parameter: chimSegmentMin = 15 
parameter: chimJunctionOverhangMin = 15 
parameter: chimOutType = ["Junctions", "WithinBAM", "SoftClip"]
parameter: chimMainSegmentMultNmax = 1 
parameter: sjdbOverhang = 0
if int(mem.replace("G","")) <  40:
    print("Insufficent memory for STAR, changing to 40G")
    star_mem = '40G'
else:
    star_mem = mem
# This option is commented out because it will force the downstream analysis to use 40G, which significantlly slow down the process.
input: fastq,group_by = is_paired_end + 1,group_with = {"sample_id","read_length"}
output: cord_bam = f'{cwd}/{_sample_id}.Aligned.sortedByCoord.out.bam',
        trans_bam = f'{cwd}/{_sample_id}.Aligned.toTranscriptome.out.bam'
if sjdbOverhang == 0:
    print("Using read length specified in the sample list")
else:
    print("Using specified --sjdbOverhang as read length")
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = star_mem, cores = numThreads
bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    rm -rf ${cwd}/${_sample_id}.*.out.*.gz
    rm -rf ${cwd}/${_sample_id}._STARpass1
    set -e
    touch ${_output[0]:n}.star_start.timestamp
    STAR --runMode alignReads \
        --runThreadN ${numThreads} \
        --genomeDir  ${STAR_index} \
        --readFilesIn  ${_input:r} \
        --readFilesCommand ${"cat" if uncompressed else "zcat"} \
        --outFileNamePrefix ${_output[0]:nnnn}. \
        --outSAMstrandField ${outSAMstrandField} \
        --twopassMode Basic \
        --outFilterMultimapNmax ${outFilterMultimapNmax} \
        --alignSJoverhangMin ${alignSJoverhangMin} \
        --alignSJDBoverhangMin ${alignSJDBoverhangMin} \
        --outFilterMismatchNmax ${outFilterMismatchNmax} \
        --outFilterMismatchNoverLmax ${outFilterMismatchNoverLmax} \
        --alignIntronMin ${alignIntronMin} \
        --alignIntronMax ${alignIntronMax} \
        --alignMatesGapMax ${alignMatesGapMax} \
        --outFilterType ${outFilterType} \
        --outFilterScoreMinOverLread ${outFilterScoreMinOverLread} \
        --outFilterMatchNminOverLread ${outFilterMatchNminOverLread} \
        --limitSjdbInsertNsj ${limitSjdbInsertNsj} \
        --outFilterIntronMotifs ${outFilterIntronMotifs} \
        --alignSoftClipAtReferenceEnds ${alignSoftClipAtReferenceEnds} \
        --quantMode ${" ".join(quantMode)} \
        --outSAMtype BAM Unsorted \
        --outSAMunmapped Within \
        --genomeLoad NoSharedMemory \
        --chimSegmentMin ${chimSegmentMin} \
        --chimJunctionOverhangMin ${chimJunctionOverhangMin} \
        --chimOutType ${" ".join(chimOutType)} \
        --chimMainSegmentMultNmax ${chimMainSegmentMultNmax} \
        --chimOutJunctionFormat 0 \
        --outSAMattributes ${" ".join(outSAMattributes)} \
        --outSAMattrRGline ${" ".join(outSAMattrRGline)} \
        --sjdbOverhang ${sjdbOverhang if sjdbOverhang != 0 else _read_length} \
        --sjdbGTFfile ${gtf} \

    rm -r ${_output[0]:nnnn}._STARgenome
    rm -r ${_output[0]:nnnn}._STARtmp
    touch ${_output[0]:n}.sort_start.timestamp
    samtools sort --threads ${numThreads} -o ${_output[0]:nnnn}.Aligned.sortedByCoord.out.bam ${_output[0]:nnnn}.Aligned.out.bam # According to GTEX, this can help reducing the amount of memory consumption.
    rm ${_output[0]:nnnn}.Aligned.out.bam 
    samtools index ${_output[0]}

In [None]:
[strand_detected_1: shared="step_strand_detected"]
input: output_from("STAR_align_1")["trans_bam"], group_with = "sample_id"
import pandas as pd
ReadsPerGene = pd.read_csv(f'{cwd}/{_sample_id}.ReadsPerGene.out.tab' , sep = "\t" , header = None )
ReadsPerGene_list = ReadsPerGene.loc[3::,].sum(axis = 0, numeric_only = True).values.tolist()
strand_percentage = [x/ReadsPerGene_list[0] for x in ReadsPerGene_list]
print(f'for sample {_sample_id}')
if strand_percentage[1] > 0.9:
    print(f'Counts for the 1st read strand aligned with RNA is {strand_percentage[1]}, > 90% of aligned count')
    print('Data is likely FR/fr-secondstrand')
    strand_detected = "fr"
elif  strand_percentage[2] > 0.9:
    print(f'Counts for the 2nd read strand aligned with RNA is {strand_percentage[2]}, > 90% of aligned count')
    print('Data is likely RF/fr-firststrand')
    strand_detected = "rf"
elif max( strand_percentage[1],  strand_percentage[2]) < 0.6:
    print(f'Both {strand_percentage[1]} and  {strand_percentage[2]} are under 60% of reads explained by one direction')
    print('Data is likely unstranded')
    strand_detected = "unstranded"
else:
    print(f'Data does not fall into a likely stranded (max percent explained {max( strand_percentage[1],  strand_percentage[2])} > 0.9) or unstranded layout (max percent explained {max( strand_percentage[1],  strand_percentage[2])} < 0.6)')
    sys.exit('Exiting, please check your data and manually specified the ``--strand`` option as ``fr``, ``rf`` or ``unstranded``')

In [None]:
[strand_detected_2: shared="strand"]
input: group_by = "all"
parameter: strand = ""
import pandas as pd
if not strand:
    if len(strand_inv) > 0:
        strand = strand_inv
        for i in range(0,len(strand)):
            if strand[i] == "strand_missing":
                strand[i] = step_strand_detected[i]
        print(f'Using strand specified in the input samples list {strand}, replacing strand_missing with detected strand')
    else:
        warn_if(not all(x is step_strand_detected[0] for x in step_strand_detected), msg = "strands detected are different among samples, please check your protocol, we will use the detected strand for each samples")    
        strand = step_strand_detected
        print(f'Using detected strand for each samples {strand}')
else:
    stop_if(strand not in ["fr", "rf", "unstranded"], msg = "``--strand`` option should be ``fr``, ``rf`` or ``unstranded``")
    print(f'Using ``--strand`` overwrite option for all the samples {strand[0]}') 
    strand = [strand] * len(step_strand_detected)

## Step 3: Mark duplicates reads & QC through `Picard`

In [None]:
[picard_qc]
# Reference gene model
parameter: gtf = path
depends: sos_variable('strand')
# Path to flat reference file, for computing QC metric
parameter: ref_flat = path
# Path to the software. Default set to using our rna_quantification.sif image
parameter: picard_jar = "/opt/picard-tools/picard.jar"
# The fasta reference file used to generate star index
parameter: reference_fasta = path
# For the patterned flowcell models (HiSeq X), change to 2500
parameter: optical_distance = 100
picard_strand_dict = {"rf":"SECOND_READ_TRANSCRIPTION_STRAND","fr": "FIRST_READ_TRANSCRIPTION_STRAND","unstranded":"NONE" }
input: output_from("STAR_align_1"),group_by = 2, group_with = {"sample_id","strand"}
output: picard_metrics = f'{cwd}/{_sample_id}.alignment_summary_metrics',
        picard_rna_metrics = f'{cwd}/{_sample_id}.rna_metrics',
        md_bam = f'{cwd}/{_sample_id}.Aligned.sortedByCoord.out.md.bam',
        md_metrics = f'{cwd}/{_sample_id}.Aligned.sortedByCoord.md.metrics'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
        set -e
        touch ${_output[0]:n}.CollectMultipleMetrics_start.timestamp
        java -jar -Xmx${java_mem} ${picard_jar} CollectMultipleMetrics \
            -REFERENCE_SEQUENCE ${reference_fasta} \
            -PROGRAM CollectAlignmentSummaryMetrics \
            -PROGRAM CollectInsertSizeMetrics \
            -PROGRAM QualityScoreDistribution \
            -PROGRAM MeanQualityByCycle \
            -PROGRAM CollectBaseDistributionByCycle \
            -PROGRAM CollectGcBiasMetrics \
            -VALIDATION_STRINGENCY STRICT \
            -INPUT  ${_input["cord_bam"]} \
            -OUTPUT  ${_output[0]:n}

bash: container=container, expand= "${ }", stderr = f'{_output[-1]:n}.stderr', stdout = f'{_output[-1]:n}.stdout'
        set -e
        touch ${_output[2]:n}.MarkDuplicates_start.timestamp
        java -Xmx${java_mem} -jar ${picard_jar} MarkDuplicates \
            -I ${_input["cord_bam"]}  \
            -O ${_output[2]} \
            -PROGRAM_RECORD_ID null \
            -M ${_output[3]} \
            -TMP_DIR ${cwd}\
            -MAX_RECORDS_IN_RAM 500000 -SORTING_COLLECTION_SIZE_RATIO 0.25 \
            -ASSUME_SORT_ORDER coordinate \
            -TAGGING_POLICY DontTag \
            -OPTICAL_DUPLICATE_PIXEL_DISTANCE ${optical_distance} \
            -CREATE_INDEX true \
            -CREATE_MD5_FILE true \
            -VALIDATION_STRINGENCY STRICT \
            -REMOVE_SEQUENCING_DUPLICATES false \
            -REMOVE_DUPLICATES false 

bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.rna_metrics.stderr', stdout = f'{_output[0]:n}.rna_metrics.stderr'
        set -e
        # Get only line with rRNA and transcript_id
        cat ${gtf}| grep rRNA | grep transcript_id > ${gtf}.tmp.${_input["cord_bam"]:bnnnn}  # To Avoid data racing problem
        samtools view -H ${_input["cord_bam"]}  > ${_input["cord_bam"]}.RI  

python: container=container, expand= "${ }", stderr = f'{_output[0]:n}.rna_metrics.stderr', stdout = f'{_output[0]:n}.rna_metrics.stderr'
        import pandas as pd
        from collections import defaultdict
        chrom = []
        start = []
        end = []
        strand = []
        tag = []
        annotation_gtf = "${gtf}.tmp.${_input["cord_bam"]:bnnnn}"
        with open(annotation_gtf, 'r') as gtf:
            for row in gtf:
                row = row.strip().split('\t')
                if row[0][0]=='#' or row[2]!="transcript": continue # skip header
                chrom.append(row[0])
                start.append(row[3])
                end.append(row[4])
                strand.append(row[6])
                attributes = defaultdict()
                for a in row[8].replace('"', '').split(';')[:-1]:
                    kv = a.strip().split(' ')
                    if kv[0]!='tag':
                        attributes[kv[0]] = kv[1]
                    else:
                        attributes.setdefault('tags', []).append(kv[1])
                tag.append(attributes)
        transcript_id = [x["transcript_id"] for x in tag]
        RI = pd.DataFrame(data={'chr':chrom, 'start':start, 'end':end, 'strand':strand,'transcript_id' : transcript_id })
        RI.to_csv("${_input["cord_bam"]}.RI", index = 0, header = 0, mode = "a",sep = "\t" )

bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.rna_metrics.stderr', stdout = f'{_output[0]:n}.rna_metrics.stdout'
        set -e
        touch ${_output[0]:n}.CollectRnaSeqMetrics_start.timestamp
        java -jar -Xmx${java_mem} ${picard_jar} CollectRnaSeqMetrics \
            -REF_FLAT ${ref_flat} \
            -RIBOSOMAL_INTERVALS ${_input["cord_bam"]}.RI \
            -STRAND_SPECIFICITY ${picard_strand_dict[_strand]} \
            -CHART_OUTPUT ${_output[0]:n}.rna_metrics.pdf \
            -VALIDATION_STRINGENCY STRICT \
            -INPUT ${_input["cord_bam"]}  \
            -OUTPUT  ${_output[0]:n}.rna_metrics
        rm ${gtf}.tmp.${_input["cord_bam"]:bnnnn}

_input["cord_bam"].zap()

In [None]:
[STAR_output]
input: output_from("picard_qc"),  group_by = "all"
depends: sos_variable("strand")
output: f'{cwd}/{samples:bn}_bam_list'
import pandas as pd
coord_bam_list = [f'{x:b}' for x in _input[2::4]]
SJ_list = [f'{x:bnnnnn}.SJ.out.tab' for x in _input[2::4]]
Trans_bam_list = [f'{x:bnnnn}.toTranscriptome.out.bam' for x in _input[2::4]]
print(SJ_list)
out = pd.DataFrame({"sample_id" : sample_id,"strand" : strand , "coord_bam_list" : coord_bam_list, "SJ_list" :SJ_list,"trans_bam_list" : Trans_bam_list  })
out.to_csv(_output,sep = "\t",index = False)

## Step 4: Post aligment QC through `RNA-SeQC`

In [10]:
[rnaseqc_call_1]
import os
import pandas as pd
parameter: bam_list = path
# Reference gene model
parameter: gtf = path
sample_inv = pd.read_csv(bam_list,"\t")

## Extract strand information if user have specified the strand
strand_list = sample_inv.strand.values.tolist()
stop_if(not all([x in ["fr", "rf", "unstranded"] for x in strand_list ]), msg = "strand columns should only include ``fr``, ``rf`` or ``unstranded``, please check the bam_list")     
## Extract sample_id
sample_id = sample_inv.sample_id.values.tolist()
    
## Get the file name for cood_bam data
coord_bam_list_inv = sample_inv.coord_bam_list.values.tolist()
coord_bam_list = [f'{cwd}/{x}' for x in coord_bam_list_inv ]
input:  coord_bam_list, group_by = 1, group_with = {"sample_id","strand_list"}
output: f'{cwd}/{_sample_id}.rnaseqc.gene_tpm.gct.gz',
        f'{cwd}/{_sample_id}.rnaseqc.gene_reads.gct.gz',
        f'{cwd}/{_sample_id}.rnaseqc.exon_reads.gct.gz',
        f'{cwd}/{_sample_id}.rnaseqc.metrics.tsv'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    cd ${cwd} && \
    run_rnaseqc.py \
        ${gtf:a} \
        ${_input:a} \
        ${_sample_id}.rnaseqc \
        -o ./ \
        ${("--stranded " + _strand_list) if _strand_list != "unstranded" else ""}

In [None]:
[rnaseqc_call_2]
input: group_by = "all"
output: f'{cwd}/{samples:bn}.rnaseqc.gene_tpm.gct.gz',
        f'{cwd}/{samples:bn}.rnaseqc.gene_readsCount.gct.gz',
        f'{cwd}/{samples:bn}.rnaseqc.exon_readsCount.gct.gz',
        f'{cwd}/{samples:bn}.rnaseqc.metrics.tsv'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
python: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    import pandas as pd
    import os
    def make_gct(gct_path):
        # sample_name
        sample_name = ".".join(os.path.basename(gct_path).split(".")[:-4])
        # read_input
        pre_gct = pd.read_csv(gct_path,sep = "\t",
                              skiprows= 2,index_col="Name").drop("Description",axis = 1)
        pre_gct.index.name = "gene_ID"
        pre_gct.columns = [sample_name]
        return(pre_gct)

    def merge_gct(gct_path_list):
        gct = pd.DataFrame()
        for gct_path in gct_path_list:
            #check duplicated indels and remove them.
            gct_col = make_gct(gct_path)
            gct = gct.merge(gct_col,right_index=True,left_index = True,how = "outer")
        return gct

    input_list = [${_input:r,}]
    tpm_list = input_list[0::4]
    gc_list = input_list[1::4]
    ec_list = input_list[2::4]
    gct_path_list_list = [tpm_list,gc_list,ec_list]
    output_path = [${_output:r,}][0:3]
    for i in range(0,len(output_path)):
        output = merge_gct(gct_path_list_list[i])
        output.to_csv(output_path[i], sep = "\t")
    metrics_list = input_list[3::4]
    with open("${cwd}/${samples:bn}.rnaseqc.metrics_output_list", "w") as f:
        f.write('\n'.join(metrics_list))

bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    aggregate_rnaseqc_metrics.py  ${_output[3]:n}_output_list ${_output[3]:nn}

## Step 6: Quantify expression through `RSEM`

In [12]:
[rsem_call_1]
parameter: RSEM_index = path
parameter: max_frag_len = 1000
estimate_rspd = True

parameter: bam_list = path

sample_inv = pd.read_csv(bam_list,"\t")

## Extract strand information if user have specified the strand
strand_list = sample_inv.strand.values.tolist()
stop_if(not all([x in ["fr", "rf", "unstranded"] for x in strand_list ]), msg = "strand columns should only include ``fr``, ``rf`` or ``unstranded``, please check the bam_list")     
## Extract sample_id
sample_id = sample_inv.sample_id.values.tolist()
    
## Get the file name for trans_bam_list data
trans_bam_list_inv = sample_inv.trans_bam_list.values.tolist()
trans_bam_list = [f'{cwd}/{x}' for x in trans_bam_list_inv ]
input: trans_bam_list ,  group_by = 1, group_with = {"sample_id","strand_list"} 
output: f'{cwd}/{_sample_id}.rsem.isoforms.results', f'{cwd}/{_sample_id}.rsem.genes.results',f'{cwd}/{_sample_id}.rsem.stat/{_sample_id}.rsem.cnt'
task: trunk_workers = 1, walltime = walltime, mem = mem, cores = numThreads, trunk_size = job_size
bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    run_RSEM.py ${RSEM_index:a} ${_input:a} ${_sample_id} \
        -o ${_output[0]:d} \
        --max_frag_len ${max_frag_len} \
        --estimate_rspd ${'true' if estimate_rspd else 'false'} \
        --paired_end ${"true" if is_paired_end else "false"} \
        --is_stranded ${"true" if _strand_list != "unstranded" else "false"} \
        --threads ${numThreads}

In [None]:
[rsem_call_2]
input: group_by = "all"
output: f'{cwd}/{samples:bn}.rsem_transcripts_expected_count.txt.gz',
        f'{cwd}/{samples:bn}.rsem_transcripts_tpm.txt.gz',
        f'{cwd}/{samples:bn}.rsem_transcripts_fpkm.txt.gz',
        f'{cwd}/{samples:bn}.rsem_transcripts_isopct.txt.gz',
        f'{cwd}/{samples:bn}.rsem_genes_expected_count.txt.gz',
        f'{cwd}/{samples:bn}.rsem_genes_tpm.txt.gz',
        f'{cwd}/{samples:bn}.rsem_genes_fpkm.txt.gz',
        f'{cwd}/{samples:bn}.rsem.aggregated_quality.metrics.tsv'
task: trunk_workers = 1, walltime = walltime, mem = mem, cores = numThreads, trunk_size = job_size
python: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    input_list = [${_input:r,}]
    with open('${cwd}/${samples:bn}.rsem.isoforms_output_list', "w") as f:
        f.write('\n'.join(input_list[0::3]))
    with open('${cwd}/${samples:bn}.rsem.genes_output_list', "w") as f:
        f.write('\n'.join(input_list[1::3]))
bash: container=container, expand= "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
         aggregate_rsem_results.py ${cwd}/${samples:bn}.rsem.isoforms_output_list {expected_count,TPM,FPKM,IsoPct} ${_output[0]:nnn}
         aggregate_rsem_results.py ${cwd}/${samples:bn}.rsem.genes_output_list {expected_count,TPM,FPKM} ${_output[1]:nnn} 

R:  container=container, expand= "${ }", stderr = f'{_output[-1]:n}.stderr', stdout = f'{_output[-1]:n}.stdout'
     readRSEM.cnt <- function (source) {
            # RSEM .cnt files gives statistics about the (transcriptome) alignment passed to RSEM:
            # Row 1: N0 (# unalignable reads);
            #        N1 (# alignable reads);
            #        N2 (# filtered reads due to too many alignments);
            #        N_tot (N0+N1+N2)
            # Row 2: nUnique (# reads aligned uniquely to a gene);
            #        nMulti (# reads aligned to multiple genes);
            #        nUncertain (# reads aligned to multiple locations in the given reference sequences, which include isoform-level multi-mapping reads)
            # Row 3: nHits (# total alignments);
            #        read_type (0: single-end read, no quality; 1: single-end read, with quality score; 2: paired-end read, no quality score; 3: paired-end read, with quality score)
            # Source: https://groups.google.com/forum/#!topic/rsem-users/usmPKgsC5LU
            # Note: N1 = nUnique + nMulti

            stopifnot(file.exists(source[1]))
            isDir <- file.info(source)$isdir
            if (isDir) {
                files <- system(paste("find", source, "-name \"*.rsem.cnt\""), intern=TRUE)
                stopifnot(length(files) > 0)
                samples <- gsub("_rsem.cnt", "", basename(files), fixed=TRUE)
            } else {
                files <- source
                samples <- gsub(".rsem.cnt", "", basename(files), fixed=TRUE)
            }
            metrics <- list()
            for (i in 1:length(files)) {
                m <- read.table(files[i], header=FALSE, sep=" ", comment.char="#", stringsAsFactors=FALSE, nrows=3, fill=TRUE)
                metrics[[i]] <- data.frame(Sample=samples[i],
                File=files[i],
                TotalReads=m[1, 4],
                AlignedReads=m[1, 2],
                UniquelyAlignedReads=m[2, 1],
                stringsAsFactors=FALSE)
            }
            metrics <- do.call(rbind, metrics)
            row.names(metrics) <- metrics$Sample

            return(metrics)
        }
        sourceRSEM = c(${_input:r,})
        sourceRSEM = sourceRSEM[seq(3,length(sourceRSEM),3)]
        metrics.RSEM = readRSEM.cnt(sourceRSEM)
        write.table(metrics.RSEM, file="${_output[-1]}",col.names=TRUE, row.names=FALSE, quote=FALSE)

## Step 7: summarize with MultiQC

In [None]:
[rsem_call_3,rnaseqc_call_3]
output: f'{cwd}/{samples:bn}.multiqc_report.html'
task: trunk_workers = 1, walltime = walltime, mem = mem, cores = numThreads, trunk_size = job_size
report: output = f"{_output:n}.multiqc_config.yml"
  extra_fn_clean_exts:
      - '_rsem'
  fn_ignore_dirs:
      - '*_STARpass1'
bash:  container=container,expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout' 
    multiqc ${_input:d} -v -n ${_output:b} -o ${_output:d} -c ${_output:n}.multiqc_config.yml

In [None]:
[rsem_call_4, rnaseqc_call_4]
# Path to flat reference file, for computing QC metric
output: f'{cwd}/{samples:b}.picard.aggregated_quality.metrics.tsv'
task: trunk_workers = 1, walltime = walltime, mem = mem, cores = numThreads, trunk_size = job_size
R: container=container, expand= "${ }", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    ## Define Function
    readPicard.alignment_summary_metrics <- function (source) {
      stopifnot(length(source) == 1)
      stopifnot(file.exists(source))
      isDir <- file.info(source)$isdir
      if (isDir) {
        files <- system(paste("find -L", source, "-name \"*.alignment_summary_metrics\""), intern=TRUE)
        stopifnot(length(files) > 0)
        samples <- gsub(".alignment_summary_metrics", "", basename(files), fixed=TRUE)
      } else {
        files <- source
        samples <- gsub(".alignment_summary_metrics", "", basename(files), fixed=TRUE)
      }
    
      metrics <- list()
      for (i in 1:length(files)) {
        m <- read.table(files[i], header=TRUE, sep="\t", comment.char="#", stringsAsFactors=FALSE, nrows=2)
        metrics[[i]] <- data.frame(Sample=samples[i],
                                   File=files[i],
                                   PF_READS=sum(m$PF_READS[1:2]),
                                   PF_READS_ALIGNED=sum(m$PF_READS_ALIGNED[1:2]),
                                   PCT_PF_READS_ALIGNED=sum(m$PF_READS_ALIGNED[1:2])/sum(m$PF_READS[1:2]),
                                   stringsAsFactors=FALSE)
      }
      metrics <- do.call(rbind, metrics)
      row.names(metrics) <- metrics$Sample
    
      return(metrics)
    }

    readPicard.rna_metrics <- function(source) {

      stopifnot(length(source) == 1)
      stopifnot(file.exists(source))
      isDir <- file.info(source)$isdir
      if (isDir) {
        files <- system(paste("find -L", source, "-name \"*.rna_metrics\""), intern=TRUE)
        stopifnot(length(files) > 0)
        samples <- gsub(".rna_metrics", "", basename(files), fixed=TRUE)
      } else {
        files <- source
        samples <- gsub(".rna_metrics", "", basename(files), fixed=TRUE)
      }

      metrics <- list()
      for (i in 1:length(files)) {
        m <- read.table(files[i], header=TRUE, sep="\t", comment.char="#", stringsAsFactors=FALSE, nrows=1)
        metrics[[i]] <- data.frame(Sample=samples[i],
                                   File=files[i],
                                   PCT_RIBOSOMAL_BASES=m$PCT_RIBOSOMAL_BASES,
                                   PCT_CODING_BASES=m$PCT_CODING_BASES,
                                   PCT_UTR_BASES=m$PCT_UTR_BASES,
                                   PCT_INTRONIC_BASES=m$PCT_INTRONIC_BASES,
                                   PCT_INTERGENIC_BASES=m$PCT_INTERGENIC_BASES,
                                   PCT_MRNA_BASES=m$PCT_MRNA_BASES,
                                   PCT_USABLE_BASES=m$PCT_USABLE_BASES,
                                   MEDIAN_CV_COVERAGE=m$MEDIAN_CV_COVERAGE,
                                   MEDIAN_5PRIME_BIAS=m$MEDIAN_5PRIME_BIAS,
                                   MEDIAN_3PRIME_BIAS=m$MEDIAN_3PRIME_BIAS,
                                   MEDIAN_5PRIME_TO_3PRIME_BIAS=m$MEDIAN_5PRIME_TO_3PRIME_BIAS,
                                   stringsAsFactors=FALSE)
      }
      metrics <- do.call(rbind, metrics)
      row.names(metrics) <- metrics$Sample
    
      return(metrics)
    }


    readPicard.duplicate_metrics <- function(source) {

      stopifnot(length(source) == 1)
      stopifnot(file.exists(source))
      isDir <- file.info(source)$isdir
      if (isDir) {
        files <- system(paste("find -L", source, "-name \"*.Aligned.sortedByCoord.md.metrics\""), intern=TRUE)
        stopifnot(length(files) > 0)
        samples <- gsub(".Aligned.sortedByCoord.md.metrics", "", basename(files), fixed=TRUE)
      } else {
        files <- source
        samples <- gsub(".Aligned.sortedByCoord.md.metrics", "", basename(files), fixed=TRUE)
      }

      metrics <- list()
      for (i in 1:length(files)) {
        m <- read.table(files[i], header=TRUE, sep="\t", comment.char="#", stringsAsFactors=FALSE, nrows=1)
        metrics[[i]] <- data.frame(Sample=samples[i],
                                   File=files[i],
                                   PERCENT_DUPLICATION=m$PERCENT_DUPLICATION,
                                   ESTIMATED_LIBRARY_SIZE=m$ESTIMATED_LIBRARY_SIZE,
                                   stringsAsFactors=FALSE)
      }
      metrics <- do.call(rbind, metrics)
      row.names(metrics) <- metrics$Sample
    
      return(metrics)
    }

    readPicard.wgs_metrics <- function (source) {

      stopifnot(length(source) == 1)
      stopifnot(file.exists(source))
      isDir <- file.info(source)$isdir
      if (isDir) {
        files <- system(paste("find -L", source, "-name \"*.wgs_metrics\""), intern=TRUE)
        stopifnot(length(files) > 0)
        samples <- gsub(".wgs_metrics", "", basename(files), fixed=TRUE)
      } else {
        files <- source
        samples <- gsub(".wgs_metrics", "", basename(files), fixed=TRUE)
      }

      metrics <- list()
      for (i in 1:length(files)) {
        m <- read.table(files[i], header=TRUE, sep="\t", comment.char="#", stringsAsFactors=FALSE, nrows=1)
        metrics[[i]] <- data.frame(Sample=samples[i],
                                   File=files[i],
                                   MEDIAN_COVERAGE=m$MEDIAN_COVERAGE,
                                   MAD_COVERAGE=m$MAD_COVERAGE,
                                   PCT_EXC_DUPE=m$PCT_EXC_DUPE,
                                   PCT_EXC_TOTAL=m$PCT_EXC_TOTAL,
                                   stringsAsFactors=FALSE)
      }
      metrics <- do.call(rbind, metrics)
      row.names(metrics) <- metrics$Sample

      return(metrics)
    }


    readPicard.insert_size_metrics <- function (source) {

      stopifnot(length(source) == 1)
      stopifnot(file.exists(source))
      isDir <- file.info(source)$isdir
      if (isDir) {
        files <- system(paste("find -L", source, "-name \"*.insert_size_metrics\""), intern=TRUE)
        stopifnot(length(files) > 0)
        samples <- gsub(".insert_size_metrics", "", basename(files), fixed=TRUE)
      } else {
        files <- source
        samples <- gsub(".insert_size_metrics", "", basename(files), fixed=TRUE)
      }

      metrics <- list()
      for (i in 1:length(files)) {
        m <- read.table(files[i], header=TRUE, sep="\t", comment.char="#", stringsAsFactors=FALSE, nrows=1)
        metrics[[i]] <- data.frame(Sample=samples[i],
                                   File=files[i],
                                   MEDIAN_INSERT_SIZE=m$MEDIAN_INSERT_SIZE,
                                   MODE_INSERT_SIZE=m$MODE_INSERT_SIZE,
                                   MEDIAN_ABSOLUTE_DEVIATION=m$MEDIAN_ABSOLUTE_DEVIATION,
                                   stringsAsFactors=FALSE)
      }
      metrics <- do.call(rbind, metrics)
      row.names(metrics) <- metrics$Sample
    
      return(metrics)
    }

    readPicard.gc_bias.summary_metrics <- function (source) {

      stopifnot(length(source) == 1)
      stopifnot(file.exists(source))
      isDir <- file.info(source)$isdir
      if (isDir) {
        files <- system(paste("find -L", source, "-name \"*.gc_bias.summary_metrics\""), intern=TRUE)
        stopifnot(length(files) > 0)
        samples <- gsub(".gc_bias.summary_metrics", "", basename(files), fixed=TRUE)
      } else {
        files <- source
        samples <- gsub(".gc_bias.summary_metrics", "", basename(files), fixed=TRUE)
      }

      metrics <- list()
      for (i in 1:length(files)) {
        m <- read.table(files[i], header=TRUE, sep="\t", comment.char="#", stringsAsFactors=FALSE, nrows=1)
        metrics[[i]] <- data.frame(Sample=samples[i],
                                   File=files[i],
                                   AT_DROPOUT=m$AT_DROPOUT,
                                   GC_DROPOUT=m$GC_DROPOUT,
                                   stringsAsFactors=FALSE)
      }
      metrics <- do.call(rbind, metrics)
      row.names(metrics) <- metrics$Sample

      return(metrics)
    }




    readPicard <- function(source) {
      metrics.aln <- readPicard.alignment_summary_metrics(source)
      metrics.rna <- readPicard.rna_metrics(source)
      metrics.dup <- readPicard.duplicate_metrics(source)

      stopifnot(all(row.names(metrics.aln) %in% row.names(metrics.rna)) &
                all(row.names(metrics.rna) %in% row.names(metrics.dup)) &
                all(row.names(metrics.dup) %in% row.names(metrics.aln)))

      metrics.aln$File <- NULL
      metrics.rna$File <- NULL
      metrics.dup$File <- NULL
      metrics.rna$Sample <- NULL
      metrics.dup$Sample <- NULL

      metrics <- cbind(metrics.aln, metrics.rna[row.names(metrics.aln), ])
      metrics <- cbind(metrics, metrics.dup[row.names(metrics.aln), ])

      return(metrics)
    }

    ## Execution  
    sourcePicard = ${_input[-1]:dr}

    Picard_qualityMetrics <- readPicard(sourcePicard)
    write.table(Picard_qualityMetrics, file="${_output}",col.names=TRUE, row.names=FALSE, quote=FALSE)