Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propose a new architecture with focus on scalability and robustness #360

Merged
merged 2 commits into from
Jun 21, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions proposals/scalable-robust-operator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
# A scalable and robust operator

Authors: @alculquicondor, @ahg-g

- [Motivation](#motivation)
- [Goals](#goals)
- [Background](#background)
- [Design](#design)
- [Alternatives Considered](#alternatives-considered)
- [Appendix](#appendix-prototype-objects)

## Motivation

A scalable MPI setup on Kubernetes is important for:
- Running jobs from multiple users in a single cluster.
- High-performance computing jobs that can require a big number of workers.

A robust MPI setup should be tolerant to failures, implementing retries while
keeping track of failures.

## Goals

- Allow driver-to-worker control to scale by removing kube-apiserver from
the communication channel.
- Reduce the complexity of the controller by relaying on Kubernetes workload
APIs for Pod creation and management.

## Background

The original design of the operator can be found as a
[proposal to the kubeflow community](https://github.com/kubeflow/community/blob/master/proposals/mpi-operator-proposal.md).
The latest release includes a v1alpha2 API and controller.
A v1 is under development. Something to highlight in this new version is the
replacement of the Job and StatefulSet by plain Pods, with the intent of
[tracking running Pods](https://github.com/kubeflow/mpi-operator/issues/201#issuecomment-827837831).

An MPIJob CRD describes the Job. Important fields include:
- The workers template
- The number of workers
- The launcher template, which should have a `mpirun` command.

The images are expected to have the MPI implementation binaries (such as
OpenMPI, MPICH or Intel MPI) the user’s MPI executable.

A controller processes the MPIJob, starting a Job with the following steps:
1. Creates ConfigMap, which contains:
- A script `kubexec.sh` that wraps `kubectl exec` and is used in replacement
of `ssh`. This script, before executing the command provided by `mpirun`,
transfers a file containing a mapping of pod names to IPs and appends it to
the worker’s `/etc/hosts`.

Note: The v1 controller no longer copies the pod-to-IP mapping file. The
OpenMPI implementation does the [routing](https://www.open-mpi.org/faq/?category=tcp#tcp-routability-1.3).
- The `hostfile` for `mpirun`, listing the worker pod names and number of
slots (which could be the number of CPUs or GPUs). This list is built
programmatically.
2. Creates a ServiceAccount+Role+RoleBinding for the launcher, which allow it
to:
- get/list/watch on Pods
- do pods/exec
This allows the launcher Pod to obtain details of the worker Pods and start
the process managers on them.
3. If configured, it creates a Volcano PodGroup
4. Creates a StatefulSet for workers (plain pods in v1). The Pod template
includes:
- mounts for the ConfigMap.
- `sleep` as command.
5. Creates launcher Job (plain pod in v1). The Pod template includes:
- An init container, `kubectl-delivery`, described below.
- Environment variables for:
- replacing `ssh` for `kubexec.sh`
- the `hostfile` location
- Volumes for:
- The ConfigMap
- Sharing files from `kubectl-delivery`

The launcher Job, as previously mentioned, contains an init container:
`kubectl-delivery`. This is a Kubernetes controller that watches pods. It does
the following:

1. Copy kubectl from the image into the volume shared with the main container.
2. Wait for all Pods in the hostfile to be running
3. Generates a file mapping pod name to IP, in `/etc/hosts` format.

To update the status of an MPIJob, the controller uses the status of the
launcher Job. That is, when the launcher Job fails or succeeds, it’s status is
copied to the MPIJob. In v1, it bases the status on the termination condition of
the Pod.

### Analysis

The above architecture for MPI Jobs has the following problems:

- It doesn’t scale as we increase the number of workers, belonging to the same
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
or different jobs.
- Due to the use of `kubectl exec`, every worker spawn goes through
kube-apiserver. `mpirun` starts a daemon in each worker
(like [`orted`](https://www.open-mpi.org/doc/v3.0/man1/orted.1.php)).
This process handles the worker-to-worker communication, which happens
without the intervention of kube-apiserver. However, the `exec` connection
stays up for control during the entirety of the job.
- The `kubectl-delivery` controller does a full cache sync to be able to watch
Pods. This startup penalty increases with the number of pods in the cluster
and has to be paid for every job. The API calls also causing additional
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
stress on the apiserver as the number of launchers increases.
- The launcher pod has execution permissions on any other Pod in the
namespace, which can be a security concern.
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@carmark carmark May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The launcher only has the execution permission on the worker pods belonging to its job, you can find it at code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed that. However, that doesn't scale as the number of workers increase (k8s objects have a size limit).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we replace kubectl exec with ssh, we can also support the elastic feature, thus I do not think it is a problem for us if I understand it correctly.

- The v1 controller doesn’t implement launcher pod retries, although there are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are user cases for launcher pod retry?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running jobs on a spot/preemptible VMs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my perspective, I don't think the launcher pod should be scheduled to a spot/preemptible VMs. For the worker pods, yes, because worker pods are stateless. However, the launcher pod seems a stateful instance, whose re-restart means a new job.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, when running the workers in a preemptible VM, if any of them fail, the entire job fails, including the launcher. So we need launcher retries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alculquicondor What MPI implementation are you expecting users to use after launcher gets restarted to make sure the workers are aware and can reconnect to the new launcher?

Copy link
Collaborator Author

@alculquicondor alculquicondor May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process goes like this:

  1. a worker terminates unexpectedly
  2. the launcher notices the worker failure, it closes the rest of ssh connections and terminates with a failure
  3. the launcher restarts, launches new ssh connections to the workers

This is independent of the MPI implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, when running the workers in a preemptible VM, if any of them fail, the entire job fails, including the launcher. So we need launcher retries.

From what I observed from other HPC system, like Slurm, for jobs that are not fault-tolerant, when some of the worker fails, it does not retry via a launcher restart. Instead, the entire job will be marked as failed with it resources released. The system will re-queue the failed job if 'retry' is demanded by the user and create a new job. Such process is compatible with the contemporary design of mpi-operator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTOH, it kind of goes against the declarative and fault-tolerant approach of k8s APIs, including the Job API. Retries is what kubernetes users would expect. And if they don't need it, they could always disable it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is useful if we make automatic retries an option that we default to 0.

plans to. So the MPIJob behaves like a plain Pod in this version.

## Design

In order to address the problems of the existing architecture, we propose the
following changes:

- **The use of `ssh` instead of `kubectl exec`.**

This would avoid any pod-to-pod communication happening through apiserver and
doesn’t require giving execution permissions at the namespace level.
This can be achieved like this:
- The controller can generate a single key and share it with the launcher and
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
worker pods through a Secret.
- The launcher and workers mount the Secret and set appropriate file
permissions.
- The workers run an SSH server instead of `sleep`.
- When encrypted communication is not a requirement, users could have the
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
choice to use `rsh` for faster communication.

- **The use of stable hostnames and a headless Service for the workers**
- This removes the need to query Pod IPs, as the Pods can discover each other
through DNS resolution. The hostfile can be generated statically by the
controller using the stable hostnames.
- Starting with k8s 1.22, we can use
[Indexed Jobs with stable hostnames](https://git.k8s.io/enhancements/keps/sig-apps/2214-indexed-job)
to delegate the pod management to Kubernetes. Additionally, this will give
us robust failure tracking so that we can give users control over retry
limits.
In the meantime, we can continue using plain Pods.
- Caveat 1: The Job controller doesn’t report the number of running Pods.
Instead, it reports active Pods, which include running and pending
(scheduling or starting). But Kubernetes SIG Apps is
[open to add a status field for running Pods](https://kubernetes.slack.com/archives/C18NZM5K9/p1619549430067400).
- Caveat 2: Horovod supports elastic workers, but the Kubernetes Job
doesn’t support changes to the completions field. This can be supported
starting from 1.23. In the meantime, we can replicate the behavior by
creating a new Job and doing Pod adoption.
- For Intel MPI, we also need a headless Service to front the launcher.
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
- **Revert the use of the Job API for the launcher.**
- The Job controller handles retries when the launcher or any of the workers fail.
- Caveat 1 also applies: The Job controller doesn’t report if the Pod is running.
We can continue watching Pods in the meantime.
- With the above changes, **the following objects can be removed**:
- The ServiceAccount+Role+RoleBinding for the launcher.
- The `kubectl-delivery` init container in the launcher, as there is no need
to obtain IPs, speeding up startup time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do u mean the init-container kubectl-delivery will be removed? If that, how could we keep the launcher start after the workers?

Copy link
Collaborator Author

@alculquicondor alculquicondor May 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 options:

  1. Do nothing, leave such handling to the gang scheduler. For this reason it is important that the launcher can do retries.
  2. To not create the launcher until all the worker pods are running. So everything is handled by the controller, no need for extra cache syncs.

I prefer option 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, you would like to speed up startup time, but it does not if you want the launcher to retry for failures. And the scheduler should not know about the job startup strategy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the same time, how do you verify the real launcher failed reason for option 1? for example,

  • workers does not startup, the launcher failed and restart
  • training job failed, the launcher failed.

Copy link
Collaborator Author

@alculquicondor alculquicondor May 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "job startup strategy"?

As for differentiating failures, I don't think we can or should do that. There is another type of failure which is kind of a mix of the two you described: a worker pod gets evicted by kubelet after the job already started. We cannot easily differentiate this one from a case where the user's code had a crash, for the purpose of retrying.

But perhaps option 2 is reasonable. WDYT?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to support retries, then by product we want option 1, correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can still use option 2. It might make startups less flaky. But it's not a guarantee anyways. A worker can simply just fail in between the time it reported running and the launcher started running.


## Alternatives Considered

TBD from discussions

## Appendix: Prototype objects

It uses a StatefulSet in place of Indexed Jobs, as they are still an alpha
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
feature in Kubernetes.

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mpi-config
data:
hostfile: |
mpi-workers-0.mpi-workers slots=3
mpi-workers-1.mpi-workers slots=3
mpi-workers-2.mpi-workers slots=3
```

```yaml
apiVersion: v1
kind: Secret
type: kubernetes.io/ssh-auth
data:
ssh-privatekey: PRIVATE_KEY
ssh-publickey: PUBLIC_KEY
```

```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: mpi-launcher
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: driver
image: '<USER_IMAGE>'
args:
- 'mpirun'
- '-np'
- '9'
- '<USER_EXECUTABLE>'
env:
- name: 'OMPI_MCA_orte_keep_fqdn_hostnames'
value: 'true'
- name: 'OMPI_MCA_orte_default_hostfile'
value: '/home/mpiuser/config/hostfile'
volumeMounts:
- name: ssh-auth
mountPath: /mnt/ssh
readOnly: true
- name: mpi-config
mountPath: /home/mpiuser/config
readOnly: true
volumes:
- name: mpi-config
configMap:
name: mpi-config
- name: ssh-auth
secret:
secretName: ssh-auth
items:
- key: ssh-privatekey
path: id_rsa
- key: ssh-publickey
path: id_rsa.pub
- key: ssh-publickey
path: authorized_keys
```

```yaml
apiVersion: v1
kind: Service
metadata:
name: mpi-workers
spec:
clusterIP: None
selector:
app: mpi-workers
```

```yaml
apiVersion: apps/v1
kind: StatefulSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering how to support elastic mode with statefulset? We may add/remove workers in the fly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just change .spec.replicas.

metadata:
name: mpi-workers
spec:
selector:
matchLabels:
app: mpi-workers
serviceName: mpi-workers
replicas: 3
podManagementPolicy: Parallel
template:
metadata:
labels:
app: mpi-workers
spec:
containers:
- name: worker
image: '<USER_IMAGE>'
volumeMounts:
- name: ssh-auth
mountPath: /mnt/ssh
readOnly: true
- name: mpi-config
mountPath: /home/mpiuser/config
readOnly: true
volumes:
- name: mpi-config
configMap:
name: mpi-config
- name: ssh-auth
secret:
secretName: ssh-auth
items:
- key: ssh-privatekey
path: id_rsa
- key: ssh-publickey
path: id_rsa.pub
- key: ssh-publickey
path: authorized_keys
```