diff --git a/shadho/shadho.py b/shadho/shadho.py index e288702..ec5eac9 100644 --- a/shadho/shadho.py +++ b/shadho/shadho.py @@ -35,10 +35,17 @@ class Shadho(pyrameter.FMin): Parameters ---------- + exp_key : str + Unique ID for this experiment, used when referencing existing results. cmd : str or function The command to run on remote workers or function to run locally. spec : dict The specification defining search spaces. + method : {'random','bayes','tpe','smac','pso'} or callable + The optimization method to use. + backend : str + Filepath (JSON) or URL (MongoDB, SQLite) to the backend storage to + record search spaces, domain state, trials, etc. files : list of str or WQFile The files to send to remote workers for task execution. use_complexity : bool, optional @@ -46,46 +53,22 @@ class Shadho(pyrameter.FMin): use_priority : bool, optional If True, use the priority heuristic to adjust search proportions. timeout : int, optional - Number of seconds to search for. + Number of seconds to search for. If None or a negative number is + passed, search with no timeout until ``max_tasks`` is reached. + max_tasks : int, optional + The maximum number of trials to run. If not passed, tasks will + continue processing until ``timeout`` or indefinitely. max_queued_tasks : int, optional Number of tasks to queue at a time. await_pending : bool, optional If True, wait for all running tasks to complete after `timeout`. Default: False + max_evals : int, optional + The number of times to evaluate a set of generated hyperparameters. max_resubmissions: int, optional Maximum number of times to resubmit a particular parameterization for processing if task failure occurs. Default is not to resubmit. - Attributes - ---------- - config : `shadho.config.ShadhoConfig` - Global configurations for shadho. - backend : `pyrameter.ModelGroup` - The data view/storage backend. This backend keeps track of all - manager : `shadho.managers.workqueue.WQManager` - The distributed task manager to use. - ccs : list of `shadho.hardware.ComputeClass` - The types of hardware to expect during optimization. If not supplied, - tasks are run on the first available worker. - use_complexity : bool - If True, use the complexity heuristic to adjust search proportions. - use_priority : bool - If True, use the priority heuristic to adjust search proportions. - timeout : int - Number of seconds to search for. - max_queued_tasks : int - Maximum number of tasks to enqueue at a time. - await_pending : bool - If True, wait for all running tasks to complete after `timeout`. - max_evals : int - The maximum number of evaluations for a grid point in grid search. Only - used when the hyperparameter search has no infinite (continuous) - domains. If ``None``, search until ``timeout``; otherwise, stop early - once every grid point has been evaluated ``max_evals`` times. - max_resubmissions: int - Maximum number of times to resubmit a particular parameterization for - processing if task failure occurs. Default is not to resubmit. - Notes ----- To enable configuration after intialization, ``backend`` and ``manager`` @@ -96,18 +79,15 @@ class Shadho(pyrameter.FMin): def __init__(self, exp_key, cmd, spec, method='random', backend='results.json', files=None, use_complexity=True, use_uncertainty=True, timeout=600, max_tasks=None, max_queued_tasks=100, await_pending=False, - max_evals=None, max_resubmissions=0, save_frequency=10, + max_evals=1, max_resubmissions=0, save_frequency=10, hyperparameters_per_task=1): super().__init__(exp_key, spec, method, backend, max_evals=max_evals) - # self.exp_key = exp_key self.config = ShadhoConfig() self.cmd = cmd if not isinstance(cmd, str): self.config.manager = 'local' else: self.config.workqueue.name = exp_key - # self.spec = spec - # self.method = method self.use_complexity = use_complexity self.use_uncertainty = use_uncertainty self.timeout = timeout if timeout is not None and timeout >= 0 \ @@ -116,7 +96,6 @@ def __init__(self, exp_key, cmd, spec, method='random', backend='results.json', else float('inf') self.max_queued_tasks = max_queued_tasks self.max_resubmissions = max_resubmissions - # self.max_evals = max_evals self.await_pending = await_pending self.save_frequency = save_frequency self.hyperparameters_per_task = hyperparameters_per_task \ @@ -210,20 +189,13 @@ def add_compute_class(self, name, resource, value, max_queued_tasks=100): The maximum number of tasks to queue for this compute class, default 100. """ - cc = ComputeClass(name, resource, value, 2 * max_queued_tasks, None) - # cc.optimizer.method = self.method + cc = ComputeClass(name, resource, value, min(self.max_tasks, max_queued_tasks)) self.ccs[cc.id] = cc def load(self): - max_evals = self.max_evals + max_tasks = self.max_tasks super().load() - self.max_evals = max_evals + len(self.trials) - - # def plot_objective(self, show=True, save=False, filename=None): - # try: - # self.backend.plot_objective() - # except AttributeError: - # print('Could not find trials! Have you run a search or loaded results?') + self.max_tasks = max_tasks + len(self.trials) def run(self): """Search hyperparameter values on remote workers. @@ -246,25 +218,19 @@ def run(self): config=self.config, tmpdir=self.__tmpdir) - # Set up the backend hyperparameter generation and database - # if not isinstance(self.backend, FMin): - # self.backend = FMin(self.exp_key, self.spec, self.method, - # self.backend, max_evals=self.max_evals) - # If no ComputeClass was created, create a dummy class. if len(self.ccs) == 0: - cc = ComputeClass('all', None, None, self.max_queued_tasks, super()) + cc = ComputeClass('all', None, None, min(self.max_tasks, self.max_queued_tasks)) self.ccs[cc.id] = cc else: for cc in self.ccs.values(): cc.optimizer = self.copy() + cc.max_queued_tasks = max(cc.max_queued_tasks / len(self.ccs), 1) # Set up intial model/compute class assignments. self.assign_to_ccs() self.start = time.time() - elapsed = 0 - exhausted = False completed_tasks = 0 try: # Run the search until timeout or until all tasks complete @@ -285,14 +251,9 @@ def run(self): # Checkpoint the results to file or DB at some frequency if self.trial_count % self.save_frequency == 0: self.save() - # Update the time for timeout check - # elapsed = time.time() - start - # exhausted = all([ss.done for ss in self.backend.searchspaces]) - # exhausted = all([ss.done for ss in self.searchspaces]) else: break - # self.backend.save() self.save() # If requested, continue the loop until all tasks return @@ -304,7 +265,6 @@ def run(self): self.success(*result) else: self.failure(*result) - # self.backend.save() self.save() # On keyboard interrupt, save any results and clean up @@ -321,8 +281,6 @@ def run(self): def done(self): elapsed = time.time() - self.start - # completed_tasks = sum([int(t.objective is not None) - # for t in self.trials.values()]) exhausted = all([space.done(self.max_tasks) for space in self.searchspaces]) return elapsed >= self.timeout or exhausted @@ -351,14 +309,16 @@ def generate(self): for cc_id in self.ccs: cc = self.ccs[cc_id] n = cc.max_queued_tasks - cc.current_tasks + print(cc.max_queued_tasks, cc.current_tasks, n) # Generate enough hyperparameters to fill the queue for i in range(n): # Get bookkeeping ids and hyperparameter values if self.hyperparameters_per_task == 1: - trial = cc.generate() + trial = super().generate(searchspaces=cc.searchspaces) if isinstance(trial, Trial): + self.trials[trial.id] = trial # Encode info to map to db in the task tag tag = '.'.join([str(trial.id), str(trial.searchspace.id), @@ -372,6 +332,7 @@ def generate(self): value=cc.value) elif isinstance(trial, list) and len(trial) > 0: for t in trial: + self.trials[t.id] = t tag = '.'.join([str(t.id), str(t.searchspace.id), cc_id]) @@ -383,23 +344,23 @@ def generate(self): resource=cc.resource, value=cc.value) - elif self.hyperparameters_per_task > 1: - trial = list(itertools.chain.from_iterable([cc.generate() for _ in range(self.hyperparameters_per_task)])) - - if not any([t is None for t in trial]) or len(trial) > 0: - # Encode info to map to db in the task tag - tag = '.'.join(['@'.join([str(t.id) for t in trial]), - str(trial[0].searchspace().id), - cc_id]) - parameters = [t.parameter_dict for t in trial] - self.manager.add_task( - self.cmd, - tag, - parameters, - files=self.files, - resource=cc.resource, - value=cc.value) - + # elif self.hyperparameters_per_task > 1: + # trial = list(itertools.chain.from_iterable([cc.generate() for _ in range(self.hyperparameters_per_task)])) + + # if not any([t is None for t in trial]) or len(trial) > 0: + # self.trials.update({t.id: t for t in trial}) + # # Encode info to map to db in the task tag + # tag = '.'.join(['@'.join([str(t.id) for t in trial]), + # str(trial[0].searchspace().id), + # cc_id]) + # parameters = [t.parameter_dict for t in trial] + # self.manager.add_task( + # self.cmd, + # tag, + # parameters, + # files=self.files, + # resource=cc.resource, + # value=cc.value) # Create a new distributed task if values were generated stop = False # Ensure that the search continues @@ -442,11 +403,6 @@ def assign_to_ccs(self): # Determine if the number of compute classes or the number of # model ids is larger ccids = list(self.ccs.keys()) - # larger = self.backend.searchspaces \ - # if len(self.backend.searchspaces) >= len(ccids) \ - # else ccids - # smaller = ccids if larger == model_ids \ - # else self.backend.searchspaces larger = self.searchspaces \ if len(self.searchspaces) >= len(ccids) \ else ccids @@ -474,22 +430,27 @@ def assign_to_ccs(self): if smaller[j] in self.ccs: self.ccs[smaller[j]].add_searchspace(larger[i]) if j < m: - self.ccs[smaller[j + 1]].add_model( - self.backend[larger[i]]) + self.ccs[smaller[j + 1]].add_searchspace( + self.searchspaces[larger[i]]) else: - self.ccs[smaller[j - 1]].add_model( - self.backend[larger[i]]) + self.ccs[smaller[j - 1]].add_searchspace( + self.searchspaces[larger[i]]) else: - self.ccs[larger[i]].add_model(smaller[j]) + self.ccs[larger[i]].add_searchspace(smaller[j]) if i < n: - self.ccs[larger[i + 1]].add_model( - self.backend[smaller[j]]) + self.ccs[larger[i + 1]].add_searchspace( + self.searchspaces[smaller[j]]) else: - self.ccs[larger[i - 1]].add_model( - self.backend[smaller[j]]) + self.ccs[larger[i - 1]].add_searchspace( + self.searchspaces[smaller[j]]) elif len(self.ccs) == 0: - cc = ComputeClass('all', None, None, self.max_queued_tasks, super()) + cc = ComputeClass('all', None, None, min(self.max_tasks, self.max_queued_tasks)) self.ccs[cc.id] = cc + cc.add_searchspace(self.searchspaces) + else: + cc = list(self.ccs.values())[0] + cc.clear() + cc.add_searchspace(self.searchspaces) def success(self, tag, loss, results): """Handle successful task completion. @@ -580,112 +541,3 @@ def failure(self, tag, resub): value=cc.value) else: self.ccs[ccid].current_tasks -= 1 - - -# class PopulationShadho(Shadho): -# def __init__(self, exp_key, cmd, spec, generations=50, method='random', -# backend='results.json', files=None, use_complexity=True, -# use_uncertainty=True, timeout=600, max_queued_tasks=100, -# await_pending=False, max_evals=None, max_resubmissions=0, -# save_frequency=10, hyperparameters_per_task=1): -# super().__init__( -# exp_key, -# cmd, -# spec, -# method='random', -# backend='results.json', -# files=None, -# use_complexity=True, -# use_uncertainty=True, -# timeout=600, -# max_queued_tasks=100, -# await_pending=False, -# max_evals=None, -# max_resubmissions=0, -# save_frequency=10, -# hyperparameters_per_task=1) - -# self.generations = generations - -# def run(self): -# # Set up the task manager as defined in `shadho.managers` -# if not hasattr(self, 'manager'): -# self.manager = create_manager( -# manager_type=self.config.manager, -# config=self.config, -# tmpdir=self.__tmpdir) - -# # Set up the backend hyperparameter generation and database -# # if not isinstance(self.backend, FMin): -# # self.backend = FMin(self.exp_key, self.spec, self.method, -# # self.backend, max_evals=self.max_evals) - -# # If no ComputeClass was created, create a dummy class. -# if len(self.ccs) == 0: -# cc = ComputeClass('all', None, None, self.max_queued_tasks, self) -# self.ccs[cc.id] = cc -# else: -# for cc in self.ccs.values(): -# cc.optimizer = self.backend.copy() - -# # Set up intial model/compute class assignments. -# self.assign_to_ccs() - -# start = time.time() -# elapsed = 0 -# generation = 0 -# exhausted = False -# try: -# # Run the search until timeout or until all tasks complete -# while elapsed < self.timeout and generation < self.generations: -# # Generate hyperparameters and a flag to continue or stop -# stop = self.generate() -# if not stop: -# # Run another task and await results -# pop_size = len(stop) -# result = self.manager.run_task() -# if result is not None: -# # If a task returned post-process as a success or fail -# if len(result) == 3: -# self.success(*result) # Store and move on -# else: -# self.failure(*result) # Resubmit if asked -# # Checkpoint the results to file or DB at some frequency -# # if self.backend.trial_count % self.save_frequency == 0: -# if self.trial_count % self.save_frequency == 0: -# # self.backend.save() -# self.save() -# # Update the time for timeout check -# elapsed = time.time() - start -# # exhausted = all([ss.done for ss in self.backend.searchspaces]) -# exhausted = all([ss.done for ss in self.searchspaces]) -# else: -# break - -# # self.backend.save() -# self.save() - -# # If requested, continue the loop until all tasks return -# if self.await_pending: -# while not self.manager.empty(): -# result = self.manager.run_task() -# if result is not None: -# if len(result) == 3: -# self.success(*result) -# else: -# self.failure(*result) -# # self.backend.save() -# self.save() - -# # On keyboard interrupt, save any results and clean up -# except KeyboardInterrupt: -# if hasattr(self, '__tmpdir') and self.__tmpdir is not None: -# os.rmdir(self.__tmpdir) - -# # Save the results and print the optimal set of parameters to screen -# # self.backend.save() -# self.save() -# # self.backend.summary() -# self.summary() -# # return self.backend.to_dataframes() -# return self.to_dataframes()