Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 36 additions & 39 deletions automation/script/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,48 +50,29 @@ def experiment_run(self_module, i):
exp = {}

cur_dir = os.getcwd()
r = self_module.search(i.copy())

r = self_module._select_script(i)
if r['return'] > 0:
return r

lst = r['list']
if not lst:
return {'return': 1, 'error': 'No scripts were found'}

# Process each artifact
for artifact in sorted(lst, key=lambda x: x.meta.get('alias', '')):
meta, script_path = artifact.meta, artifact.path
tags, script_alias, script_uid = meta.get(
"tags", []), meta.get(
'alias', ''), meta.get(
'uid', '')

# Execute the experiment script
mlc_script_input = {
'action': 'run', 'target': 'script'
}
if exp:
for key in exp:
ii = {**mlc_script_input, **run_input}
if isinstance(exp[key], list):
for val in exp[key]:
ii[key] = val
r = run_script_and_tag_experiment(
ii,
self_module.action_object,
experiment_action,
tags,
extra_exp_tags,
meta,
skip_state_save,
logger)
if r['return'] > 0:
return r
elif isinstance(exp[key], dict):
return {
'return': 1, 'error': 'Dictionary inputs are not supported for mlc experiment script'}
else:
ii[key] = exp[key]
script = r['script']

meta, script_path = script.meta, script.path
tags, script_alias, script_uid = meta.get(
"tags", []), meta.get(
'alias', ''), meta.get(
'uid', '')

# Execute the experiment script
mlc_script_input = {
'action': 'run', 'target': 'script'
}
if exp:
for key in exp:
ii = {**mlc_script_input, **run_input}
if isinstance(exp[key], list):
for val in exp[key]:
ii[key] = val
r = run_script_and_tag_experiment(
ii,
self_module.action_object,
Expand All @@ -103,6 +84,22 @@ def experiment_run(self_module, i):
logger)
if r['return'] > 0:
return r
elif isinstance(exp[key], dict):
return {
'return': 1, 'error': 'Dictionary inputs are not supported for mlc experiment script'}
else:
ii[key] = exp[key]
r = run_script_and_tag_experiment(
ii,
self_module.action_object,
experiment_action,
tags,
extra_exp_tags,
meta,
skip_state_save,
logger)
if r['return'] > 0:
return r

return {'return': 0}

Expand Down
2 changes: 1 addition & 1 deletion automation/script/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -4669,7 +4669,7 @@ def _select_script(self, i):
sorted_list = sorted(lst, key=lambda x: x.meta.get('alias', ''))

# If quiet mode is off, prompt the user
if not i.get('quiet', False):
if not i.get('quiet', False) and len(sorted_list) > 1:
print("\nAvailable scripts:")
for idx, artifact in enumerate(sorted_list, 1):
meta = artifact.meta
Expand Down
221 changes: 39 additions & 182 deletions automation/script/remote_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections import defaultdict
import os
from mlc.main import ExperimentAction
import mlc.utils as utils
from mlc import utils
from utils import *
Expand Down Expand Up @@ -39,193 +38,51 @@ def remote_run(self_module, i):
return prune_result

run_input = prune_result['new_input']

r = convert_input(i)
mlc_run_cmd = run_input['mlc_run_cmd']

cur_dir = os.getcwd()
r = self_module.search(i.copy())
if r['return'] > 0:
return r

lst = r['list']
if not lst:
return {'return': 1, 'error': 'No scripts were found'}

# Process each artifact
for artifact in sorted(lst, key=lambda x: x.meta.get('alias', '')):
meta, script_path = artifact.meta, artifact.path
tags, script_alias, script_uid = meta.get(
"tags", []), meta.get(
'alias', ''), meta.get(
'uid', '')

# Execute the experiment script
mlc_script_input = {
'action': 'run', 'target': 'script'
}
if exp:
for key in exp:
ii = {**mlc_script_input, **run_input}
if isinstance(exp[key], list):
for val in exp[key]:
ii[key] = val
r = run_script_and_tag_experiment(
ii,
self_module.action_object,
experiment_action,
tags,
extra_exp_tags,
meta,
skip_state_save,
logger)
if r['return'] > 0:
return r
elif isinstance(exp[key], dict):
return {
'return': 1, 'error': 'Dictionary inputs are not supported for mlc experiment script'}
else:
ii[key] = exp[key]
r = run_script_and_tag_experiment(
ii,
self_module.action_object,
experiment_action,
tags,
extra_exp_tags,
meta,
skip_state_save,
logger)
if r['return'] > 0:
return r

return {'return': 0}


def run_script_and_tag_experiment(
ii, script_action, experiment_action, tags, extra_exp_tags, script_meta, skip_state_save, logger):

