Skip to content

Commit

Permalink
add flux operator executor
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Mar 16, 2023
1 parent 1fb3cef commit 2d838a8
Show file tree
Hide file tree
Showing 9 changed files with 621 additions and 23 deletions.
64 changes: 62 additions & 2 deletions .github/workflows/test-flux.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,69 @@
name: Test Flux Executor
name: Test Flux Executors
on:
pull_request: []

jobs:
build:
test-flux-operator-executor:
runs-on: ubuntu-latest
steps:
- name: Checkout the code
uses: actions/checkout@v3

- name: Install Snakemake
run: |
wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda.sh
/bin/bash ~/miniconda.sh -b -p /opt/conda
# Install snakemake
pip install .
- name: Install Flux Operator SDK
run: pip install fluxoperator

- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: ^1.18

- name: Start minikube
uses: medyagh/setup-minikube@697f2b7aaed5f70bf2a94ee21a4ec3dde7b12f92 # v0.0.9

- name: Create the namespace
run: kubectl create namespace flux-operator

- name: Pull Docker Containers to MiniKube
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
export SHELL=/bin/bash
eval $(minikube -p minikube docker-env)
minikube ssh docker pull ghcr.io/rse-ops/atacseq-vanilla:app-latest
- name: Prepare workflow in MiniKube
run: |
minikube ssh -- sudo apt-get update
minikube ssh -- sudo apt-get install -y git wget
minikube ssh -- git clone --depth 1 https://github.com/snakemake/snakemake-tutorial-data /tmp/workflow
minikube ssh -- mkdir -p /tmp/workflow/scripts
minikube ssh -- wget -O /tmp/workflow/scripts/plot-quals.py https://raw.githubusercontent.com/rse-ops/flux-hpc/main/snakemake/atacseq/scripts/plot-quals.py
- name: Prepare workflow on Runner
run: |
git clone --depth 1 https://github.com/snakemake/snakemake-tutorial-data /tmp/workflow
mkdir -p /tmp/workflow/scripts
wget -O /tmp/workflow/scripts/plot-quals.py https://raw.githubusercontent.com/rse-ops/flux-hpc/main/snakemake/atacseq/scripts/plot-quals.py
cp ./examples/flux/operator/Snakefile /tmp/workflow/Snakefile
- name: Install Flux Operator
run: |
wget https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
kubectl apply -f flux-operator.yaml
- name: Run Jobs
run: |
cd /tmp/workflow
snakemake --cores 1 --jobs 1 --flux-operator
test-flux-executor:
runs-on: ubuntu-latest
permissions:
packages: read
Expand Down
90 changes: 85 additions & 5 deletions docs/executor_tutorial/flux.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@

.. _tutorial-flux:

Flux Tutorial
-------------
Flux Tutorials
==============

These tutorials will cover using the Flux and Flux Operator executors.

.. _Snakemake: http://snakemake.readthedocs.io
.. _Snakemake Remotes: https://snakemake.readthedocs.io/en/stable/snakefiles/remote_files.html
.. _Python: https://www.python.org/

`Flux-framework <https://flux-framework.org/>`_ is a flexible resource scheduler that can work
on both high performance computing systems and cloud (e.g., Kubernetes).
Since it is more modern (e.g., has an official Python API) we define it under a cloud resource.
This also means that here we provide two tutorials, and two modes of execution:

- Local Flux Executor: runs on a local cluster with a Flux Framework instance
- Flux Operator: spins up "MiniClusters" on demand to achieve the same


.. _tutorial-flux-executor:

Flux Executor
-------------

The Flux Executor will issue commands to a Flux Scheduler that is locally accessible
via the Flux Python bindings.

Setup
:::::
Expand All @@ -16,9 +34,7 @@ To go through this tutorial, you need the following software installed:

- Docker


`Flux-framework <https://flux-framework.org/>`_ is a flexible resource scheduler that can work on both high performance computing systems and cloud (e.g., Kubernetes).
Since it is more modern (e.g., has an official Python API) we define it under a cloud resource. For this example, we will show you how to set up a "single node" local Flux container to interact with snakemake. You can use the `Dockerfile in examples/flux <https://github.com/snakemake/snakemake/blob/main/examples/flux/Dockerfile>`_ that will provide a container with Flux and snakemake
For this example, we will show you how to set up a "single node" local Flux container to interact with snakemake. You can use the `Dockerfile in examples/flux <https://github.com/snakemake/snakemake/blob/main/examples/flux/Dockerfile>`_ that will provide a container with Flux and snakemake
Note that we install from source and bind to ``/home/fluxuser/snakemake`` with the intention of being able to develop (if desired).
First, build the container:

