Skip to content

Commit

Permalink
Merge pull request #46294 from AAbouZaid/kapacitor_enhancements
Browse files Browse the repository at this point in the history
Enhance Kapacitor module and state.
  • Loading branch information
Nicole Thomas committed Mar 8, 2018
2 parents ba4266f + b74c416 commit 65cee66
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 27 deletions.
48 changes: 40 additions & 8 deletions salt/modules/kapacitor.py
Expand Up @@ -6,6 +6,8 @@
parameters or as configuration settings in /etc/salt/minion on the relevant
minions::
kapacitor.unsafe_ssl: 'false'
kapacitor.protocol: 'http'
kapacitor.host: 'localhost'
kapacitor.port: 9092
Expand Down Expand Up @@ -40,6 +42,17 @@ def version():
return version


def _get_url():
'''
Get the kapacitor URL.
'''
protocol = __salt__['config.option']('kapacitor.protocol', 'http')
host = __salt__['config.option']('kapacitor.host', 'localhost')
port = __salt__['config.option']('kapacitor.port', 9092)

return '{0}://{1}:{2}'.format(protocol, host, port)


def get_task(name):
'''
Get a dict of data on a task.
Expand All @@ -53,15 +66,14 @@ def get_task(name):
salt '*' kapacitor.get_task cpu
'''
host = __salt__['config.option']('kapacitor.host', 'localhost')
port = __salt__['config.option']('kapacitor.port', 9092)
url = _get_url()

if version() < '0.13':
url = 'http://{0}:{1}/task?name={2}'.format(host, port, name)
task_url = '{0}/task?name={1}'.format(url, name)
else:
url = 'http://{0}:{1}/kapacitor/v1/tasks/{2}?skip-format=true'.format(host, port, name)
task_url = '{0}/kapacitor/v1/tasks/{1}?skip-format=true'.format(url, name)

response = salt.utils.http.query(url, status=True)
response = salt.utils.http.query(task_url, status=True)

