Permalink
Browse files

Support to get pool information for the runtime

Add an administrative operation for getting the pool information for the
runtime, so that the admin user can check the capacity of the runtime and
scale up or scale down the pool accordingly.

Change-Id: Iec4536396c1c31a9e545b09c5f46b46dc6d79ae6
Story: 2002969
Task: 22975
  • Loading branch information...
lingxiankong committed Jul 13, 2018
1 parent bcdbfb6 commit 01d769fa629d24badb764a77bd2654dddbd26294
View
@@ -7,6 +7,7 @@
"runtime:create": "rule:context_is_admin",
"runtime:update": "rule:context_is_admin",
"runtime:delete": "rule:context_is_admin",
"runtime_pool:get_all": "rule:context_is_admin",
"function:get_all:all_projects": "rule:context_is_admin",
"function_worker:get_all": "rule:context_is_admin",
@@ -282,6 +282,16 @@ def sample(cls):
return sample
class RuntimePoolCapacity(Resource):
total = wsme.wsattr(int, readonly=True)
available = wsme.wsattr(int, readonly=True)
class RuntimePool(Resource):
name = wsme.wsattr(wtypes.text, readonly=True)
capacity = wsme.wsattr(RuntimePoolCapacity, readonly=True)
class Execution(Resource):
id = types.uuid
function_id = wsme.wsattr(types.uuid, mandatory=True)
@@ -34,6 +34,8 @@
class RuntimesController(rest.RestController):
_custom_actions = {'pool': ['GET']}
def __init__(self, *args, **kwargs):
self.engine_client = rpc.get_engine_client()
self.type = 'runtime'
@@ -162,3 +164,24 @@ def put(self, id, runtime):
runtime_db = db_api.update_runtime(id, values)
return resources.Runtime.from_db_obj(runtime_db)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.RuntimePool, types.uuid)
def pool(self, id):
"""Get the pool information for the runtime.
This operation should be admin only.
We don't check the runtime existence, because this function
also helps us to check the underlying pool even after the runtime
is already deleted.
"""
acl.enforce('runtime_pool:get_all', context.get_ctx())
LOG.info("Getting pool information for runtime %s.", id)
capacity = self.engine_client.get_runtime_pool(id)
pool_capacity = resources.RuntimePoolCapacity.from_dict(capacity)
return resources.RuntimePool.from_dict(
{"name": id, "capacity": pool_capacity}
)
@@ -84,6 +84,11 @@ def update_runtime(self, ctx, runtime_id, image=None, pre_image=None):
LOG.info('Rollbacked runtime %s.', runtime_id)
def get_runtime_pool(self, ctx, runtime_id):
LOG.info("Getting pool information for runtime %s", runtime_id)
return self.orchestrator.get_pool(runtime_id)
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(30),
View
@@ -84,6 +84,11 @@ class DBEntityNotFoundError(DBError):
message = "Object not found"
class RuntimeNotFoundException(QinlingException):
http_code = 404
message = "Runtime not found"
class ApplicationContextNotFoundException(QinlingException):
http_code = 400
message = "Application context not found"
@@ -38,6 +38,10 @@ def delete_pool(self, name, **kwargs):
def update_pool(self, name, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def get_pool(self, name, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def prepare_execution(self, function_id, function_version, **kwargs):
raise NotImplementedError
@@ -103,6 +103,33 @@ def _wait_deployment_available(self, name):
return ret.status.replicas == ret.status.available_replicas
def get_pool(self, name):
total = 0
available = 0
try:
ret = self.v1extension.read_namespaced_deployment(
name,
namespace=self.conf.kubernetes.namespace
)
except Exception:
raise exc.RuntimeNotFoundException()
if not ret.status.replicas:
return {"total": total, "available": available}
total = ret.status.replicas
labels = {'runtime_id': name}
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='!function_id,%s' % selector
)
available = len(ret.items)
return {"total": total, "available": available}
def create_pool(self, name, image):
deployment_body = self.deployment_template.render(
{
View
@@ -152,6 +152,14 @@ def update_runtime(self, id, image=None, pre_image=None):
pre_image=pre_image
)
@wrap_messaging_exception
def get_runtime_pool(self, runtime_id):
return self._client.prepare(topic=self.topic, server=None).call(
ctx.get_ctx(),
'get_runtime_pool',
runtime_id=runtime_id
)
@wrap_messaging_exception
def create_execution(self, execution_id, function_id, version, runtime_id,
input=None, is_sync=True):
@@ -156,3 +156,20 @@ def test_put_image_not_allowed(self, mock_update_runtime, mock_etcd_url):
self.assertEqual(403, resp.status_int)
mock_update_runtime.assert_not_called()
mock_etcd_url.assert_called_once_with(function_id)
@mock.patch('qinling.rpc.EngineClient.get_runtime_pool')
def test_get_runtime_pool(self, mock_get_pool):
mock_get_pool.return_value = {"total": 3, "available": 2}
resp = self.app.get('/v1/runtimes/%s/pool' % self.runtime_id)
expected = {
"capacity": {
"available": 2,
"total": 3
},
"name": self.runtime_id
}
self.assertEqual(200, resp.status_int)
self.assertEqual(expected, resp.json)
@@ -562,3 +562,11 @@ def test_scaledown_function_leaving_one_worker(
]
etcd_util_delete_workers_mock.assert_has_calls(expected)
self.assertEqual(3, etcd_util_delete_workers_mock.call_count)
def test_get_runtime_pool(self):
runtime = self.create_runtime()
runtime_id = runtime.id
self.default_engine.get_runtime_pool(mock.Mock(), runtime_id)
self.orchestrator.get_pool.assert_called_once_with(runtime_id)
Oops, something went wrong.

0 comments on commit 01d769f

Please sign in to comment.