Skip to content

Commit

Permalink
More attemptes to speed up remake.
Browse files Browse the repository at this point in the history
Speed up loading of remakefile only.
Disable ALL file checks! Only remake on task code changes.
  • Loading branch information
markmuetz committed Sep 27, 2023
1 parent 3f34cf7 commit 6e7f9c1
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 75 deletions.
7 changes: 4 additions & 3 deletions remake/global_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def __call__(self, key):
if self.curr_key is not None:
self.timers[(self.curr_key, key)].append(time - self.last_time)

self.last_time = time
self.curr_key = key
self.last_time = pd.Timestamp.now()

def __str__(self):
output = []
Expand All @@ -35,8 +35,9 @@ def __str__(self):
time_total_ms = np.sum(times_ms)
time_mean_ms = np.mean(times_ms)
time_std_ms = np.std(times_ms)
output.append((f'{k1} -> {k2}', f'{time_total_ms / 1e6:.2g}s', f'{time_mean_ms / 1e6:.2g}s', f'(+/- {time_std_ms / 1e6:.2g}s)'))
return f'{self.name}\n' + '=' * len(self.name) + '\n' + tabulate(output, headers=('tx', 'total', 'mean', 'std'))
count = len(times_ms)
output.append((f'{k1} -> {k2}', f'{time_total_ms / 1e6:.2g}s', f'{time_mean_ms / 1e6:.2g}s', f'(+/- {time_std_ms / 1e6:.2g}s)', f'{count}'))
return f'{self.name}\n' + '=' * len(self.name) + '\n' + tabulate(output, headers=('tx', 'total', 'mean', 'std', 'count'))

def start(self):
# TODO:
Expand Down
127 changes: 69 additions & 58 deletions remake/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from remake.flags import RemakeOn
from remake.util import sha1sum
from remake.global_timer import get_global_timer


logger = getLogger(__name__)
Expand Down Expand Up @@ -48,51 +49,61 @@ class NoMetadata(Exception):
class MetadataManager:
"""Creates and stores maps of PathMetadata and TaskMetadata"""
# Needed because it keeps track of all PathMetadata objs, and stops there being duplicate ones for inputs.
def __init__(self, task_control_name, dotremake_dir, content_checks):
def __init__(self, task_control_name, dotremake_dir, content_checks, track_files=False):
self.task_control_name = task_control_name
self.dotremake_dir = dotremake_dir
self.content_checks = content_checks
self.track_files = track_files
self.path_metadata_map = {}
self.task_metadata_map = {}

def create_task_metadata(self, task):
create_task_metadata_timer = get_global_timer('create_task_metadata_timer')
create_task_metadata_timer('4.4.0')

task_inputs_metadata_map = {}
task_outputs_metadata_map = {}
for input_path, special_input_path in zip(task.inputs.values(), task.special_inputs.values()):
if input_path not in self.path_metadata_map:
input_md = self._create_path_metadata(input_path, special_input_path)
else:
input_md = self.path_metadata_map[input_path]
task_inputs_metadata_map[input_path] = input_md

for output_path, special_output_path in zip(task.outputs.values(), task.special_outputs.values()):
if output_path not in self.path_metadata_map:
output_md = self._create_path_metadata(output_path, special_output_path)
else:
output_md = self.path_metadata_map[output_path]
task_outputs_metadata_map[output_path] = output_md

if self.track_files:
for input_path, special_input_path in zip(task.inputs.values(), task.special_inputs.values()):
if input_path not in self.path_metadata_map:
input_md = self._create_path_metadata(input_path, special_input_path)
else:
input_md = self.path_metadata_map[input_path]
task_inputs_metadata_map[input_path] = input_md
create_task_metadata_timer('4.4.1')

