Skip to content

Commit

Permalink
[openmpi] Introduce a sidecar container for inter-pod synchronization
Browse files Browse the repository at this point in the history
* openmpi-controller monitors the master pod's status and creates a semaphore file "term.sig" to signal openmpi-job to terminate
* openmpi-job is now decoupled from kubernetes
* openmpi-controller and openmpi-job shares a volume for inter-container communication
* openmpi-controller can be extended in the future to support data snapshot
  • Loading branch information
Jie Zhang committed Apr 23, 2018
1 parent 5eb0d72 commit 3abe08d
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 137 deletions.
2 changes: 2 additions & 0 deletions components/openmpi-controller/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.openmpi-controller
env
10 changes: 10 additions & 0 deletions components/openmpi-controller/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.6

USER root

ENV HOME /root

ADD requirements.txt $HOME
ADD controller $HOME/controller

RUN pip3 install -r $HOME/requirements.txt
11 changes: 11 additions & 0 deletions components/openmpi-controller/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# TODO: move to kubeflow
IMAGE=jiez/openmpi-controller
TAG=latest

build:
docker build --pull -t ${IMAGE}:${TAG} .

push: build
docker push ${IMAGE}:${TAG}

.PHONY: build push
Empty file.
63 changes: 63 additions & 0 deletions components/openmpi-controller/controller/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from time import sleep
from pathlib import Path

from retrying import retry
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from kubernetes.config.config_exception import ConfigException


SIG_DIR = '.openmpi-controller'
SIG_TERM = f'{SIG_DIR}/term.sig'
POD_MASTER = 'openmpi-master'
POLL_STATUS_INTERVAL = 10
TERMINATED_PHASES = ('Succeeded', 'Failed')


class Controller:
"""
Controller is a sidecar container that extends the "main" container (openmpi-job).
It communicates with the main container using a shared volume mounted at the working directory.
Right before it finishes its work, it creates a semaphore file "term.sig" to signal the main container to terminate.
"""

def __init__(self, namespace):
self.namespace = namespace
Path(SIG_DIR).mkdir()

def __enter__(self):
log('controller entered')
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()

self.api = client.CoreV1Api()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
log('controller exited')
Path(SIG_TERM).touch()

def wait_master_terminated(self):
while True:
phase = self._get_master_phase()
log(f'{POD_MASTER} is in "{phase}" phase')
if phase in TERMINATED_PHASES:
break

sleep(POLL_STATUS_INTERVAL)

@retry(stop_max_attempt_number=5,
wait_exponential_multiplier=1000,
retry_on_exception=lambda e: isinstance(e, ApiException),)
def _get_master_phase(self):
pod = self.api.read_namespaced_pod(POD_MASTER, self.namespace)
return pod.status.phase


def log(msg):
print(msg, flush=True)



16 changes: 16 additions & 0 deletions components/openmpi-controller/controller/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from argparse import ArgumentParser

from controller import Controller


def main():
parser = ArgumentParser()
parser.add_argument('--namespace', type=str, required=True)
args = parser.parse_args()

with Controller(args.namespace) as ctl:
ctl.wait_master_terminated()


if __name__ == '__main__':
main()
2 changes: 2 additions & 0 deletions components/openmpi-controller/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kubernetes==6.0.0
retrying==1.3.3
4 changes: 2 additions & 2 deletions kubeflow/openmpi/all.libsonnet
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
local assets = import "kubeflow/openmpi/assets.libsonnet";
local service = import "kubeflow/openmpi/service.libsonnet";
local workloads = import "kubeflow/openmpi/workloads.libsonnet";
local assets = import "kubeflow/openmpi/assets.libsonnet";

