# Nextflow from python

<a target="_blank" href="https://colab.research.google.com/github/seandavi/notebooks/blob/main/intro_to_unstructured.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

nextflow.py is a Python wrapper around the Nextflow pipeline framework. 
It lets you run Nextflow pipelines from Python code.

* <https://pypi.org/project/nextflowpy/>


In [1]:
%%capture
%pip install nextflowpy
%pip install pandas

### Nextflow installed?

Nextflow command line tool needs to be installed and on the path.

In [2]:
from nextflow import pipeline
import pandas as pd

In [3]:
mainnf = '''#!/usr/bin/env nextflow

process sayHello {
  input: 
    val x
  output:
    stdout
  script:
    """
    echo '$x world!'
    """
}

workflow {
  Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
}
'''
with open('main.nf', 'w') as mainnf_file:
    mainnf_file.write(mainnf)

In [4]:
execution = pipeline.run('main.nf')

In [5]:
execution.command

'nextflow -Duser.country=US run /Users/seandavis/Documents/git/notebooks/main.nf\n'

In [6]:
execution.duration

3.0

In [7]:
execution.process_executions

[<ProcessExecution from grave_elion: sayHello (1)>,
 <ProcessExecution from grave_elion: sayHello (4)>,
 <ProcessExecution from grave_elion: sayHello (3)>,
 <ProcessExecution from grave_elion: sayHello (2)>]

In [8]:
for pe in execution.process_executions:
    print(pe.bash)

#!/bin/bash -ue
echo 'Bonjour world!'

#!/bin/bash -ue
echo 'Hola world!'

#!/bin/bash -ue
echo 'Hello world!'

#!/bin/bash -ue
echo 'Ciao world!'



In [9]:
execution.started

1703938142.0

In [10]:
execution.started_string

'2023-12-30 07:09:02'

In [18]:
print(execution.stdout)

N E X T F L O W  ~  version 23.10.0
Launching `/Users/seandavis/Documents/git/notebooks/main.nf` [prickly_montalcini] DSL2 - revision: c3e999c295
[b0/2350fb] Submitted process > sayHello (3)
[32/e114c0] Submitted process > sayHello (4)
[79/eb0aa9] Submitted process > sayHello (1)
[5a/57b724] Submitted process > sayHello (2)
/Users/seandavis/Documents/git/notebooks/work/b0/2350fbd69de06d451967bc4480998c/file.out
/Users/seandavis/Documents/git/notebooks/work/32/e114c00542d11e43490f4a221b6f65/file.out
/Users/seandavis/Documents/git/notebooks/work/79/eb0aa9ac6928ee08a6ab8188ce88d4/file.out
/Users/seandavis/Documents/git/notebooks/work/5a/57b7240858f0da5b0ad2c3443d4296/file.out



In [12]:
execution.stderr

''

### Polling

The function described above will run the pipeline and wait while it does, with the completed Execution being returned only at the end.

An alternate method is to use run\_and\_poll, which returns an Execution object every few seconds representing the state of the pipeline execution at that moment in time, as a generator:

```
for execution in pipeline.run_and_poll(sleep=2, run_path="./rundir", params={"param1": "123"}):
    print("Processing intermediate execution")
```

By default, an Execution will be returned every second, but you can adjust this as required with the sleep paramater. This is useful if you want to get information about the progress of the pipeline execution as it proceeds.

In [13]:
mainnf = '''#!/usr/bin/env nextflow

process sayHello {
  input: 
    val x
  output:
    path 'file.out'
  script:
    """
    sleep 10
    echo '$x world!' > file.out
    """
}

workflow {
  Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
}
'''
with open('main.nf', 'w') as mainnf_file:
    mainnf_file.write(mainnf)

The code chunk here will run, polling every 2 seconds.

In [14]:
for execution in pipeline.run_and_poll('main.nf', sleep=2, params={"param1": "123"}):
    print("Processing intermediate execution")
    print(execution.process_executions)
    tot=0
    fin=0
    for pe in execution.process_executions:
        tot+=1
        if pe.returncode!='':
            fin+=1
    print(f"{fin}/{tot}")
    if tot > 0:
        print(f"({fin/tot}")