if response['status'] == 404:
return None
Expand Down Expand Up @@ -89,7 +101,11 @@ def _run_cmd(cmd):
Run a Kapacitor task and return a dictionary of info.
'''
ret = {}
result = __salt__['cmd.run_all'](cmd)
env_vars = {
'KAPACITOR_URL': _get_url(),
'KAPACITOR_UNSAFE_SSL': __salt__['config.option']('kapacitor.unsafe_ssl', 'false'),
}
result = __salt__['cmd.run_all'](cmd, env=env_vars)

if result.get('stdout'):
ret['stdout'] = result['stdout']
Expand All @@ -104,7 +120,8 @@ def define_task(name,
tick_script,
task_type='stream',
database=None,
retention_policy='default'):
retention_policy='default',
dbrps=None):
'''
Define a task. Serves as both create/update.
Expand All @@ -117,6 +134,13 @@ def define_task(name,
task_type
Task type. Defaults to 'stream'
dbrps
A list of databases and retention policies in "dbname"."rpname" format
to fetch data from. For backward compatibility, the value of
'database' and 'retention_policy' will be merged as part of dbrps.
.. versionadded:: Fluorine
database
Which database to fetch data from. Defaults to None, which will use the
default database in InfluxDB.
Expand All @@ -143,8 +167,16 @@ def define_task(name,
if task_type:
cmd += ' -type {0}'.format(task_type)

if not dbrps:
dbrps = []

if database and retention_policy:
cmd += ' -dbrp {0}.{1}'.format(database, retention_policy)
dbrp = '{0}.{1}'.format(database, retention_policy)
dbrps.append(dbrp)

if dbrps:
for dbrp in dbrps:
cmd += ' -dbrp {0}'.format(dbrp)

return _run_cmd(cmd)

Expand Down
33 changes: 23 additions & 10 deletions salt/states/kapacitor.py
Expand Up @@ -6,6 +6,8 @@
parameters or as configuration settings in /etc/salt/minion on the relevant
minions::
kapacitor.unsafe_ssl: 'false'
kapacitor.protocol: 'http'
kapacitor.host: 'localhost'
kapacitor.port: 9092
Expand All @@ -32,7 +34,8 @@ def task_present(name,
task_type='stream',
database=None,
retention_policy='default',
enable=True):
enable=True,
dbrps=None):
'''
Ensure that a task is present and up-to-date in Kapacitor.
Expand All @@ -45,6 +48,13 @@ def task_present(name,
task_type
Task type. Defaults to 'stream'
dbrps
A list of databases and retention policies in "dbname"."rpname" format
to fetch data from. For backward compatibility, the value of
'database' and 'retention_policy' will be merged as part of dbrps.
.. versionadded:: Fluorine
database
Which database to fetch data from. Defaults to None, which will use the
default database in InfluxDB.
Expand All @@ -61,6 +71,12 @@ def task_present(name,

task = __salt__['kapacitor.get_task'](name)
old_script = task['script'] if task else ''
if not dbrps:
dbrps = []
if database and retention_policy:
dbrp = '{0}.{1}'.format(database, retention_policy)
dbrps.append(dbrp)
task_dbrps = [{'db': dbrp[0], 'rp': dbrp[1]} for dbrp in (dbrp.split('.') for dbrp in dbrps)]

if tick_script.startswith('salt://'):
script_path = __salt__['cp.cache_file'](tick_script, __env__)
Expand All @@ -73,7 +89,7 @@ def task_present(name,
is_up_to_date = task and (
old_script == new_script and
task_type == task['type'] and
task['dbrps'] == [{'db': database, 'rp': retention_policy}]
task['dbrps'] == task_dbrps
)

if is_up_to_date:
Expand All @@ -88,7 +104,8 @@ def task_present(name,
script_path,
task_type=task_type,
database=database,
retention_policy=retention_policy
retention_policy=retention_policy,
dbrps=dbrps
)
ret['result'] = result['success']
if not ret['result']:
Expand All @@ -109,13 +126,9 @@ def task_present(name,
ret['changes']['type'] = task_type
comments.append('Task type updated')

if not task or task['dbrps'][0]['db'] != database:
ret['changes']['db'] = database
comments.append('Task database updated')

if not task or task['dbrps'][0]['rp'] != retention_policy:
ret['changes']['rp'] = retention_policy
comments.append('Task retention policy updated')
if not task or task['dbrps'] != task_dbrps:
ret['changes']['dbrps'] = task_dbrps
comments.append('Task dbrps updated')

if enable:
if task and task['enabled']:
Expand Down
12 changes: 8 additions & 4 deletions tests/unit/modules/test_kapacitor.py
Expand Up @@ -14,6 +14,10 @@


class KapacitorTestCase(TestCase, LoaderModuleMockMixin):
env = {
'KAPACITOR_UNSAFE_SSL': 'false',
'KAPACITOR_URL': 'http://localhost:9092'
}

def setup_loader_modules(self):
return {
Expand Down Expand Up @@ -51,22 +55,22 @@ def test_define_task(self):
with patch.dict(kapacitor.__salt__, {'cmd.run_all': cmd_mock}):
kapacitor.define_task('taskname', '/tmp/script.tick')
cmd_mock.assert_called_once_with('kapacitor define taskname '
'-tick /tmp/script.tick -type stream')
'-tick /tmp/script.tick -type stream', env=self.__class__.env)

def test_enable_task(self):
cmd_mock = Mock(return_value={'retcode': 0})
with patch.dict(kapacitor.__salt__, {'cmd.run_all': cmd_mock}):
kapacitor.enable_task('taskname')
cmd_mock.assert_called_once_with('kapacitor enable taskname')
cmd_mock.assert_called_once_with('kapacitor enable taskname', env=self.__class__.env)

def test_disable_task(self):
cmd_mock = Mock(return_value={'retcode': 0})
with patch.dict(kapacitor.__salt__, {'cmd.run_all': cmd_mock}):
kapacitor.disable_task('taskname')
cmd_mock.assert_called_once_with('kapacitor disable taskname')
cmd_mock.assert_called_once_with('kapacitor disable taskname', env=self.__class__.env)

def test_delete_task(self):
cmd_mock = Mock(return_value={'retcode': 0})
with patch.dict(kapacitor.__salt__, {'cmd.run_all': cmd_mock}):
kapacitor.delete_task('taskname')
cmd_mock.assert_called_once_with('kapacitor delete tasks taskname')
cmd_mock.assert_called_once_with('kapacitor delete tasks taskname', env=self.__class__.env)
14 changes: 9 additions & 5 deletions tests/unit/states/test_kapacitor.py
Expand Up @@ -17,6 +17,7 @@ def _present(name='testname',
task_type='stream',
database='testdb',
retention_policy='default',
dbrps=None,
enable=True,
task=None,
define_result=True,
Expand Down Expand Up @@ -49,12 +50,13 @@ def _present(name='testname',
}):
with patch('salt.utils.files.fopen', mock_open(read_data=script)) as open_mock:
retval = kapacitor.task_present(name, tick_script, task_type=task_type,
database=database, retention_policy=retention_policy, enable=enable)
database=database, retention_policy=retention_policy, enable=enable, dbrps=dbrps)

return retval, get_mock, define_mock, enable_mock, disable_mock


def _task(script='testscript', enabled=True, task_type='stream', db='testdb', rp='default'):
def _task(script='testscript', enabled=True, task_type='stream',
db='testdb', rp='default'):
return {
'script': script,
'enabled': enabled,
Expand All @@ -73,10 +75,11 @@ def setup_loader_modules(self):
}

def test_task_present_new_task(self):
ret, get_mock, define_mock, enable_mock, _ = _present()
ret, get_mock, define_mock, enable_mock, _ = _present(dbrps=['testdb2.default_rp'])
get_mock.assert_called_once_with('testname')
define_mock.assert_called_once_with('testname', '/tmp/script.tick',
database='testdb', retention_policy='default', task_type='stream')
database='testdb', retention_policy='default',
task_type='stream', dbrps=['testdb2.default_rp', 'testdb.default'])
enable_mock.assert_called_once_with('testname')
self.assertIn('TICKscript diff', ret['changes'])
self.assertIn('enabled', ret['changes'])
Expand All @@ -86,7 +89,8 @@ def test_task_present_existing_task_updated_script(self):
ret, get_mock, define_mock, enable_mock, _ = _present(task=_task(script='oldscript'))
get_mock.assert_called_once_with('testname')
define_mock.assert_called_once_with('testname', '/tmp/script.tick',
database='testdb', retention_policy='default', task_type='stream')
database='testdb', retention_policy='default',
task_type='stream', dbrps=['testdb.default'])
self.assertEqual(False, enable_mock.called)
self.assertIn('TICKscript diff', ret['changes'])
self.assertNotIn('enabled', ret['changes'])
Expand Down

0 comments on commit 65cee66

Please sign in to comment.