Skip to content

Commit

Permalink
Significant logging improvements, specifically for parallel runs.
Browse files Browse the repository at this point in the history
  • Loading branch information
lzkelley committed Jun 18, 2024
1 parent 53951d2 commit bdfe7f7
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 89 deletions.
11 changes: 10 additions & 1 deletion holodeck/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

# ---- Define Global Parameters

LOG_SUFFIX = '.log'
LOG_FILENAME_WITH_TIME_STAMP = False

class Parameters:
"""These are WMAP9 parameters, see: [WMAP9]_ Table 3, WMAP+BAO+H0
Expand Down Expand Up @@ -73,7 +75,14 @@ class Parameters:
# ---- Load logger

from . import logger # noqa
log = logger.get_logger(__name__, logging.WARNING) #: global root logger from `holodeck.logger`
log = logger.get_logger(__name__, level_stream=logging.WARNING) #: global root logger from `holodeck.logger`
log.setLevel(logging.WARNING)

def log_to_file(**kwargs):
logger.log_to_file(log, **kwargs)

def set_log_level(level):
log.setLevel(level)

# ---- Load cosmology instance

Expand Down
4 changes: 2 additions & 2 deletions holodeck/librarian/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def sam_lib_combine(
lib_path = lib_tools.get_sam_lib_fname(path_output, gwb_only, library=library)
if lib_path.exists():
lvl = log.INFO if recreate else log.WARNING
log.log(lvl, f"combined library already exists: {lib_path}, run with `-r` to recreate.")
log.log(lvl, f"Combined library already exists: {lib_path}, run with `-r` to recreate.")
if not recreate:
return

Expand All @@ -135,7 +135,7 @@ def sam_lib_combine(

if path_pspace is None:
path_pspace = path_output
pspace, pspace_fname = lib_tools.load_pspace_from_path(path_pspace, log=log)
pspace, pspace_fname = lib_tools.load_pspace_from_path(path_pspace)
args, args_fname = holo.librarian.gen_lib.load_config_from_path(path_pspace, log=log)

log.info(f"loaded param space: {pspace} from '{pspace_fname}'")
Expand Down
127 changes: 68 additions & 59 deletions holodeck/librarian/gen_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from holodeck.constants import YR
import holodeck.librarian
import holodeck.librarian.combine
from holodeck import log
from holodeck.librarian import (
lib_tools, ARGS_CONFIG_FNAME, PSPACE_DOMAIN_EXTREMA, DIRNAME_LIBRARY_SIMS, DIRNAME_DOMAIN_SIMS
)
Expand All @@ -44,7 +45,7 @@
# FILES_COPY_TO_OUTPUT = [__file__, holo.librarian.__file__, holo.param_spaces.__file__]
FILES_COPY_TO_OUTPUT = []

comm = None
# comm = None


def main(): # noqa : ignore complexity warning
Expand Down Expand Up @@ -75,16 +76,19 @@ def main(): # noqa : ignore complexity warning
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
log.info(f"Loaded MPI communicator: {comm.rank=} {comm.size=} {log.comm_rank=}")
except ModuleNotFoundError as err:
comm = None
holo.log.error(f"failed to load `mpi4py` in {__file__}: {err}")
holo.log.error("`mpi4py` may not be included in the standard `requirements.txt` file.")
holo.log.error("Check if you have `mpi4py` installed, and if not, please install it.")
log.error(f"failed to load `mpi4py` in {__file__}: {err}")
log.error("`mpi4py` may not be included in the standard `requirements.txt` file.")
log.error("Check if you have `mpi4py` installed, and if not, please install it.")
raise err

# ---- setup arguments / settings, loggers, and outputs

if comm.rank == 0:
log.warning(f"Running {__file__} : {comm.rank=} {comm.size=} | {sys.argv=}")
log.debug("Setting up argparse...")
args = _setup_argparse()
else:
args = None
Expand All @@ -93,12 +97,12 @@ def main(): # noqa : ignore complexity warning
args = comm.bcast(args, root=0)

# setup log instance, separate for all processes
log = _setup_log(comm, args)
args.log = log
log.debug("Setting up log...")
_setup_log(comm, args)

if comm.rank == 0:

# get parameter-space class
# get parameter-space class (created new, or load previous save when `args.resume`)
space = _setup_param_space(args)

# copy certain files to output directory
Expand All @@ -114,14 +118,15 @@ def main(): # noqa : ignore complexity warning
if args.resume:
args, config_fname = load_config_from_path(args.output, log)
log.warning(f"Loaded configuration save from {config_fname}")
# `args.resume` may be set to `False` after loading from save; reset to True
args.resume = True
# Save parameter space and args/configuration to output directory
else:
space_fname = space.save(args.output)
log.info(f"saved parameter space {space} to {space_fname}")
log.info(f"Saved parameter space {space} to {space_fname}")

config_fname = _save_config(args)
log.info(f"saved configuration to {config_fname}")
log.info(f"Saved configuration to {config_fname}")

# ---- Split simulations for all processes

Expand Down Expand Up @@ -156,12 +161,11 @@ def main(): # noqa : ignore complexity warning
# If we've loaded a new `args`, then share to all processes from rank=0
if args.resume:
args = comm.bcast(args, root=0)
args.log = log

log.info(
f"param_space={args.param_space}, parameters={space.nparameters}, samples={args.nsamples}\n"
f"sam_shape={args.sam_shape}, nreals={args.nreals}\n"
f"nfreqs={args.nfreqs}, pta_dur={args.pta_dur} [yr]\n"
f"param_space={args.param_space}, parameters={space.nparameters}, samples={args.nsamples}, "
f"sam_shape={args.sam_shape}, nreals={args.nreals}, "
f"nfreqs={args.nfreqs}, pta_dur={args.pta_dur} [yr]"
)

# ---- distribute jobs to processors
Expand All @@ -174,11 +178,11 @@ def main(): # noqa : ignore complexity warning
# ---- iterate over each processors' jobs

beg = datetime.now()
log.info(f"beginning tasks at {beg}")
log.debug(f"beginning tasks at {beg}")
failures = 0
num_done = 0
for sim_num in iterator:
log.info(f"{comm.rank=} {sim_num=}")
log.debug(f"{comm.rank=} {sim_num=}")

# Domain: Vary only one parameter at a time to explore the domain
if args.domain:
Expand All @@ -203,7 +207,7 @@ def main(): # noqa : ignore complexity warning
for kk, vv in params.items():
msg.append(f"{kk}={vv:.4e}")
msg = ", ".join(msg)
log.info(msg)
log.debug(msg)

rv, _sim_fname = run_sam_at_pspace_params(args, space, sim_num, params)

Expand All @@ -226,9 +230,9 @@ def main(): # noqa : ignore complexity warning
comm.barrier()

if (comm.rank == 0):
log.info("Concatenating outputs into single file")
log.warning("Combining simulation files into single library file")
holo.librarian.combine.sam_lib_combine(args.output, log, library=(not args.domain))
log.info("Concatenation completed")
log.info("Library combination completed.")

return

Expand Down Expand Up @@ -274,25 +278,45 @@ def run_sam_at_pspace_params(args, space, pnum, params):
produced that contains a single key: 'fail'. This designates the file as a failure.
"""
log = args.log

