Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to log subprocess output to files in DDP launcher. #33193

Closed
wants to merge 14 commits into from
65 changes: 54 additions & 11 deletions torch/distributed/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@

::

>>> torch.cuda.set_device(arg.local_rank) # before your code runs
>>> torch.cuda.set_device(args.local_rank) # before your code runs

or

::

>>> with torch.cuda.device(arg.local_rank):
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run

3. In your training program, you are supposed to call the following function
Expand All @@ -111,8 +111,8 @@
::

model = torch.nn.parallel.DistributedDataParallel(model,
device_ids=[arg.local_rank],
output_device=arg.local_rank)
device_ids=[args.local_rank],
output_device=args.local_rank)

Please ensure that ``device_ids`` argument is set to be the only GPU device id
that your code will be operating on. This is generally the local rank of the
Expand Down Expand Up @@ -142,6 +142,8 @@
import os
from argparse import ArgumentParser, REMAINDER

node_local_rank_stdout_filename = "node_{}_local_rank_{}_stdout"
node_local_rank_stderr_filename = "node_{}_local_rank_{}_stderr"

def parse_args():
"""
Expand Down Expand Up @@ -185,6 +187,16 @@ def parse_args():
parser.add_argument("--no_python", default=False, action="store_true",
help="Do not prepend the training script with \"python\" - just exec "
"it directly. Useful when the script is not a Python script.")
parser.add_argument(
"--logdir",
default=None,
type=str,
help=f"""Relative path to write subprocess logs to. Passing in a relative
path will create a directory if needed, and write the stdout and stderr to files
{node_local_rank_stdout_filename} and {node_local_rank_stderr_filename}. Note that
successive runs with the same path to write logs to will overwrite existing logs,
so be sure to save logs as needed.""",
)

# positional
parser.add_argument("training_script", type=str,
Expand Down Expand Up @@ -220,6 +232,17 @@ def main():
"your application as needed. \n"
"*****************************************".format(current_env["OMP_NUM_THREADS"]))

if args.logdir:
# Possibly create the directory to write subprocess log output to.
if os.path.exists(args.logdir):
if not os.path.isdir(args.logdir):
raise ValueError("argument --logdir must be a path to a directory.")
else:
# create the relative directory
os.mkdir(os.path.join(os.getcwd(), args.logdir))

subprocess_file_handles = []

for local_rank in range(0, args.nproc_per_node):
# each process's rank
dist_rank = args.nproc_per_node * args.node_rank + local_rank
Expand All @@ -246,15 +269,35 @@ def main():

cmd.extend(args.training_script_args)

process = subprocess.Popen(cmd, env=current_env)
if args.logdir:
directory_path = os.path.join(os.getcwd(), args.logdir)
node_rank = args.node_rank
stdout_file_name = node_local_rank_stdout_filename.format(node_rank, local_rank)
stderr_file_name = node_local_rank_stderr_filename.format(node_rank, local_rank)
stdout_handle = open(os.path.join(directory_path, stdout_file_name), "w")
stderr_handle = open(os.path.join(directory_path, stderr_file_name), "w")
subprocess_file_handles.append((stdout_handle, stderr_handle))
stdout_name = stdout_handle.name
stderr_name = stderr_handle.name
print(f"""Note: Stdout and stderr for node {node_rank} rank {local_rank} will
be written to {stdout_name}, {stderr_name} respectively.""")

stdout_handle = None if not subprocess_file_handles else subprocess_file_handles[local_rank][0]
stderr_handle = None if not subprocess_file_handles else subprocess_file_handles[local_rank][1]
process = subprocess.Popen(cmd, env=current_env, stdout=stdout_handle, stderr=stderr_handle)
processes.append(process)

for process in processes:
process.wait()
if process.returncode != 0:
raise subprocess.CalledProcessError(returncode=process.returncode,
cmd=cmd)

try:
for process in processes:
process.wait()
if process.returncode != 0:
raise subprocess.CalledProcessError(returncode=process.returncode,
cmd=cmd)
finally:
# close open file descriptors
for (stdout_handle, stderr_handle) in subprocess_file_handles:
stdout_handle.close()
stderr_handle.close()

if __name__ == "__main__":
main()