Skip to content

Commit

Permalink
run slurm in docker-compose
Browse files Browse the repository at this point in the history
allows more than N=2

adapted and simplified from dask-jobqueue

test coverage for SlurmControllerLauncher
  • Loading branch information
minrk committed Oct 18, 2021
1 parent 73b431f commit 179f08c
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 43 deletions.
37 changes: 22 additions & 15 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
cluster_type: mpi
- python: "3.7"
cluster_type: slurm
container: slurmctld
- python: "3.6"
tornado: "5.1.1"
- python: "3.7"
Expand Down Expand Up @@ -84,14 +85,10 @@ jobs:
- name: Set up slurm
if: ${{ matrix.cluster_type == 'slurm' }}
run: |
sudo rm -rf /var/lib/apt/lists
sudo apt-get update && sudo apt-get -f -y install && sudo apt-get -y install slurm-wlm
sudo cp ci/slurm/slurm.conf /etc/slurm-llnl/
sudo mkdir /var/spool/slurmctl
sudo mkdir /var/spool/slurmd
sudo service munge start
sudo service slurmd start
sudo service slurmctld start
export DOCKER_BUILDKIT=1
export COMPOSE_DOCKER_CLI_BUILD=1
cd ci/slurm
docker-compose up -d --build
- name: Install Python (conda) ${{ matrix.python }}
if: ${{ matrix.cluster_type == 'mpi' }}
Expand Down Expand Up @@ -124,10 +121,14 @@ jobs:
- name: Show environment
run: pip freeze

- name: Run tests in container ${{ matrix.container }}
if: ${{ matrix.container }}
run: echo "EXEC=docker exec -i ${{ matrix.container }}" >> $GITHUB_ENV

