# Nextflow in Neurodesk: Minimal BIDS T1w QC
####
**Author:** Steffen Bollmann

**Date:** 13 Feb 2026


### Citation and Resources
#### Workflow engine
- Di Tommaso, P., Chatzou, M., Floden, E. W., Barja, P. P., Palumbo, E., & Notredame, C. (2017). Nextflow enables reproducible computational workflows. *Nature Biotechnology, 35*(4), 316-319. https://doi.org/10.1038/nbt.3820

#### Data access pattern used in this notebook
- OpenNeuro (example dataset IDs can be cloned via DataLad): https://openneuro.org
- Halchenko, Y. O., et al. (2021). DataLad: distributed system for joint management of code, data, and their relationship. *Journal of Open Source Software, 6*(63), 3262. https://doi.org/10.21105/joss.03262

#### Platform
- Neurodesk: https://www.neurodesk.org


## Table of contents
1. [What this notebook builds](#What-this-notebook-builds)
2. [Load Nextflow in Neurodesk](#Load-Nextflow-in-Neurodesk)
3. [Write a DSL2 pipeline](#Write-a-DSL2-pipeline)
4. [Create a tiny local BIDS dataset](#Create-a-tiny-local-BIDS-dataset)
5. [Run Nextflow and inspect outputs](#Run-Nextflow-and-inspect-outputs)
6. [Optional: OpenNeuro + DataLad helper script](#Optional:-OpenNeuro-+-DataLad-helper-script)
7. [Dependencies and environment capture](#Dependencies-and-environment-capture)


## What this notebook builds

This notebook demonstrates a complete, beginner-friendly Nextflow workflow inside Neurodesk.

The pipeline takes BIDS-style T1w MRI files and performs a simple quality-control pass per subject:
- finds `*_T1w.nii` / `*_T1w.nii.gz` images in BIDS `anat/` folders,
- runs one process per subject to extract shape and voxel size,
- merges all per-subject tables into one summary TSV.

To keep the notebook reliable for testing and teaching, the default run path uses a tiny synthetic BIDS dataset generated locally.

An optional section also writes a DataLad/OpenNeuro helper script based on your requested pattern for real public data.


## Load Nextflow in Neurodesk


In [None]:
from pathlib import Path
import importlib.util
import os
import shutil
import subprocess
import textwrap

try:
    import module
except ImportError:
    module = None
    print("Python `module` helper is not available in this kernel. Continuing with PATH checks.")


In [None]:
loaded_nextflow_module = None

if module is not None:
    # Try a small set of common module names in Neurodesk images.
    for candidate in ["nextflow", "nextflow/24.10.4", "nextflow/24.04.4", "nextflow/23.10.1"]:
        try:
            await module.load(candidate)
            loaded_nextflow_module = candidate
            print(f"Loaded module: {candidate}")
            break
        except Exception as exc:
            print(f"Could not load {candidate}: {exc}")

    await module.list()
else:
    print("Skipping module loading.")


In [None]:
def run_and_print(cmd):
    # Run a shell command without failing the notebook on non-zero exit.
    print("$", " ".join(cmd))
    proc = subprocess.run(cmd, text=True, capture_output=True)
    if proc.stdout.strip():
        print(proc.stdout.strip())
    if proc.stderr.strip():
        print(proc.stderr.strip())
    print(f"[exit code: {proc.returncode}]")
    return proc

nextflow_ok = shutil.which("nextflow") is not None
datalad_ok = shutil.which("datalad") is not None
python_ok = shutil.which("python3") is not None

print(f"nextflow in PATH: {nextflow_ok}")
print(f"datalad in PATH: {datalad_ok}")
print(f"python3 in PATH: {python_ok}")

if nextflow_ok:
    run_and_print(["nextflow", "-version"])
if datalad_ok:
    run_and_print(["datalad", "--version"])

nibabel_available = importlib.util.find_spec("nibabel") is not None
print(f"nibabel available in notebook kernel: {nibabel_available}")


## Write a DSL2 pipeline

The cell below writes a complete Nextflow script to disk.

Important points to notice:
- `Channel.fromPath(...)` searches for candidate T1w files.
- A BIDS-aware regex keeps only `sub-*/(ses-*/)?anat/*_T1w` paths.
- `T1wQcPerSubject` runs once per input image.
- `MergeQc` concatenates all per-subject QC files into one summary table.

This is the same logic pattern you provided, with defaults that point to a local demo directory.


In [None]:
ROOT_DIR = Path.cwd() / "nextflow_neuro_demo"
PIPELINE = ROOT_DIR / "example_neuroimaging_qc.nf"
LOCAL_BIDS = ROOT_DIR / "example_bids"
LOCAL_OUT = ROOT_DIR / "example_qc_results"

ROOT_DIR.mkdir(parents=True, exist_ok=True)

pipeline_text = textwrap.dedent(r"""#!/usr/bin/env nextflow

nextflow.enable.dsl = 2

params.bids = params.bids ?: "${projectDir}/example_bids"
params.outdir = params.outdir ?: "${projectDir}/example_qc_results"

/*
Example:
  nextflow run example_neuroimaging_qc.nf \
    --bids /path/to/bids_dataset \
    --outdir /path/to/output
*/

def t1wPatterns = [
    "${params.bids}/**/*_T1w.nii.gz",
    "${params.bids}/**/*_T1w.nii"
]

Channel
    .fromPath(t1wPatterns, type: "any", followLinks: true)
    .filter { t1w ->
        def path = t1w.toString()
        path ==~ /.*\/sub-[^\/]+\/(ses-[^\/]+\/)?anat\/.*_T1w\.nii(\.gz)?$/ && t1w.exists()
    }
    .ifEmpty {
        error "No readable T1w files found in BIDS anat folders under ${params.bids}. If this is an OpenNeuro/DataLad clone, run datalad get on *_T1w.nii.gz files first."
    }
    .map { t1w ->
        def matcher = (t1w.baseName =~ /(sub-[^_]+(?:_ses-[^_]+)?)/)
        def id = matcher ? matcher[0][1] : t1w.baseName
        tuple(id, t1w)
    }
    .set { t1w_files }

process T1wQcPerSubject {
    tag "${id}"
    publishDir "${params.outdir}/per_subject", mode: "copy"

    input:
    tuple val(id), path(t1w)

    output:
    path "${id}.qc.tsv"

    script:
    """
    python3 - << 'PY'
import nibabel as nib
from pathlib import Path

subject = "${id}"
t1w = Path("${t1w}")
img = nib.load(str(t1w))
shape = "x".join(str(v) for v in img.shape)
zooms = "x".join(f"{z:.3f}" for z in img.header.get_zooms()[:3])

with open(f"{subject}.qc.tsv", "w", encoding="utf-8") as f:
    f.write("id\tfile\tshape\tvoxel_size_mm\n")
    f.write(f"{subject}\t{t1w.name}\t{shape}\t{zooms}\n")
PY
    """
}

process MergeQc {
    publishDir "${params.outdir}", mode: "copy"

    input:
    path qc_tables

    output:
    path "t1w_qc_summary.tsv"

    script:
    """
    set -euo pipefail
    first=$(ls *.qc.tsv | head -n 1)
    head -n 1 "$first" > t1w_qc_summary.tsv
    tail -n +2 -q *.qc.tsv >> t1w_qc_summary.tsv
    """
}

workflow {
    T1wQcPerSubject(t1w_files)
    MergeQc(T1wQcPerSubject.out.collect())
}
""")

PIPELINE.write_text(pipeline_text, encoding="utf-8")

print(f"Pipeline written to: {PIPELINE}")
print(f"Default BIDS dir:    {LOCAL_BIDS}")
print(f"Default output dir:  {LOCAL_OUT}")


In [None]:
# Quick preview of the generated Nextflow script
print("\n".join(PIPELINE.read_text(encoding="utf-8").splitlines()[:60]))


## Create a tiny local BIDS dataset

This section avoids external downloads and gives you a deterministic test case.

If `nibabel` is present, we generate two fake T1w images:
- `sub-01/anat/sub-01_T1w.nii.gz`
- `sub-02/anat/sub-02_T1w.nii.gz`

These are sufficient to validate Nextflow channel selection and process fan-out.


In [None]:
if not nibabel_available:
    print("nibabel is missing in this kernel. Skipping synthetic dataset creation.")
else:
    import json
    import numpy as np
    import nibabel as nib

    LOCAL_BIDS.mkdir(parents=True, exist_ok=True)

    dataset_description = {
        "Name": "Synthetic Nextflow Neurodesk Demo",
        "BIDSVersion": "1.8.0",
    }
    (LOCAL_BIDS / "dataset_description.json").write_text(
        json.dumps(dataset_description, indent=2) + "\n",
        encoding="utf-8",
    )

    rng = np.random.default_rng(seed=42)
    for subject in ["sub-01", "sub-02"]:
        anat_dir = LOCAL_BIDS / subject / "anat"
        anat_dir.mkdir(parents=True, exist_ok=True)

        data = (rng.random((32, 32, 24), dtype=np.float32) * 1000.0).astype("float32")
        affine = np.array([
            [1.0, 0.0, 0.0, 0.0],
            [0.0, 1.0, 0.0, 0.0],
            [0.0, 0.0, 1.2, 0.0],
            [0.0, 0.0, 0.0, 1.0],
        ])
        img = nib.Nifti1Image(data, affine)
        out_file = anat_dir / f"{subject}_T1w.nii.gz"
        nib.save(img, out_file)
        print(f"Wrote {out_file}")

    print("\nSynthetic BIDS dataset is ready.")


## Run Nextflow and inspect outputs

We run the pipeline against the local synthetic dataset.

The notebook does not crash on command failures: this keeps documentation readable even when users are missing tools in a custom environment. In a real project, you would usually enforce `check=True` and fail fast.


In [None]:
def run_and_print_tail(cmd, tail_lines=40):
    print("$", " ".join(cmd))
    proc = subprocess.run(cmd, text=True, capture_output=True)

    stdout_lines = proc.stdout.strip().splitlines()
    stderr_lines = proc.stderr.strip().splitlines()

    if stdout_lines:
        print("--- stdout (tail) ---")
        print("\n".join(stdout_lines[-tail_lines:]))
    if stderr_lines:
        print("--- stderr (tail) ---")
        print("\n".join(stderr_lines[-tail_lines:]))

    print(f"[exit code: {proc.returncode}]")
    return proc

can_run_local_demo = nextflow_ok and python_ok and nibabel_available and (LOCAL_BIDS / "dataset_description.json").exists()

if can_run_local_demo:
    LOCAL_OUT.mkdir(parents=True, exist_ok=True)
    run_and_print_tail([
        "nextflow",
        "run",
        str(PIPELINE),
        "--bids",
        str(LOCAL_BIDS),
        "--outdir",
        str(LOCAL_OUT),
    ])
else:
    print("Skipping local Nextflow run. Missing one or more prerequisites:")
    print(f"  nextflow_ok={nextflow_ok}")
    print(f"  python_ok={python_ok}")
    print(f"  nibabel_available={nibabel_available}")
    print(f"  dataset_exists={(LOCAL_BIDS / 'dataset_description.json').exists()}")


In [None]:
summary_path = LOCAL_OUT / "t1w_qc_summary.tsv"

if summary_path.exists():
    print(f"QC summary found at: {summary_path}")
    print()
    print(summary_path.read_text(encoding="utf-8"))
else:
    print("QC summary file not found yet. If needed, inspect Nextflow logs above.")

per_subject_dir = LOCAL_OUT / "per_subject"
if per_subject_dir.exists():
    print("Per-subject files:")
    for p in sorted(per_subject_dir.glob("*.qc.tsv")):
        print(" -", p.name)


## Optional: OpenNeuro + DataLad helper script

The next cell writes a helper shell script based on your requested pattern.

It does the following:
- clones an OpenNeuro dataset with DataLad,
- downloads a limited number of T1w files,
- runs the same Nextflow pipeline,
- prints the merged QC summary.

By default we only *write* this script (no large network operations during notebook execution).

To run it, set:
- `RUN_OPENNEURO_DEMO=1`
- optional `OPENNEURO_DATASET_ID` (default `ds000030`)
- optional `OPENNEURO_MAX_T1W` (default `2`)


In [None]:
RUN_SCRIPT = ROOT_DIR / "run_openneuro_nextflow_demo.sh"

run_script_text = textwrap.dedent(r"""#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="${1:-$PWD/nextflow_neuro_demo}"
DATASET_ID="${2:-${OPENNEURO_DATASET_ID:-ds000030}}"
MAX_T1W="${3:-${OPENNEURO_MAX_T1W:-2}}"

mkdir -p "${ROOT_DIR}"
PIPELINE="${ROOT_DIR}/example_neuroimaging_qc.nf"
BIDS_DIR="${ROOT_DIR}/${DATASET_ID}"
OUT_DIR="${ROOT_DIR}/example_qc_results"

require_cmd() {
    local cmd="$1"
    if ! command -v "${cmd}" >/dev/null 2>&1; then
        echo "Missing required command: ${cmd}" >&2
        exit 1
    fi
}

require_cmd datalad
require_cmd nextflow

if [ ! -f "${PIPELINE}" ]; then
    if [ -f /opt/neurodesktop/example_neuroimaging_qc.nf ]; then
        cp -f /opt/neurodesktop/example_neuroimaging_qc.nf "${PIPELINE}"
    elif [ -f "$(dirname "$0")/example_neuroimaging_qc.nf" ]; then
        cp -f "$(dirname "$0")/example_neuroimaging_qc.nf" "${PIPELINE}"
    else
        echo "Could not find example_neuroimaging_qc.nf. Write it first from the notebook cell." >&2
        exit 1
    fi
fi

if [ ! -d "${BIDS_DIR}/.datalad" ]; then
    echo "Cloning OpenNeuro dataset ${DATASET_ID} with DataLad..."
    datalad clone "///openneuro/${DATASET_ID}" "${BIDS_DIR}"
else
    echo "Using existing dataset clone at ${BIDS_DIR}"
fi

cd "${BIDS_DIR}"
echo "Installing OpenNeuro subdataset metadata (no file content yet)..."
datalad get -n -r .

mapfile -t t1w_candidates < <(
    find . \( -type f -o -type l \)         \( -path "./sub-*/anat/*_T1w.nii.gz" -o -path "./sub-*/ses-*/anat/*_T1w.nii.gz" \)         | sort
)

if [ "${#t1w_candidates[@]}" -eq 0 ]; then
    echo "No T1w files found in ${DATASET_ID}. Try another OpenNeuro dataset via OPENNEURO_DATASET_ID." >&2
    exit 1
fi

if ! [[ "${MAX_T1W}" =~ ^[0-9]+$ ]] || [ "${MAX_T1W}" -lt 1 ]; then
    echo "OPENNEURO_MAX_T1W must be an integer >= 1 (current: ${MAX_T1W})" >&2
    exit 1
fi

selected_t1w=("${t1w_candidates[@]:0:${MAX_T1W}}")
get_targets=("dataset_description.json")

for rel in "${selected_t1w[@]}"; do
    rel="${rel#./}"
    get_targets+=("${rel}")
    json="${rel%.nii.gz}.json"
    if [ -e "${json}" ] || [ -L "${json}" ]; then
        get_targets+=("${json}")
    fi
done

echo "Downloading ${#get_targets[@]} file(s) from OpenNeuro dataset ${DATASET_ID}..."
datalad get "${get_targets[@]}"

echo "Running Nextflow QC pipeline..."
nextflow run "${PIPELINE}" --bids "${BIDS_DIR}" --outdir "${OUT_DIR}"

echo
echo "Done. QC summary:"
cat "${OUT_DIR}/t1w_qc_summary.tsv"
""")

RUN_SCRIPT.write_text(run_script_text, encoding="utf-8")
RUN_SCRIPT.chmod(0o755)

print(f"Helper script written: {RUN_SCRIPT}")
print("Use it manually, for example:")
print(f"  bash {RUN_SCRIPT} {ROOT_DIR/'openneuro_demo'} ds000030 2")


In [None]:
run_openneuro_demo = os.environ.get("RUN_OPENNEURO_DEMO", "0") == "1"

if run_openneuro_demo:
    if not nextflow_ok or not datalad_ok:
        print("Cannot run OpenNeuro demo because nextflow or datalad is missing.")
    else:
        openneuro_root = ROOT_DIR / "openneuro_demo"
        dataset_id = os.environ.get("OPENNEURO_DATASET_ID", "ds000030")
        max_t1w = os.environ.get("OPENNEURO_MAX_T1W", "2")

        run_and_print_tail([
            "bash",
            str(RUN_SCRIPT),
            str(openneuro_root),
            dataset_id,
            max_t1w,
        ], tail_lines=80)
else:
    print("OpenNeuro/DataLad run skipped (set RUN_OPENNEURO_DEMO=1 to enable).")


## Interpreting and extending this pipeline

Ways to adapt this pattern in your own Neurodesk projects:
- Add more BIDS modalities (e.g., T2w, bold) by extending `t1wPatterns` or making separate channels.
- Replace the QC code block with your real per-subject analysis (FSL/AFNI/ANTs/SPM commands).
- Add process-level resources (`cpus`, `memory`, `time`) and executor config for HPC.
- Track provenance by storing pipeline files and run scripts with your dataset repository.

For bigger projects, split configuration into `nextflow.config` and keep `.nf` scripts focused on workflow logic.


## Dependencies and environment capture

As in the Neurodesk notebook template, we capture environment details at the end. If `watermark` is unavailable, the notebook continues and prints a message.


In [None]:
try:
    %load_ext watermark
    %watermark
    %watermark --iversions
except Exception as exc:
    print("Could not run watermark extension:", exc)
