Skip to content

Commit

Permalink
update shadho to work with the updated pyrameter
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffkinnison committed Aug 6, 2019
1 parent 79ef8c7 commit 9b7aed9
Showing 1 changed file with 46 additions and 35 deletions.
81 changes: 46 additions & 35 deletions shadho/shadho.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Shadho
Driver class for local and distributed hyperparameter optimization.
"""
from .config import ShadhoConfig
from .configuration import ShadhoConfig
from .hardware import ComputeClass
from .managers import create_manager

Expand All @@ -20,6 +20,8 @@
import pyrameter
import scipy.stats

from pyrameter.optimizer import FMin


def shadho():
pass
Expand Down Expand Up @@ -83,15 +85,17 @@ class Shadho(object):
"""

def __init__(self, cmd, spec, method='random', backend=None, files=None, use_complexity=True,
use_priority=True, timeout=600, max_tasks=100,
await_pending=False, max_resubmissions=0):
def __init__(self, exp_key, cmd, spec, method='random', backend=None,
files=None, use_complexity=True, use_uncertainty=True,
timeout=600, max_tasks=100, await_pending=False,
max_resubmissions=0):
self.exp_key = exp_key
self.config = ShadhoConfig()
self.cmd = cmd
self.spec = spec
self.method = method
self.use_complexity = use_complexity
self.use_priority = use_priority
self.use_uncertainty = use_uncertainty
self.timeout = timeout if timeout is not None and timeout >= 0 \
else float('inf')
self.max_tasks = 2 * max_tasks
Expand Down Expand Up @@ -183,7 +187,7 @@ def add_compute_class(self, name, resource, value, max_tasks=100):
The maximum number of tasks to queue for this compute class,
default 100.
"""
cc = ComputeClass(name, resource, value, 2 * max_tasks)
cc = ComputeClass(name, resource, value, 2 * max_tasks, None)
self.ccs[cc.id] = cc

def run(self):
Expand All @@ -208,17 +212,17 @@ def run(self):
tmpdir=self.__tmpdir)

# Set up the backend hyperparameter generation and database
if not isinstance(self.backend, ComputeClass):
self.backend = pyrameter.build(self.spec,
method=self.method,
db=self.backend,
complexity_sort=self.use_complexity,
priority_sort=self.use_priority)
if not isinstance(self.backend, FMin):
self.backend = FMin(self.exp_key, self.spec, self.method,
self.backend)

# If no ComputeClass was created, create a dummy class.
if len(self.ccs) == 0:
cc = ComputeClass('all', None, None, self.max_tasks)
cc = ComputeClass('all', None, None, self.max_tasks, self.backend)
self.ccs[cc.id] = cc
else:
for cc in self.ccs.values():
cc.optimizer = self.backend.copy()

# Set up intial model/compute class assignments.
self.assign_to_ccs()
Expand All @@ -240,7 +244,7 @@ def run(self):
else:
self.failure(*result) # Resubmit if asked
# Checkpoint the results to file or DB at some frequency
if self.backend.result_count % 50 == 0:
if self.backend.trial_count % 50 == 0:
self.backend.save()
# Update the time for timeout check
elapsed = time.time() - start
Expand All @@ -264,11 +268,10 @@ def run(self):

# Save the results and print the optimal set of parameters to screen
self.backend.save()
opt = self.backend.optimal(mode='best')
key = list(opt.keys())[0]
print("Optimal result: {}".format(opt[key]['loss']))
print("With parameters: {}".format(opt[key]['values']))
print("And additional results: {}".format(opt[key]['results']))
opt = self.backend.optimum()
print("Optimal result: {}".format(opt.objective))
print("With parameters: {}".format(opt.parameter_dict))
print("And additional results: {}".format(opt.results))

def generate(self):
"""Generate hyperparameter values to test.
Expand Down Expand Up @@ -299,16 +302,18 @@ def generate(self):
# Generate enough hyperparameters to fill the queue
for i in range(n):
# Get bookkeeping ids and hyperparameter values
model_id, result_id, param = cc.generate()
trial = cc.generate()