# ---- get output filename for this simulation, check if already exists

library_flag = not args.domain
sim_fname = lib_tools._get_sim_fname(args.output_sims, pnum, library=library_flag)

beg = datetime.now()
log.info(f"{pnum=} :: {sim_fname=} beginning at {beg}")
log.info(f"{pnum=} :: {params=} beginning at {beg}")
log.info(f"file exists: {sim_fname.is_file()} | '{sim_fname}'")

if sim_fname.exists():
log.info(f"File {sim_fname} already exists. {args.recreate=}")
temp = np.load(sim_fname)
data_keys = list(temp.keys())
log.info(f"Sim file already exists, {args.recreate=} | '{sim_fname}'")
data = np.load(sim_fname)
data_keys = list(data.keys())

if 'fail' in data_keys:
log.info("Existing file was a failure, re-attempting...")
log.info(f"Existing file was a failure, re-attempting... ({data_keys=})")

# skip existing files unless we specifically want to recreate them
elif not args.recreate:

# Make sure parameters are consistent with expectations
params_array = np.array([params[pn] for pn in space.param_names])
file_params = data['params']
file_param_names = data['param_names']
if not np.all([fpn == pn for fpn, pn in zip(file_param_names, space.param_names)]):
err = f"Mismatch between space param names and loaded parmeter names! {sim_fname=}"
log.exception(err)
log.exception(f"{space.param_names=}")
log.exception(f"{file_param_names=}")
raise RuntimeError(err)

if not np.allclose(file_params, params_array):
err = f"Mismatch between space param names and loaded parmeter names! {sim_fname=}"
log.exception(err)
log.exception(f"{space.param_names=}")
log.exception(f"{file_param_names=}")
raise RuntimeError(err)

return True, sim_fname

# ---- run Model
Expand All @@ -309,10 +333,11 @@ def run_sam_at_pspace_params(args, space, pnum, params):
)
data['params'] = np.array([params[pn] for pn in space.param_names])
data['param_names'] = space.param_names

rv = True
log.debug("Completed model successfully.")

except Exception as err:
log.exception(f"`run_model` FAILED on {pnum=}\n")
log.exception(f"`run_model` FAILED on {pnum=} with {params=}")
log.exception(err)
rv = False
# failed simulations get an output file with a single key: 'fail'
Expand Down Expand Up @@ -448,7 +473,6 @@ def _setup_argparse(*args, **kwargs):
# ---- Create output directories as needed