- name: Run ${{ matrix.cluster_type }} tests
if: ${{ matrix.cluster_type }}
run: |
pytest -ra -v --maxfail=2 --color=yes --cov=ipyparallel ipyparallel/tests/test_${{ matrix.cluster_type }}.py
${EXEC:-} pytest -ra -v --maxfail=2 --color=yes --cov=ipyparallel ipyparallel/tests/test_${{ matrix.cluster_type }}.py
- name: Run tests
if: ${{ ! matrix.cluster_type }}
Expand All @@ -136,6 +137,12 @@ jobs:
run: |
pytest -ra -v --maxfail=3 --color=yes --cov=ipyparallel ipyparallel/tests
- name: Fixup coverage permissions ${{ matrix.container }}
if: ${{ matrix.container }}
run: |
ls -l .coverage*
${EXEC} chmod -R a+rw .coverage*
- name: Submit codecov report
run: |
codecov
Expand All @@ -144,9 +151,9 @@ jobs:
if: ${{ matrix.cluster_type == 'slurm' && failure() }}
run: |
set -x
slurmd -C
ls -l
squeue
sinfo
scontrol show node=localhost
sudo cat /var/log/slurm-llnl/*
docker ps -a
docker logs slurmctld
docker exec -i slurmctld squeue --states=all
docker exec -i slurmctld sinfo
docker logs c1
docker logs c2
20 changes: 20 additions & 0 deletions ci/slurm/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# syntax = docker/dockerfile:1.2.1
FROM ubuntu:20.04

ENV DEBIAN_FRONTEND=noninteractive
RUN --mount=type=cache,target=/var/cache/apt \
rm -f /etc/apt/apt.conf.d/docker-clean \
&& apt-get update && apt-get -y install python3-pip slurm-wlm
ENV PIP_CACHE_DIR=/tmp/pip-cache
RUN --mount=type=cache,target=${PIP_CACHE_DIR} python3 -m pip install ipyparallel pytest-asyncio pytest-cov pytest-tornado
RUN mkdir /var/spool/slurmctl \
&& mkdir /var/spool/slurmd
COPY slurm.conf /etc/slurm-llnl/slurm.conf
COPY entrypoint.sh /entrypoint
ENV IPP_DISABLE_JS=1
ENTRYPOINT ["/entrypoint"]

# the mounted directory
RUN mkdir /io
ENV PYTHONPATH=/io
WORKDIR "/io"
84 changes: 84 additions & 0 deletions ci/slurm/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
version: "2.2"

services:
slurmctld:
image: ipp-cluster:slurm
build: .
container_name: slurmctld
hostname: slurmctld
command:
- tail
- "-f"
- /var/log/slurm-llnl/slurmctld.log
volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- slurm_jobdir:/data
- var_log_slurm:/var/log/slurm
- ../..:/io
expose:
- "6817"
networks:
common-network:
ipv4_address: 10.1.1.10

c1:
image: ipp-cluster:slurm
build: .
hostname: c1
command:
- tail
- "-f"
- /var/log/slurm-llnl/slurmd.log
container_name: c1

volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- slurm_jobdir:/data
- var_log_slurm:/var/log/slurm
- ../..:/io
expose:
- "6818"
depends_on:
- "slurmctld"
networks:
common-network:
ipv4_address: 10.1.1.11

c2:
image: ipp-cluster:slurm
build: .
command:
- tail
- "-f"
- /var/log/slurm-llnl/slurmd.log
hostname: c2
container_name: c2
volumes:
- etc_munge:/etc/munge
- etc_slurm:/etc/slurm
- slurm_jobdir:/data
- var_log_slurm:/var/log/slurm
- ../..:/io
expose:
- "6818"
depends_on:
- "slurmctld"
networks:
common-network:
ipv4_address: 10.1.1.12

volumes:
etc_munge:
etc_slurm:
slurm_jobdir:
var_log_slurm:

networks:
common-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 10.1.1.0/24
18 changes: 18 additions & 0 deletions ci/slurm/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
set -ex
# set permissions on munge dir, may be mounted
chown -R munge:munge /etc/munge

echo "starting munge"
service munge start

echo "hostname=$(hostname)"
if [[ "$(hostname)" == *"slurmctl"* ]]; then
echo "starting slurmctld"
service slurmctld start
else
echo "starting slurmd"
service slurmd start
fi

exec "$@"
12 changes: 9 additions & 3 deletions ci/slurm/slurm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Put this file on all nodes of your cluster.
# See the slurm.conf man page for more information.
#
SlurmctldHost=localhost
SlurmctldHost=slurmctld
#
#MailProg=/bin/mail
MpiDefault=none
Expand Down Expand Up @@ -49,5 +49,11 @@ SlurmdLogFile=/var/log/slurm-llnl/slurmd.log
# COMPUTE NODES
# Note: CPUs apparently cannot be oversubscribed
# this can only run where at least 2 CPUs are available
NodeName=localhost NodeHostName=localhost NodeAddr=127.0.0.1 CPUs=2 State=UNKNOWN
PartitionName=part Nodes=localhost Default=YES MaxTime=INFINITE State=UP OverSubscribe=YES
#NodeName=localhost NodeHostName=localhost NodeAddr=127.0.0.1 CPUs=2 State=UNKNOWN
#PartitionName=part Nodes=localhost Default=YES MaxTime=INFINITE State=UP OverSubscribe=YES

# COMPUTE NODES
NodeName=c[1-2] RealMemory=4096 CPUs=2 State=UNKNOWN
#
# PARTITIONS
PartitionName=normal Default=yes Nodes=c[1-2] Priority=50 DefMemPerCPU=2048 Shared=NO MaxNodes=2 MaxTime=5-00:00:00 DefaultTime=5-00:00:00 State=UP
3 changes: 3 additions & 0 deletions ipyparallel/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,10 @@ def add_args(args):
)

else:
# copy to make sure change events fire
controller_args = list(controller_args)
add_args = controller_args.extend

if self.controller_ip:
add_args(['--ip=%s' % self.controller_ip])
if self.controller_location:
Expand Down
7 changes: 7 additions & 0 deletions ipyparallel/cluster/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1988,6 +1988,13 @@ def signal(self, sig):
def get_output(self, remove=True):
return LocalProcessLauncher.get_output(self, remove=remove)

def poll(self):
"""Poll not implemented
Need to use `squeue` and friends to check job status
"""
return None


class BatchControllerLauncher(BatchSystemLauncher, ControllerLauncher):
@default("program")
Expand Down
36 changes: 31 additions & 5 deletions ipyparallel/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@


@contextmanager
def temporary_ipython_dir():
def temporary_ipython_dir(prefix=None):
# FIXME: cleanup has issues on Windows
# this is *probably* a real bug of holding open files,
# but it is preventing feedback about test failures
td_obj = TemporaryDirectory(suffix=".ipython")
td_obj = TemporaryDirectory(suffix=".ipython", prefix=prefix)
td = td_obj.name

with mock.patch.dict(os.environ, {"IPYTHONDIR": td}):
Expand Down Expand Up @@ -103,17 +103,43 @@ def Context():


@pytest.fixture
def Cluster(request, ipython_dir, io_loop):
def engine_launcher_class():
"""override to test an alternate launcher"""
return 'local'


@pytest.fixture
def controller_launcher_class():
"""override to test an alternate launcher"""
return 'local'


@pytest.fixture
def cluster_config():
"""Override to set default cluster config"""
return Config()


@pytest.fixture
def Cluster(
request,
ipython_dir,
io_loop,
controller_launcher_class,
engine_launcher_class,
cluster_config,
):
"""Fixture for instantiating Clusters"""

def ClusterConstructor(**kwargs):
log = logging.getLogger(__file__)
log.setLevel(logging.DEBUG)
log.handlers = [logging.StreamHandler(sys.stdout)]
kwargs['log'] = log
engine_launcher_class = kwargs.get("engine_launcher_class")

cfg = kwargs.setdefault("config", Config())
kwargs.setdefault("controller_launcher_class", controller_launcher_class)
kwargs.setdefault("engine_launcher_class", engine_launcher_class)
cfg = kwargs.setdefault("config", cluster_config)
cfg.EngineLauncher.engine_args = ['--log-level=10']
cfg.ControllerLauncher.controller_args = ['--log-level=10']
kwargs.setdefault("controller_args", ['--ping=250'])
Expand Down
32 changes: 15 additions & 17 deletions ipyparallel/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ async def test_ipython_log(ipython):
assert c.log.handlers[0].stream is sys.stdout


@pytest.fixture
def engine_launcher_class():
return 'Local'


async def test_start_stop_controller(Cluster):
cluster = Cluster()
await cluster.start_controller()
Expand All @@ -86,7 +81,7 @@ async def test_start_stop_controller(Cluster):


async def test_start_stop_engines(Cluster, engine_launcher_class):
cluster = Cluster(engine_launcher_class=engine_launcher_class)
cluster = Cluster()
await cluster.start_controller()

n = 2
Expand All @@ -107,9 +102,9 @@ async def test_start_stop_engines(Cluster, engine_launcher_class):
await cluster.stop_controller()


async def test_start_stop_cluster(Cluster, engine_launcher_class):
async def test_start_stop_cluster(Cluster):
n = 2
cluster = Cluster(engine_launcher_class=engine_launcher_class, n=n)
cluster = Cluster(n=n)
await cluster.start_cluster()
controller = cluster.controller
assert controller is not None
Expand All @@ -125,8 +120,8 @@ async def test_start_stop_cluster(Cluster, engine_launcher_class):
@pytest.mark.skipif(
sys.platform.startswith("win"), reason="Signal tests don't pass on Windows yet"
)
async def test_signal_engines(request, Cluster, engine_launcher_class):
cluster = Cluster(engine_launcher_class=engine_launcher_class)
async def test_signal_engines(request, Cluster):
cluster = Cluster()
await cluster.start_controller()
engine_set_id = await cluster.start_engines(n=2)
rc = await cluster.connect_client()
Expand Down Expand Up @@ -157,9 +152,9 @@ async def test_signal_engines(request, Cluster, engine_launcher_class):
await cluster.stop_controller()


async def test_restart_engines(Cluster, engine_launcher_class):
async def test_restart_engines(Cluster):
n = 2
async with Cluster(engine_launcher_class=engine_launcher_class, n=n) as rc:
async with Cluster(n=n) as rc:
cluster = rc.cluster
engine_set_id = next(iter(cluster.engines))
engine_set = cluster.engines[engine_set_id]
Expand All @@ -175,9 +170,9 @@ async def test_restart_engines(Cluster, engine_launcher_class):
assert set(after_pids).intersection(before_pids) == set()


async def test_get_output(Cluster, engine_launcher_class):
async def test_get_output(Cluster):
n = 2
async with Cluster(engine_launcher_class=engine_launcher_class, n=n) as rc:
async with Cluster(n=n) as rc:
cluster = rc.cluster
engine_set_id = next(iter(cluster.engines))
engine_set = cluster.engines[engine_set_id]
Expand Down Expand Up @@ -281,15 +276,18 @@ async def test_cluster_manager():
m.remove_cluster("nosuchcluster")


async def test_to_from_dict(Cluster, engine_launcher_class):
cluster = Cluster(engine_launcher_class=engine_launcher_class, n=2)
async def test_to_from_dict(
Cluster,
):
cluster = Cluster(n=2)
print(cluster.config, cluster.controller_args)
async with cluster as rc:
d = cluster.to_dict()
cluster2 = ipp.Cluster.from_dict(d)
assert not cluster2.shutdown_atexit
assert cluster2.controller is not None
assert cluster2.controller.process.pid == cluster.controller.process.pid
if isinstance(cluster2.controller, ipp.cluster.launcher.LocalProcessLauncher):
assert cluster2.controller.process.pid == cluster.controller.process.pid
assert list(cluster2.engines) == list(cluster.engines)

es1 = next(iter(cluster.engines.values()))
Expand Down
Loading

0 comments on commit 179f08c

Please sign in to comment.