Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp show: display running/queued state for experiments #6174

Merged
merged 14 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 26 additions & 6 deletions dvc/command/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ def _collect_rows(

new_checkpoint = True
for i, (rev, exp) in enumerate(experiments.items()):
queued = str(exp.get("queued") or "")
if exp.get("running"):
state = "Running"
elif exp.get("queued"):
state = "Queued"
else:
state = FILL_VALUE
executor = exp.get("executor", FILL_VALUE)
is_baseline = rev == "baseline"

if is_baseline:
Expand Down Expand Up @@ -189,10 +195,11 @@ def _collect_rows(
row = [
exp_name,
name_rev,
queued,
typ,
_format_time(exp.get("timestamp")),
parent,
state,
executor,
]
_extend_row(
row, metric_names, exp.get("metrics", {}).items(), precision
Expand Down Expand Up @@ -304,7 +311,15 @@ def experiments_table(

from dvc.compare import TabularData

headers = ["Experiment", "rev", "queued", "typ", "Created", "parent"]
headers = [
"Experiment",
"rev",
"typ",
"Created",
"parent",
"State",
"Executor",
]
td = TabularData(
lconcat(headers, metric_headers, param_headers), fill_value=FILL_VALUE
)
Expand Down Expand Up @@ -340,8 +355,7 @@ def prepare_exp_id(kwargs) -> "Text":
text.append(suff)

tree = experiment_types[typ]
queued = "*" if kwargs.get("queued") else ""
pref = (f"{tree} " if tree else "") + queued
pref = f"{tree} " if tree else ""
return Text(pref) + text


Expand Down Expand Up @@ -381,16 +395,22 @@ def show_experiments(
if no_timestamp:
td.drop("Created")

for col in ("State", "Executor"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for other columns to be empty? If so, would we want to hide them, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be any other columns in the table right now that can be empty

if td.is_empty(col):
td.drop(col)

row_styles = lmap(baseline_styler, td.column("typ"))

merge_headers = ["Experiment", "rev", "queued", "typ", "parent"]
merge_headers = ["Experiment", "rev", "typ", "parent"]
td.column("Experiment")[:] = map(prepare_exp_id, td.as_dict(merge_headers))
td.drop(*merge_headers[1:])

headers = {"metrics": metric_headers, "params": param_headers}
styles = {
"Experiment": {"no_wrap": True, "header_style": "black on grey93"},
"Created": {"header_style": "black on grey93"},
"State": {"header_style": "black on grey93"},
"Executor": {"header_style": "black on grey93"},
}
header_bg_colors = {"metrics": "cornsilk1", "params": "light_cyan1"}
styles.update(
Expand Down
4 changes: 4 additions & 0 deletions dvc/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def project(self, *col_names: str) -> None:
self.drop(*(set(self._keys) - set(col_names)))
self._keys = list(col_names)

def is_empty(self, col_name: str) -> bool:
col = self.column(col_name)
return not any(item != self._fill_value for item in col)

def to_csv(self) -> str:
import csv
from io import StringIO
Expand Down
115 changes: 91 additions & 24 deletions dvc/repo/experiments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Experiments:
r"(?P<checkpoint>-checkpoint)?$"
)
EXEC_TMP_DIR = "exps"
EXEC_PID_DIR = os.path.join(EXEC_TMP_DIR, "run")

StashEntry = namedtuple(
"StashEntry", ["index", "rev", "baseline_rev", "branch", "name"]
Expand Down Expand Up @@ -588,26 +589,21 @@ def _reproduce_revs(
)

executors = self._init_executors(to_run)
exec_results = self._executors_repro(executors, **kwargs)

if keep_stash:
try:
exec_results = {}
exec_results.update(self._executors_repro(executors, **kwargs))
finally:
# only drop successfully run stashed experiments
to_drop = sorted(
(
stash_revs[rev][0]
for rev in exec_results
if rev in stash_revs
),
reverse=True,
)
else:
# drop all stashed experiments
to_drop = sorted(
(stash_revs[rev][0] for rev in to_run if rev in stash_revs),
reverse=True,
)
for index in to_drop:
self.stash.drop(index)
to_drop = [
entry.index
for rev, entry in to_run.items()
if (
entry.index is not None
and (not keep_stash or rev in exec_results)
)
]
for index in sorted(to_drop, reverse=True):
self.stash.drop(index)

result: Dict[str, str] = {}
for _, exp_result in exec_results.items():
Expand All @@ -623,6 +619,9 @@ def _init_executors(self, to_run):
base_tmp_dir = os.path.join(self.repo.tmp_dir, self.EXEC_TMP_DIR)
if not os.path.exists(base_tmp_dir):
makedirs(base_tmp_dir)
pid_dir = os.path.join(self.repo.tmp_dir, self.EXEC_PID_DIR)
if not os.path.exists(pid_dir):
makedirs(pid_dir)
for stash_rev, item in to_run.items():
self.scm.set_ref(EXEC_HEAD, item.rev)
self.scm.set_ref(EXEC_MERGE, stash_rev)
Expand All @@ -649,9 +648,10 @@ def _init_executors(self, to_run):

return executors

@unlocked_repo
def _executors_repro(
self, executors: dict, jobs: Optional[int] = 1
) -> Mapping[str, Mapping[str, str]]:
) -> Dict[str, Dict[str, str]]:
"""Run dvc repro for the specified BaseExecutors in parallel.

Returns:
Expand All @@ -667,6 +667,11 @@ def _executors_repro(
with ProcessPoolExecutor(max_workers=jobs) as workers:
futures = {}
for rev, executor in executors.items():
pidfile = os.path.join(
self.repo.tmp_dir,
self.EXEC_PID_DIR,
f"{rev}{executor.PIDFILE_EXT}",
)
future = workers.submit(
executor.reproduce,
executor.dvc_dir,
Expand All @@ -675,6 +680,8 @@ def _executors_repro(
name=executor.name,
rel_cwd=rel_cwd,
log_level=logger.getEffectiveLevel(),
pidfile=pidfile,
git_url=executor.git_url,
)
futures[future] = (rev, executor)

Expand All @@ -683,12 +690,15 @@ def _executors_repro(
except KeyboardInterrupt:
# forward SIGINT to any running executor processes and
# cancel any remaining futures
workers.shutdown(wait=False)
pids = {}
while not pid_q.empty():
rev, pid = pid_q.get()
pids[rev] = pid
for future, (rev, _) in futures.items():
if future.running():
# if future has already been started by the scheduler
# we still have to wait until it tells us its PID
while rev not in pids:
rev, pid = pid_q.get()
pids[rev] = pid
os.kill(pids[rev], signal.SIGINT)
elif not future.done():
future.cancel()
Expand Down Expand Up @@ -735,7 +745,10 @@ def on_diverged(ref: str, checkpoint: bool):
raise ExperimentExistsError(ref_info.name)

for ref in executor.fetch_exps(
self.scm, force=exec_result.force, on_diverged=on_diverged
self.scm,
executor.git_url,
force=exec_result.force,
on_diverged=on_diverged,
):
exp_rev = self.scm.get_ref(ref)
if exp_rev:
Expand All @@ -747,6 +760,8 @@ def on_diverged(ref: str, checkpoint: bool):
@unlocked_repo
def _workspace_repro(self) -> Mapping[str, str]:
"""Run the most recently stashed experiment in the workspace."""
from dvc.utils.fs import makedirs

from .executor.base import BaseExecutor

entry = first(self.stash_revs.values())
Expand All @@ -766,12 +781,19 @@ def _workspace_repro(self) -> Mapping[str, str]:
self.scm.remove_ref(EXEC_BRANCH)
try:
orig_checkpoint = self.scm.get_ref(EXEC_CHECKPOINT)
pid_dir = os.path.join(self.repo.tmp_dir, self.EXEC_PID_DIR)
if not os.path.exists(pid_dir):
makedirs(pid_dir)
pidfile = os.path.join(
pid_dir, f"workspace{BaseExecutor.PIDFILE_EXT}"
)
exec_result = BaseExecutor.reproduce(
None,
rev,
name=entry.name,
rel_cwd=relpath(os.getcwd(), self.scm.root_dir),
log_errors=False,
pidfile=pidfile,
)

if not exec_result.exp_hash:
Expand Down Expand Up @@ -861,6 +883,51 @@ def get_exact_name(self, rev: str):
return ExpRefInfo.from_ref(ref).name
return None

def get_running_exps(self) -> Dict[str, int]:
"""Return info for running experiments."""
from dvc.utils.serialize import load_yaml

from .executor.base import BaseExecutor, ExecutorInfo

result = {}
for pidfile in self.repo.fs.walk_files(
os.path.join(self.repo.tmp_dir, self.EXEC_PID_DIR)
):
rev, _ = os.path.splitext(os.path.basename(pidfile))

try:
info = ExecutorInfo.from_dict(load_yaml(pidfile))
if rev == "workspace":
# If we are appending to a checkpoint branch in a workspace
# run, show the latest checkpoint as running.
last_rev = self.scm.get_ref(EXEC_BRANCH)
if last_rev:
result[last_rev] = info.to_dict()
else:
result[rev] = info.to_dict()
else:
result[rev] = info.to_dict()
if info.git_url:

def on_diverged(_ref: str, _checkpoint: bool):
return False

for ref in BaseExecutor.fetch_exps(
self.scm,
info.git_url,
on_diverged=on_diverged,
):
logger.debug(
"Updated running experiment '%s'.", ref
)
last_rev = self.scm.get_ref(ref)
result[rev]["last"] = last_rev
if last_rev:
result[last_rev] = info.to_dict()
except OSError:
pass
return result

def apply(self, *args, **kwargs):
from dvc.repo.experiments.apply import apply

Expand Down