output.mkdir(parents=True, exist_ok=True)
holo.utils.mpi_print(f"output path: {output}")
args.output = output

if args.domain:
Expand Down Expand Up @@ -479,7 +503,7 @@ def _setup_param_space(args):
For 'resume' runs, load a saved parameter-space instance.
"""
log = args.log
# log = args.log

# ---- Determine and load the parameter-space class

Expand Down Expand Up @@ -507,10 +531,11 @@ def _setup_param_space(args):

if args.resume:
# Load pspace object from previous save
log.info(f"{args.resume=} attempting to load pspace {space_class=} from {args.output=}")
space, space_fname = holo.librarian.load_pspace_from_path(args.output, space_class=space_class, log=log)
log.info(f"{args.resume=} : attempting to load pspace {space_class=} from {args.output=}")
space, space_fname = holo.librarian.load_pspace_from_path(args.output, space_class=space_class)
log.warning(f"Loaded param-space save from {space_fname}")
else:
log.info(f"Constructing a new parameter space from {space_class=} ({args.resume=})")
# we don't use standard samples when constructing a parameter-space 'domain'
nsamples = None if args.domain else args.nsamples
space = space_class(log, nsamples, args.sam_shape, args.seed)
Expand Down Expand Up @@ -543,7 +568,7 @@ def _save_config(args):
with open(fname, 'w') as out:
json.dump(config, out)

args.log.warning(f"Saved to {fname} - {holo.utils.get_file_size(fname)}")
log.warning(f"Saved to {fname} - {holo.utils.get_file_size(fname)}")

return fname

Expand All @@ -554,7 +579,7 @@ def load_config_from_path(path, log):
with open(fname, 'r') as inp:
config = json.load(inp)

log.info("Loaded configuration from {fname}")
log.info(f"Loaded configuration from {fname}")

pop_keys = [
'holodeck_version', 'holodeck_librarian_version', 'holodeck_git_hash', 'created'
Expand All @@ -572,23 +597,14 @@ def load_config_from_path(path, log):


def _setup_log(comm, args):
"""Setup up the logging module logger for output messaging.
Arguemnts
---------
comm
args

Returns
-------
log : ``logging.Logger`` instance
# ---- setup logger level

"""
beg = datetime.now()
log_lvl = args.verbose if comm.rank == 0 else holo.logger.ERROR
holo.set_log_level(log_lvl)

# ---- setup name of log file
# ---- set name of log file

str_time = f"{beg.strftime('%Y%m%d-%H%M%S')}"
# get the path to the directory containing the `holodeck` module
# e.g.: "/Users/lzkelley/Programs/nanograv/holodeck"
holo_parent = Path(holo.__file__).parent.parent
Expand All @@ -597,24 +613,16 @@ def _setup_log(comm, args):
log_name = Path(__file__).relative_to(holo_parent)
# e.g.: "holodeck.librarian.gen_lib"
log_name = ".".join(log_name.with_suffix("").parts)
# e.g.: "holodeck.librarian.gen_lib__20230918-140722"
log_name = f"{log_name}__{str_time}"
# e.g.: "_holodeck.librarian.gen_lib__20230918-140722__r0003"
if comm.rank > 0:
log_name = f"_{log_name}__r{comm.rank:04d}"

output = args.output_logs
fname = f"{output.joinpath(log_name)}.log"
holo.log_to_file(base_name=log_name, path=output)

# ---- setup logger

log_lvl = args.verbose if comm.rank == 0 else holo.logger.DEBUG
tostr = sys.stdout if comm.rank == 0 else False
log = holo.logger.get_logger(name=log_name, level_stream=log_lvl, tofile=fname, tostr=tostr)
log.info(f" Processor: rank={comm.rank=} / size={comm.size}")
log.info(f"Output path: {output}")
log.info(f" log: {fname}")
log.info(f" log: {log.filename}")
log.info(args)
return log

return


# ==============================================================================
Expand All @@ -626,7 +634,7 @@ def make_plots(args, data, sim_fname):
"""Generate diagnostic plots from the given simulation data and save to file.
"""
import matplotlib.pyplot as plt
log = args.log
# log = args.log
log.info("generating characteristic strain/psd plots")
log.info("generating strain plots")
plot_fname = args.output_plots.joinpath(sim_fname.name)
Expand Down Expand Up @@ -766,6 +774,7 @@ def make_pars_plot(fobs, hc_ss, hc_bg, sspar, bgpar):


if __name__ == "__main__":
holo.set_log_level(holo.log.WARNING)
main()

#! the below doesn't work for catching errors... maybe because of comm.barrier() calls?
Expand Down
Loading

0 comments on commit bdfe7f7

Please sign in to comment.