{
all(params, env):: $.parts(params, env).all,
Expand All @@ -16,7 +16,7 @@ local assets = import "kubeflow/openmpi/assets.libsonnet";

all::
assets.all(updatedParams)
+ workloads.all(updatedParams)
+ service.all(updatedParams)
+ workloads.all(updatedParams),
},
}
4 changes: 2 additions & 2 deletions kubeflow/openmpi/assets.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
genHostfile(params)::
std.lines(
std.map(
function(index) "openmpi-worker-%(index)d.%(name)s.%(namespace)s%(slots)s" % {
function(index) "openmpi-worker-%(index)d.%(name)s.%(namespace)s slots=%(slots)d" % {
index: index,
name: params.name,
namespace: params.namespace,
slots: if params.gpus > 0 then " slots=%d" % params.gpus else ""
slots: if params.gpus > 1 then params.gpus else 1,
},
std.range(0, params.workers - 1)
)
Expand Down
86 changes: 20 additions & 66 deletions kubeflow/openmpi/assets/init.sh
Original file line number Diff line number Diff line change
@@ -1,73 +1,34 @@
set -exv

SCRIPT_DIR=$(cd $(dirname $0);pwd)
K8S_PODS_ENDPOINT="$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/namespaces/$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)/pods"

# TOKEN should not be printed.
set +x
TOKEN_HEADER="Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
set -x

if [ $# -ne 5 ]; then
echo "illegal number of parameters"
exit 1
fi

role="$1"
workers="$2"
hostname="$3"
exec="$4"
master_pod="$5"

phase_of(){
local pod_name=$1
# TOKEN should not be printed.
set +x
curl -sL --insecure --header "$TOKEN_HEADER" \
https://${K8S_PODS_ENDPOINT}/${pod_name} \
| jq -r '.status.phase' 2>/dev/null
set -x
}

wait_workers_running() {
wait_mpi_ready() {
local max_retries=$1
local retries=0
local num_runnning_worker=0

until [ ${num_runnning_worker} -eq ${workers} ]; do

local num_runnning_worker=0
for worker in $(cat ${SCRIPT_DIR}/hostfile | cut -f 1 -d' '); do
local worker_pod=${worker%%.*}
echo -n "worker pod ${worker_pod}: "
phase=$(phase_of ${worker_pod})
echo $phase
if [ "$phase" = "Running" ]; then
num_runnning_worker=$((${num_runnning_worker} + 1))
fi
done
echo the number of running worker: ${num_runnning_worker}/${workers}
until mpiexec -n ${workers} --hostfile /kubeflow/openmpi/assets/hostfile --allow-run-as-root -q sh -c 'echo $(hostname) is ready'; do
sleep 10

retries=$(expr ${retries} + 1)
if [ -n "${max_retries}" ] && [ ${retries} -ge ${max_retries} ]; then
exit 124
else
sleep 1
fi
done
}

wait_master_done() {
local max_retries=$1
local retries=0
until [ $(phase_of ${master_pod}) = "Succeeded" -o $(phase_of ${master_pod}) = "Failed" ]; do
sleep 10;
retries=$(expr ${retries} + 1)
if [ -n "${max_retries}" ] && [ ${retries} -ge ${max_retries} ]; then
exit 124
fi
done
wait_controller_term() {
until [ -f /kubeflow/openmpi/data/.openmpi-controller/term.sig ]; do
sleep 10
done
}

if [ $# -ne 3 ]; then
echo "illegal number of parameters"
exit 1
fi

role="$1"
workers="$2"
exec="$3"

# Set up openmpi
mkdir -p /root/.openmpi
cp /kubeflow/openmpi/assets/mca-params.conf /root/.openmpi
Expand All @@ -79,23 +40,16 @@ cp /kubeflow/openmpi/secrets/id_rsa.pub /root/.ssh
cp /kubeflow/openmpi/secrets/authorized_keys /root/.ssh
cp /kubeflow/openmpi/assets/ssh_config /root/.ssh/config

# Install curl and jq
apt-get update && apt-get install -y curl jq

# Start sshd in daemon mode
/usr/sbin/sshd -e -f /kubeflow/openmpi/assets/sshd_config
sleep 10

# Start running the workloads.
echo running ${hostname}

exit_code=0
if [ "${role}" = "master" ]; then
wait_workers_running 30
# Run the exec command in master
wait_mpi_ready 30
sh -c "${exec}" || exit_code=$?
else
wait_master_done
wait_controller_term
fi

echo shutting down ${hostname}
exit ${exit_code}
1 change: 1 addition & 0 deletions kubeflow/openmpi/prototypes/openmpi.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// @optionalParam imagePullPolicy string IfNotPresent Image pull policy (either IfNotPresent or Always).
// @optionalParam gpus number 0 Number of GPUs per worker.
// @optionalParam schedulerName string default-scheduler scheduler name to use for the components.
// @optionalParam controllerImage string jiez/openmpi-controller:latest Docker image of the openmpi-controller.

local k = import "k.libsonnet";
local openmpi = import "kubeflow/openmpi/all.libsonnet";
Expand Down
Loading

0 comments on commit 3abe08d

Please sign in to comment.