for output_path, special_output_path in zip(task.outputs.values(), task.special_outputs.values()):
if output_path not in self.path_metadata_map:
output_md = self._create_path_metadata(output_path, special_output_path)
else:
output_md = self.path_metadata_map[output_path]
task_outputs_metadata_map[output_path] = output_md
create_task_metadata_timer('4.4.2')
task_md = TaskMetadata(self.task_control_name, self.dotremake_dir,
task, task_inputs_metadata_map, task_outputs_metadata_map,
self.content_checks)
self.task_metadata_map[task] = task_md
create_task_metadata_timer('4.4.3')
return task_md

def check_task_status(self, task):
changed_paths = []
# Issue #34: only hits PathMetadata._load_metadata once now per path.
requires_rerun = RemakeOn.NOT_NEEDED
task_md = self.task_metadata_map[task]
for path in task.inputs.values():
if not path.exists():
task_md.rerun_reasons.append(('input_path_does_not_exist', path))
requires_rerun |= RemakeOn.MISSING_INPUT
continue
if self.content_checks:
path_md = self.path_metadata_map[path]
if path_md.compare_path_with_previous():
task_md.rerun_reasons.append(('input_path_metadata_has_changed', path))
requires_rerun |= RemakeOn.INPUTS_CHANGED
changed_paths.append(path)
if self.track_files:
for path in task.inputs.values():
if not path.exists():
task_md.rerun_reasons.append(('input_path_does_not_exist', path))
requires_rerun |= RemakeOn.MISSING_INPUT
continue
if self.content_checks:
path_md = self.path_metadata_map[path]
if path_md.compare_path_with_previous():
task_md.rerun_reasons.append(('input_path_metadata_has_changed', path))
requires_rerun |= RemakeOn.INPUTS_CHANGED
changed_paths.append(path)

task_md.generate_metadata()
requires_rerun = task_md.task_requires_rerun()
Expand All @@ -107,14 +118,15 @@ def _create_path_metadata(self, path, special_input_path):

class TaskMetadata:
def __init__(self, task_control_name, dotremake_dir, task,
inputs_metadata_map, outputs_metadata_map, content_checks):
inputs_metadata_map, outputs_metadata_map, content_checks, track_files=False):
self.task_control_name = task_control_name
self.dotremake_dir = dotremake_dir
self.metadata_dir = dotremake_dir / METADATA_VERSION
self.task = task
self.inputs_metadata_map = inputs_metadata_map
self.outputs_metadata_map = outputs_metadata_map
self.content_checks = content_checks
self.track_files = track_files

self.task_path_hash_key = self.task.path_hash_key()

Expand Down Expand Up @@ -214,29 +226,30 @@ def task_requires_rerun(self):
self.rerun_reasons.append(('task_has_not_been_run', None))
self.requires_rerun |= RemakeOn.NO_TASK_METADATA

logger.debug(' stat all files')
earliest_output_path_mtime = float('inf')
for output in self.task.outputs.values():
if not output.exists():
self.rerun_reasons.append(('output_path_does_not_exist', output))
self.requires_rerun |= RemakeOn.MISSING_OUTPUT
break
earliest_output_path_mtime = min(earliest_output_path_mtime,
output.lstat().st_mtime)
if not self.requires_rerun:
latest_input_path_mtime = 0
for input_path in self.task.inputs.values():
if not input_path.exists():
self.rerun_reasons.append(('input_path_does_not_exist', input_path))
self.requires_rerun |= RemakeOn.MISSING_INPUT
if self.track_files:
logger.debug(' stat all files')
earliest_output_path_mtime = float('inf')
for output in self.task.outputs.values():
if not output.exists():
self.rerun_reasons.append(('output_path_does_not_exist', output))
self.requires_rerun |= RemakeOn.MISSING_OUTPUT
break

latest_input_path_mtime = max(latest_input_path_mtime,
input_path.lstat().st_mtime)
if latest_input_path_mtime > earliest_output_path_mtime:
self.requires_rerun |= RemakeOn.OLDER_OUTPUT
self.rerun_reasons.append(('output_is_older_than_input', None))
logger.debug(' statted all files')
earliest_output_path_mtime = min(earliest_output_path_mtime,
output.lstat().st_mtime)
if not self.requires_rerun:
latest_input_path_mtime = 0
for input_path in self.task.inputs.values():
if not input_path.exists():
self.rerun_reasons.append(('input_path_does_not_exist', input_path))
self.requires_rerun |= RemakeOn.MISSING_INPUT
break

