From 7e8a06c0021c7a3a6c06b77cce6f36aebb109c08 Mon Sep 17 00:00:00 2001 From: William Graf <7930703+willgraf@users.noreply.github.com> Date: Fri, 12 Apr 2019 13:36:21 -0700 Subject: [PATCH] generalize secondary scaling --- autoscaler/autoscaler.py | 168 ++++++++++++++++++++++------------ autoscaler/autoscaler_test.py | 131 ++++++++++++++++++++++---- 2 files changed, 223 insertions(+), 76 deletions(-) diff --git a/autoscaler/autoscaler.py b/autoscaler/autoscaler.py index ca39d4e..fd36a8a 100644 --- a/autoscaler/autoscaler.py +++ b/autoscaler/autoscaler.py @@ -52,6 +52,11 @@ class Autoscaler(object): # pylint: disable=useless-object-inheritance def __init__(self, redis_client, scaling_config, secondary_scaling_config, backoff_seconds=1, deployment_delim=';', param_delim='|'): + + if deployment_delim == param_delim: + raise ValueError('`deployment_delim` and `param_delim` must be ' + 'different. Got "{}" and "{}".'.format( + deployment_delim, param_delim)) self.redis_client = redis_client self.backoff_seconds = int(backoff_seconds) self.logger = logging.getLogger(str(self.__class__.__name__)) @@ -62,7 +67,7 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config, deployment_delim=deployment_delim, param_delim=param_delim) - self.autoscaled_deployments = self._get_secondary_autoscaling_params( + self.secondary_autoscaling_params = self._get_secondary_autoscaling_params( secondary_scaling_config=secondary_scaling_config.rstrip(), deployment_delim=deployment_delim, param_delim=param_delim) @@ -74,28 +79,17 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config, self.managed_resource_types = {'deployment', 'job'} - self.tf_serving_pods = 0 - self.new_tf_serving_pods = 0 + self.previous_reference_pods = {} def _get_autoscaling_params(self, scaling_config, deployment_delim=';', param_delim='|'): - if deployment_delim == param_delim: - raise ValueError('`deployment_delim` and `param_delim` must be ' - 'different. Got "{}" and "{}".'.format( - deployment_delim, param_delim)) - return [x.split(param_delim) for x in scaling_config.split(deployment_delim)] def _get_secondary_autoscaling_params(self, secondary_scaling_config, deployment_delim=';', param_delim='|'): - if deployment_delim == param_delim: - raise ValueError('`deployment_delim` and `param_delim` must be ' - 'different. Got "{}" and "{}".'.format( - deployment_delim, param_delim)) - secondary_autoscaling_params = [ x.split(param_delim) for x in secondary_scaling_config.split(deployment_delim) @@ -210,7 +204,7 @@ def patch_namespaced_job(self, name, namespace, body): raise err return response - def get_current_pods(self, namespace, resource_type, deployment, + def get_current_pods(self, namespace, resource_type, name, only_running=False): """Find the number of current pods deployed for the given resource""" if resource_type not in self.managed_resource_types: @@ -221,7 +215,7 @@ def get_current_pods(self, namespace, resource_type, deployment, if resource_type == 'deployment': deployments = self.list_namespaced_deployment(namespace) for d in deployments: - if d.metadata.name == deployment: + if d.metadata.name == name: if only_running: current_pods = d.status.available_replicas else: @@ -231,7 +225,7 @@ def get_current_pods(self, namespace, resource_type, deployment, elif resource_type == 'job': jobs = self.list_namespaced_job(namespace) for j in jobs: - if j.metadata.name == deployment: + if j.metadata.name == name: current_pods = j.spec.parallelism # TODO: is this right? break @@ -240,36 +234,62 @@ def get_current_pods(self, namespace, resource_type, deployment, return int(current_pods) - def get_desired_pods(self, deployment, key, keys_per_pod, + def get_desired_pods(self, key, keys_per_pod, min_pods, max_pods, current_pods): - if deployment in self.autoscaled_deployments: - extra_pods = self.new_tf_serving_pods * self.autoscaled_deployments[deployment] - desired_pods = current_pods + extra_pods - else: - desired_pods = self.redis_keys[key] // keys_per_pod - - # set `desired_pods` to inside the max/min boundaries. - if desired_pods > max_pods: - desired_pods = max_pods - elif desired_pods < min_pods: - desired_pods = min_pods - - # To avoid removing currently running pods, wait until all - # pods of the deployment are idle before scaling down. - if 0 < desired_pods < current_pods: - desired_pods = current_pods + desired_pods = self.redis_keys[key] // keys_per_pod + + # set `desired_pods` to inside the max/min boundaries. + if desired_pods > max_pods: + desired_pods = max_pods + elif desired_pods < min_pods: + desired_pods = min_pods + + # To avoid removing currently running pods, wait until all + # pods of the deployment are idle before scaling down. + if 0 < desired_pods < current_pods: + desired_pods = current_pods return desired_pods - def scale_deployments(self): - # some special logic surrounding the loop - # for tf-serving, the fixed-number deployment - tf_serving_pods = self.get_current_pods( - 'deepcell', 'deployment', 'tf-serving-deployment', - only_running=True) + def get_secondary_desired_pods(self, reference_pods, pods_per_reference_pod, + min_pods, max_pods, current_pods): + desired_pods = current_pods + (pods_per_reference_pod * reference_pods) - self.new_tf_serving_pods = tf_serving_pods - self.tf_serving_pods + # trim `desired_pods` to lay inside the max/min boundaries. + if desired_pods > max_pods: + desired_pods = max_pods + elif desired_pods < min_pods: + desired_pods = min_pods + # To avoid removing currently running pods, wait until all + # pods of the deployment are idle before scaling down. + # TODO: is this necessary? + if 0 < desired_pods < current_pods: + desired_pods = current_pods + + return desired_pods + + def scale_resource(self, desired_pods, current_pods, + resource_type, namespace, name): + if desired_pods == current_pods: + return # no scaling action is required + + if resource_type == 'job': + # TODO: Find a suitable method for scaling jobs + body = {'spec': {'parallelism': desired_pods}} + res = self.patch_namespaced_job(name, namespace, body) + + elif resource_type == 'deployment': + body = {'spec': {'replicas': desired_pods}} + res = self.patch_namespaced_deployment(name, namespace, body) + + self.logger.info('Successfully scaled %s `%s` in namespace `%s` ' + 'from %s to %s pods.', resource_type, name, + namespace, current_pods, desired_pods) + return True + + def scale_primary_resources(self): + """Scale each resource defined in `autoscaling_params`""" for entry in self.autoscaling_params: # entry schema: minPods maxPods keysPerPod namespace resource_type # predict_or_train deployment @@ -285,13 +305,13 @@ def scale_deployments(self): self.logger.error('Autoscaling entry %s is malformed.', entry) continue - self.logger.debug('Scaling `%s`', name) + self.logger.debug('Scaling %s `%s`', resource_type, name) current_pods = self.get_current_pods( namespace, resource_type, name) desired_pods = self.get_desired_pods( - name, predict_or_train, keys_per_pod, + predict_or_train, keys_per_pod, min_pods, max_pods, current_pods) self.logger.debug('%s `%s` in namespace `%s` has a current state ' @@ -299,25 +319,59 @@ def scale_deployments(self): str(resource_type).capitalize(), name, namespace, current_pods, desired_pods) - if desired_pods == current_pods: - continue # no scaling action is required + self.scale_resource(desired_pods, current_pods, + resource_type, namespace, name) - if resource_type == 'job': - # TODO: Find a suitable method for scaling jobs - body = {'spec': {'parallelism': desired_pods}} - res = self.patch_namespaced_job(name, namespace, body) + def scale_secondary_resources(self): + for entry in self.secondary_autoscaling_params: + # redis-deployment|deployment|namespace|tf-serving-deployment| + # deployment|namespace2|podRatio|minPods|maxPods + try: + resource_name = str(entry[0]) + resource_type = str(entry[1]) + resource_namespace = str(entry[2]) + reference_resource_name = str(entry[3]) + reference_resource_type = str(entry[4]) + reference_resource_namespace = str(entry[5]) + pods_per_other_pod = int(entry[6]) + min_pods = int(entry[7]) + max_pods = int(entry[8]) + except (IndexError, ValueError): + self.logger.error('Secondary autoscaling entry %s is ' + 'malformed.', entry) + continue + + self.logger.debug('Scaling secondary %s `%s`', + resource_type, resource_name) + + # keep track of how many reference pods we're working with + if not self.previous_reference_pods[resource_name]: + self.previous_reference_pods[resource_name] = 0 + + current_reference_pods = self.get_current_pods( + reference_resource_namespace, + reference_resource_type, + reference_resource_name, + only_running=True) - elif resource_type == 'deployment': - body = {'spec': {'replicas': desired_pods}} - res = self.patch_namespaced_deployment(name, namespace, body) + new_reference_pods = current_reference_pods - \ + self.previous_reference_pods[resource_name] - self.logger.info('Successfully scaled %s `%s` in namespace `%s` ' - 'from %s to %s pods.', resource_type, name, - namespace, current_pods, desired_pods) + self.previous_reference_pods[resource_name] = new_reference_pods + + # compute desired pods for this deployment + current_pods = self.get_current_pods( + resource_namespace, resource_type, resource_name) + + desired_pods = self.get_secondary_desired_pods( + new_reference_pods, pods_per_other_pod, + min_pods, max_pods, current_pods) - # update number of tf-serving pods for next iteration of loop - self.tf_serving_pods = tf_serving_pods + # scale pods + self.scale_resource(desired_pods, current_pods, resource_type, + resource_namespace, resource_name) def scale(self): self.tally_keys() - self.scale_deployments() + self.scale_primary_resources() + self.scale_secondary_resources() diff --git a/autoscaler/autoscaler_test.py b/autoscaler/autoscaler_test.py index 5d8287a..0424147 100644 --- a/autoscaler/autoscaler_test.py +++ b/autoscaler/autoscaler_test.py @@ -116,7 +116,7 @@ def list_namespaced_deployment(self, *args, **kwargs): return Bunch(items=[ Bunch(spec=Bunch(replicas='4'), metadata=Bunch(name='pod1'), - status=Bunch(available_replicas='4')), + status=Bunch(available_replicas=None)), Bunch(spec=Bunch(replicas='8'), metadata=Bunch(name='pod2'), status=Bunch(available_replicas='8')), @@ -173,22 +173,44 @@ def test_get_desired_pods(self): backoff_seconds=0.01) scaler.redis_keys['predict'] = 10 # desired_pods is > max_pods - desired_pods = scaler.get_desired_pods( - 'tf-serving-deployment', 'predict', 2, 0, 2, 1) + desired_pods = scaler.get_desired_pods('predict', 2, 0, 2, 1) assert desired_pods == 2 # desired_pods is < min_pods - desired_pods = scaler.get_desired_pods( - 'tf-serving-deployment', 'predict', 5, 9, 10, 0) + desired_pods = scaler.get_desired_pods('predict', 5, 9, 10, 0) assert desired_pods == 9 # desired_pods is in range - desired_pods = scaler.get_desired_pods( - 'tf-serving-deployment', 'predict', 3, 0, 5, 1) + desired_pods = scaler.get_desired_pods('predict', 3, 0, 5, 1) assert desired_pods == 3 # desired_pods is in range, current_pods exist - desired_pods = scaler.get_desired_pods( - 'tf-serving-deployment', 'predict', 10, 0, 5, 3) + desired_pods = scaler.get_desired_pods('predict', 10, 0, 5, 3) assert desired_pods == 3 + def test_get_secondary_desired_pods(self): + # reference_pods, pods_per_reference_pod, + # min_pods, max_pods, current_pods + redis_client = DummyRedis(fail_tolerance=2) + scaler = autoscaler.Autoscaler(redis_client, 'None', 'None', + backoff_seconds=0.01) + + # desired_pods is > max_pods + desired_pods = scaler.get_secondary_desired_pods(1, 10, 0, 4, 4) + assert desired_pods == 4 + # desired_pods is < min_pods + desired_pods = scaler.get_secondary_desired_pods(1, 1, 2, 4, 0) + assert desired_pods == 2 + # desired_pods is in range + desired_pods = scaler.get_secondary_desired_pods(1, 3, 0, 4, 0) + assert desired_pods == 3 + # desired_pods is in range with current pods and max limit + desired_pods = scaler.get_secondary_desired_pods(1, 3, 0, 4, 2) + assert desired_pods == 4 + # desired_pods is in range with current pods and no max limit + desired_pods = scaler.get_secondary_desired_pods(1, 3, 0, 10, 2) + assert desired_pods == 5 + # desired_pods is less than current_pods but current > max + desired_pods = scaler.get_secondary_desired_pods(1, 0, 0, 4, 10) + assert desired_pods == 10 + def test_list_namespaced_deployment(self): redis_client = DummyRedis(fail_tolerance=2) scaler = autoscaler.Autoscaler(redis_client, 'None', 'None', @@ -245,6 +267,12 @@ def test_get_current_pods(self): deployed_pods = scaler.get_current_pods('ns', 'deployment', 'pod2') assert deployed_pods == 8 + deployed_pods = scaler.get_current_pods('ns', 'deployment', 'pod2', True) + assert deployed_pods == 8 + + deployed_pods = scaler.get_current_pods('ns', 'deployment', 'pod1', True) + assert deployed_pods == 0 + deployed_pods = scaler.get_current_pods('ns', 'job', 'pod1') assert deployed_pods == 1 @@ -258,7 +286,7 @@ def test_tally_keys(self): scaler.tally_keys() assert scaler.redis_keys == {'predict': 2, 'train': 2} - def test_scale_deployments(self): + def test_scale_primary_resources(self): redis_client = DummyRedis(fail_tolerance=2) deploy_params = ['0', '1', '3', 'ns', 'deployment', 'predict', 'name'] job_params = ['1', '2', '1', 'ns', 'job', 'train', 'name'] @@ -275,7 +303,7 @@ def test_scale_deployments(self): deployment_delim, param_delim) scaler.get_apps_v1_client = DummyKubernetes scaler.get_batch_v1_client = DummyKubernetes - scaler.scale_deployments() + scaler.scale_primary_resources() # not enough params will warn, but not raise (or autoscale) bad_params = ['0', '1', '3', 'ns', 'job', 'train'] @@ -284,7 +312,7 @@ def test_scale_deployments(self): deployment_delim, param_delim) scaler.get_apps_v1_client = DummyKubernetes scaler.get_batch_v1_client = DummyKubernetes - scaler.scale_deployments() + scaler.scale_primary_resources() # test bad resource_type with pytest.raises(ValueError): @@ -294,7 +322,7 @@ def test_scale_deployments(self): deployment_delim, param_delim) scaler.get_apps_v1_client = DummyKubernetes scaler.get_batch_v1_client = DummyKubernetes - scaler.scale_deployments() + scaler.scale_primary_resources() # test good delimiters and scaling params, bad resource_type deploy_params = ['0', '5', '1', 'ns', 'deployment', 'predict', 'name'] @@ -309,17 +337,81 @@ def test_scale_deployments(self): scaler.get_apps_v1_client = DummyKubernetes scaler.get_batch_v1_client = DummyKubernetes - scaler.scale_deployments() + scaler.scale_primary_resources() + # test desired_pods == current_pods + scaler.get_desired_pods = lambda *x: 4 + scaler.scale_primary_resources() + + # same delimiter throws an error; + with pytest.raises(ValueError): + param_delim = '|' + deployment_delim = '|' + p = deployment_delim.join([param_delim.join(p) for p in params]) + autoscaler.Autoscaler(None, p, 'None', 0, + deployment_delim, param_delim) + + def test_scale_secondary_resources(self): + # redis-deployment|deployment|namespace|tf-serving-deployment| + # deployment|namespace2|podRatio|minPods|maxPods + + redis_client = DummyRedis(fail_tolerance=2) + deploy_params = ['name', 'deployment', 'ns', + 'primary', 'deployment', 'ns', + '7', '1', '10'] + job_params = ['name', 'job', 'ns', + 'primary', 'job', 'ns', + '7', '1', '10'] + + params = [deploy_params, job_params] + + param_delim = '|' + deployment_delim = ';' + + # non-integer values will warn, but not raise (or autoscale) + bad_params = ['name', 'bad_type', 'ns', + 'primary', 'bad_type', 'ns', + '7', '1', '10'] + p = deployment_delim.join([param_delim.join(bad_params)]) + scaler = autoscaler.Autoscaler(redis_client, 'None', p, 0, + deployment_delim, param_delim) + scaler.get_apps_v1_client = DummyKubernetes + scaler.get_batch_v1_client = DummyKubernetes + scaler.scale_secondary_resources() + + # not enough params will warn, but not raise (or autoscale) + bad_params = ['0', '1', '3', 'ns', 'job', 'train'] + p = deployment_delim.join([param_delim.join(bad_params)]) + scaler = autoscaler.Autoscaler(redis_client, 'None', p, 0, + deployment_delim, param_delim) + scaler.get_apps_v1_client = DummyKubernetes + scaler.get_batch_v1_client = DummyKubernetes + scaler.scale_secondary_resources() + + # test good delimiters and scaling params, bad resource_type + deploy_params = ['0', '5', '1', 'ns', 'deployment', 'predict', 'name'] + job_params = ['1', '2', '1', 'ns', 'job', 'train', 'name'] + params = [deploy_params, job_params] + p = deployment_delim.join([param_delim.join(p) for p in params]) + + scaler = autoscaler.Autoscaler(redis_client, 'None', p, 0, + deployment_delim, + param_delim) + + scaler.get_apps_v1_client = DummyKubernetes + scaler.get_batch_v1_client = DummyKubernetes + + scaler.scale_secondary_resources() # test desired_pods == current_pods scaler.get_desired_pods = lambda *x: 4 - scaler.scale_deployments() + scaler.scale_secondary_resources() # same delimiter throws an error; with pytest.raises(ValueError): param_delim = '|' deployment_delim = '|' p = deployment_delim.join([param_delim.join(p) for p in params]) - autoscaler.Autoscaler(None, p, 0, deployment_delim, param_delim) + autoscaler.Autoscaler(None, 'None', p, 0, + deployment_delim, param_delim) def test_scale(self): redis_client = DummyRedis(fail_tolerance=2) @@ -333,12 +425,13 @@ def dummy_tally(): global counter counter += 1 - def dummy_scale_deployments(): + def dummy_scale_resources(): global counter counter += 1 scaler.tally_keys = dummy_tally - scaler.scale_deployments = dummy_scale_deployments + scaler.scale_primary_resources = dummy_scale_resources + scaler.scale_secondary_resources = dummy_scale_resources scaler.scale() - assert counter == 2 + assert counter == 3