# Table of Contents
* [Nextflow](#Nextflow)
    * [Tutorial](#Running-the-nextflow-tutorial)
    * [Requirements](#Requirements)
    * [script1.nf](#script1.nf)
    * [script2.nf](#script2.nf)
    * [script3.nf](#script3.nf)
    * [script4.nf](#script4.nf)
    * [script5.nf](#script5.nf)
    * [script6.nf](#script6.nf)
    * [script7.nf](#script7.nf)
    * [Own Scripts](#Using-custom-scripts)
    * [Executors](#Managing-execution-with-executors)
    * [Modularization](#Workflow-modularization)

# Nextflow

`Nextflow` is a workflow programming language, based on `Groovy`, based on `Java`. Therefore, it supports general programming as well as has specific constructs for specifying and running workflows. A introductory course on the language can be found at https://carpentries-incubator.github.io/workflows-nextflow/aio/index.html. In addition, the developers have prepared a tutorial which I have commented below.

# Running the nextflow tutorial

## Requirements

### Tutorial pulled from github

https://github.com/seqeralabs/nextflow-tutorial

### Docker 
Docker is required for running the workflow, as it uses images of various software such as `salmon`. Docker can be downloaded from Dockerhub and installed in Applications. Then a docker daemon, necessary for pulling images from Dockerhub, can be started by double-clicking on the Docker application.

### Nextflow scripting

**Note:** nextflow uses the `groovy` programming language (http://groovy-lang.org/documentation.html), which is based on `Java`.

Info on nextflow scripting can be found https://www.nextflow.io/docs/latest/script.html. 

## script1.nf

Looks like this:

```
/*                                                                                                                     
* pipeline input parameters                                                                                           
*/

params.reads = "$baseDir/data/ggal/gut_{1,2}.fq"                                                                       
params.transcript = "$baseDir/data/ggal/transcriptome.fa"                                                              
params.multiqc = "$baseDir/multiqc"                                                                                    
params.outdir = "$baseDir/output"                                                                                      
                                                                                                                       
/* println "reads: $params.reads" */                                                                                   
                                                                                                                       
log.info """\                                                                                                          
 R N A S E Q - N F   P I P E L I N E                                                                                   
 ===================================                                                                                   
 transcriptome: ${params.transcript}                                                                                   
 reads        : ${params.reads}                                                                                        
 outdir       : ${params.outdir}                                                                                       
 """                                                                                                                   
```
Parameters are defined as part of a *params* structure (which is a name scope), thus params. will be the common prefix of these parameters. **Note:** variable interpolation is allowed **within double-quoted strings**. That is, although both single and double quotes can be used to denote strings, variable interpolation is only possible with the latter.

`$baseDir` seems to be the directory from which the script is run. It is only one of the implicit variables defined in the global scope of the script, the list including:

* baseDir - directory where the main workflow script is located (deprecated in favor of projectDir).
* launchDir - directory where the workflow is run (requires version 20.04.0 or later).
* moduleDir - directory where a module script is located for DSL2 modules or the same as projectDir for a non-module script (requires version 20.04.0 or later).
* nextflow - dictionary-like object representing nextflow runtime information (have to check Nextflow metadata).
* params - dictionary-like object holding workflow parameters specified in the config file or as commandline options.
* projectDir - directory where the main script is located (requires version 20.04.0 or later).
* workDir - directory where tasks temporary files are created.
* workflow - dictionary-like object representing workflow runtime information (have to check Runtime metadata).

Once a parameter is defined, it can be used by name in the nextflow script, and it can be dereferenced within string variables as `${varname}`.

Comments have **C**-type syntax.

Two types of print commands were used in the script, `println` (print line) and `log.info`, which is used to print multi-line strings to standard output. The syntax uses triple quotes, `"""`, like in Python.

## script2.nf

Builds onto the first script, adding the following lines:

```
/*                                                                                                                     
 * define the `index` process that create a binary index                                                               
 * given the transcriptome file                                                                                       
 */                                                                                                                    
process index {                                                                                                                                                                                                                             
    input:                                                                                                             
    path transcriptome from params.transcript                                                                         
                                                                                                                       
    output:                                                                                                           
    path 'index' into index_ch                                                                                         
                                                                                                                       
    script:                                                                                                           
    """                                                                                                               
    salmon index --threads $task.cpus -t $transcriptome -i index                                                       
    """                                                                                                                                                                                                                                     
}                                                                                                                                                                                                                                           
index_ch.view { "Index found it $it" }                                                                                 

```

It shows how a `process` is defined, essentially as a block of code in Python, delimited by `{}`. Within the *process*, there are sections introduced by keywords, specifically
* input
* output
* script

#### Syntax of sections in the process definition:
```
input:
  <input qualifier> <input name> [from <source channel>] [attributes]
``` 
The qualifier specifies the type of data to be received, and can take the following values:
* val - allows the received input value to be accessed by its name in the process script, e.g. val x from num, where num is a channel
* env - define an environment variable in the process execution context, based on the value received from the channel.
* file - handle the received value as a file. If the same file name is to be used, irrespective of the values that come from the input channel, this name can be specified as follows `file query_file_name 'query.fa' from proteins` or `file 'query.fa' from proteins`. In this case, the execution is done in a separate environment, as if the files in the input channel were copied over into a file whose name does not change.
* path - interprets string values as the path location of the input file and automatically converts to a file object; more recently introduced than file
* stdin - forward the received value to stdin.
* tuple - allows grouping multiple parameters in a single parameter definition. The items can be of the following types: val(x), file(x), file(‘name’), file(x:’name’), stdin, env(x). This allows the values of the respective variables to be set based on the inputs from the channel.
* each - allows looping over items in a collection

**Example `input` section:**
```
path transcriptome from params.transcript
```
following the definition of the `params.transcript` variable:
```
params.transcript = "$baseDir/data/ggal/transcriptome.fa"
```
So the syntax is similar to a `foreach i in set_of_i's` in python, the variable *transcriptome* takes on the values from the *params.transcript* channel.  

**Example `output` section:**
```
path 'index' into index_ch
```
The *index_ch* channel is linked to a file called 'index', probably placed in the working directory. If the input channel had multiple files, the index file would probably be rewitten.

**Example script section** (in triple quotes):
```
salmon index --threads \\$task.cpus -t \\$transcriptome -i index 
``` 

Should run `salmon`, although from the script it is not clear where it gets it from. It is probably included in the docker image associated with this tutorial. First occurrence of *index* is the option to salmon indicating what function needs to be run. `$task.cpus` gives the value of the `cpus` directive for the current task. It's an interpolated variable defined implicitly by nextflow. `$transcriptome` is also interpolated from the input definition above, and the second occurrence of *index* is a file name, the same that occurs from the output definition above. Single quotes do not allow variable interpolation, while double quotes do (I presume triple quotes, like here, also allow for interpolation). So the script section provides the exact commandline that needs to be run.

#### Directives

`cpus` is an example of a proces *directive*. Directives have to be speciefied before any other blocks (like input, output etc.) and without using the assignment operator <kbd>=</kbd>. Example of using directives in the scope of a project:
```
process big_job {

  cpus 8
  executor 'sge'

  """
  blastp -query input_sequence -num_threads ${task.cpus}
  """
}
```
Available directives are:
* accelerator - e.g. "accelerator 4, type: 'nvidia-tesla-k80'"
* afterScript 
* beforeScript - e.g. "beforeScript 'source /cluster/bin/setup'"
* cache - allows caching of results 
* cpus - number of (logical) CPU required by the process’ task
* conda -  definition of the process dependencies using Conda, multiple can be specified separated by spaces, syntax can be conda channel:package
* container - allows to execute the process script in a Docker container if the host machine has a Docker daemon running.
* containerOptions
* clusterOptions
* disk
* echo
* errorStrategy
* executor - default is specified in nextflow.config, this directive allows overriding, many possibilties available, including local, sge, slurm, kubernetes. 
* ext
* label - allows selection of processes, for e.g. to specify a set of resources for all processes with a given label
* machineType
* maxErrors
* maxForks
* maxRetries
* memory
* module - works with Environment Modules to load modules necessary for the process
* penv
* pod
* publishDir - where the results are to be saved, e.g. `publishDir '/data/chunks', mode: 'copy', overwrite: false`
* queue - which queue to use with sge
* scratch
* stageInMode
* stageOutMode
* storeDir
* tag
* time
* validExitStatus

There is an instruction to use the `tree` command to inspect the structure of the *work* directory, but that seems to be a bash command not installed on my laptop.

There is also a `when` declaration that can be used to define a condition that must be verified in order to execute the process, e.g. 
```
when:
  proteins.name =~ /^BB11.*/ && type == 'nr'
```
This instruction is placed in the process block (as the `input`, `output` etc. sections) and it contains a condition. Pattern matching in `groovy` seems to be a rather lengthy topic. Took a while to figure out that to get expressions that **do not** match the pattern the unary negation operator needs to be used on the entire expression, e.g. 
```
when:
  !(proteins.name =~ /^BB11.*/ && type == 'nr')
```

## script3.nf

The main new concepts are introduced here:

```
/* read_pairs_ch = Channel .fromFilePairs(params.reads) */                                                             
/* equivalent */                                                                                                       
Channel.fromFilePairs(params.reads, checkIfExists: true).set { read_pairs_ch }                                         
                                                                                                                       
read_pairs_ch.view()                                                                                                   
                                                                                                                       
c = channel.from(['a','b','c'])                                                                                        
c.view()                                                                                                               
                                                                                                                       
d = channel.fromPath(params.reads)                                                                                     
d.view()                                                                                                               
```

The read files are defined like this:
```
params.reads = "$baseDir/data/ggal/gut_{1,2}.fq"
```
which results in two files matching the pattern. So the first line defines a *read_pairs_ch* initialized with pairs of files (here only one pair). The same initialization could be done as:
```
read_pairs_ch = Channel .fromFilePairs(params.reads)  

read_pairs_ch.view()
```
The second line prints the content of the channel, i.e. the two file names.

`fromFilePairs` is a function that should return tuples, in which the first element is the read pair prefix, and the second element the list of actual files. Why the prefix returned for this example is *gut* is not explained. It seems that it is found by removing the upstream directories as well as the underscore in the file name.

Changing the specification of the reads files in the commandline like this:
```
 ./nextflow run script3.nf --reads 'data/ggal/*_{1,2}.fq'
``` 
leads to the wild card resolving to tuples of distinct prefixes (matched by the pattern, here <kbd>*</kbd>) and associated file lists. It does not result in a flat list of files.

To get the code to produce an info message when no files are matching the provided pattern, one could use the following syntax (note the check, which may fail):
```
Channel.fromFilePairs(params.reads, checkIfExists: true).set { read_pairs_ch }
```

Channel creation can be done in other ways as well. There seem to be two types of channels:

1. queue channel - unidirectional FIFO queue from which items are consumed
2. value channel - single values that are not consumed, can be used by multiple processes

Queue channels are created by channel factory methods (**Note** both *Channel.* and *channel.* can be used):
* create - e.g. c = channel.create() 
* empty
* from - e.g. c = channel.from( 1, 2, 3, 4 ) - creates a channel that emits the specified values (numbers/strings/lists - if a single list is specified view() outputs its elements, if multiple lists are specified view() outputs them as individual lists)
* fromList - channel will emit elements of the list provided as argument
* fromPath - e.g. d = channel.from(params.reads) - generates a list of all files that match the glob pattern specified in params.reads
* fromFilePairs - demonstrated in the example
* fromSRA - queries the NCBI SRA database and returns a channel emitting the fastq files matching the specified criteria i.e project or accession number(s).
* value - creates a single value channel
* watchPath - creates a channel that watches for modifications (default: creation) of files satisfying some name pattern 

**Note** white spaces between parentheses and argument or between arguments don't seem to make any difference.

## script4.nf

Introduces a more complex process, depending on multiple inputs, in the quantification process. 
```
process index {                                                                                                       

    input:
    path transcriptome from params.transcript
    
    output:
    path 'index' into index_ch
    
    script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i index
    """                                                                                                               
    
}

/*                                                                                                                     
 * Run Salmon to perform the quantification of expression using                                                       
 * the index and the matched read files                                                                               
 */                                                                                                                    
process quantification {                                                                                               
                                                                                                                       
    publishDir "QuantificationResults"                                                                                 
                                                                                                                       
    input:                                                                                                             
    path index from index_ch                                                                                           
    tuple val(pair_id), path(reads) from read_pairs_ch                                                                 
                                                                                                                       
    output:                                                                                                           
    path(pair_id) into quant_ch                                                                                                                                                                                                             
    script:                                                                                                            
    """                                                                                                               
    salmon quant --threads $task.cpus --libType=U -i $index -1 ${reads[0]} -2 ${reads[1]} -o $pair_id                 
    """                                                                                                               
}
```

The first input is the index, which comes from the *index_ch* channel created by the previous process. A variable of type path, named **index** is created from the *index_ch* channel. Then, the tuple definition is used to extract tuples of values from the *read_pairs_ch*, which was created to contain a dictionary indexed by tissue with samples files as values. These values are cast to specific data types, `val` (scalar) for the tissue name and `path` for the sample file names. It seems that the path data type casting is broadcasted, as the *reads* variable created is an array.

The output section specifies that and output channel *quant_ch* will be associated with a path variable containing the tissue name. This tissue name is used as file name in the salmon command as well. In this particular script, this channel is not used and the `output` declaration here seems superfluous (the workflow runs without it as well).

Checked the `publishDir` directive, it creates a symlink with the specified name in the project directory, linking to the results directory of the current run. To actually copy the results in the specified directory the `mode: copy` option needs to be used.

## script5.nf

Adds another process, *fastqc*, with similar structure to *quantification*. 
```
/* 
 * Run fastQC to check quality of reads files 
 */ 

process fastqc {
    tag "FASTQC on $sample_id" 
    
    input: 
    tuple val(sample_id), path(reads) from read_pairs_ch2 
    
    output:
    path("fastqc_${sample_id}_logs") into fastqc_ch 
    
    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
    """ 
}
```
However, it uses the same reads input channel, leading to error because the items from this channel are consumed by the *quantification* process. This can be fixed by making the fastqc script commands part of the script of the *quantification* process, that first uses the *read_pairs_ch* channel. However, the somewhat more elegant solution is to create a copy of the channel, changing the `set` for the `into` operator, and providing different names for the two channels. Syntax is important, `;` should be used between the two file names:
```
Channel                                                                                                                
    .fromFilePairs( params.reads, checkIfExists:true )                                                                 
    .into { read_pairs_ch; read_pairs_ch2 } 
```
Finally, modules give an even more elegant solution to the problem, using function call-parameters type of logic for reusing channels (see below).

Note also the `tag` directive that is used to give labels to executed processes, to make them easier identifiable in log files.

## script6.nf

Adds the *multiqc* process. It uses the `publishDir` directive to save the output of the process into a directory specified in the parameter definition section. This is used as a root dir for the multiqc_report.html file created by the process. The process uses the `mix` and `collect` operators chained together to get all the outputs of the *quantification* and *fastqc* process as a single input.
```
/*
 * Create a report using multiQC for the quantification
 * and fastqc processes
 */
 
process multiqc { 

    publishDir params.outdir, mode:'copy'
    
    input: 
    path('*') from quant_ch.mix(fastqc_ch).collect() 
    
    output:
    path('multiqc_report.html')
    
    script:
    """ 
    multiqc .
    """ 
}
```

quant_ch.mix(fastqx_ch) means mix the items of channel *fastqc_ch* into those of *quant_ch* (no specific order is guaranteed). *collect()* takes all the items, constructs and returns a list. This is necessary, because otherwise only one item is processed correctly. *multiqc* runs in a specified directory (here `.`) and collects all the data it can find in it. Not clear though how the items in the input channel feed into this directory.

## script7.nf

Adds a task to be done on the completion of the workflow:
```
workflow.onComplete {                                                                     
        log.info ( workflow.success ? "\nDone! Open the following report in your browser \
--> $params.outdir/multiqc_report.html\n" : "Oops .. something went wrong" )              
}                                                                                         
```                                                                                         
This also shows the `C++`-like syntax of a shorthand if-else statement.

If an email server is set up, an option can be used in the commandline of nextflow to send an email upon workflow completion (`-N email@address`).

## Using custom scripts

Custom scripts can be placed into a *bin* subdirectory of the project root directory. Then these scripts will be automatically added to the pipeline execution `PATH`. 

## Managing execution with executors

Nextflow supports various systems for job distribution and execution such as the Sun Grid Engine (sge) and Slurm. There is also a `local` executor for local execution. We will most likely use Slurm. 

### Example specifying execution of the workflow using Slurm
The `nextflow.config` file can be modified to include the following lines specifying relevant information for Slurm execution:

```
process.executor = 'slurm'
process.queue = 'short'
process.memory = '10 GB' 
process.time = '30 min'
process.cpus = 4 
```
Equivalently, the definition of this data structure can be done using a `struct`/dictionary-like syntax:
```
process {
    executor = 'slurm'
    queue = 'short'
    memory = '10 GB' 
    time = '30 min'
    cpus = 4 
```
Process-specific configuration can be generated using selectors such as `withLabel` and `withName`, which will result in the application of the respective configuration only to processes with the specified label or name.

More complex settings are also supported, e.g. 
```
process {
    withLabel: 'foo' { cpus = 2 }
    withLabel: '!foo' { cpus = 4 }
    withName: '!align.*' { queue = 'long' }
}
```
Here the config for different types of processes is specified in a single definition, in which not only patterns but also their negation is allowed for process selection.

Note that above we have set up configurations for *processes*. *Process* is just one of the `scopes` for which configurations can be defined. Many other `scopes` are available, see https://www.nextflow.io/docs/latest/config.html#config-scopes. 

Although in principle scope-specific configuration can be defined both in a config file and in the main nextflow script, I could not make this work for the `process` scope. For the `params` scope, I could only set it up in the config file. Including `process` scope definition in the main script gives a syntax error.

The configuration file can also be organized in `profiles` that can be selectively used via a commandline parameter, `-profile standard` or `-profile cluster` assuming that the *standard* and *cluster* *profiles* were specified in the config file.

### Workflows from github

A workflow can be also pulled from github and run, e.g. the tutorial being located at https://github.com/nextflow-io/rnaseq-nf it can be run as follows:

```
./nextflow run nextflow-io/rnaseq-nf -with-docker
```

### Conda support

Nextflow pipelines can automatically create and activate Conda environment. This can be done either on the command line: 
```
./nextflow run script7.nf -with-conda env.yml
```
Or the Conda environment can be specified in the config file `process.conda = "env.yml"`.

## Workflow modularization

Processes and workflows can be combined hierarchically, similar to python functions and modules. A simple version of this approach, covered in the tutorial, is the following. 

There is a first-level file, `rnaseq-tasks.nf` which includes the definition of the processes. This is done pretty much as in the other scripts shown in the tutorial, except that the input sections of the processes are defined parametrically. That is, only the name of the parameters are specified, but not their bindings to various channels.

The second-level script, `rnaseq-flow.nf`, includes the process definitions from `rnaseq-tasks.nf`, in a way similar to python module imports, e.g. 
```
include { index; quantification; fastqc; multiqc  } from './rnaseq-tasks.nf'              
```
Then, the analysis steps are specified in terms of these functions:
```
workflow rnaseqFlow {                                                                     
    // required inputs                                                                    
    take:                                                                                 
      transcriptome                                                                       
      read_files                                                                          
    // workflow implementation                                                            
    main:                                                                                 
      index(transcriptome)                                                                
      quantification(index.out, read_files)                                               
      fastqc(read_files)                                                                  
      multiqc( quantification.out.mix(fastqc.out).collect() )                             
}                                                                                         
```
There is a new concept, of a `workflow`, which takes some inputs and has a `main` section where the calls to the processes, with required parameters, are made. The workflow does not declare and it is implicitly to be the entry point of the application. Outputs of the workflow that should be visible in the outer scope can also be specified in an `emit` section, e.g.
```
workflow RNASEQ {
  take:
    transcriptome
    read_pairs_ch
 
  main: 
    INDEX(transcriptome)
    FASTQC(read_pairs_ch)
    QUANT(INDEX.out, read_pairs_ch)

  emit: 
     QUANT.out.mix(FASTQC.out).collect()
}
```
Some interesting things going on here: first, the parameters like read_files can be used multiple times, unlike channels which are consumed at their first use. This seems to be a feature added relatively recently to nextflow. Second, there output of a process can be accessed as <process_name>.out. Third, the outputs are treated as channels. These features come with the workflow definition functionality (DSL, currently DLS2 https://www.nextflow.io/blog/2020/dsl2-is-here.html). That's why the following command is included in the main script, which sets up the main inputs to the processes:
```
nextflow.enable.dsl=2

/* 
 * pipeline input parameters
 */
 
params.reads = "$baseDir/data/ggal/gut_{1,2}.fq"
params.transcript = "$baseDir/data/ggal/transcriptome.fa"
params.multiqc = "$baseDir/multiqc"
params.outdir = "results" 

log.info """\                                                                                                          
         R N A S E Q - N F   P I P E L I N E                                                                           
         ===================================                                                                           
         transcriptome: ${params.transcript}                                                                           
         reads        : ${params.reads}                                                                                
         outdir       : ${params.outdir}                                                                               
         """                                                                                                           
         .stripIndent() 
         
include { rnaseqFlow } from './rnaseq-flow.nf'

workflow {
    read_pairs_ch = Channel .fromFilePairs( params.reads, checkIfExists:true )
    rnaseqFlow( params.transcript, read_pairs_ch )
}
```

Third, a wrapper script is used to define global parameters such as input file names and to set up the appropriate input channel. The call to the workflow is then made with the transcriptome file name and channel with read files.