latest_input_path_mtime = max(latest_input_path_mtime,
input_path.lstat().st_mtime)
if latest_input_path_mtime > earliest_output_path_mtime:
self.requires_rerun |= RemakeOn.OLDER_OUTPUT
self.rerun_reasons.append(('output_is_older_than_input', None))
logger.debug(' statted all files')

if not (self.requires_rerun & RemakeOn.NO_TASK_METADATA):
if self.new_metadata['task_source_sha1hex'] != self.metadata['task_source_sha1hex']:
Expand Down Expand Up @@ -277,33 +290,31 @@ def write_task_metadata(self):

class PathMetadata:
def __init__(self, task_control_name, dotremake_dir, path, special_input_path):
PathMetadata_timer = get_global_timer('PathMetadata_timer')
PathMetadata_timer('4.4.0.0')
self.task_control_name = task_control_name
self.dotremake_dir = dotremake_dir
self.path = path
if special_input_path:
self.special_input_path = special_input_path
else:
self.special_input_path = Path(*path.parts[1:])
self.relative_input_path = Path(*path.parts[1:])
PathMetadata_timer('4.4.0.1')
self.metadata_dir = dotremake_dir / METADATA_VERSION
self.file_metadata_dir = self.metadata_dir / 'file_metadata'

if special_input_path.is_absolute():
self.metadata_path = self.file_metadata_dir.joinpath(*(special_input_path.parent.parts[1:] +
(f'{path.name}.metadata',)))
else:
self.metadata_path = self.file_metadata_dir.joinpath(*(special_input_path.parent.parts +
(f'{path.name}.metadata',)))
self.metadata_path = self.file_metadata_dir.joinpath(*(relative_input_path.parent.parts +
(f'{path.name}.metadata',)))

self.task_metadata_path = self.file_metadata_dir.joinpath(*(path.parent.parts[1:] +
(f'{path.name}.created_by.task',)))

PathMetadata_timer('4.4.0.2')
self.metadata = {}
self.new_metadata = {'task_control_name': task_control_name}
self.task_metadata = {}

self.changes = []
self.metadata_has_changed = False
self.need_write = False
PathMetadata_timer('4.4.0.3')

