Skip to content

Commit

Permalink
Merge pull request #1 from basepi/daemon
Browse files Browse the repository at this point in the history
Fix up pulsar for non-beacon use
  • Loading branch information
jettero committed Jan 5, 2017
2 parents 60f66bc + e8d6bd0 commit 202fcfd
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 74 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ python-pygit2 to get gitfs working:
gitfs_remotes:
- https://github.com/hubblestack/hubble.git:
fileserver_backend:
- root
- roots
- git
```

Expand Down
83 changes: 12 additions & 71 deletions hubble/extmods/modules/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import absolute_import
import collections
import fnmatch
import multiprocessing
import os
import re
import yaml
Expand Down Expand Up @@ -80,22 +79,12 @@ def _get_notifier():
return __context__['pulsar.notifier']


def beacon():
def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml',
verbose=False):
'''
Watch the configured files
Example pillar config
.. code-block:: yaml
beacons:
pulsar:
paths:
- /var/cache/salt/minion/files/base/hubblestack_pulsar/hubblestack_pulsar_config.yaml
refresh_interval: 300
verbose: False
Example yaml config on fileserver (targeted by pillar)
Example yaml config on fileserver:
.. code-block:: yaml
Expand All @@ -111,17 +100,10 @@ def beacon():
- /path/to/file/or/dir/exclude2
- /path/to/file/or/dir/regex[\d]*$:
regex: True
return:
splunk:
batch: True
slack:
batch: False # overrides the global setting
checksum: sha256
stats: True
batch: True
Note that if `batch: True`, the configured returner must support receiving
a list of events, rather than single one-off events.
refresh_interval: 300
verbose: False
The mask list can contain the following events (the default mask is create,
delete, and modify):
Expand Down Expand Up @@ -158,6 +140,11 @@ def beacon():
True, then changes will be discarded.
'''
config = __opts__.get('pulsar', {})
if isinstance(configfile, list):
config['paths'] = configfile
else:
config['paths'] = [configfile]
config['verbose'] = verbose
global CONFIG_STALENESS
global CONFIG
if config.get('verbose'):
Expand All @@ -176,17 +163,12 @@ def beacon():
config = CONFIG
else:
if config.get('verbose'):
log.debug('No cached config found for pulsar, retrieving fresh from disk.')
log.debug('No cached config found for pulsar, retrieving fresh from fileserver.')
new_config = config
if isinstance(config.get('paths'), list):
for path in config['paths']:
if 'salt://' in path:
log.error('Path {0} is not an absolute path. Please use a '
'scheduled cp.cache_file job to deliver the '
'config to the minion, then provide the '
'absolute path to the cached file on the minion '
'in the beacon config.'.format(path))
continue
path = __salt__['cp.cache_file'](path)
if os.path.isfile(path):
with open(path, 'r') as f:
new_config = _dict_update(new_config,
Expand Down Expand Up @@ -346,47 +328,6 @@ def beacon():

return ret

#if ret and 'return' in config:
# __opts__['grains'] = __grains__
# __opts__['pillar'] = __pillar__
# __returners__ = salt.loader.returners(__opts__, __salt__)
# return_config = config['return']
# if isinstance(return_config, salt.ext.six.string_types):
# tmp = {}
# for conf in return_config.split(','):
# tmp[conf] = None
# return_config = tmp
# for returner_mod in return_config:
# returner = '{0}.returner'.format(returner_mod)
# if returner not in __returners__:
# log.error('Could not find {0} returner for pulsar beacon'.format(config['return']))
# return ret
# batch_config = config.get('batch')
# if isinstance(return_config[returner_mod], dict) and return_config[returner_mod].get('batch'):
# batch_config = True
# if batch_config:
# transformed = []
# for item in ret:
# transformed.append({'return': item})
# if config.get('multiprocessing_return', True):
# p = multiprocessing.Process(target=__returners__[returner], args=(transformed,))
# p.daemon = True
# p.start()
# else:
# __returners__[returner](transformed)
# else:
# for item in ret:
# if config.get('multiprocessing_return', True):
# p = multiprocessing.Process(target=__returners__[returner], args=({'return': item},))
# p.daemon = True
# p.start()
# else:
# __returners__[returner]({'return': item})
# return []
#else:
# # Return event data
# return ret


def _dict_update(dest, upd, recursive_update=True, merge_lists=False):
'''
Expand Down
13 changes: 11 additions & 2 deletions hubble/extmods/returners/splunk_pulsar_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@

_max_content_bytes = 100000
http_event_collector_SSL_verify = False
http_event_collector_debug = True
http_event_collector_debug = False

log = logging.getLogger(__name__)

hec = None


def returner(ret):
if isinstance(ret, dict) and not ret.get('return'):
# Empty single return, let's not do any setup or anything
return
# Customized to split up the change events and send to Splunk.
opts = _get_options()
logging.info('Options: %s' % json.dumps(opts))
Expand All @@ -91,8 +94,14 @@ def returner(ret):
except IndexError:
fqdn_ip4 = __grains__['ipv4'][0]

alerts = []
for item in data:
alert = item['return']
events = item['return']
if not isinstance(events, list):
events = [events]
alerts.extend(events)

for alert in alerts:
event = {}
payload = {}
if('change' in alert): # Linux, normal pulsar
Expand Down

0 comments on commit 202fcfd

Please sign in to comment.