current_path = os.path.abspath(os.getcwd())
experiment_meta = {}
recursion_spaces = ''
exp_tags = tags + extra_exp_tags
ssi = {'action': 'update',
'target': 'experiment',
'script_alias': script_meta['alias'],
'script_uid': script_meta['uid'],
'tags': ','.join(exp_tags),
'extra_tags': ",".join(extra_exp_tags),
'meta': experiment_meta,
'force': True}

r = experiment_action.access(ssi)
r = self_module._select_script(i)
if r['return'] > 0:
return r

experiment = r['list'][0]

logger.debug(
recursion_spaces +
' - Changing to {}'.format(experiment.path))

os.chdir(experiment.path)
# Get current datetime in YYYY-MM-DD_HH-MM-SS format
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# Create a folder name using the timestamp
folder_name = f"run_{timestamp}"

# Create the directory
os.makedirs(folder_name, exist_ok=True)
os.chdir(folder_name)

if not skip_state_save:
ssi = {'action': 'run',
'target': 'script',
'tags': 'save,system,state',
'outfile': 'system_state_before.json',
'quiet': True
}
r = script_action.access(ssi)
if r['return'] > 0:
return r

start_time = time.time()
r = script_action.access(ii)
script = r['script']

meta, script_path = script.meta, script.path
tags, script_alias, script_uid = meta.get(
"tags", []), meta.get(
'alias', ''), meta.get(
'uid', '')

# Execute the experiment script
mlc_script_input = {
'action': 'run', 'target': 'script'
}

run_cmds = []
remote_mlc_python_venv = i.get('remote_python_venv', 'mlcflow')
run_cmds.append(f". {remote_mlc_python_venv}/bin/activate")
if i.get('remote_pull_mlc_repos', False):
run_cmds.append("mlc pull repo")

script_run_cmd = " ".join(mlc_run_cmd.split(" ")[3:])
run_cmds.append(f"mlcr {script_run_cmd}")

remote_inputs = {}
for key in ["host", "port", "user", "client_refresh",
"password", "skip_host_verify", "ssh_key_file"]:
if i.get(f"remote_{key}"):
remote_inputs[key] = i[f"remote_{key}"]

# Execute the remote command
mlc_remote_input = {
'action': 'run', 'target': 'script', 'tags': 'remote,run,cmds,ssh',
'script_tags': i.get('tags'), 'run_cmds': run_cmds,
**remote_inputs
}

r = self_module.action_object.access(mlc_remote_input)
if r['return'] > 0:
return r

end_time = time.time()
elapsed = end_time - start_time
time_taken_string = format_elapsed(elapsed)
logger.info(f"Time taken: {time_taken_string}")

if not skip_state_save:
ssi['outfile'] = 'system_state_after.json'
r = script_action.access(ssi)
if r['return'] > 0:
return r

'''
exp_tags = tags
ii = {'action': 'update',
'target': 'experiment',
'uid': experiment.meta['uid'],
'meta': experiment.meta,
'script_alias': meta['alias'],
'replace_lists': True, # To replace tags
'tags': ','.join(exp_tags)}

r = experiment_action.access(ii)
if r['return'] > 0:
return r
'''
os.chdir(current_path)
logger.info(
f"Experiment entry saved at: {os.path.join(experiment.path, folder_name)}")

return {'return': 0, 'experiment': experiment, 'folder_name': folder_name}

elif seconds < 3600:
mins, secs = divmod(seconds, 60)
return f"{int(mins)} minutes {secs:.1f} seconds"
else:
hours, remainder = divmod(seconds, 3600)
mins, secs = divmod(remainder, 60)
return f"{int(hours)} hours {int(mins)} minutes {secs:.1f} seconds"


def parse_value(val):
if isinstance(val, list):
return [parse_value(v) for v in val]

val = str(val)

# Handle range inputs like 2:10 or 2:10:2
if ':' in val:
parts = val.split(':')
try:
parts = list(map(int, parts))
if len(parts) == 2:
return list(range(parts[0], parts[1] + 1))
elif len(parts) == 3:
return list(range(parts[0], parts[1] + 1, parts[2]))
except ValueError:
pass # Not a valid range, fall through

# Convert to int if possible
if val.isdigit():
return int(val)

return val


def convert_input(input_dict):
output = defaultdict(dict)

for key, value in input_dict.items():
if '.' in key:
main_key, sub_key = key.split('.', 1)
output[main_key][sub_key] = parse_value(value)
elif isinstance(value, dict):
output[key].update({k: parse_value(v) for k, v in value.items()})

return dict(output)
return {'return': 0}
Loading