def _load_metadata(self):
if self.metadata_path.exists():
Expand Down
16 changes: 8 additions & 8 deletions remake/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Task(BaseTask):
def __init__(self, task_ctrl, func, inputs, outputs,
*, force=False, depends_on=tuple()):
task_init_timer = get_global_timer(str(self.__class__) + '__init__')
task_init_timer(0)
task_init_timer('2.0')
super().__init__(task_ctrl)
# self.remake_on = True
self.depends_on_sources = []
Expand All @@ -73,11 +73,11 @@ def __init__(self, task_ctrl, func, inputs, outputs,
Task.task_func_cache[depend_obj] = depend_func_source

self.depends_on = depends_on
task_init_timer(1)
task_init_timer('2.1')

if not callable(func):
raise ValueError(f'{func} is not callable')
task_init_timer(2)
task_init_timer('2.2')

self.func = func
if self.func in Task.task_func_cache:
Expand Down Expand Up @@ -111,23 +111,23 @@ def __init__(self, task_ctrl, func, inputs, outputs,

if not outputs:
raise Exception('outputs must be set')
task_init_timer(3)
task_init_timer('2.3')

task_init_timer(3.1)
task_init_timer('2.3.1')
#self.inputs = {k: Path(v).absolute() for k, v in inputs.items()}
#self.outputs = {k: Path(v).absolute() for k, v in outputs.items()}
self.inputs = inputs
self.outputs = outputs
task_init_timer(3.2)
task_init_timer('2.3.2')
self.special_inputs = map_special_paths(self.task_ctrl.special_paths, self.inputs)
self.special_outputs = map_special_paths(self.task_ctrl.special_paths, self.outputs)
task_init_timer(3.3)
task_init_timer('2.3.3')
self.result = None
self.rerun_on_mtime = True
self.tmp_outputs = {}
self.logger = None
self._path_hash_key = None
task_init_timer(4)
task_init_timer('2.4')

def __repr__(self):
return str(self)
Expand Down
14 changes: 13 additions & 1 deletion remake/task_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from remake.flags import RemakeOn
from remake.executor import SingleprocExecutor, MultiprocExecutor, SlurmExecutor
from remake.remake_exceptions import CyclicDependency
from remake.global_timer import get_global_timer

logger = getLogger(__name__)

Expand Down Expand Up @@ -222,23 +223,32 @@ def remaining_tasks(self):

@check_finalized(False)
def add(self, task):
task_ctrl_add_timer = get_global_timer('task_ctrl_add')
task_ctrl_add_timer(4.0)

for output in task.outputs.values():
if output in self.output_task_map:
raise Exception(f'Trying to add {output} twice')
task_ctrl_add_timer(4.1)

task_path_hash_key = task.path_hash_key()
task_ctrl_add_timer(4.2)
if task_path_hash_key in self.task_from_path_hash_key:
raise Exception(f'Trying to add {task} twice')
self.task_from_path_hash_key[task_path_hash_key] = task

task_ctrl_add_timer(4.3)
self.tasks.append(task)
for input_path in task.inputs.values():
self.input_task_map[input_path].append(task)
for output in task.outputs.values():
self.output_task_map[output] = task
task_ctrl_add_timer(4.4)

task_md = self.metadata_manager.create_task_metadata(task)
task_ctrl_add_timer(4.5)
task.add_metadata(task_md)
task_ctrl_add_timer(4.6)

return task

Expand Down Expand Up @@ -383,6 +393,7 @@ def finalize(self):
return self

def _assign_tasks(self):
task_ctrl_assign_task_timer = get_global_timer('task_ctrl_assign_task_timer')
# Assign each task to one of four groups:
# cannot_run: not possible to run task (missing inputs).
# completed: task has been run and does not need to be rerun.
Expand All @@ -393,7 +404,8 @@ def _assign_tasks(self):
logger.debug(f' assign task: {task}')
requires_rerun = self.task_requires_rerun(task)

if task.can_run():
# if task.can_run():
if not (requires_rerun & self.remake_on):
status = 'completed'
if task.force:
status = 'pending'
Expand Down
23 changes: 18 additions & 5 deletions remake/task_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,24 @@ def __new__(mcs, clsname, bases, attrs):
remake.task_ctrl.add(task)
loop_timer(5)
print(loop_timer)
task_init_timer = get_global_timer(str(newcls) + '__init__')
print(task_init_timer)
cond_input_timer = get_global_timer('cond_input_timer')
print(cond_input_timer)
cond_input_timer.reset()

task_ctrl_add_timer = get_global_timer('task_ctrl_add')
print(task_ctrl_add_timer)
task_ctrl_add_timer.reset()

create_task_metadata_timer = get_global_timer('create_task_metadata_timer')
print(create_task_metadata_timer)
create_task_metadata_timer.reset()

PathMetadata_timer = get_global_timer('PathMetadata_timer')
print(PathMetadata_timer)
PathMetadata_timer.reset()

# task_init_timer = get_global_timer(str(newcls) + '__init__')
# print(task_init_timer)
# cond_input_timer = get_global_timer('cond_input_timer')
# print(cond_input_timer)
# cond_input_timer.reset()
else:
logger.debug(f' creating instance of {clsname}')
inputs = create_inputs_fn(**{})
Expand Down

0 comments on commit 6e7f9c1

Please sign in to comment.