Expand Down Expand Up @@ -165,3 +181,67 @@ See the `flux documentation <https://flux-framework.readthedocs.io/en/latest/qui
for more detail. For now, let's try interacting with flux via snakemake via the `Flux Python Bindings <https://flux-framework.readthedocs.io/projects/flux-workflow-examples/en/latest/job-submit-api/README.html>`_.

The code for this example is provided in (`examples/flux <https://github.com/snakemake/snakemake/tree/main/examples/flux>`_)


.. _tutorial-flux-operator-executor:

Flux Operator Executor
----------------------

The Flux Operator Executor will issue commands to a Kubernetes cluster to run your jobs on a set of
networked pods with a Flux instance running called a
`MiniCluster <https://flux-framework.org/flux-operator/getting_started/custom-resource-definition.html>`_.

Setup
:::::

To go through this tutorial, you need the following additional software installed or accessible

- Kubernetes (e.g., MiniKube or similar)
- kubectl

For this tutorial we will show you how to launch MiniKube and create a MiniCluster to run your jobs. We will
do this in two parts - the first assuming that all job steps require the same resources (and thus can run on the
same cluster) and the second assuming the steps need different resources, and we will create more than one
MiniCluster. First, bring up MiniKube and create a namespace for your jobs:

.. code-block:: console
$ minikube start
$ kubectl create namespace flux-operator
Next, prepare the Snakemake tutorial data in a temporary directory, ``/tmp/workflow``.

.. code-block:: console
$ git clone --depth 1 https://github.com/snakemake/snakemake-tutorial-data /tmp/workflow
$ mkdir -p /tmp/workflow/scripts
$ wget -O /tmp/workflow/scripts/plot-quals.py https://raw.githubusercontent.com/rse-ops/flux-hpc/main/snakemake/atacseq/scripts/plot-quals.py
The Snakefile can be found in the ``./examples/flux/operator`` directory of Snakemake:

.. code-block:: console
$ cp ./examples/flux/operator/Snakefile /tmp/workflow/Snakefile
The main difference is that it has a container defined for each step. Since we are using MiniKube
(which isn't great at pulling containers like a production cluster) let's pull the container first:

.. code-block:: console
$ minikube ssh docker pull ghcr.io/rse-ops/atacseq-vanilla:app-latest
And finally, in a separate terminal, make sure your host ``/tmp/workflow`` is bound to the same
path in the MiniKube virtual machine:

.. code-block:: console
$ minikube ssh -- mkdir -p /tmp/workflow
$ minikube mount /tmp/workflow:/tmp/workflow
Finally, run the workflow from this directory, and ask for the flux Operator:

.. code-block:: console
$ snakemake --cores 1 --jobs 1 --flux-operator
66 changes: 66 additions & 0 deletions examples/flux/operator/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
SAMPLES = ["A", "B"]


rule all:
input:
"plots/quals.svg"


rule bwa_map:
input:
"data/genome.fa",
"data/samples/{sample}.fastq"
output:
"mapped_reads/{sample}.bam"
container:
"docker://ghcr.io/rse-ops/atacseq-vanilla:app-latest"
shell:
"bwa mem {input} | samtools view -Sb - > {output}"


rule samtools_sort:
input:
"mapped_reads/{sample}.bam"
output:
"sorted_reads/{sample}.bam"
container:
"docker://ghcr.io/rse-ops/atacseq-vanilla:app-latest"
shell:
"samtools sort -T sorted_reads/{wildcards.sample} "
"-O bam {input} > {output}"


rule samtools_index:
input:
"sorted_reads/{sample}.bam"
output:
"sorted_reads/{sample}.bam.bai"
container:
"docker://ghcr.io/rse-ops/atacseq-vanilla:app-latest"
shell:
"samtools index {input}"


rule bcftools_call:
input:
fa="data/genome.fa",
bam=expand("sorted_reads/{sample}.bam", sample=SAMPLES),
bai=expand("sorted_reads/{sample}.bam.bai", sample=SAMPLES)
output:
"calls/all.vcf"
container:
"docker://ghcr.io/rse-ops/atacseq-vanilla:app-latest"
shell:
"bcftools mpileup -f {input.fa} {input.bam} | "
"bcftools call -mv - > {output}"


rule plot_quals:
input:
"calls/all.vcf"
container:
"docker://ghcr.io/rse-ops/atacseq-vanilla:app-latest"
output:
"plots/quals.svg"
script:
"scripts/plot-quals.py"
32 changes: 31 additions & 1 deletion snakemake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ def snakemake(
container_image=None,
k8s_cpu_scalar=1.0,
flux=False,
flux_operator=False,
flux_operator_ns="flux-operator",
tibanna=False,
tibanna_sfn=None,
google_lifesciences=False,
Expand Down Expand Up @@ -316,6 +318,7 @@ def snakemake(
container_image (str): Docker image to use, e.g., for Kubernetes.
k8s_cpu_scalar (float): What proportion of each k8s node's CPUs are availabe to snakemake?
flux (bool): Launch workflow to flux cluster.
flux-operator (bool): Launch workflow to use Flux Operator MiniCluster (Kubernetes).
default_remote_provider (str): default remote provider to use instead of local files (e.g. S3, GS)
default_remote_prefix (str): prefix for default remote provider (e.g. name of the bucket).
tibanna (bool): submit jobs to AWS cloud using Tibanna.
Expand Down Expand Up @@ -738,6 +741,8 @@ def snakemake(
google_lifesciences_regions=google_lifesciences_regions,
google_lifesciences_location=google_lifesciences_location,
google_lifesciences_cache=google_lifesciences_cache,
flux_operator=flux_operator,
flux_operator_ns=flux_operator_ns,
flux=flux,
tes=tes,
precommand=precommand,
Expand Down Expand Up @@ -797,6 +802,8 @@ def snakemake(
google_lifesciences_cache=google_lifesciences_cache,
tes=tes,
flux=flux,
flux_operator=flux_operator,
flux_operator_ns=flux_operator_ns,
precommand=precommand,
preemption_default=preemption_default,
preemptible_rules=preemptible_rules,
Expand Down Expand Up @@ -2368,6 +2375,7 @@ def get_argument_parser(profile=None):

group_cloud = parser.add_argument_group("CLOUD")
group_flux = parser.add_argument_group("FLUX")
group_flux_operator = parser.add_argument_group("FLUX_OPERATOR")
group_kubernetes = parser.add_argument_group("KUBERNETES")
group_tibanna = parser.add_argument_group("TIBANNA")
group_google_life_science = parser.add_argument_group("GOOGLE_LIFE_SCIENCE")
Expand Down Expand Up @@ -2480,7 +2488,6 @@ def get_argument_parser(profile=None):
"contents, and kept in Google Cloud Storage. By default, the caches "
"are deleted at the shutdown step of the workflow.",
)

group_flux.add_argument(
"--flux",
action="store_true",
Expand All @@ -2489,6 +2496,18 @@ def get_argument_parser(profile=None):
"If you don't have a shared filesystem, additionally specify --no-shared-fs.",
)

group_flux_operator.add_argument(
"--flux-operator",
action="store_true",
help="Execute your workflow on a flux MiniCluster cluster using the Flux Operator.",
)

group_flux_operator.add_argument(
"--flux-operator-namespace",
default="flux-operator",
help="Namespace for the Flux Operator MiniCluster(s).",
)

group_tes.add_argument(
"--tes",
metavar="URL",
Expand Down Expand Up @@ -2711,6 +2730,7 @@ def adjust_path(f):
or args.google_lifesciences
or args.drmaa
or args.flux
or args.flux_operator
)
no_exec = (
args.print_compilation
Expand Down Expand Up @@ -2773,6 +2793,14 @@ def adjust_path(f):
)
sys.exit(1)

# Singularity does not work on the flux operator
if args.use_singularity and args.flux_operator:
print(
"Error: the --flux-operator does not support --use_singularity.",
file=sys.stderr,
)
sys.exit(1)

if args.singularity_prefix and not args.use_singularity:
print(
"Error: --use_singularity must be set if --singularity-prefix " "is set.",
Expand Down Expand Up @@ -3012,6 +3040,8 @@ def open_browser():
container_image=args.container_image,
k8s_cpu_scalar=args.k8s_cpu_scalar,
flux=args.flux,
flux_operator=args.flux_operator,
flux_operator_ns=args.flux_operator_namespace,
tibanna=args.tibanna,
tibanna_sfn=args.tibanna_sfn,
google_lifesciences=args.google_lifesciences,
Expand Down
2 changes: 2 additions & 0 deletions snakemake/executors/flux/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .flux import FluxExecutor
from .flux_operator import FluxOperatorExecutor
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

import os
import shlex
import sys
from collections import namedtuple

from snakemake.executors import ClusterExecutor, sleep
from snakemake.executors.common import format_cli_arg, join_cli_args
from snakemake.logging import logger
from snakemake.resources import DefaultResources
from snakemake.common import async_lock
Expand All @@ -36,7 +34,6 @@ def __init__(
self,
workflow,
dag,
cores,
jobname="snakejob.{name}.{jobid}.sh",
printreason=False,
quiet=False,
Expand Down

0 comments on commit 2d838a8

Please sign in to comment.