Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterized Dockerfiles #66

Merged
merged 38 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ffc6235
DDP Enhancements
ravi-mosaicml Nov 4, 2021
d2bf826
Logging composer at the debug level in tests
ravi-mosaicml Nov 4, 2021
349fd7c
Removed old run_long
ravi-mosaicml Nov 4, 2021
7625775
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 4, 2021
1b3cd48
Logging DDP to tempfile
ravi-mosaicml Nov 4, 2021
71c170d
Fixed formatting
ravi-mosaicml Nov 4, 2021
1ee16cf
Added coverage options
ravi-mosaicml Nov 4, 2021
eee6676
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 4, 2021
16dc8b3
Updated coverage and fixed toml
ravi-mosaicml Nov 4, 2021
30a7af4
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 4, 2021
8e443b9
Increased timeout for test_load
ravi-mosaicml Nov 5, 2021
08674cf
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 5, 2021
9404613
Changed sources to include so coverage reports grab the correct files
ravi-mosaicml Nov 5, 2021
091e9fa
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 5, 2021
0521fd4
Printing subprocess stdout and stderr
ravi-mosaicml Nov 5, 2021
7b02046
Fixing package data
ravi-mosaicml Nov 5, 2021
5c3aa24
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 5, 2021
ac6d40e
Docker build matrix WIP
ravi-mosaicml Nov 5, 2021
51c4bfb
Added docker/composer build matrix
ravi-mosaicml Nov 5, 2021
a3a5968
Fixed dockerfile builds
ravi-mosaicml Nov 5, 2021
c20962f
Lowered minimum required python version to 3.7
ravi-mosaicml Nov 5, 2021
4b0b40f
Updated to python3
ravi-mosaicml Nov 5, 2021
d93fc55
Fixed docker/composer/build_matrix.sh
ravi-mosaicml Nov 5, 2021
c02a366
Removed build matrix
ravi-mosaicml Nov 6, 2021
540b7dc
Fixed setup.py
ravi-mosaicml Nov 6, 2021
2d82a35
Merge branch 'dev' into ravi/log_debug_in_tests
ravi-mosaicml Nov 6, 2021
e49ec60
Including the composer package, printing warning to stderr, pyright c…
ravi-mosaicml Nov 6, 2021
6a85ba7
Merge branch 'ravi/log_debug_in_tests' into ravi/ddp_enhancements
ravi-mosaicml Nov 6, 2021
9264cb4
Merge branch 'ravi/ddp_enhancements' into ravi/docker_matrix
ravi-mosaicml Nov 6, 2021
8677ce9
Updated readme
ravi-mosaicml Nov 6, 2021
0deb5fb
Merge branch 'dev' into ravi/docker_matrix
ravi-mosaicml Nov 9, 2021
4670b15
Addressed PR comments
ravi-mosaicml Nov 9, 2021
f3924f1
Added license header
ravi-mosaicml Nov 9, 2021
6efb612
Rebasing diff on dev instead of docker matrix
ravi-mosaicml Nov 9, 2021
46078d0
Merge branch 'ravi/support_py37' into ravi/docker_matrix
ravi-mosaicml Nov 9, 2021
e6424b3
Added venv and wheel
ravi-mosaicml Nov 9, 2021
7f8d628
Fixed typo in dockerfile
ravi-mosaicml Nov 9, 2021
896c770
Merge branch 'dev' into ravi/docker_matrix
ravi-mosaicml Nov 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions composer/core/logging/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ class Logger:
The global :class:`~composer.core.state.State` object.
backends (Sequence[BaseLoggerBackend]):
A sequence of
:class:`~composer.core.logging.base_backend.BaseLoggerBackend`\s
:class:`~composer.core.logging.base_backend.BaseLoggerBackend`\\s
to which logging calls will be sent.

