Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cell tracking #18

Merged
merged 6 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 88 additions & 28 deletions autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,62 @@ def __init__(self, redis_client, scaling_config, secondary_scaling_config,
self.logger = logging.getLogger(str(self.__class__.__name__))
self.completed_statuses = {'done', 'failed'}

self.autoscaling_params = self._get_autoscaling_params(
self.autoscaling_params = self._get_primary_autoscaling_params(
scaling_config=scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)

self.secondary_autoscaling_params = self._get_autoscaling_params(
self.secondary_autoscaling_params = self._get_secondary_autoscaling_params(
scaling_config=secondary_scaling_config.rstrip(),
deployment_delim=deployment_delim,
param_delim=param_delim)

self.redis_keys = {
'predict': 0,
'train': 0
'train': 0,
'track': 0
}

self.managed_resource_types = {'deployment', 'job'}

self.reference_pods = {}

def _get_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
def _get_primary_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
raw_params = [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

params = {}
for entry in raw_params:
try:
print(len(entry))
print(entry)
namespace_resource_type_name = (
str(entry[3]).strip(),
str(entry[4]).strip(),
str(entry[6]).strip(),
)

if namespace_resource_type_name not in params:
params[namespace_resource_type_name] = []

params[namespace_resource_type_name].append({
'min_pods': int(entry[0]),
'max_pods': int(entry[1]),
'keys_per_pod': int(entry[2]),
'prefix': str(entry[5]).strip(),
})

except (IndexError, ValueError):
self.logger.error('Autoscaling entry %s is malformed.', entry)
continue

return params

def _get_secondary_autoscaling_params(self, scaling_config,
deployment_delim=';',
param_delim='|'):
return [x.split(param_delim)
for x in scaling_config.split(deployment_delim)]

Expand Down Expand Up @@ -106,7 +140,7 @@ def tally_keys(self):

self.logger.debug('Finished tallying redis keys in %s seconds.',
timeit.default_timer() - start)
self.logger.info('Tallied redis keys: %s', self.redis_keys)
self.logger.info('In-progress or new redis keys: %s', self.redis_keys)

def get_apps_v1_client(self):
"""Returns Kubernetes API Client for AppsV1Api"""
Expand All @@ -133,6 +167,8 @@ def list_namespaced_deployment(self, namespace):
self.logger.debug('Found %s deployments in namespace `%s` in '
'%s seconds.', len(response.items), namespace,
timeit.default_timer() - t)
self.logger.debug('Specifically: %s',
[d.metadata.name for d in response.items])
return response.items

def list_namespaced_job(self, namespace):
Expand All @@ -145,7 +181,7 @@ def list_namespaced_job(self, namespace):
self.logger.error('%s when calling `list_namespaced_job`: %s',
type(err).__name__, err)
raise err
self.logger.debug('Found %s deployments in namespace `%s` in '
self.logger.debug('Found %s jobs in namespace `%s` in '
'%s seconds.', len(response.items), namespace,
timeit.default_timer() - t)
return response.items
Expand Down Expand Up @@ -198,6 +234,9 @@ def get_current_pods(self, namespace, resource_type, name,
current_pods = d.status.available_replicas
else:
current_pods = d.spec.replicas

self.logger.debug("Deployment {} has {} pods".format(
name, current_pods))
break

elif resource_type == 'job':
Expand Down Expand Up @@ -259,29 +298,50 @@ def scale_resource(self, desired_pods, current_pods,

def scale_primary_resources(self):
"""Scale each resource defined in `autoscaling_params`"""
for entry in self.autoscaling_params:
# entry schema: minPods maxPods keysPerPod namespace resource_type
# predict_or_train deployment
try:
min_pods = int(entry[0])
max_pods = int(entry[1])
keys_per_pod = int(entry[2])
namespace = str(entry[3]).strip()
resource_type = str(entry[4]).strip()
predict_or_train = str(entry[5]).strip()
name = str(entry[6]).strip()
except (IndexError, ValueError):
self.logger.error('Autoscaling entry %s is malformed.', entry)
continue
self.logger.debug("Scaling primary resources.")
for ((namespace, resource_type, name),
entries) in self.autoscaling_params.items():
# iterate through all entries with this
# (namespace, resource_type, name) entry. We sum up the current
# and desired pods over all entries

self.logger.debug('Scaling %s `%s`', resource_type, name)
current_pods = self.get_current_pods(namespace, resource_type, name)
desired_pods = 0

self.logger.debug("Scaling {}".format((namespace, resource_type, name)))

min_pods_for_all_entries = []
max_pods_for_all_entries = []

for entry in entries:
min_pods = entry["min_pods"]
max_pods = entry["max_pods"]
keys_per_pod = entry["keys_per_pod"]
prefix = entry["prefix"]

min_pods_for_all_entries.append(min_pods)
max_pods_for_all_entries.append(max_pods)

current_pods = self.get_current_pods(
namespace, resource_type, name)
self.logger.debug("Inspecting entry {}.".format(entry))

desired_pods = self.get_desired_pods(
predict_or_train, keys_per_pod,
min_pods, max_pods, current_pods)
desired_pods += self.get_desired_pods(prefix, keys_per_pod,
min_pods, max_pods,
current_pods)

self.logger.debug("desired_pods now = {}".format(desired_pods))

# this is the most conservative bound
if entries == []:
return

min_pods = max(min_pods_for_all_entries)
max_pods = min(max_pods_for_all_entries)

desired_pods = self.clip_pod_count(desired_pods, min_pods,
max_pods, current_pods)
self.logger.debug("desired_pods clamped to {}".format(desired_pods))

self.logger.debug('Scaling %s `%s`', resource_type, name)

self.logger.debug('%s `%s` in namespace `%s` has a current state '
'of %s pods and a desired state of %s pods.',
Expand Down
47 changes: 46 additions & 1 deletion autoscaler/autoscaler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,51 @@ def patch_namespaced_job(self, *_, **__):

class TestAutoscaler(object):

def test__get_primary_autoscaling_params(self):
primary_params = """
1|2|3|namespace|resource_1|predict|name_1;
4|5|6|namespace|resource_1|track|name_1;
7|8|9|namespace|resource_2|train|name_1
""".strip()
secondary_params = """
redis-consumer-deployment|deployment|deepcell|tf-serving-deployment|deployment|deepcell|7|0|0;
tracking-consumer-deployment|deployment|deepcell|tf-serving-deployment|deployment|deepcell|7|0|0;
data-processing-deployment|deployment|deepcell|tf-serving-deployment|deployment|deepcell|1|0|0
""".strip()

redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client,
primary_params,
secondary_params)
print("printing primary the nsecondary")
print(scaler.autoscaling_params)
print(scaler.secondary_autoscaling_params)
print("done printing")
assert scaler.autoscaling_params == {
('namespace', 'resource_1', 'name_1'): [
{
"prefix": "predict",
"min_pods": 1,
"max_pods": 2,
"keys_per_pod": 3
},
{
"prefix": "track",
"min_pods": 4,
"max_pods": 5,
"keys_per_pod": 6
},
],
('namespace', 'resource_2', 'name_1'): [
{
"prefix": "train",
"min_pods": 7,
"max_pods": 8,
"keys_per_pod": 9
}
]
}

def test_get_desired_pods(self):
# key, keys_per_pod, min_pods, max_pods, current_pods
redis_client = DummyRedis()
Expand Down Expand Up @@ -248,7 +293,7 @@ def test_tally_keys(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler.tally_keys()
assert scaler.redis_keys == {'predict': 2, 'train': 2}
assert scaler.redis_keys == {'predict': 2, 'track': 0, 'train': 2}

def test_scale_primary_resources(self):
redis_client = DummyRedis()
Expand Down