Processing intermediate execution
[]
0/0
Processing intermediate execution
[<ProcessExecution from prickly_montalcini: sayHello (3)>, <ProcessExecution from prickly_montalcini: sayHello (4)>, <ProcessExecution from prickly_montalcini: sayHello (1)>, <ProcessExecution from prickly_montalcini: sayHello (2)>]
0/4
(0.0
Processing intermediate execution
[<ProcessExecution from prickly_montalcini: sayHello (3)>, <ProcessExecution from prickly_montalcini: sayHello (4)>, <ProcessExecution from prickly_montalcini: sayHello (1)>, <ProcessExecution from prickly_montalcini: sayHello (2)>]
0/4
(0.0
Processing intermediate execution
[<ProcessExecution from prickly_montalcini: sayHello (3)>, <ProcessExecution from prickly_montalcini: sayHello (4)>, <ProcessExecution from prickly_montalcini: sayHello (1)>, <ProcessExecution from prickly_montalcini: sayHello (2)>]
0/4
(0.0
Processing intermediate execution
[<ProcessExecution from prickly_montalcini: sayHello (3)>, <ProcessExecution from prickly_montalc

### Executions

An Execution represents a single execution of a pipeline. It has properties for:

-   identifier - The unique ID of that run, generated by Nextflow.  
-   started - When the pipeline ran (as a Python datetime).
-   finished - When the pipeline completed (as a Python datetime).
-   duration - how long the pipeline ran for (if finished).
-   status - the status Nextflow reports on completion.
-   command - the command used to run the pipeline.
-   stdout - the stdout of the execution process.
-   stderr - the stderr of the execution process.
-   log - the full text of the log file produced.
-   return\_code - the exit code of the run - usually 0 or 1.
-   path - the path to the execution directory.



In [15]:
pe1 = execution.process_executions[0]

It also has a process\_executions property, which is a list of ProcessExecution objects. Nextflow processes data by chaining together isolated ‘processes’, and each of these has a ProcessExecution object representing its execution. These have the following properties:

-   identifier - The unique ID generated by Nextflow, of the form xx/xxxxxx.
-   process - The name of the process that spawned the process execution.
-   name - The name of this specific process execution.
-   status - the status Nextflow reports on completion.
-   stdout - the stdout of the process execution.
-   stderr - the stderr of the process execution.
-   started - When the process execution ran (as a Python datetime).
-   finished - When the process execution completed (as a Python datetime).
-   duration - how long the process execution took in seconds.
-   return\_code - the exit code of the process execution - usually 0 or 1.
-   path - the local path to the process execution directory.
-   full\_path - the absolute path to the process execution directory.
-   bash - the bash file contents generated for the process execution.


In [16]:
attributes = "process name hash status stdout stderr started started_string started_dt duration returncode bash"
records = []
for i in attributes.split():
    d={}
    d['name']=i
    d['value']=getattr(pe1,i)
    records.append(d)

Process executions can have various files passed to them, and will create files during their execution too. These can be obtained as follows:

```process_execution.input_data() # Full absolute paths
process_execution.input_data(include_path=False) # Just file names
process_execution.all_output_data() # Full absolute paths
process_execution.all_output_data(include_path=False) # Just file names
```

In [17]:
for i in ['input_data', 'all_output_data']:
    d={}
    d['name']=i
    d['value']=str(getattr(pe1,i)())
    records.append(d)
df = pd.DataFrame(records)
df

Unnamed: 0,name,value
0,process,sayHello
1,name,sayHello (3)
2,hash,b0/2350fb
3,status,COMPLETED
4,stdout,
5,stderr,
6,started,1703938149.997
7,started_string,Dec-30 07:09:09.997
8,started_dt,2023-12-30 07:09:09.997000
9,duration,10.123
