Skip to content

Commit

Permalink
pass queues to the autoscaler to scale different queue names (#29)
Browse files Browse the repository at this point in the history
* define redis_keys based on init arg "queues", a CSV string of many queues.

* parse env var QUEUES and pass as string to autoscaler.

* update tests for new init orders
  • Loading branch information
willgraf committed Sep 27, 2019
1 parent 3364ce5 commit 0d98d80
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
8 changes: 3 additions & 5 deletions autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class Autoscaler(object):
def __init__(self,
redis_client,
scaling_config,
queues='predict',
queue_delim=',',
deployment_delim=';',
param_delim='|'):

Expand All @@ -55,11 +57,7 @@ def __init__(self,
'different. Got "{}" and "{}".'.format(
deployment_delim, param_delim))

self.redis_keys = {
'predict': 0,
'train': 0,
'track': 0,
}
self.redis_keys = {q: 0 for q in queues.split(queue_delim)}

self.redis_client = redis_client
self.logger = logging.getLogger(str(self.__class__.__name__))
Expand Down
45 changes: 28 additions & 17 deletions autoscaler/autoscaler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def test__get_autoscaling_params(self):
def test_get_desired_pods(self):
# key, keys_per_pod, min_pods, max_pods, current_pods
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')
scaler.redis_keys['predict'] = 10
# desired_pods is > max_pods
desired_pods = scaler.get_desired_pods('predict', 2, 0, 2, 1)
Expand All @@ -159,23 +159,23 @@ def test_get_desired_pods(self):

def test_list_namespaced_deployment(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')
# test ApiException is logged and thrown
scaler.get_apps_v1_client = lambda: DummyKubernetes(fail=True)
with pytest.raises(kubernetes.client.rest.ApiException):
scaler.list_namespaced_deployment('ns')

def test_list_namespaced_job(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')
# test ApiException is logged and thrown
scaler.get_batch_v1_client = lambda: DummyKubernetes(fail=True)
with pytest.raises(kubernetes.client.rest.ApiException):
scaler.list_namespaced_job('ns')

def test_patch_namespaced_deployment(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')
# test ApiException is logged and thrown
scaler.get_apps_v1_client = lambda: DummyKubernetes(fail=True)
with pytest.raises(kubernetes.client.rest.ApiException):
Expand All @@ -184,7 +184,7 @@ def test_patch_namespaced_deployment(self):

def test_patch_namespaced_job(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')
# test ApiException is logged and thrown
scaler.get_batch_v1_client = lambda: DummyKubernetes(fail=True)
with pytest.raises(kubernetes.client.rest.ApiException):
Expand All @@ -193,7 +193,7 @@ def test_patch_namespaced_job(self):

def test_get_current_pods(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')

scaler.get_apps_v1_client = DummyKubernetes
scaler.get_batch_v1_client = DummyKubernetes
Expand Down Expand Up @@ -222,7 +222,12 @@ def test_get_current_pods(self):

def test_tally_queues(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None', 'predict')
scaler.tally_queues()
assert scaler.redis_keys == {'predict': 8}

redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'predict,track,train')
scaler.tally_queues()
assert scaler.redis_keys == {'predict': 8, 'track': 6, 'train': 6}

Expand All @@ -239,17 +244,19 @@ def test_scale_resources(self):
# non-integer values will warn, but not raise (or autoscale)
bad_params = ['f0', 'f1', 'f3', 'ns', 'job', 'train', 'name']
p = deployment_delim.join([param_delim.join(bad_params)])
scaler = autoscaler.Autoscaler(redis_client, p, deployment_delim,
param_delim)
scaler = autoscaler.Autoscaler(redis_client, p,
deployment_delim=deployment_delim,
param_delim=param_delim)
scaler.get_apps_v1_client = DummyKubernetes
scaler.get_batch_v1_client = DummyKubernetes
scaler.scale_resources()

# not enough params will warn, but not raise (or autoscale)
bad_params = ['0', '1', '3', 'ns', 'job', 'train']
p = deployment_delim.join([param_delim.join(bad_params)])
scaler = autoscaler.Autoscaler(redis_client, p, deployment_delim,
param_delim)
scaler = autoscaler.Autoscaler(redis_client, p,
deployment_delim=deployment_delim,
param_delim=param_delim)
scaler.get_apps_v1_client = DummyKubernetes
scaler.get_batch_v1_client = DummyKubernetes
scaler.scale_resources()
Expand All @@ -258,8 +265,9 @@ def test_scale_resources(self):
with pytest.raises(ValueError):
bad_params = ['0', '1', '3', 'ns', 'bad_type', 'train', 'name']
p = deployment_delim.join([param_delim.join(bad_params)])
scaler = autoscaler.Autoscaler(redis_client, p, deployment_delim,
param_delim)
scaler = autoscaler.Autoscaler(redis_client, p,
deployment_delim=deployment_delim,
param_delim=param_delim)
scaler.get_apps_v1_client = DummyKubernetes
scaler.get_batch_v1_client = DummyKubernetes
scaler.scale_resources()
Expand All @@ -270,8 +278,9 @@ def test_scale_resources(self):
params = [deploy_params, job_params]
p = deployment_delim.join([param_delim.join(p) for p in params])

scaler = autoscaler.Autoscaler(redis_client, p, deployment_delim,
param_delim)
scaler = autoscaler.Autoscaler(redis_client, p,
deployment_delim=deployment_delim,
param_delim=param_delim)

scaler.get_apps_v1_client = DummyKubernetes
scaler.get_batch_v1_client = DummyKubernetes
Expand All @@ -293,11 +302,13 @@ def fail_to_scale(*_, **__):
param_delim = '|'
deployment_delim = '|'
p = deployment_delim.join([param_delim.join(p) for p in params])
autoscaler.Autoscaler(None, p, deployment_delim, param_delim)
scaler = autoscaler.Autoscaler(None, p,
deployment_delim=deployment_delim,
param_delim=param_delim)

def test_scale(self):
redis_client = DummyRedis()
scaler = autoscaler.Autoscaler(redis_client, 'None', 'None')
scaler = autoscaler.Autoscaler(redis_client, 'None')

global counter
counter = 0
Expand Down
3 changes: 2 additions & 1 deletion scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def initialize_logger(debug_mode=True):

SCALER = autoscaler.Autoscaler(
redis_client=REDIS_CLIENT,
scaling_config=decouple.config('AUTOSCALING'))
scaling_config=decouple.config('AUTOSCALING'),
queues=decouple.config('QUEUES', 'predict,track,train'))

INTERVAL = decouple.config('INTERVAL', default=5, cast=int)

Expand Down

0 comments on commit 0d98d80

Please sign in to comment.