# Create a new distributed task if values were generated
if param is not None:
if trial is not None:
# Encode info to map to db in the task tag
tag = '.'.join([result_id, model_id, cc_id])
tag = '.'.join([str(trial.id),
str(trial.searchspace().id),
cc_id])
self.manager.add_task(
self.cmd,
tag,
param,
trial.parameter_dict,
files=self.files,
resource=cc.resource,
value=cc.value)
Expand Down Expand Up @@ -337,12 +342,12 @@ def assign_to_ccs(self):
# If only one CC exists, do nothing; otherwise, update assignments
if len(self.ccs) == 1:
key = list(self.ccs.keys())[0]
self.ccs[key].model_group = self.backend
self.ccs[key].optimizer = self.backend
else:
# Sort models in the search by complexity, priority, or both and
# get the updated order.
self.backend.sort_models()
model_ids = [mid for mid in self.backend.model_ids]
self.backend.sort_spaces(use_complexity=self.use_complexity,
use_uncertainty=self.use_uncertainty)

# Clear the current assignments
for cc in self.ccs:
Expand All @@ -351,8 +356,11 @@ def assign_to_ccs(self):
# Determine if the number of compute classes or the number of
# model ids is larger
ccids = list(self.ccs.keys())
larger = model_ids if len(self.model_ids) >= len(ccids) else ccids
smaller = model_ids if larger == model_ids else ccids
larger = self.backend.searchspaces \
if len(self.backend.searchspaces) >= len(ccids) \
else ccids
smaller = ccids if larger == model_ids \
else self.backend.searchspaces

# Assign models to CCs such that each model is assigned to at
# least two CCs.
Expand All @@ -373,15 +381,15 @@ def assign_to_ccs(self):
# Add the model to the current CC. If i <= n, add the model to
# the next CC as well; if i > n, add to the previous CC.
if smaller[j] in self.ccs:
self.ccs[smaller[j]].add_model(self.backend[larger[i]])
self.ccs[smaller[j]].add_searchspace(larger[i])
if j < m:
self.ccs[smaller[j + 1]].add_model(
self.backend[larger[i]])
else:
self.ccs[smaller[j - 1]].add_model(
self.backend[larger[i]])
else:
self.ccs[larger[i]].add_model(self.backend[smaller[j]])
self.ccs[larger[i]].add_model(smaller[j])
if i < n:
self.ccs[larger[i + 1]].add_model(
self.backend[smaller[j]])
Expand Down Expand Up @@ -409,18 +417,20 @@ def success(self, tag, loss, results):
updated.
"""
# Get bookkeeping information from the task tag
result_id, model_id, ccid = tag.split('.')
trial_id, ss_id, ccid = tag.split('.')
results['compute_class'] = {
'id': ccid,
'name': self.ccs[ccid].name,
'value': self.ccs[ccid].value
}

# Update the DB with the result
self.backend.register_result(model_id, result_id, loss, results)
self.backend.register_result(ss_id, trial_id, loss, results)

# Reassign models to CCs at some frequency
if self.backend.result_count % 10 == 0:
n_completed = sum([1 for trial in self.backend.trials.values()
if trial.status.value == 3])
if n_completed % 10 == 0:
self.assign_to_ccs()

# Update the number of enqueued items
Expand All @@ -440,11 +450,12 @@ def failure(self, tag, resub):
potential worker dropout, etc.
"""
# Get bookkeeping information from the task tag
result_id, model_id, ccid = tag.split('.')
trial_id, ss_id, ccid = tag.split('.')

# Determine whether or not to resubmit
submissions, params = \
self.backend.register_result(model_id, result_id, None, {})
self.backend.register_result(ss_id, trial_id, objective=None,
results=None, errmsg='yes')

# Resubmit the task if it should be, otherwise update the number of
# enqueued items.
Expand Down

0 comments on commit 9b7aed9

Please sign in to comment.