Skip to content

Commit

Permalink
Merge 5e6f0c9 into ac9affd
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanbannon committed May 28, 2019
2 parents ac9affd + 5e6f0c9 commit 86c7d8b
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 360 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ RUN pip install -r requirements.txt

COPY . .

CMD ["python", "autoscale.py"]
CMD ["python", "scale.py"]
195 changes: 63 additions & 132 deletions autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from __future__ import division
from __future__ import print_function

import re
import timeit
import logging

Expand All @@ -41,32 +40,30 @@ class Autoscaler(object):
Args:
redis_client: Redis Client Connection object.
scaling_config: string, joined lists of autoscaling configurations
secondary_scaling_config: string, defines initial pod counts
deployment_delim: string, character delimiting deployment configs.
param_delim: string, character delimiting deployment config parameters.
"""

def __init__(self, redis_client, scaling_config, secondary_scaling_config,
deployment_delim=';', param_delim='|'):
def __init__(self,
redis_client,
scaling_config,
deployment_delim=';',
param_delim='|'):

if deployment_delim == param_delim:
raise ValueError('`deployment_delim` and `param_delim` must be '
'different. Got "{}" and "{}".'.format(
deployment_delim, param_delim))

self.redis_client = redis_client
self.logger = logging.getLogger(str(self.__class__.__name__))
self.completed_statuses = {'done', 'failed'}

self.autoscaling_params = self._get_primary_autoscaling_params(
self.autoscaling_params = self._get_autoscaling_params(
scaling_config=scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)

self.secondary_autoscaling_params = self._get_secondary_autoscaling_params(
scaling_config=secondary_scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)

self.redis_keys = {
'predict': 0,
'train': 0,
Expand All @@ -77,17 +74,16 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config,

self.reference_pods = {}

def _get_primary_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
def _get_autoscaling_params(self,
scaling_config,
deployment_delim=';',
param_delim='|'):
raw_params = [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

params = {}
for entry in raw_params:
try:
print(len(entry))
print(entry)
namespace_resource_type_name = (
str(entry[3]).strip(),
str(entry[4]).strip(),
Expand All @@ -110,33 +106,20 @@ def _get_primary_autoscaling_params(self, scaling_config,

return params

def _get_secondary_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
return [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

def tally_keys(self):
def tally_queues(self):
"""Update counts of all redis queues"""
start = timeit.default_timer()
# reset the key tallies to 0
for k in self.redis_keys:
self.redis_keys[k] = 0

self.logger.debug('Tallying keys in redis matching: `%s`',
', '.join(self.redis_keys.keys()))
for q in self.redis_keys:
self.logger.debug('Tallying items in queue `%s`.', q)

for key in self.redis_client.scan_iter(count=1000):
if any(re.match(k, key) for k in self.redis_keys):
if self.redis_client.type(key) != 'hash':
continue
num_items = self.redis_client.llen(q)

status = self.redis_client.hget(key, 'status')
processing_q = 'processing-{}:*'.format(q)
scan = self.redis_client.scan_iter(match=processing_q, count=1000)
num_in_progress = len([x for x in scan])

# add up each type of key that is "in-progress" or "new"
if status is not None and status not in self.completed_statuses:
for k in self.redis_keys:
if re.match(k, key):
self.redis_keys[k] += 1
self.redis_keys[q] = num_items + num_in_progress

self.logger.debug('Finished tallying redis keys in %s seconds.',
timeit.default_timer() - start)
Expand Down Expand Up @@ -220,10 +203,21 @@ def patch_namespaced_job(self, name, namespace, body):

def get_current_pods(self, namespace, resource_type, name,
only_running=False):
"""Find the number of current pods deployed for the given resource"""
"""Find the number of current pods deployed for the given resource.
Args:
name: str, name of the resource.
namespace: str, namespace of the resource.
resource_type: str, type of resource to count.
only_running: bool, Only count pods with status `Running`.
Returns:
int, number of pods for the given resource.
"""
if resource_type not in self.managed_resource_types:
raise ValueError('The resource_type of {} is unsuitable. Use either'
'`deployment` or `job`'.format(resource_type))
raise ValueError(
'`resource_type` must be one of {}. Got {}.'.format(
self.managed_resource_types, resource_type))

current_pods = 0
if resource_type == 'deployment':
Expand All @@ -235,8 +229,8 @@ def get_current_pods(self, namespace, resource_type, name,
else:
current_pods = d.spec.replicas

self.logger.debug("Deployment {} has {} pods".format(
name, current_pods))
self.logger.debug('Deployment %s has %s pods',
name, current_pods)
break

elif resource_type == 'job':
Expand All @@ -253,6 +247,7 @@ def get_current_pods(self, namespace, resource_type, name,

def clip_pod_count(self, desired_pods, min_pods, max_pods, current_pods):
# set `desired_pods` to inside the max/min boundaries.
_original = desired_pods
if desired_pods > max_pods:
desired_pods = max_pods
elif desired_pods < min_pods:
Expand All @@ -263,6 +258,9 @@ def clip_pod_count(self, desired_pods, min_pods, max_pods, current_pods):
if 0 < desired_pods < current_pods:
desired_pods = current_pods

if desired_pods != _original:
self.logger.debug('Clipped pods from %s to %s',
_original, desired_pods)
return desired_pods

def get_desired_pods(self, key, keys_per_pod,
Expand All @@ -271,12 +269,6 @@ def get_desired_pods(self, key, keys_per_pod,
return self.clip_pod_count(desired_pods, min_pods,
max_pods, current_pods)

def get_secondary_desired_pods(self, reference_pods, pods_per_reference_pod,
min_pods, max_pods, current_pods):
desired_pods = current_pods + pods_per_reference_pod * reference_pods
return self.clip_pod_count(desired_pods, min_pods,
max_pods, current_pods)

def scale_resource(self, desired_pods, current_pods,
resource_type, namespace, name):
if desired_pods == current_pods:
Expand All @@ -296,52 +288,50 @@ def scale_resource(self, desired_pods, current_pods,
namespace, current_pods, desired_pods)
return True

def scale_primary_resources(self):
def scale_resources(self):
"""Scale each resource defined in `autoscaling_params`"""
self.logger.debug("Scaling primary resources.")
for ((namespace, resource_type, name),
entries) in self.autoscaling_params.items():
# iterate through all entries with this
# (namespace, resource_type, name) entry. We sum up the current
# and desired pods over all entries
self.logger.debug('Scaling primary resources.')
for fingerprint, entries in self.autoscaling_params.items():
# iterate through all entries with this fingerprint
namespace, resource_type, name = fingerprint

self.logger.debug('Scaling %s `%s.%s` with %s config entries.',
resource_type, namespace, name, len(entries))

# Sum up the current and desired pods over all entries
current_pods = self.get_current_pods(namespace, resource_type, name)
desired_pods = 0

self.logger.debug("Scaling {}".format((namespace, resource_type, name)))

min_pods_for_all_entries = []
max_pods_for_all_entries = []

if entries == []: # this is the most conservative bound
self.logger.warning('%s `%s.%s` has no autoscaling entry.',
str(resource_type).capitalize(),
namespace, name)
continue

for entry in entries:
min_pods = entry["min_pods"]
max_pods = entry["max_pods"]
keys_per_pod = entry["keys_per_pod"]
prefix = entry["prefix"]
min_pods = entry['min_pods']
max_pods = entry['max_pods']
keys_per_pod = entry['keys_per_pod']
prefix = entry['prefix']

min_pods_for_all_entries.append(min_pods)
max_pods_for_all_entries.append(max_pods)

self.logger.debug("Inspecting entry {}.".format(entry))

desired_pods += self.get_desired_pods(prefix, keys_per_pod,
min_pods, max_pods,
current_pods)

self.logger.debug("desired_pods now = {}".format(desired_pods))

# this is the most conservative bound
if entries == []:
return

min_pods = max(min_pods_for_all_entries)
max_pods = min(max_pods_for_all_entries)

desired_pods = self.clip_pod_count(desired_pods, min_pods,
max_pods, current_pods)
self.logger.debug("desired_pods clamped to {}".format(desired_pods))

self.logger.debug('Scaling %s `%s`', resource_type, name)
if desired_pods > 0 and resource_type != 'job':
desired_pods = current_pods if current_pods > 0 else 1

self.logger.debug('%s `%s` in namespace `%s` has a current state '
'of %s pods and a desired state of %s pods.',
Expand All @@ -351,65 +341,6 @@ def scale_primary_resources(self):
self.scale_resource(desired_pods, current_pods,
resource_type, namespace, name)

def scale_secondary_resources(self):
for entry in self.secondary_autoscaling_params:
# redis-consumer-deployment|deployment|deepcell|
# tf-serving-deployment|deployment|deepcell|7|0|200
try:
resource_name = str(entry[0]).strip()
resource_type = str(entry[1]).strip()
resource_namespace = str(entry[2]).strip()
reference_resource_name = str(entry[3]).strip()
reference_resource_type = str(entry[4]).strip()
reference_resource_namespace = str(entry[5]).strip()
pods_per_other_pod = int(entry[6])
min_pods = int(entry[7])
max_pods = int(entry[8])
except (IndexError, ValueError):
self.logger.error('Secondary autoscaling entry %s is '
'malformed.', entry)
continue

self.logger.debug('Scaling secondary %s `%s`',
resource_type, resource_name)

# keep track of how many reference pods we're working with
if resource_name not in self.reference_pods:
self.reference_pods[resource_name] = 0

current_reference_pods = self.get_current_pods(
reference_resource_namespace,
reference_resource_type,
reference_resource_name,
only_running=True)

new_reference_pods = current_reference_pods - \
self.reference_pods[resource_name]

# update reference pod count
self.reference_pods[resource_name] = current_reference_pods

self.logger.debug('Secondary scaling: %s `%s` references %s `%s` '
'which has %s pods (%s new pods).',
str(resource_type).capitalize(), resource_name,
reference_resource_type, reference_resource_name,
current_reference_pods, new_reference_pods)

# only scale secondary deployments if there are new reference pods
if new_reference_pods > 0:

# compute desired pods for this deployment
current_pods = self.get_current_pods(
resource_namespace, resource_type, resource_name)

desired_pods = self.get_secondary_desired_pods(
new_reference_pods, pods_per_other_pod,
min_pods, max_pods, current_pods)

self.scale_resource(desired_pods, current_pods, resource_type,
resource_namespace, resource_name)

def scale(self):
self.tally_keys()
self.scale_primary_resources()
self.scale_secondary_resources()
self.tally_queues()
self.scale_resources()
Loading

0 comments on commit 86c7d8b

Please sign in to comment.