Attributes:
backends (Sequence[BaseLoggerBackend]):
A sequence of
:class:`~composer.core.logging.base_backend.BaseLoggerBackend`\s
:class:`~composer.core.logging.base_backend.BaseLoggerBackend`\\s
to which logging calls will be sent.
"""

Expand Down
Empty file added composer/py.typed
Empty file.
100 changes: 71 additions & 29 deletions composer/trainer/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
from __future__ import annotations

import collections.abc
import datetime
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from threading import Thread
from typing import Callable, Iterator, List, Optional, Sequence, TypeVar, cast
from typing import Callable, Iterator, List, Optional, Sequence, Set, TypeVar, cast

import torch
import torch.distributed
Expand Down Expand Up @@ -82,16 +85,19 @@ def __init__(self,
num_nodes: int,
backend: str,
fork_rank_0: bool,
timeout: float,
find_unused_parameters: bool = False):
self.hparams = DDPHparams(
store=store_hparams,
node_rank=node_rank,
num_nodes=num_nodes,
fork_rank_0=fork_rank_0,
timeout=timeout,
)
self.nproc_per_node = nproc_per_node
self.world_size = num_nodes * nproc_per_node
self.num_nodes = num_nodes
self.node_rank = node_rank
self.store_hparams = store_hparams
self.last_return_code: Optional[int] = None
self.backend = backend
self.fork_rank_0 = fork_rank_0
self.processes: List[subprocess.Popen[str]] = []
self.killed_pids: Set[int] = set() # track which pids have been killed
self.find_unused_parameters = find_unused_parameters

if backend == 'nccl':
Expand All @@ -103,6 +109,10 @@ def __init__(self,
if not torch.distributed.is_nccl_available():
raise ValueError('Requested NCCL backend not available in torch.distributed')

@property
def world_size(self) -> int:
return self.hparams.num_nodes * self.nproc_per_node

def barrier(self) -> None:
if torch.distributed.is_available():
torch.distributed.barrier()
Expand Down Expand Up @@ -157,7 +167,7 @@ def all_gather_object(self, obj: TObj) -> List[TObj]:
def launch(self, state: State, loop: Callable[[], None]):
if os.environ.get("RANK") is None:
os.environ["WORLD_SIZE"] = str(self.world_size)
logger.info("Starting DDP on node_rank(%d) with world_size(%d)", self.node_rank, self.world_size)
logger.info("Starting DDP on node_rank(%d) with world_size(%d)", self.hparams.node_rank, self.world_size)

if torch.distributed.is_available():
# Adapted from torch.distributed.launch
Expand All @@ -168,14 +178,14 @@ def launch(self, state: State, loop: Callable[[], None]):
# TODO omp num threads -- this parameter needs to be auto-tuned
for local_rank in range(self.nproc_per_node):
# each process's rank
global_rank = self.nproc_per_node * self.node_rank + local_rank
global_rank = self.nproc_per_node * self.hparams.node_rank + local_rank
current_env["RANK"] = str(global_rank)

if local_rank == 0 and not self.fork_rank_0:
if local_rank == 0 and not self.hparams.fork_rank_0:
os.environ["RANK"] = str(global_rank)
else:
logger.info("Launching process for global_rank(%d) on node_rank(%d)", global_rank,
self.node_rank)
self.hparams.node_rank)
# spawn the processes
cmd = [
sys.executable,
Expand All @@ -192,35 +202,36 @@ def launch(self, state: State, loop: Callable[[], None]):
process = subprocess.Popen(
cmd,
env=current_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=tempfile.TemporaryFile(),
stderr=tempfile.TemporaryFile(),
text=True,
)
self.processes.append(process)
if self.fork_rank_0:
if self.hparams.fork_rank_0:
self.monitor()
return
else:
Thread(target=self.monitor, daemon=True).start()
else:
if self.world_size != 1:
raise ValueError("Must have world size == 1 when torch.distributed is not available")
if self.node_rank != 0:
if self.hparams.node_rank != 0:
raise ValueError("Must have a node_rank == 0 when torch.distributed is not available")
os.environ["RANK"] = "0"
# We are now on the correct process
global_rank = int(os.environ["RANK"])
assert global_rank // self.world_size == self.node_rank
assert global_rank // self.world_size == self.hparams.node_rank
assert os.environ["WORLD_SIZE"] == str(
self.world_size
), f"os.environ['WORLD_SIZE']({os.environ['WORLD_SIZE']}) != self.world_size({self.world_size})"
is_main = global_rank == 0
if torch.distributed.is_available():
logger.info("Initializing ddp: GLOBAL_RANK: %s, WORLD_SIZE: %s", global_rank, self.world_size)
store = self.store_hparams.initialize_object(is_main, state.world_size)
store = self.hparams.store.initialize_object(is_main, state.world_size)
torch.distributed.init_process_group(self.backend,
rank=global_rank,
world_size=self.world_size,
timeout=datetime.timedelta(seconds=self.hparams.timeout),
store=store)
assert torch.distributed.is_initialized()
assert state.is_rank_set, "state.is_rank_set should be set after torch.distributed is initialized"
Expand Down Expand Up @@ -271,7 +282,10 @@ def monitor(self) -> None:
# return code of 0 implies clean exit
# return code of -9 implies sigkill, presumably from
# cleanup() in the main process
if process.returncode not in (0, -9):
if process.pid in self.killed_pids or process.returncode == 0:
# exited cleanly
finished_processes.append(process)
else:
if process.stdout is None:
output = ""
else:
Expand All @@ -287,24 +301,50 @@ def monitor(self) -> None:
output=output,
stderr=stderr,
)
if self.fork_rank_0:
if self.hparams.fork_rank_0:
raise exc
else:
logger.exception("Error in subprocess", exc_info=exc)
sys.exit(1)
else:
# exited cleanly
finished_processes.append(process)
error_msg = [
"Error in subprocess",
"----------Subprocess STDOUT----------",
exc.output,
"----------Subprocess STDERR----------",
exc.stderr,
]
logger.exception("\n".join(error_msg), exc_info=exc)
sys.exit(process.returncode)
alive_processes = set(alive_processes) - set(finished_processes)
time.sleep(1)

def cleanup(self) -> None:
for process in self.processes:
logger.info("Killing subprocess %s", process.pid)
try:
process.kill()
except Exception:
pass
if process.returncode is None:
logger.info("Killing subprocess %s with SIGTERM", process.pid)
self.killed_pids.add(process.pid)
try:
os.killpg(process.pid, signal.SIGTERM)
except ProcessLookupError:
pass
current_time = datetime.datetime.now()
while datetime.datetime.now() - current_time < datetime.timedelta(seconds=5):
all_finished = True
for process in self.processes:
if process.returncode is None:
all_finished = False
break
if all_finished:
break
time.sleep(0.1)

for process in self.processes:
if process.returncode is None:
logger.error("Killing subprocess %s with SIGKILL", process.pid)
self.killed_pids.add(process.pid)
try:
os.killpg(process.pid, signal.SIGKILL)
except ProcessLookupError:
pass

if torch.distributed.is_initialized():
torch.distributed.destroy_process_group()

Expand Down Expand Up @@ -350,6 +390,7 @@ class DDPHparams(hp.Hparams):
doc="Whether to fork the local rank 0 process, or use the existing process for rank 0 training.",
default=False,
)
timeout: float = hp.optional(doc="Timeout, in seconds, for initializing the DDP process group.", default=5.0)

def initialize_object(self, nproc_per_node: int, backend: str, find_unused_parameters: bool) -> DDP:
return DDP(
Expand All @@ -360,4 +401,5 @@ def initialize_object(self, nproc_per_node: int, backend: str, find_unused_param
num_nodes=self.num_nodes,
fork_rank_0=self.fork_rank_0,
find_unused_parameters=find_unused_parameters,
timeout=self.timeout,
)
9 changes: 7 additions & 2 deletions composer/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class Trainer:
(default: ``TCPStoreHparams("127.0.0.1", 43297)``)
fork_rank_0 (bool, optional): True to fork the rank 0 process in distributed data parallel,
False to not. (default: ``True``)
ddp_timeout (float, optional): Timeout, in seconds, for initializing the DDP process group.
(default: ``5.0``)
seed (int, optional): The seed used in randomization. When not provided a random seed
will be created. (default: ``None``)
deterministic_mode (bool, optional): Run the model deterministically. Experimental. Performance
Expand Down Expand Up @@ -148,6 +150,7 @@ def __init__(
# ddp hparams
ddp_store_hparams: Optional[StoreHparams] = None,
fork_rank_0: bool = False,
ddp_timeout: float = 5.0,

# Randomness
seed: Optional[int] = None,
Expand Down Expand Up @@ -201,6 +204,7 @@ def __init__(
backend=self.device.ddp_backend,
fork_rank_0=fork_rank_0,
find_unused_parameters=find_unused_parameters,
timeout=ddp_timeout,
)

self.state = State(max_epochs=max_epochs,
Expand Down Expand Up @@ -338,8 +342,9 @@ def create_from_hparams(cls, hparams: TrainerHparams) -> Trainer:
timeout=hparams.dataloader.timeout,

# ddp hparams
ddp_store_hparams=ddp.store_hparams,
fork_rank_0=ddp.fork_rank_0,
ddp_store_hparams=ddp.hparams.store,
fork_rank_0=ddp.hparams.fork_rank_0,
ddp_timeout=ddp.hparams.timeout,

# Randomness
seed=seed,
Expand Down
36 changes: 24 additions & 12 deletions docker/Makefile
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
IMAGE_NAME ?= mosaicml/composer
BUILD_TYPE ?= base

ifeq (${BUILD_TYPE}, base)
# Base dependency build tagged with 'latest'
IMAGE_TAG = latest
else
# Otherwise just use the BUILD_TYPE
IMAGE_TAG = ${BUILD_TYPE}
endif
BASE_IMAGE ?= 'nvidia/cuda:11.3.1-cudnn8-runtime-ubuntu20.04'
PYTHON_VERSION ?='3.9'
CUDA_VERSION_TAG ?= 'cu113'
PYTORCH_VERSION ?= '1.10.0'
TORCHVISION_VERSION ?= '0.11.1'
PYTORCH_TAG ?= mosaicml/pytorch
COMPOSER_TAG ?= mosaicml/composer
COMPOSER_EXTRA_DEPS ?= base
ravi-mosaicml marked this conversation as resolved.
Show resolved Hide resolved

.PHONY: build

build:
docker build .. -t $(IMAGE_NAME):${IMAGE_TAG} -f composer/Dockerfile --build-arg COMPOSER_LIB_DEP=${BUILD_TYPE}
base:
cd pytorch && docker build -t $(PYTORCH_TAG) \
--build-arg BASE_IMAGE=$(BASE_IMAGE) \
--build-arg CUDA_VERSION_TAG=$(CUDA_VERSION_TAG) \
--build-arg PYTHON_VERSION=$(PYTHON_VERSION) \
--build-arg PYTORCH_VERSION=$(PYTORCH_VERSION) \
--build-arg TORCHVISION_VERSION=$(TORCHVISION_VERSION) \
.

composer: base
cd .. && docker build -t $(COMPOSER_TAG) \
--build-arg BASE_IMAGE=$(PYTORCH_TAG) \
--build-arg COMPOSER_EXTRA_DEPS=$(COMPOSER_EXTRA_DEPS) \
-f docker/composer/Dockerfile \
.

build: composer
12 changes: 6 additions & 6 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ To simplify environment setup for the MosaicML `Composer` library, we provide a
leverage.

All Docker images come preinstalled with the following packages:
* Ubuntu 18.04
* Python 3.8.0
* Nvidia CUDA 11.1.1
* PyTorch 1.9.0
* Ubuntu 20.04
* Python
* Pytorch
* Nvidia CUDA (except if using a CPU-only flavor of pytorch)
ravi-mosaicml marked this conversation as resolved.
Show resolved Hide resolved

Additional dependencies are installed as required by the `composer` library and particular flavor (see below)

Expand Down Expand Up @@ -50,8 +50,8 @@ docker pull mosaicml/composer:all
make

# Build 'dev' image
BUILD_TYPE=dev make
COMPOSER_EXTRA_DEPS=dev make

# Build 'all' image
BUILD_TYPE=all make
COMPOSER_EXTRA_DEPS=all make
```
Loading