Skip to content

Commit

Permalink
generalize secondary scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
willgraf committed Apr 12, 2019
1 parent 614c319 commit 7e8a06c
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 76 deletions.
168 changes: 111 additions & 57 deletions autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class Autoscaler(object): # pylint: disable=useless-object-inheritance

def __init__(self, redis_client, scaling_config, secondary_scaling_config,
backoff_seconds=1, deployment_delim=';', param_delim='|'):

if deployment_delim == param_delim:
raise ValueError('`deployment_delim` and `param_delim` must be '
'different. Got "{}" and "{}".'.format(
deployment_delim, param_delim))
self.redis_client = redis_client
self.backoff_seconds = int(backoff_seconds)
self.logger = logging.getLogger(str(self.__class__.__name__))
Expand All @@ -62,7 +67,7 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config,
deployment_delim=deployment_delim,
param_delim=param_delim)

self.autoscaled_deployments = self._get_secondary_autoscaling_params(
self.secondary_autoscaling_params = self._get_secondary_autoscaling_params(
secondary_scaling_config=secondary_scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)
Expand All @@ -74,28 +79,17 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config,

self.managed_resource_types = {'deployment', 'job'}

self.tf_serving_pods = 0
self.new_tf_serving_pods = 0
self.previous_reference_pods = {}

def _get_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
if deployment_delim == param_delim:
raise ValueError('`deployment_delim` and `param_delim` must be '
'different. Got "{}" and "{}".'.format(
deployment_delim, param_delim))

return [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

def _get_secondary_autoscaling_params(self, secondary_scaling_config,
deployment_delim=';',
param_delim='|'):
if deployment_delim == param_delim:
raise ValueError('`deployment_delim` and `param_delim` must be '
'different. Got "{}" and "{}".'.format(
deployment_delim, param_delim))

secondary_autoscaling_params = [
x.split(param_delim) for x in
secondary_scaling_config.split(deployment_delim)
Expand Down Expand Up @@ -210,7 +204,7 @@ def patch_namespaced_job(self, name, namespace, body):
raise err
return response

def get_current_pods(self, namespace, resource_type, deployment,
def get_current_pods(self, namespace, resource_type, name,
only_running=False):
"""Find the number of current pods deployed for the given resource"""
if resource_type not in self.managed_resource_types:
Expand All @@ -221,7 +215,7 @@ def get_current_pods(self, namespace, resource_type, deployment,
if resource_type == 'deployment':
deployments = self.list_namespaced_deployment(namespace)
for d in deployments:
if d.metadata.name == deployment:
if d.metadata.name == name:
if only_running:
current_pods = d.status.available_replicas
else:
Expand All @@ -231,7 +225,7 @@ def get_current_pods(self, namespace, resource_type, deployment,
elif resource_type == 'job':
jobs = self.list_namespaced_job(namespace)
for j in jobs:
if j.metadata.name == deployment:
if j.metadata.name == name:
current_pods = j.spec.parallelism # TODO: is this right?
break

Expand All @@ -240,36 +234,62 @@ def get_current_pods(self, namespace, resource_type, deployment,

return int(current_pods)

def get_desired_pods(self, deployment, key, keys_per_pod,
def get_desired_pods(self, key, keys_per_pod,
min_pods, max_pods, current_pods):
if deployment in self.autoscaled_deployments:
extra_pods = self.new_tf_serving_pods * self.autoscaled_deployments[deployment]
desired_pods = current_pods + extra_pods
else:
desired_pods = self.redis_keys[key] // keys_per_pod

# set `desired_pods` to inside the max/min boundaries.
if desired_pods > max_pods:
desired_pods = max_pods
elif desired_pods < min_pods:
desired_pods = min_pods

# To avoid removing currently running pods, wait until all
# pods of the deployment are idle before scaling down.
if 0 < desired_pods < current_pods:
desired_pods = current_pods
desired_pods = self.redis_keys[key] // keys_per_pod

# set `desired_pods` to inside the max/min boundaries.
if desired_pods > max_pods:
desired_pods = max_pods
elif desired_pods < min_pods:
desired_pods = min_pods

# To avoid removing currently running pods, wait until all
# pods of the deployment are idle before scaling down.
if 0 < desired_pods < current_pods:
desired_pods = current_pods

return desired_pods

def scale_deployments(self):
# some special logic surrounding the loop
# for tf-serving, the fixed-number deployment
tf_serving_pods = self.get_current_pods(
'deepcell', 'deployment', 'tf-serving-deployment',
only_running=True)
def get_secondary_desired_pods(self, reference_pods, pods_per_reference_pod,
min_pods, max_pods, current_pods):
desired_pods = current_pods + (pods_per_reference_pod * reference_pods)

self.new_tf_serving_pods = tf_serving_pods - self.tf_serving_pods
# trim `desired_pods` to lay inside the max/min boundaries.
if desired_pods > max_pods:
desired_pods = max_pods
elif desired_pods < min_pods:
desired_pods = min_pods

# To avoid removing currently running pods, wait until all
# pods of the deployment are idle before scaling down.
# TODO: is this necessary?
if 0 < desired_pods < current_pods:
desired_pods = current_pods

return desired_pods

def scale_resource(self, desired_pods, current_pods,
resource_type, namespace, name):
if desired_pods == current_pods:
return # no scaling action is required

if resource_type == 'job':
# TODO: Find a suitable method for scaling jobs
body = {'spec': {'parallelism': desired_pods}}
res = self.patch_namespaced_job(name, namespace, body)

elif resource_type == 'deployment':
body = {'spec': {'replicas': desired_pods}}
res = self.patch_namespaced_deployment(name, namespace, body)

self.logger.info('Successfully scaled %s `%s` in namespace `%s` '
'from %s to %s pods.', resource_type, name,
namespace, current_pods, desired_pods)
return True

def scale_primary_resources(self):
"""Scale each resource defined in `autoscaling_params`"""
for entry in self.autoscaling_params:
# entry schema: minPods maxPods keysPerPod namespace resource_type
# predict_or_train deployment
Expand All @@ -285,39 +305,73 @@ def scale_deployments(self):
self.logger.error('Autoscaling entry %s is malformed.', entry)
continue

self.logger.debug('Scaling `%s`', name)
self.logger.debug('Scaling %s `%s`', resource_type, name)

current_pods = self.get_current_pods(
namespace, resource_type, name)

desired_pods = self.get_desired_pods(
name, predict_or_train, keys_per_pod,
predict_or_train, keys_per_pod,
min_pods, max_pods, current_pods)

self.logger.debug('%s `%s` in namespace `%s` has a current state '
'of %s pods and a desired state of %s pods.',
str(resource_type).capitalize(), name,
namespace, current_pods, desired_pods)

if desired_pods == current_pods:
continue # no scaling action is required
self.scale_resource(desired_pods, current_pods,
resource_type, namespace, name)

if resource_type == 'job':
# TODO: Find a suitable method for scaling jobs
body = {'spec': {'parallelism': desired_pods}}
res = self.patch_namespaced_job(name, namespace, body)
def scale_secondary_resources(self):
for entry in self.secondary_autoscaling_params:
# redis-deployment|deployment|namespace|tf-serving-deployment|
# deployment|namespace2|podRatio|minPods|maxPods
try:
resource_name = str(entry[0])
resource_type = str(entry[1])
resource_namespace = str(entry[2])
reference_resource_name = str(entry[3])
reference_resource_type = str(entry[4])
reference_resource_namespace = str(entry[5])
pods_per_other_pod = int(entry[6])
min_pods = int(entry[7])
max_pods = int(entry[8])
except (IndexError, ValueError):
self.logger.error('Secondary autoscaling entry %s is '
'malformed.', entry)
continue

self.logger.debug('Scaling secondary %s `%s`',
resource_type, resource_name)

# keep track of how many reference pods we're working with
if not self.previous_reference_pods[resource_name]:
self.previous_reference_pods[resource_name] = 0

current_reference_pods = self.get_current_pods(
reference_resource_namespace,
reference_resource_type,
reference_resource_name,
only_running=True)

elif resource_type == 'deployment':
body = {'spec': {'replicas': desired_pods}}
res = self.patch_namespaced_deployment(name, namespace, body)
new_reference_pods = current_reference_pods - \
self.previous_reference_pods[resource_name]

self.logger.info('Successfully scaled %s `%s` in namespace `%s` '
'from %s to %s pods.', resource_type, name,
namespace, current_pods, desired_pods)
self.previous_reference_pods[resource_name] = new_reference_pods

# compute desired pods for this deployment
current_pods = self.get_current_pods(
resource_namespace, resource_type, resource_name)

desired_pods = self.get_secondary_desired_pods(
new_reference_pods, pods_per_other_pod,
min_pods, max_pods, current_pods)

# update number of tf-serving pods for next iteration of loop
self.tf_serving_pods = tf_serving_pods
# scale pods
self.scale_resource(desired_pods, current_pods, resource_type,
resource_namespace, resource_name)

def scale(self):
self.tally_keys()
self.scale_deployments()
self.scale_primary_resources()
self.scale_secondary_resources()
Loading

0 comments on commit 7e8a06c

Please sign in to comment.