Skip to content

Commit

Permalink
update to use traditional snakemake model with a default kubernetes c…
Browse files Browse the repository at this point in the history
…ontainer and allow conda, etc.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Mar 24, 2023
1 parent ce09c61 commit 992d261
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 35 deletions.
77 changes: 50 additions & 27 deletions docs/executor_tutorial/flux.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,16 @@ Setup

To go through this tutorial, you need the following additional software installed or accessible

- Kubernetes (e.g., MiniKube or similar)
- Kubernetes (e.g., kind or an actual cluster)
- kubectl

For this tutorial we will show you how to launch MiniKube and create a MiniCluster to run your jobs. We will
do this in two parts - the first assuming that all job steps require the same resources (and thus can run on the
same cluster) and the second assuming the steps need different resources, and we will create more than one
MiniCluster. First, bring up MiniKube and create a namespace for your jobs:
For this tutorial we will show you how to launch kind and create a MiniCluster to run your jobs. We will
demonstrate running with conda, and then singularity:

.. code-block:: console
$ minikube start
$ kubectl create namespace flux-operator
Tutorial Workflow with Conda
::::::::::::::::::::::::::::

Next, prepare the Snakemake tutorial data in a temporary directory, ``/tmp/workflow``.
First, prepare the Snakemake tutorial data in a temporary directory, ``/tmp/workflow``.

.. code-block:: console
Expand All @@ -225,20 +221,26 @@ The Snakefile can be found in the ``./examples/flux/operator`` directory of Snak
$ cp ./examples/flux/operator/Snakefile /tmp/workflow/Snakefile
The main difference is that it has a container defined for each step. Since we are using MiniKube
(which isn't great at pulling containers like a production cluster) let's pull the container first:
The main difference is that it has a container defined for each step. Let's create
a cluster now with `kind <https://kind.sigs.k8s.io/>`_. You'll need this config file
to mount the workflow directory to the same location:

.. code-block:: console
.. code-block:: yaml
apiVersion: kind.x-k8s.io/v1alpha4
kind: Cluster
nodes:
- role: control-plane
extraMounts:
- hostPath: /tmp/workflow
containerPath: /tmp/workflow
$ minikube ssh docker pull ghcr.io/rse-ops/mamba:mamba-app
And finally, in a separate terminal, make sure your host ``/tmp/workflow`` is bound to the same
path in the MiniKube virtual machine:
And create the cluster, targeting the config:

.. code-block:: console
$ minikube ssh -- mkdir -p /tmp/workflow
$ minikube mount /tmp/workflow:/tmp/workflow
$ kind create cluster --config kind-config.yaml
You'll need to install the Flux Operator! This is the easiest way:

Expand All @@ -247,29 +249,50 @@ You'll need to install the Flux Operator! This is the easiest way:
$ wget https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
$ kubectl apply -f flux-operator.yaml
And create the flux-operator namespace:

.. code-block:: console
$ kubectl create namespace flux-operator
For advanced users, if you shell into your control plane, you should see the files (also on the host!)

.. code-block:: console
$ docker exec -it kind-control-plane bash
root@kind-control-plane:/# ls /tmp/workflow/
Dockerfile README.md Snakefile config.yaml data environment.yaml scripts
Finally, load the snakemake image into kind.

.. code-block:: console
$ docker pull ghcr.io/rse-ops/mamba:app-mamba
$ kind load docker-image ghcr.io/rse-ops/mamba:app-mamba
Run the Workflow
::::::::::::::::
^^^^^^^^^^^^^^^^

Finally, run the workflow from this directory, and ask for the flux Operator.
Note that this currently depends on having the same input files locally and
in the cluster (created as a volume), and this is because we are running the demo
locally. For a production run, we'd want to mount the volume from cloud storage,
and then run the command from there.
Finally, run the workflow from the ``/tmp/workflow`` directory on your host, and ask for
the flux Operator.

.. code-block:: console
$ snakemake --cores 1 --jobs 1 --flux-operator
$ snakemake --cores 1 --jobs 1 --flux-operator --use-conda
And you'll see the jobs run! When it's done (and you see outputs) try deleting everything,
and then running again and allowing for more than one job to be run at once.

.. code-block:: console
$ rm -rf calls/ mapped_reads/ sorted_reads/ plots/
$ snakemake --cores 1 --jobs 2 --flux-operator
$ snakemake --cores 1 --jobs 2 --flux-operator --use-conda
You'll notice the workflow moving faster, and this is because we have submit more
than one job at once!
than one job at once! Note that we discourage using MiniKube, as the conda environments
create a lot of tiny files that (in practice) will not finish in any amount of reasonable time.

How does it work?
:::::::::::::::::
Expand Down
2 changes: 1 addition & 1 deletion snakemake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2509,7 +2509,7 @@ def get_argument_parser(profile=None):
metavar="IMAGE",
help="Docker image to use, e.g., when submitting jobs to the Flux MiniCluster"
"on Kubernetes. It must contain a Flux install and have active user root. "
"Defaults to ghcr.io/rse-ops/mamba:app-mamba"
"Defaults to ghcr.io/rse-ops/mamba:app-mamba",
)

group_tes.add_argument(
Expand Down
15 changes: 8 additions & 7 deletions snakemake/executors/flux/flux_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
fluxuser = "fluxuser"
socket = "local:///run/flux/local"


# Wrap the MiniCluster to add extra execution logic
class MiniClusterExec(FluxMiniCluster):

def execute(self, command, print_result=False, quiet=True):
"""
Wrap the kubectl_exec to add logic to issue to the broker instance.
Expand Down Expand Up @@ -162,8 +162,6 @@ def cleanup_miniclusters(self, name=None):
logger.warning(f"WARNING: {name} is not a known MiniCluster")
return

import IPython
IPython.embed()
to_delete = [name] if name else list(self._clusters.keys())
for name in to_delete:
logger.info(f"Deleting MiniCluster {name}")
Expand Down Expand Up @@ -225,7 +223,7 @@ def __init__(
)

# Set the default container image
self.container_image = container_image or 'ghcr.io/rse-ops/mamba:app-mamba'
self.container_image = container_image or "ghcr.io/rse-ops/mamba:app-mamba"

# Attach variables for easy access
self.workdir = os.path.realpath(os.path.dirname(self.workflow.persistence.path))
Expand Down Expand Up @@ -357,16 +355,19 @@ def ensure_minicluster(self, job):
if self.ctrl.has_cluster(uid):
return self.ctrl.get_cluster(uid), uid

# TODO If we don't have the MiniCluster, ensure we have enough room for it
# If we do, we can create, otherwise we need to cleanup or wait
# The container has a named volume "data" bound at the working directory path
# This is the working directory that must be present in the MiniCluster
container = {
"image": assignment["image"],
"run_flux": True,
"volumes": {"data": {"path": self.workdir}},
"flux_user": {"name": fluxuser},
}

# Privileged if we want to run singularity
# This doesn't currently work.
if self.workflow.use_singularity:
container["securityContext"] = {"privileged": True}

# The MiniCluster is expecting /tmp/workflow to be bound on the node
minicluster = {
"namespace": self.ctrl.namespace,
Expand Down

0 comments on commit 992d261

Please sign in to comment.