Skip to content

Commit

Permalink
add flux framework connector
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Apr 7, 2023
1 parent 1ab8a7c commit cd91c5c
Show file tree
Hide file tree
Showing 14 changed files with 435 additions and 4 deletions.
3 changes: 0 additions & 3 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,3 @@ venv/
*/*.swp
*/*/*.swp
*/*/*/*.swp

# Project specific
examples/
31 changes: 31 additions & 0 deletions .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,37 @@ jobs:
--mount type=bind,source="${TMPDIR:-/tmp}"/streamflow,target="/tmp/streamflow" \
alphaunito/streamflow:latest \
streamflow run /streamflow/project/streamflow.yml
test-flux:
runs-on: ubuntu-latest
permissions:
packages: read
strategy:
fail-fast: false
matrix:
container: ["fluxrm/flux-sched:focal"]
container:
image: ${{ matrix.container }}
options: "--platform=linux/amd64 --user root -it"
name: ${{ matrix.container }}
steps:
- name: Make Space
run: |
rm -rf /usr/share/dotnet
rm -rf /opt/ghc
- name: Checkout
uses: actions/checkout@v3

- name: Install Streamflow
run: |
pip install .
pip install lockfile
chown -R fluxuser .
- name: Start Flux and Test Workflow
run: |
su fluxuser
cd examples/flux
which streamflow
flux start streamflow run streamflow.yml
static-checks:
name: "StreamFlow static checks"
runs-on: ubuntu-22.04
Expand Down
10 changes: 10 additions & 0 deletions docs/source/connector/flux.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
=============
FluxConnector
=============

The `Flux Framework <https://flux-framework.org/>`_ connector allows running jobs on a cluster with Flux Framework in a High Performance Computing Context. Although Flux can work in a local testing container or a cloud environment and has a Python SDK, to match the design here, we follow suit and use a :ref:`SSHConnection <SSHConnection>` pointing to a login node.

For a quick demo or tutorial, see our `example workflow <https://github.com/alpha-unito/streamflow/tree/master/examples/flux>`_.

.. jsonschema:: ../../../streamflow/config/schemas/v1.0/queue_manager.json
:lift_description: true
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ For LaTeX users, the following BibTeX entry can be used:

connector/docker.rst
connector/docker-compose.rst
connector/flux.rst
connector/helm3.rst
connector/occam.rst
connector/pbs.rst
Expand Down
20 changes: 20 additions & 0 deletions examples/flux/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM fluxrm/flux-sched:focal

# ubuntu base with Flux
# this allows for easy development of Flux connector

LABEL maintainer="Vanessasaurus <@vsoch>"

USER root
RUN apt-get update && \
python3 -m pip install IPython

COPY . /code
WORKDIR /code

# Install in development mode in case container used for development
RUN pip install develop . && pip install lockfile && \
chown -R fluxuser /code

# Ensure we enter the container as the fluxuser
USER fluxuser
87 changes: 87 additions & 0 deletions examples/flux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Flux in Streamflow

From the root of the repository, build the container:

```bash
$ docker build -f examples/flux/Dockerfile -t streamflow-flux .
```

Shell into the container!

```bash
$ docker run -it streamflow-flux bash
$ whoami
```
```console
# fluxuser
```

Start a flux instance:

```bash
$ flux start --test-size=4 bash
```

Then go into the Flux example directory, and run the workflow.

```bash
$ cd examples/flux
$ streamflow run streamflow.yml
```

That will compile a program and run it and exit. Note that the original example
uses `mpirun`, but since Flux has MPI bindings, we replace this with flux run.
You'll see the streamflow result printed to the screen:

```console
2023-04-02 19:35:18.426 INFO COMPLETED Workflow execution
{
"result": {
"basename": "mpi_output.log",
"checksum": "sha1$8abcdbccb5d53018e69ac1c1849f50928a6c4669",
"class": "File",
"dirname": "/code/examples/flux/ecc301a4-6fad-4199-b792-c47caaf7a9da",
"location": "file:///code/examples/flux/ecc301a4-6fad-4199-b792-c47caaf7a9da/mpi_output.log",
"nameext": ".log",
"nameroot": "mpi_output",
"path": "/code/examples/flux/ecc301a4-6fad-4199-b792-c47caaf7a9da/mpi_output.log",
"size": 271
}
}
2023-04-02 19:35:18.428 INFO UNDEPLOYING dc-mpi
2023-04-02 19:35:18.443 INFO COMPLETED Undeployment of dc-mpi
```

And the output directory will be in your working directory:

```bash
$ cat ecc301a4-6fad-4199-b792-c47caaf7a9da/mpi_output.log
```
```console
Hello I'm the server with id 1 on bff3d5c1b83d out of 2 I'm the server
Hello I'm the server with id 0 on bff3d5c1b83d out of 2 I'm the server
Total time (MPI) 1 is 0.000105146
Total time (MPI) 0 is 0.000120071
Total time (gtd) 1 is 0.299063
Total time (gtd) 0 is 0.29899
```

## Development

To work in development mode, making changes on your local machine that
persist in the container (and you might want to change the user to ROOT)
in the Dockerfile:

```bash
$ docker run -it -v $PWD:/code streamflow-flux bash
```

Install!

```bash
$ pip install develop .
```

Note that for the example here, MPI doesn't like to be run as root, so you'll
get an error. Also note that because the queue managers are run async, it's
challenging to interactively develop.
21 changes: 21 additions & 0 deletions examples/flux/cwl/clt/compile.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
cwlVersion: v1.1
class: CommandLineTool
baseCommand: ["mpicxx"]
arguments:
- position: 1
valueFrom: '-O3'
- position: 2
valueFrom: '$(inputs.source_file.nameroot)'
prefix: '-o'

inputs:
source_file:
type: File
inputBinding:
position: 3

outputs:
executable_file:
type: File
outputBinding:
glob: '$(inputs.source_file.nameroot)'
20 changes: 20 additions & 0 deletions examples/flux/cwl/clt/execute.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
cwlVersion: v1.1
class: CommandLineTool
requirements:
ShellCommandRequirement: {}
baseCommand: ["flux", "run"]
stdout: mpi_output.log
inputs:
num_processes:
type: int
inputBinding:
position: 1
prefix: '-n'
executable_file:
type: File
inputBinding:
position: 4

outputs:
mpi_output:
type: stdout
4 changes: 4 additions & 0 deletions examples/flux/cwl/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source_file:
class: File
path: data/cs.cxx
num_processes: 2
60 changes: 60 additions & 0 deletions examples/flux/cwl/data/cs.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Author: Marco Aldinucci
// Date: 13 May 2010
// Ex. 1-2, for PDS-physics class 2010

#include "mpi.h"
#include <iostream>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>

enum messages {msg_tag,eos_tag};


static inline const double diffmsec(const struct timeval & a,
const struct timeval & b) {
long sec = (a.tv_sec - b.tv_sec);
long usec = (a.tv_usec - b.tv_usec);

if(usec < 0) {
--sec;
usec += 1000000;
}
return ((double)(sec*1000)+ (double)usec/1000.0);
}

int main( int argc, char **argv )
{
int myid,numprocs,namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
double t0,t1;
struct timeval wt1,wt0;
// MPI_Wtime cannot be called here
gettimeofday(&wt0,NULL);
MPI_Init(&argc,&argv );
t0 = MPI_Wtime();
//gettimeofday(&wt0,NULL);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);
MPI_Get_processor_name(processor_name,&namelen);
srand(time(NULL));


// This is the server code
// Note - I don't understand how this example works - with the previous
// it hung forever, so I reduced to just printing the server id and exiting.
int n_eos = 0;
std::cout << "Hello I'm the server with id " << myid << " on " << processor_name
<< " out of " << numprocs << " I'm the server\n";

MPI_Barrier(MPI_COMM_WORLD);
t1 = MPI_Wtime();
//gettimeofday(&wt1,NULL);
MPI_Finalize();
gettimeofday(&wt1,NULL);
std::cout << "Total time (MPI) " << myid << " is " << t1-t0 << "\n";
std::cout << "Total time (gtd) " << myid << " is " <<
diffmsec(wt1,wt0)/1000 << "\n";
return 0;
}
35 changes: 35 additions & 0 deletions examples/flux/cwl/main.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env cwl-runner
cwlVersion: v1.1
class: Workflow
$namespaces:
sf: "https://streamflow.org/cwl#"

inputs:
num_processes: int
source_file: File

outputs:
result:
type: File
outputSource: execute/mpi_output

steps:
compile:
run: clt/compile.cwl
doc: |
This step takes as input a C source file and compiles it with the MPI compiler. Its output is an executable
linked with a proper MPI implementation.
in:
source_file: source_file
out: [executable_file]

##############################################################

execute:
run: clt/execute.cwl
doc: |
This step runs the executable..
in:
executable_file: compile/executable_file
num_processes: num_processes
out: [mpi_output]
19 changes: 19 additions & 0 deletions examples/flux/streamflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env streamflow
version: v1.0
workflows:
master:
type: cwl
config:
file: cwl/main.cwl
settings: cwl/config.yml
bindings:
- step: /compile
target:
deployment: dc-mpi
- step: /execute
target:
deployment: dc-mpi
deployments:
dc-mpi:
type: flux
config: {}
7 changes: 6 additions & 1 deletion streamflow/deployment/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from streamflow.deployment.connector.kubernetes import Helm3Connector
from streamflow.deployment.connector.local import LocalConnector
from streamflow.deployment.connector.occam import OccamConnector
from streamflow.deployment.connector.queue_manager import PBSConnector, SlurmConnector
from streamflow.deployment.connector.queue_manager import (
PBSConnector,
SlurmConnector,
FluxConnector,
)
from streamflow.deployment.connector.ssh import SSHConnector

connector_classes = {
"docker": DockerConnector,
"docker-compose": DockerComposeConnector,
"flux": FluxConnector,
"helm": Helm3Connector,
"helm3": Helm3Connector,
"local": LocalConnector,
Expand Down

0 comments on commit cd91c5c

Please sign in to comment.