Skip to content

Commit

Permalink
Merge 00be726 into f2180d0
Browse files Browse the repository at this point in the history
  • Loading branch information
enricozb committed May 22, 2019
2 parents f2180d0 + 00be726 commit 3c7af03
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 29 deletions.
116 changes: 88 additions & 28 deletions autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,62 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config,
self.logger = logging.getLogger(str(self.__class__.__name__))
self.completed_statuses = {'done', 'failed'}

self.autoscaling_params = self._get_autoscaling_params(
self.autoscaling_params = self._get_primary_autoscaling_params(
scaling_config=scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)

self.secondary_autoscaling_params = self._get_autoscaling_params(
self.secondary_autoscaling_params = self._get_secondary_autoscaling_params(
scaling_config=secondary_scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)

self.redis_keys = {
'predict': 0,
'train': 0
'train': 0,
'track': 0
}

self.managed_resource_types = {'deployment', 'job'}

self.reference_pods = {}

def _get_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
def _get_primary_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
raw_params = [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

params = {}
for entry in raw_params:
try:
print(len(entry))
print(entry)
namespace_resource_type_name = (
str(entry[3]).strip(),
str(entry[4]).strip(),
str(entry[6]).strip(),
)

if namespace_resource_type_name not in params:
params[namespace_resource_type_name] = []

params[namespace_resource_type_name].append({
'min_pods': int(entry[0]),
'max_pods': int(entry[1]),
'keys_per_pod': int(entry[2]),
'prefix': str(entry[5]).strip(),
})

except (IndexError, ValueError):
self.logger.error('Autoscaling entry %s is malformed.', entry)
continue

return params

def _get_secondary_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
return [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

Expand Down Expand Up @@ -106,7 +140,7 @@ def tally_keys(self):

self.logger.debug('Finished tallying redis keys in %s seconds.',
timeit.default_timer() - start)
self.logger.info('Tallied redis keys: %s', self.redis_keys)
self.logger.info('In-progress or new redis keys: %s', self.redis_keys)

def get_apps_v1_client(self):
"""Returns Kubernetes API Client for AppsV1Api"""
Expand All @@ -133,6 +167,8 @@ def list_namespaced_deployment(self, namespace):
self.logger.debug('Found %s deployments in namespace `%s` in '
'%s seconds.', len(response.items), namespace,
timeit.default_timer() - t)
self.logger.debug('Specifically: %s',
[d.metadata.name for d in response.items])
return response.items

def list_namespaced_job(self, namespace):
Expand All @@ -145,7 +181,7 @@ def list_namespaced_job(self, namespace):
self.logger.error('%s when calling `list_namespaced_job`: %s',
type(err).__name__, err)
raise err
self.logger.debug('Found %s deployments in namespace `%s` in '
self.logger.debug('Found %s jobs in namespace `%s` in '
'%s seconds.', len(response.items), namespace,
timeit.default_timer() - t)
return response.items
Expand Down Expand Up @@ -198,6 +234,9 @@ def get_current_pods(self, namespace, resource_type, name,
current_pods = d.status.available_replicas
else:
current_pods = d.spec.replicas

self.logger.debug("Deployment {} has {} pods".format(
name, current_pods))
break

elif resource_type == 'job':
Expand Down Expand Up @@ -259,29 +298,50 @@ def scale_resource(self, desired_pods, current_pods,

def scale_primary_resources(self):
"""Scale each resource defined in `autoscaling_params`"""
for entry in self.autoscaling_params:
# entry schema: minPods maxPods keysPerPod namespace resource_type
# predict_or_train deployment
try:
min_pods = int(entry[0])
max_pods = int(entry[1])
keys_per_pod = int(entry[2])
namespace = str(entry[3]).strip()
resource_type = str(entry[4]).strip()
predict_or_train = str(entry[5]).strip()
name = str(entry[6]).strip()
except (IndexError, ValueError):
self.logger.error('Autoscaling entry %s is malformed.', entry)
continue
self.logger.debug("Scaling primary resources.")
for ((namespace, resource_type, name),
entries) in self.autoscaling_params.items():
# iterate through all entries with this
# (namespace, resource_type, name) entry. We sum up the current
# and desired pods over all entries

self.logger.debug('Scaling %s `%s`', resource_type, name)
current_pods = self.get_current_pods(namespace, resource_type, name)
desired_pods = 0

self.logger.debug("Scaling {}".format((namespace, resource_type, name)))

min_pods_for_all_entries = []
max_pods_for_all_entries = []

for entry in entries:
min_pods = entry["min_pods"]
max_pods = entry["max_pods"]
keys_per_pod = entry["keys_per_pod"]
prefix = entry["prefix"]

min_pods_for_all_entries.append(min_pods)
max_pods_for_all_entries.append(max_pods)

current_pods = self.get_current_pods(
namespace, resource_type, name)
self.logger.debug("Inspecting entry {}.".format(entry))

desired_pods = self.get_desired_pods(
predict_or_train, keys_per_pod,
min_pods, max_pods, current_pods)
desired_pods += self.get_desired_pods(prefix, keys_per_pod,
min_pods, max_pods,
current_pods)

self.logger.debug("desired_pods now = {}".format(desired_pods))

# this is the most conservative bound
if entries == []:
return

min_pods = max(min_pods_for_all_entries)
max_pods = min(max_pods_for_all_entries)

desired_pods = self.clip_pod_count(desired_pods, min_pods,
max_pods, current_pods)
self.logger.debug("desired_pods clamped to {}".format(desired_pods))

self.logger.debug('Scaling %s `%s`', resource_type, name)

self.logger.debug('%s `%s` in namespace `%s` has a current state '
'of %s pods and a desired state of %s pods.',
Expand Down
47 changes: 46 additions & 1 deletion autoscaler/autoscaler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,51 @@ def patch_namespaced_job(self, *_, **__):

class TestAutoscaler(object):

def test__get_primary_autoscaling_params(self):
primary_params = """
1|2|3|namespace|resource_1|predict|name_1;
4|5|6|namespace|resource_1|track|name_1;
7|8|9|namespace|resource_2|train|name_1
""".strip()
secondary_params = """
redis-consumer-deployment|deployment|deepcell|tf-serving-deployment|deployment|deepcell|7|0|0;
tracking-consumer-deployment|deployment|deepcell|tf-serving-deployment|deployment|deepcell|7|0|0;
data-processing-deployment|deployment|deepcell|tf-serving-deployment|deployment|deepcell|1|0|0
""".strip()

redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client,
primary_params,
secondary_params)
print("printing primary the nsecondary")
print(scaler.autoscaling_params)
print(scaler.secondary_autoscaling_params)
print("done printing")
assert scaler.autoscaling_params == {
('namespace', 'resource_1', 'name_1'): [
{
"prefix": "predict",
"min_pods": 1,
"max_pods": 2,
"keys_per_pod": 3
},
{
"prefix": "track",
"min_pods": 4,
"max_pods": 5,
"keys_per_pod": 6
},
],
('namespace', 'resource_2', 'name_1'): [
{
"prefix": "train",
"min_pods": 7,
"max_pods": 8,
"keys_per_pod": 9
}
]
}

def test_get_desired_pods(self):
# key, keys_per_pod, min_pods, max_pods, current_pods
redis_client = DummyRedis()
Expand Down Expand Up @@ -248,7 +293,7 @@ def test_tally_keys(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler.tally_keys()
assert scaler.redis_keys == {'predict': 2, 'train': 2}
assert scaler.redis_keys == {'predict': 2, 'track': 0, 'train': 2}

def test_scale_primary_resources(self):
redis_client = DummyRedis()
Expand Down

0 comments on commit 3c7af03

Please sign in to comment.