Skip to content

Commit

Permalink
Merge 4b98ce1 into 1fdca87
Browse files Browse the repository at this point in the history
  • Loading branch information
gaocegege authored May 31, 2019
2 parents 1fdca87 + 4b98ce1 commit e5810a6
Show file tree
Hide file tree
Showing 23 changed files with 1,134 additions and 5 deletions.
8 changes: 8 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3

ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/bayesianoptimization/v1alpha2
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/api/v1alpha2/python

ENTRYPOINT ["python", "main.py"]
14 changes: 14 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
- start the service

```
python suggestion/bayesian/main.py
```

- start the testing client

```
python suggestion/test_client.py
```

note:
the testing client uses the [Franke's function](http://www.sfu.ca/~ssurjano/franke2d.html) as the black box, and the maximum of Franke's function is around 1.22
Empty file.
25 changes: 25 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import grpc
from concurrent import futures

import time

from pkg.api.v1alpha2.python import api_pb2_grpc
from pkg.suggestion.v1alpha2.bayesian_service import BayesianService

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
DEFAULT_PORT = "0.0.0.0:6789"

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
api_pb2_grpc.add_SuggestionServicer_to_server(BayesianService(), server)
server.add_insecure_port(DEFAULT_PORT)
print("Listening...")
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()
9 changes: 9 additions & 0 deletions cmd/suggestion/bayesianoptimization/v1alpha2/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
grpcio
duecredit
cloudpickle==0.5.6
numpy>=1.13.3
scikit-learn>=0.19.0
scipy>=0.19.1
forestci
protobuf
googleapis-common-protos
64 changes: 64 additions & 0 deletions examples/v1alpha2/bayseopt-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: "kubeflow.org/v1alpha2"
kind: Experiment
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: bayseopt-example
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: Validation-accuracy
additionalMetricNames:
- accuracy
algorithm:
algorithmName: bayesianoptimization
algorithmSettings:
- name: "burn_in"
value: "5"
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
parameters:
- name: --lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: --num-layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
- name: --optimizer
parameterType: categorical
feasibleSpace:
list:
- sgd
- adam
- ftrl
trialTemplate:
goTemplate:
rawTemplate: |-
apiVersion: batch/v1
kind: Job
metadata:
name: {{.Trial}}
namespace: {{.NameSpace}}
spec:
template:
spec:
containers:
- name: {{.Trial}}
image: katib/mxnet-mnist-example
command:
- "python"
- "/mxnet/example/image-classification/train_mnist.py"
- "--batch-size=64"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
restartPolicy: Never
24 changes: 24 additions & 0 deletions manifests/v1alpha2/suggestion/bayesianoptimization/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: katib-suggestion-bayesianoptimization
namespace: kubeflow
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
replicas: 1
template:
metadata:
name: katib-suggestion-bayesianoptimization
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
containers:
- name: katib-suggestion-bayesianoptimization
image: katib/v1alpha2/suggestion-bayesianoptimization
imagePullPolicy: IfNotPresent
ports:
- name: api
containerPort: 6789
17 changes: 17 additions & 0 deletions manifests/v1alpha2/suggestion/bayesianoptimization/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: v1
kind: Service
metadata:
name: katib-suggestion-bayesianoptimization
namespace: kubeflow
labels:
app: katib
component: suggestion-bayesianoptimization
spec:
type: ClusterIP
ports:
- port: 6789
protocol: TCP
name: api
selector:
app: katib
component: suggestion-bayesianoptimization
211 changes: 211 additions & 0 deletions pkg/suggestion/v1alpha2/bayesian_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import random
import string

import grpc
import numpy as np

from pkg.api.v1alpha2.python import api_pb2
from pkg.api.v1alpha2.python import api_pb2_grpc
from pkg.suggestion.v1alpha2.bayesianoptimization.src.bayesian_optimization_algorithm import BOAlgorithm
from pkg.suggestion.v1alpha2.bayesianoptimization.src.algorithm_manager import AlgorithmManager
import logging
from logging import getLogger, StreamHandler, INFO, DEBUG

timeout = 10

class BayesianService(api_pb2_grpc.SuggestionServicer):
def __init__(self, logger=None):
self.manager_addr = "katib-manager"
self.manager_port = 6789
if logger == None:
self.logger = getLogger(__name__)
FORMAT = '%(asctime)-15s Experiment %(experiment_name)s %(message)s'
logging.basicConfig(format=FORMAT)
handler = StreamHandler()
handler.setLevel(INFO)
self.logger.setLevel(INFO)
self.logger.addHandler(handler)
self.logger.propagate = False
else:
self.logger = logger

def _get_experiment(self, name):
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
exp = client.GetExperiment(
api_pb2.GetExperimentRequest(experiment_name=name), 10)
return exp.experiment

def ValidateAlgorithmSettings(self, request, context):
return api_pb2.ValidateAlgorithmSettingsReply()

def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
service_params = self.parseParameters(request.experiment_name)
experiment = self._get_experiment(request.experiment_name)
X_train, y_train = self.getEvalHistory(
request.experiment_name, experiment.spec.objective.objective_metric_name, service_params["burn_in"])

algo_manager = AlgorithmManager(
experiment_name=request.experiment_name,
experiment=experiment,
X_train=X_train,
y_train=y_train,
logger=self.logger,
)

lowerbound = np.array(algo_manager.lower_bound)
upperbound = np.array(algo_manager.upper_bound)
self.logger.debug("lowerbound: %r", lowerbound,
extra={"experiment_name": request.experiment_name})
self.logger.debug("upperbound: %r", upperbound,
extra={"experiment_name": request.experiment_name})
alg = BOAlgorithm(
dim=algo_manager.dim,
N=int(service_params["N"]),
lowerbound=lowerbound,
upperbound=upperbound,
X_train=algo_manager.X_train,
y_train=algo_manager.y_train,
mode=service_params["mode"],
trade_off=service_params["trade_off"],
# todo: support length_scale with array type
length_scale=service_params["length_scale"],
noise=service_params["noise"],
nu=service_params["nu"],
kernel_type=service_params["kernel_type"],
n_estimators=service_params["n_estimators"],
max_features=service_params["max_features"],
model_type=service_params["model_type"],
logger=self.logger,
)
trials = []
x_next_list = alg.get_suggestion(request.request_number)
for x_next in x_next_list:
x_next = x_next.squeeze()
self.logger.debug("xnext: %r ", x_next, extra={
"experiment_name": request.experiment_name})
x_next = algo_manager.parse_x_next(x_next)
x_next = algo_manager.convert_to_dict(x_next)
trials.append(api_pb2.Trial(
spec=api_pb2.TrialSpec(
experiment_name=request.experiment_name,
parameter_assignments=api_pb2.TrialSpec.ParameterAssignments(
assignments=[
api_pb2.ParameterAssignment(
name=x["name"],
value=str(x["value"]),
) for x in x_next
]
)
)
))
return api_pb2.GetSuggestionsReply(
trials=trials
)

def getEvalHistory(self, experiment_name, obj_name, burn_in):
worker_hist = []
x_train = []
y_train = []
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
trialsrep = client.GetTrialList(api_pb2.GetTrialListRequest(
experiment_name=experiment_name
))
for t in trialsrep.trials:
if t.status.condition == api_pb2.TrialStatus.TrialConditionType.SUCCEEDED:
gwfrep = client.GetObservationLog(
api_pb2.GetObservationLogRequest(
trial_name=t.name,
metric_name=obj_name), timeout)
w = gwfrep.observation_log
for ml in w.metric_logs:
y_train.append(float(ml.metric.value))
x_train.append(w.parameter_set)
self.logger.info("%d completed trials are found.",
len(x_train), extra={"Experiment": experiment_name})
if len(x_train) <= burn_in:
x_train = []
y_train = []
self.logger.info("Trials will be sampled until %d trials for burn-in are completed.",
burn_in, extra={"experiment_name": experiment_name})
else:
self.logger.debug("Completed trials: %r", x_train,
extra={"experiment_name": experiment_name})

return x_train, y_train

def parseParameters(self, experiment_name):
channel = grpc.beta.implementations.insecure_channel(
self.manager_addr, self.manager_port)
params = []
with api_pb2.beta_create_Manager_stub(channel) as client:
gsprep = client.GetAlgorithmExtraSettings(
api_pb2.GetAlgorithmExtraSettingsRequest(experiment_name=experiment_name), timeout)
params = gsprep.extra_algorithm_settings

parsed_service_params = {
"N": 100,
"model_type": "gp",
"max_features": "auto",
"length_scale": 0.5,
"noise": 0.0005,
"nu": 1.5,
"kernel_type": "matern",
"n_estimators": 50,
"mode": "pi",
"trade_off": 0.01,
"trial_hist": "",
"burn_in": 10,
}
modes = ["pi", "ei"]
model_types = ["gp", "rf"]
kernel_types = ["matern", "rbf"]

for param in params:
if param.name in parsed_service_params.keys():
if param.name == "length_scale" or param.name == "noise" or param.name == "nu" or param.name == "trade_off":
try:
float(param.value)
except ValueError:
self.logger.warning(
"Parameter must be float for %s: %s back to default value", param.name, param.value)
else:
parsed_service_params[param.name] = float(param.value)

elif param.name == "N" or param.name == "n_estimators" or param.name == "burn_in":
try:
int(param.value)
except ValueError:
self.logger.warning(
"Parameter must be int for %s: %s back to default value", param.name, param.value)
else:
parsed_service_params[param.name] = int(param.value)

elif param.name == "kernel_type":
if param.value != "rbf" and param.value != "matern":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
elif param.name == "mode" and param.value in modes:
if param.value != "lcb" and param.value != "ei" and param.value != "pi":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
elif param.name == "model_type" and param.value in model_types:
if param.value != "rf" and param.value != "gp":
parsed_service_params[param.name] = param.value
else:
self.logger.warning(
"Unknown Parameter for %s: %s back to default value", param.name, param.value)
else:
self.logger.warning("Unknown Parameter name: %s ", param.name)

return parsed_service_params
Empty file.
Loading

0 comments on commit e5810a6

Please sign in to comment.